在 4 层 tcp 代理的场景下,一般使用 TOA option 将真实的客户端地址和端口信息传递给后端服务。
在后端服务里,如果不使用内核模块读取 TOA,而是使用 eBPF 来读取 TOA,该如何实现呢?
太长不读:以下提供了 4 种 eBPF 读取 TOA 的方式,仅供参考;不过它们只是通过了 verifier,未验证是否能读取到正确的 TOA。
内核模块读取 TOA
想要使用 eBPF 读取 TOA,先看看内核模块是如何读取 TOA 的。
比如 UCloud 的实现方式,核心读取 TOA 的函数:
// https://github.com/ucloud/ucloud-toa/blob/master/toa.c#L108
/* Parse TCP options in skb, try to get client ip, port
* @param skb [in] received skb, it should be a ack/get-ack packet.
* @return NULL if we don't get client ip/port;
* value of toa_data in ret_ptr if we get client ip/port.
*/
static void *get_toa_data(struct sk_buff *skb)
{
struct tcphdr *th;
int length;
unsigned char *ptr;
struct toa_data tdata;
void *ret_ptr = NULL;
//TOA_DBG("get_toa_data called\n");
if (skb != NULL) {
th = tcp_hdr(skb);
length = (th->doff * 4) - sizeof(struct tcphdr);
ptr = (unsigned char *) (th + 1);
while (length > 0) {
int opcode = *ptr++;
int opsize;
switch (opcode) {
case TCPOPT_EOL:
return NULL;
case TCPOPT_NOP: /* Ref: RFC 793 section 3.1 */
length--;
continue;
default:
opsize = *ptr++;
if (opsize < 2) /* "silly options" */
return NULL;
if (opsize > length)
return NULL; /* don't parse partial options */
if ((opcode == TCPOPT_TOA_UCLOUD || opcode == TCPOPT_TOA_COMPAT || opcode == TCPOPT_TOA_AKAMAI) &&
opsize == TCPOLEN_TOA) {
memcpy(&tdata, ptr - 2, sizeof(tdata));
//TOA_DBG("find toa data: ip = %u.%u.%u.%u, port = %u\n", NIPQUAD(tdata.ip),
//ntohs(tdata.port));
memcpy(&ret_ptr, &tdata, sizeof(ret_ptr));
//TOA_DBG("coded toa data: %p\n", ret_ptr);
return ret_ptr;
}
ptr += opsize - 2;
length -= opsize;
}
}
}
return NULL;
}
这个函数的处理逻辑:
读取 opcode
,并判断opcode
的值。读取 opsize
,并判断opsize
的值是否合理。判断 opcode
和opsize
是否为预期的 TOA option。调整 ptr
和length
。
看着不难呀,直接抄一下吧。
eBPF 读取 TOA
eBPF 读取 TOA 的代码:
static __always_inline bool
__read_toa_3(struct tcphdr *tcp, struct toa_data *toa)
{
void *buff;
int length;
length = BPF_CORE_READ_BITFIELD_PROBED(tcp, doff) << 2;
length -= sizeof(struct tcphdr);
buff = (void *) tcp + sizeof(struct tcphdr);
while (length > 0) {
__u8 opcode, opsize;
if (bpf_probe_read_kernel(&opcode, sizeof(opcode), buff) < 0)
return false;
if (opcode == TCPOPT_EOL)
return false;
if (opcode == TCPOPT_NOP) { /* Ref: RFC 793 section 3.1 */
buff++;
length--;
continue;
}
if (bpf_probe_read_kernel(&opsize, sizeof(opsize), buff + 1) < 0)
return false;
if (opsize < 2) /* "silly options" */
return false;
if (opsize > length) /* don't parse partial options */
return false;
if (opcode == TCPOPT_TOA_COMPAT || opcode == TCPOPT_TOA_AKAMAI) {
if (opsize == TCPOLEN_TOA) {
if (bpf_probe_read_kernel(toa, sizeof(*toa), buff-2) < 0)
return false;
return true;
}
}
buff += opsize;
length -= opsize;
}
return false;
}
基本照抄了内核模块的逻辑,有几个不一样的地方:
bpf_probe_read_kernel()
读取opcode
和opsize
。bpf_probe_read_kernel()
读取toa
。
可以看出,每一次读取 option 时,至少需要两次 bpf_probe_read_kernel()
。
有没有办法减少 bpf_probe_read_kernel()
的次数呢?
eBPF 读取 TOA 优化
优化后的代码:
static __always_inline bool
__read_toa_4(struct tcphdr *tcp, struct toa_data *toa)
{
void *buff;
int length;
length = BPF_CORE_READ_BITFIELD_PROBED(tcp, doff) << 2;
length -= sizeof(struct tcphdr);
buff = (void *) tcp + sizeof(struct tcphdr);
while (length > 0) {
__u8 opcode, opsize;
/* bpf_probe_read_kernel() expects to read a toa option. If it fails to
* read it, it won't be a toa option; then return false because there's
* not enough data to read as a toa option.
*/
if (bpf_probe_read_kernel(toa, sizeof(*toa), buff) < 0)
return false;
opcode = toa->opcode;
opsize = toa->opsize;
if (opcode == TCPOPT_EOL)
return false;
if (opcode == TCPOPT_NOP) { /* Ref: RFC 793 section 3.1 */
buff++;
length--;
continue;
}
if (opsize < 2) /* "silly options" */
return false;
if (opsize > length) /* don't parse partial options */
return false;
if (opcode == TCPOPT_TOA_COMPAT || opcode == TCPOPT_TOA_AKAMAI) {
if (opsize == TCPOLEN_TOA)
return true;
}
buff += opsize;
length -= opsize;
}
return false;
}
优化的思路是:一次性读取整个 option,然后再判断是否为 TOA option。
如果读取失败、异常的 option,直接返回 false
。如果是 TOA option,则直接返回 true
。如果不是 TOA option,则继续读取下一个 option。
eBPF 读取 TOA 优化 2
将 while
循环重构为迭代器:
struct toa_prober {
struct toa_data *toa;
int length;
int offset;
};
static int
__probe_toa(struct toa_prober *prober, void *buff)
{
int ret;
if (prober->offset + sizeof(*prober->toa) > prober->length)
return -1;
ret = bpf_probe_read_kernel(prober->toa, sizeof(*prober->toa), buff + prober->offset);
if (ret < 0)
return ret;
if (prober->toa->opcode == TCPOPT_EOL)
return -1;
if (prober->toa->opcode == TCPOPT_NOP) {
prober->offset++;
return 0;
}
if (prober->toa->opsize < 2) /* "silly options" */
return -1;
if (prober->offset + prober->toa->opsize > prober->length) /* don't parse partial options */
return -1;
if (prober->toa->opcode == TCPOPT_TOA_COMPAT || prober->toa->opcode == TCPOPT_TOA_AKAMAI) {
if (prober->toa->opsize == TCPOLEN_TOA)
return 1;
}
prober->offset += prober->toa->opsize;
return 0;
}
static __always_inline bool
__read_toa_2(struct tcphdr *tcp, struct toa_data *toa)
{
void *buff = (void *) tcp + sizeof(struct tcphdr);
struct toa_prober prober = {
.toa = toa,
.length = (BPF_CORE_READ_BITFIELD_PROBED(tcp, doff) << 2) - sizeof(struct tcphdr),
.offset = 0,
};
/* Tell compiler and verifier that this for loop tries 32 times at most. */
for (int i = 0; i < MAX_TCPOPT_LEN - sizeof(*toa); i++) {
int ret = __probe_toa(&prober, buff);
if (ret < 0)
return false;
if (ret > 0)
return true;
}
return false;
}
优化思路:
将 while
循环转为有限次数的for
循环。抽象 struct toa_prober
作为迭代器。每次循环都使用 bpf_probe_read_kernel()
读取整个 option。
有没有更加极致的优化呢?
eBPF 读取 TOA 优化 3
毕竟 bpf_probe_read_kernel()
调用是有代价的,能尽量减少调用次数,就尽量减少调用次数。
static __always_inline bool
__read_toa(struct tcphdr *tcp, struct toa_data *toa)
{
__u8 buff[MAX_TCPOPT_LEN];
int length;
length = BPF_CORE_READ_BITFIELD_PROBED(tcp, doff) << 2;
length -= sizeof(struct tcphdr);
if (bpf_probe_read_kernel(buff, MAX_TCPOPT_LEN, (void *) tcp + sizeof(struct tcphdr)) < 0)
return false;
for (int i = 0; i < MAX_TCPOPT_LEN - sizeof(*toa); ) {
if (i > length)
return false;
barrier_var(i);
if (i > MAX_TCPOPT_LEN - sizeof(*toa))
return false;
__u8 opcode = buff[i];
if (opcode == TCPOPT_EOL)
break;
if (opcode == TCPOPT_NOP) {
i++;
continue;
}
/* i becomes variable because of i++ */
/* it's necessary to check range of i again, or
* invalid variable-offset read from stack R0 var_off=(0x0; 0x1ff) off=-40 size=1
*/
barrier_var(i);
if (i + 1 >= MAX_TCPOPT_LEN)
return false;
__u8 opsize = buff[i + 1];
if (opsize < 2)
return false;
if ((opcode == TCPOPT_TOA_COMPAT || opcode == TCPOPT_TOA_AKAMAI) && opsize == TCPOLEN_TOA) {
/* it's necessary to check narrow range of i, or
* invalid variable-offset read from stack R0 var_off=(0x0; 0x1ff) off=-40 size=1
*/
barrier_var(i);
if (i > MAX_TCPOPT_LEN - sizeof(*toa))
return false;
/* it fails because of misalign:
* misaligned stack access off (0x10; 0xf)+-40+0 size 8
*/
/* *(__u64 *) toa = *(__u64 *) (buff + i); */
bpf_probe_read_kernel(toa, sizeof(*toa), buff + i);
return true;
}
i += opsize;
}
return false;
}
优化思路:
一次性读取 40 字节的包含所有 option 的数据到 bpf 栈上;只要是内核空间的内存地址,一般不会读取失败。 按需解析 bpf 栈上的数据。 其中多次使用 barrier_var()
和i
的范围判断,是为了避免 verifier 的报错。最后,使用 bpf_probe_read_kernel()
读取 TOA option,是为了避免 verifier 的 "misaligned stack access" 的报错。
最终,最多只需要 2 次 bpf_probe_read_kernel()
,就能读取到 TOA option。
总结
eBPF 读取 TOA 的 4 种方式:
方式 1: 按需调用 bpf_probe_read_kernel()
去读取内存,但bpf_probe_read_kernel()
的调用次数较多。方式 2: 每次调用 bpf_probe_read_kernel()
读取整个 TOA option,减少bpf_probe_read_kernel()
的调用次数。方式 3: 方式 2 的迭代器版本。 方式 4: 一次性读取所有 option,然后按需解析。
这 4 种方式,都通过了 verifier 校验。
完整的源代码:learn-by-example toa.c[1]。
learn-by-example toa.c: https://github.com/Asphaltt/learn-by-example/blob/main/ebpf/toa/toa.c