安全研究员onepwnman报告了在BMW车辆MGU 中使用SOME/IP的应用程序中的几个安全漏洞。因此,被列入宝马名人堂。在本文中,将介绍SOME/IP协议并分享如何模糊SOME/IP应用程序以查找漏洞。
What is SOME/IP Protocol?
SOME/IP消息可以使用TCP或UDP协议。服务接口通过端口号来区分,因此相同的接口不能在同一端口上提供多个实例。
消息类型如下图所示,分为四种支持方式。
Request/Response与Fire & Forget的区别在于是否有响应消息。Events是一种消息类型,当服务器发生事件时会回调。
关于Subscribe的更多信息,请参见SOME/IP-SD协议。我们将在第二部分中详细解释,首先让我们了解一下SOME/IP的数据包结构。
SOME/IP Packet Structure
首先,为了发送一个普通的SOME/IP消息,你需要了解SOME/IP头部的结构。下图是从标准中摘录的SOME/IP头部格式。
• Message ID
• 每项服务都有一个唯一的Service ID。
• 每项服务可以有多种方法,每种方法都有自己的ID。
• Length
• 从Request ID到有效负载末尾的长度。
• Request ID
• Client ID允许ECU区分对同一方法来自多个客户端的调用。
• Session ID允许区分来自同一发送者的顺序消息或请求。
• Protocol Version
• 它通常有一个固定值。
• Interface Version
• 它通常有一个固定值。
• Message Type
• Message Type字段根据上述四种消息类型的不通而不同。例如,在事件消息的情况下,消息类型是0x2 notification,并且发送方不期望收到响应。
在宝马MGU 22的实际案例中,它使用了COVESA的SOME/IP相关库,每个数据类型的实现都在以下链接的代码中。(https://github.com/COVESA/vsomeip/wiki/vsomeip-in-10-minutes#prep)
Data Type | Byte Count |
---|---|
UInt8 | 1 |
UInt16 | 2 |
UInt32 | 4 |
UInt64 | 8 |
Bool | 1 |
Enum | 1, 2 |
Utf8 | Depends on the length of the string |
Utf16 | Depends on the length of the string |
Array | Depends on the length of the string |
Map | Depends on the length of the string |
Struct | 0 |
以下示例是为每种数据类型用Python实现的代码,返回值是一个十六进制字符串。
def _uint8(p):
return "{:02X}".format(p)
def _uint16(p):
return "{:04X}".format(p)
def _uint32(p):
return "{:08X}".format(p)
def _uint64(p):
return "{:016X}".format(p)
def _bool(p):
return "{:02X}".format(p)
def _enum(p, w=False):
if w:
return "{:04X}".format(p)
else:
return "{:02X}".format(p)
def _utf8(p):
data = "EFBBBF"
data += p.encode("utf-8").hex()
data += "00"
return "{:08X}".format(len(data)//2) + data
def _utf16(p):
data = "FEFF"
data += p.encode("utf-16-be").hex()
data += "0000"
return "{:08X}".format(len(data)//2) + data
def _array(p):
return "{:08X}".format(len(p)//2) + p
def _map(k, v):
data = k + v
return "{:08X}".format(len(data)//2) + data
例如,UTF-16字符序列“TEST”可以表示为如下形式。
print(_utf16("TEST"))
# the length of a string 0000000C
# actual string content "FEFF" + "0054004500530054" + "0000"
# return value 0000000CFEFF00540045005300540000
此外,有效负载是由多种类型的序列化数据组成的。序列化数据仅仅是一系列的数据类型。如果一个数据类型需要长度,那么长度会被加在这个数据之前。例如,我们考虑以下几种数据类型的序列化:
Struct
{
UInt32,
Array
{
Struct
{
Utf8,
Utf8
},
},
Map
{
Enum,
Utf8
},
Bool
}
Struct - UInt32 - Array - Struct - Utf8 - Utf8 - Map - Enum - Utf8 - Bool
Array、Map 和 Bool
_array(_utf8("str1") + _utf8("str2"))
_map(_enum(data1), _utf8("str3"))
payload = _uint32(data1)
payload += _array(_utf8("str1") + _utf8("str2"))
payload += _map(_enum(data2) + _utf8("str3"))
payload += _bool(data3)
总之,发送序列化后的SOME/IP消息的代码如下:
import socket
import datetime
class SomeIP:
def __init__(self, service_id, method_id, client_id, session_id, protocol_version, interface_version, message_type, return_code):
self.service_id = service_id
self.method_id = method_id
self.client_id = client_id
self.session_id = session_id
self.protocol_version = protocol_version
self.interface_version = interface_version
self.message_type = message_type
self.return_code = return_code
def assemble_packet(self, payload):
self.header = "{:04X}".format(self.service_id)
self.header += "{:04X}".format(self.method_id)
self.header += "{:08X}".format(len(payload) + 8)
self.header += "{:04X}".format(self.client_id)
self.header += "{:04X}".format(self.session_id)
self.header += "{:02X}".format(self.protocol_version)
self.header += "{:02X}".format(self.interface_version)
self.header += "{:02X}".format(self.message_type)
self.header += "{:02X}".format(self.return_code)
self.packet = bytes.fromhex(self.header) + payload
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect(("160.48.199.99", 32504))
sock.settimeout(0.1)
def _uint8(p):
return "{:02X}".format(p)
def _uint16(p):
return "{:04X}".format(p)
def _uint32(p):
return "{:08X}".format(p)
def _uint64(p):
return "{:016X}".format(p)
def _bool(p):
return "{:02X}".format(p)
def _enum(p, w=False):
if w:
return "{:04X}".format(p)
else:
return "{:02X}".format(p)
def _utf8(p):
data = "EFBBBF"
data += p.encode("utf-8").hex()
data += "00"
return "{:08X}".format(len(data)//2) + data
def _utf16(p):
data = "FEFF"
data += p.encode("utf-16-be").hex()
data += "0000"
return "{:08X}".format(len(data)//2) + data
def _array(p):
return "{:08X}".format(len(p)//2) + p
def _map(k, v):
data = k + v
return "{:08X}".format(len(data)//2) + data
def someip_func(data1, str1, str2, data2, str3, data3) -> SomeIP:
someip = SomeIP(45246, 1, 0, 0, 1, 1, 0, 0)
payload = _uint32(data1)
payload = _array(_utf8(str1) + _utf8(str2))
payload += _map(_enum(data2), _utf8(str3))
payload += _bool(data3)
someip.assemble_packet(bytes.fromhex(payload))
return someip
def send_and_recv(someip):
try:
sock.send(someip.packet)
print(f"{datetime.datetime.now()}: {someip.packet.hex()}")
res = sock.recv(1024)
print(f"Recv: {res.hex()}")
except socket.timeout:
pass
send_and_recv(someip_func(1, "aaa", "bbb", 2, "ccc", 3))
if __name__ == "__main__":
main()
b0be00010000003300000000010100000000001600000007efbbbf6161610000000007efbbbf626262000000000c0200000007efbbbf6363630003