前言
在 eBPF 生态系统中,新的 MPSC(多生产者单消费者)Ring Buffer 的实现允许多个 CPU 向同一个共享的 Ring Buffer 提交数据,而在消费端则只假定一个消费者。本文介绍了该 Ringbuf 的设计动机、语法和 API,并与其他替代方案进行了比较。
动机
有两个关键的动机促使了这个 Ring Buffer 的创建,这些动机并不能通过现有的 perf buffer
得到满足,从而推动了新 Ringbuf 的产生:
更高效的内存利用:通过多个 CPU 共享同一个 Ring Buffer 实现;
事件顺序保持:即使是跨多个 CPU 的顺序事件(例如
fork/exec/exit
事件),也能按时间顺序保留。
现有的 perf buffer
由于是 per-CPU 设计,因此在解决这两个问题时都不够理想。而通过 MPSC Ringbuf 实现可以很好地解决这两点需求,尤其是在跨 CPU 保持事件顺序方面, perf buffer
的 per-CPU 设计容易导致事件顺序错乱。
语法与 API
Ringbuf 在 BPF 程序中被表示为 BPF_MAP_TYPE_RINGBUF
类型的 BPF map。此设计比创建每个 CPU 一个独立的 Ringbuf 更加高效和简单。
核心 API
BPF 程序使用以下 API 来与 Ringbuf 进行交互:
bpf_ringbuf_output()
:将数据从一个位置复制到 Ringbuf 中,类似于bpf_perf_event_output()
;bpf_ringbuf_reserve()/bpf_ringbuf_commit()/bpf_ringbuf_discard()
:通过分两步完成数据提交。首先调用bpf_ringbuf_reserve()
预留固定大小的空间,如果成功,返回一个指向 Ringbuf 内存区域的指针,程序可以使用这个指针操作数据,完成后使用bpf_ringbuf_commit()
提交数据或bpf_ringbuf_discard()
丢弃数据。
需要注意的是, bpf_ringbuf_reserve()
必须与 bpf_ringbuf_commit()
或 bpf_ringbuf_discard()
成对使用,否则无法通过 eBPF 校验器。此外,由于 bpf_ringbuf_reserve()
中可能存在锁的争用,为了减少性能损失,建议尽可能晚地调用该函数以申请内存空间。
示例代码
以下是一个简单的 BPF 程序示例,展示如何使用 Ringbuf:
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
struct {
// specify the type, eBPF specific syntax
__uint(type, BPF_MAP_TYPE_RINGBUF);
// specify the size of the buffer
// has to be a multiple of the page size
__uint(max_entries, 256 * 4096);
} my_ringbuf SEC(".maps"); /* placed in maps section */
SEC("tracepoint/sched/sched_switch")
int handle_sched_switch(struct bpf_pt_regs *ctx) {
void *record;
// 预留空间
record = bpf_ringbuf_reserve(&my_ringbuf, sizeof(struct sched_event), 0);
if (!record) {
return 0; // 预留失败
}
// 在这里填充数据
struct sched_event *event = record;
event->pid = bpf_get_current_pid_tgid() >> 32;
event->cpu = bpf_get_smp_processor_id();
// 提交数据
bpf_ringbuf_commit(record,0);
return 0;
}
char _license[] SEC("license") = "GPL";
在用户空间,可以使用 ring_buffer__poll
来读取数据。下面是一个示例代码:
#include <stdio.h>
#include <bpf/libbpf.h>
#include <bpf/ringbuf.h>
struct sched_event {
__u32 pid;
__u32 cpu;
};
int main() {
struct ring_buffer *ring_buf;
struct bpf_object *obj;
int err;
// 加载 BPF 程序
......
// 获取 ring buffer
ring_buf = ring_buffer__new(bpf_map__fd(obj->maps->my_ringbuf), NULL, NULL);
if (!ring_buf) {
fprintf(stderr, "Failed to create ring buffer\n");
return 1;
}
// 轮询数据
while (true) {
err = ring_buffer__poll(ring_buf, 100);
if (err < 0) {
fprintf(stderr, "Polling error: %d\n", err);
break;
}
}
ring_buffer__free(ring_buf);
bpf_object__close(obj);
return 0;
}
在这个示例中,我们定义了一个简单的 BPF 程序,该程序在任务调度点记录进程 ID 和 CPU 信息。在用户空间,我们通过 ringbuffer_poll 函数来读取 Ringbuf 中的数据。
设计与实现
这种预留/提交机制允许多个生产者(无论是在不同的 CPU 上还是在同一 CPU/同一 BPF 程序中)独立地预留记录并进行操作,而不会阻塞其他生产者。这意味着,如果一个 BPF 程序被另一个共享同一 Ringbuf 的 BPF 程序中断,它们都会成功预留记录(只要有足够的空间),并可以独立工作并提交。
Ringbuf 内部实现为大小为 2 的幂的循环缓冲区,使用两个逻辑上不断增加的计数器:
消费者计数器:表示消费者消费到的数据的逻辑位置;
生产者计数器:表示所有生产者保留的数据的数量。每当一个记录被预留时,"拥有"该记录的生产者会成功推进生产者计数器。此时,数据尚未准备好供消费。每个记录都有一个 8 字节的头部,其中包含保留记录的长度,以及两个附加位:忙碌位和丢弃位。
与替代方案的比较
实现 BPF Ring Buffer 之前,作者评估了内核中现有的替代方案,但发现它们无法满足需求。现有方案主要分为几类:
per-CPU 缓冲区(如 perf、ftrace 等),无法满足排序和内存消耗的需求;
基于链表的实现,尽管有些是多生产者设计,但从用户空间消费时可能非常复杂且性能较差;
io_uring 是 SPSC,但也要求固定大小的元素。简单地将 SPSC 队列转换为 MPSC 会导致性能不足;
专用实现(例如新的 printk Ring Buffer),具有很多特定的限制,无法很好地适应 BPF 程序的需求。
通过引入 MPSC Ring Buffer,eBPF 的使用场景更加灵活,同时保持了高效的内存使用和事件顺序。