eBPF Talk: BPF 读取 TOA 的 4 种方式

文摘   2024-08-12 08:10   新加坡  

在 4 层 tcp 代理的场景下,一般使用 TOA option 将真实的客户端地址和端口信息传递给后端服务。

在后端服务里,如果不使用内核模块读取 TOA,而是使用 eBPF 来读取 TOA,该如何实现呢?

太长不读:以下提供了 4 种 eBPF 读取 TOA 的方式,仅供参考;不过它们只是通过了 verifier,未验证是否能读取到正确的 TOA。

内核模块读取 TOA

想要使用 eBPF 读取 TOA,先看看内核模块是如何读取 TOA 的。

比如 UCloud 的实现方式,核心读取 TOA 的函数:

// https://github.com/ucloud/ucloud-toa/blob/master/toa.c#L108

/* Parse TCP options in skb, try to get client ip, port
 * @param skb [in] received skb, it should be a ack/get-ack packet.
 * @return NULL if we don't get client ip/port;
 *         value of toa_data in ret_ptr if we get client ip/port.
 */

static void *get_toa_data(struct sk_buff *skb)
{
    struct tcphdr *th;
    int length;
    unsigned char *ptr;

    struct toa_data tdata;

    void *ret_ptr = NULL;

    //TOA_DBG("get_toa_data called\n");

    if (skb != NULL) {
        th = tcp_hdr(skb);
        length = (th->doff * 4) - sizeof(struct tcphdr);
        ptr = (unsigned char *) (th + 1);

        while (length > 0) {
            int opcode = *ptr++;
            int opsize;

            switch (opcode) {
            case TCPOPT_EOL:
                return NULL;
            case TCPOPT_NOP:        /* Ref: RFC 793 section 3.1 */
                length--;
                continue;
            default:
                opsize = *ptr++;
                if (opsize < 2)     /* "silly options" */
                    return NULL;
                if (opsize > length)
                    return NULL;    /* don't parse partial options */
                if ((opcode == TCPOPT_TOA_UCLOUD || opcode == TCPOPT_TOA_COMPAT || opcode == TCPOPT_TOA_AKAMAI) &&
                    opsize == TCPOLEN_TOA) {
                    memcpy(&tdata, ptr - 2sizeof(tdata));
                    //TOA_DBG("find toa data: ip = %u.%u.%u.%u, port = %u\n", NIPQUAD(tdata.ip),
                        //ntohs(tdata.port));
                    memcpy(&ret_ptr, &tdata, sizeof(ret_ptr));
                    //TOA_DBG("coded toa data: %p\n", ret_ptr);
                    return ret_ptr;
                }
                ptr += opsize - 2;
                length -= opsize;
            }
        }
    }
    return NULL;
}

这个函数的处理逻辑:

  1. 读取 opcode,并判断 opcode 的值。
  2. 读取 opsize,并判断 opsize 的值是否合理。
  3. 判断 opcodeopsize 是否为预期的 TOA option。
  4. 调整 ptrlength

看着不难呀,直接抄一下吧。

eBPF 读取 TOA

eBPF 读取 TOA 的代码:

static __always_inline bool
__read_toa_3(struct tcphdr *tcp, struct toa_data *toa)
{
    void *buff;
    int length;

    length  = BPF_CORE_READ_BITFIELD_PROBED(tcp, doff) << 2;
    length -= sizeof(struct tcphdr);

    buff = (void *) tcp + sizeof(struct tcphdr);

    while (length > 0) {
        __u8 opcode, opsize;

        if (bpf_probe_read_kernel(&opcode, sizeof(opcode), buff) < 0)
            return false;

        if (opcode == TCPOPT_EOL)
            return false;
        if (opcode == TCPOPT_NOP) { /* Ref: RFC 793 section 3.1 */
            buff++;
            length--;
            continue;
        }

        if (bpf_probe_read_kernel(&opsize, sizeof(opsize), buff + 1) < 0)
            return false;

        if (opsize < 2)             /* "silly options" */
            return false;
        if (opsize > length)        /* don't parse partial options */
            return false;

        if (opcode == TCPOPT_TOA_COMPAT || opcode == TCPOPT_TOA_AKAMAI) {
            if (opsize == TCPOLEN_TOA) {
                if (bpf_probe_read_kernel(toa, sizeof(*toa), buff-2) < 0)
                    return false;
                return true;
            }
        }

        buff += opsize;
        length -= opsize;
    }

    return false;
}

基本照抄了内核模块的逻辑,有几个不一样的地方:

  1. bpf_probe_read_kernel() 读取 opcodeopsize
  2. bpf_probe_read_kernel() 读取 toa

可以看出,每一次读取 option 时,至少需要两次 bpf_probe_read_kernel()

有没有办法减少 bpf_probe_read_kernel() 的次数呢?

eBPF 读取 TOA 优化

优化后的代码:

static __always_inline bool
__read_toa_4(struct tcphdr *tcp, struct toa_data *toa)
{
    void *buff;
    int length;

    length  = BPF_CORE_READ_BITFIELD_PROBED(tcp, doff) << 2;
    length -= sizeof(struct tcphdr);

    buff = (void *) tcp + sizeof(struct tcphdr);

    while (length > 0) {
        __u8 opcode, opsize;

        /* bpf_probe_read_kernel() expects to read a toa option. If it fails to
         * read it, it won't be a toa option; then return false because there's
         * not enough data to read as a toa option.
         */

        if (bpf_probe_read_kernel(toa, sizeof(*toa), buff) < 0)
            return false;

        opcode = toa->opcode;
        opsize = toa->opsize;

        if (opcode == TCPOPT_EOL)
            return false;
        if (opcode == TCPOPT_NOP) { /* Ref: RFC 793 section 3.1 */
            buff++;
            length--;
            continue;
        }

        if (opsize < 2)             /* "silly options" */
            return false;
        if (opsize > length)        /* don't parse partial options */
            return false;

        if (opcode == TCPOPT_TOA_COMPAT || opcode == TCPOPT_TOA_AKAMAI) {
            if (opsize == TCPOLEN_TOA)
                return true;
        }

        buff += opsize;
        length -= opsize;
    }

    return false;
}

优化的思路是:一次性读取整个 option,然后再判断是否为 TOA option。

  1. 如果读取失败、异常的 option,直接返回 false
  2. 如果是 TOA option,则直接返回 true
  3. 如果不是 TOA option,则继续读取下一个 option。

eBPF 读取 TOA 优化 2

while 循环重构为迭代器:

struct toa_prober {
    struct toa_data *toa;
    int length;
    int offset;
};

static int
__probe_toa(struct toa_prober *prober, void *buff)
{
    int ret;

    if (prober->offset + sizeof(*prober->toa) > prober->length)
        return -1;

    ret = bpf_probe_read_kernel(prober->toa, sizeof(*prober->toa), buff + prober->offset);
    if (ret < 0)
        return ret;

    if (prober->toa->opcode == TCPOPT_EOL)
        return -1;
    if (prober->toa->opcode == TCPOPT_NOP) {
        prober->offset++;
        return 0;
    }
    if (prober->toa->opsize < 2)                                /* "silly options" */
        return -1;
    if (prober->offset + prober->toa->opsize > prober->length)  /* don't parse partial options */
        return -1;
    if (prober->toa->opcode == TCPOPT_TOA_COMPAT || prober->toa->opcode == TCPOPT_TOA_AKAMAI) {
        if (prober->toa->opsize == TCPOLEN_TOA)
            return 1;
    }

    prober->offset += prober->toa->opsize;

    return 0;
}

static __always_inline bool
__read_toa_2(struct tcphdr *tcp, struct toa_data *toa)
{
    void *buff = (void *) tcp + sizeof(struct tcphdr);
    struct toa_prober prober = {
        .toa = toa,
        .length = (BPF_CORE_READ_BITFIELD_PROBED(tcp, doff) << 2) - sizeof(struct tcphdr),
        .offset = 0,
    };

    /* Tell compiler and verifier that this for loop tries 32 times at most. */
    for (int i = 0; i < MAX_TCPOPT_LEN - sizeof(*toa); i++) {
        int ret = __probe_toa(&prober, buff);
        if (ret < 0)
            return false;
        if (ret > 0)
            return true;
    }

    return false;
}

优化思路:

  1. while 循环转为有限次数的 for 循环。
  2. 抽象 struct toa_prober 作为迭代器。
  3. 每次循环都使用 bpf_probe_read_kernel() 读取整个 option。

有没有更加极致的优化呢?

eBPF 读取 TOA 优化 3

毕竟 bpf_probe_read_kernel() 调用是有代价的,能尽量减少调用次数,就尽量减少调用次数。

static __always_inline bool
__read_toa(struct tcphdr *tcp, struct toa_data *toa)
{
    __u8 buff[MAX_TCPOPT_LEN];
    int length;

    length = BPF_CORE_READ_BITFIELD_PROBED(tcp, doff) << 2;
    length -= sizeof(struct tcphdr);

    if (bpf_probe_read_kernel(buff, MAX_TCPOPT_LEN, (void *) tcp + sizeof(struct tcphdr)) < 0)
        return false;

    for (int i = 0; i < MAX_TCPOPT_LEN - sizeof(*toa); ) {
        if (i > length)
            return false;
        barrier_var(i);
        if (i > MAX_TCPOPT_LEN - sizeof(*toa))
            return false;

        __u8 opcode = buff[i];
        if (opcode == TCPOPT_EOL)
            break;
        if (opcode == TCPOPT_NOP) {
            i++;
            continue;
        }

        /* i becomes variable because of i++ */
        /* it's necessary to check range of i again, or
         * invalid variable-offset read from stack R0 var_off=(0x0; 0x1ff) off=-40 size=1
         */

        barrier_var(i);
        if (i + 1 >= MAX_TCPOPT_LEN)
            return false;

        __u8 opsize = buff[i + 1];
        if (opsize < 2)
            return false;

        if ((opcode == TCPOPT_TOA_COMPAT || opcode == TCPOPT_TOA_AKAMAI) && opsize == TCPOLEN_TOA) {
            /* it's necessary to check narrow range of i, or
             * invalid variable-offset read from stack R0 var_off=(0x0; 0x1ff) off=-40 size=1
             */

            barrier_var(i);
            if (i > MAX_TCPOPT_LEN - sizeof(*toa))
                return false;

            /* it fails because of misalign:
             * misaligned stack access off (0x10; 0xf)+-40+0 size 8
             */

            /* *(__u64 *) toa = *(__u64 *) (buff + i); */

            bpf_probe_read_kernel(toa, sizeof(*toa), buff + i);
            return true;
        }

        i += opsize;
    }

    return false;
}

优化思路:

  1. 一次性读取 40 字节的包含所有 option 的数据到 bpf 栈上;只要是内核空间的内存地址,一般不会读取失败。
  2. 按需解析 bpf 栈上的数据。
  3. 其中多次使用 barrier_var()i 的范围判断,是为了避免 verifier 的报错。
  4. 最后,使用 bpf_probe_read_kernel() 读取 TOA option,是为了避免 verifier 的 "misaligned stack access" 的报错。

最终,最多只需要 2 次 bpf_probe_read_kernel(),就能读取到 TOA option。

总结

eBPF 读取 TOA 的 4 种方式:

  1. 方式 1: 按需调用 bpf_probe_read_kernel() 去读取内存,但 bpf_probe_read_kernel() 的调用次数较多。
  2. 方式 2: 每次调用 bpf_probe_read_kernel() 读取整个 TOA option,减少 bpf_probe_read_kernel() 的调用次数。
  3. 方式 3: 方式 2 的迭代器版本。
  4. 方式 4: 一次性读取所有 option,然后按需解析。

这 4 种方式,都通过了 verifier 校验。

完整的源代码:learn-by-example toa.c[1]

参考资料
[1]

learn-by-example toa.c: https://github.com/Asphaltt/learn-by-example/blob/main/ebpf/toa/toa.c

eBPF Talk
专注于 eBPF 技术,以及 Linux 网络上的 eBPF 技术应用