亲爱的 Rust 学习者们,今天我们将一起探索 Rust 中异步运行时的奥秘。本文将带你从零开始实现一个简单的事件循环,让你深入理解异步编程的核心概念。无论你是 Rust 新手还是有经验的开发者,这篇文章都将为你揭开异步编程的神秘面纱。
什么是异步运行时?
异步运行时是现代编程中的重要概念,它允许程序高效地处理大量并发操作。在 Rust 中,异步运行时通常包含以下核心组件:
Reactor:负责监听底层文件描述符的事件 Executor:负责执行和管理异步任务 Event Loop:事件循环,是整个异步系统的心脏
实现一个简单的 Reactor
Reactor 是异步运行时的基础,它使用系统调用(如 kqueue、epoll 或 IOCP)来监听文件描述符的事件。让我们看看如何实现一个简单的 Reactor:
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use libc::{kevent, kqueue, EV_ADD, EV_DELETE, EVFILT_READ, EVFILT_WRITE};
pub struct Event {
pub fd: usize,
pub readable: bool,
pub writable: bool,
}
impl Event {
pub fn readable(fd: RawFd) -> Self {
Self {
fd: fd as usize,
readable: true,
writable: false,
}
}
pub fn none(fd: RawFd) -> Self {
Self {
fd: fd as usize,
readable: false,
writable: false,
}
}
}
pub struct Reactor {
kq: RawFd, // kqueue 文件描述符
events: Vec<libc::kevent>,
capacity: usize,
notifier: (UnixStream, UnixStream),
}
impl Reactor {
pub fn new() -> std::io::Result<Self> {
// 初始化 kqueue
let kq = unsafe { libc::kqueue() };
if kq < 0 {
return Err(std::io::Error::last_os_error());
}
// 创建通知器
let (read_stream, write_stream) = UnixStream::pair()?;
read_stream.set_nonblocking(true)?;
write_stream.set_nonblocking(true)?;
let mut reactor = Self {
kq,
events: Vec::new(),
capacity: 1,
notifier: (read_stream, write_stream),
};
reactor.modify(reactor.notifier.0.as_raw_fd(), Event::readable(reactor.notifier.0.as_raw_fd()))?;
Ok(reactor)
}
pub fn add(&mut self, fd: RawFd, ev: Event) -> std::io::Result<()> {
self.modify(fd, ev)
}
pub fn delete(&mut self, fd: RawFd) -> std::io::Result<()> {
self.modify(fd, Event::none(fd))
}
pub fn notify(&mut self) -> std::io::Result<()> {
self.notifier.1.write(&[1])?;
Ok(())
}
fn modify(&self, fd: RawFd, ev: Event) -> std::io::Result<()> {
let read_flags = if ev.readable { EV_ADD | EV_ONESHOT } else { EV_DELETE };
let changes = [libc::kevent {
ident: fd as usize,
filter: EVFILT_READ,
flags: read_flags,
fflags: 0,
data: 0,
udata: std::ptr::null_mut(),
}];
let result = unsafe {
kevent(
self.kq,
changes.as_ptr(),
1,
std::ptr::null_mut(),
0,
std::ptr::null_mut(),
)
};
if result < 0 {
return Err(std::io::Error::last_os_error());
}
Ok(())
}
pub fn poll(&mut self) -> std::io::Result<Vec<Event>> {
let max_capacity = self.capacity as i32;
self.events.clear();
let result = unsafe {
self.events.resize(max_capacity as usize, std::mem::zeroed());
kevent(
self.kq,
std::ptr::null(),
0,
self.events.as_mut_ptr(),
max_capacity,
std::ptr::null(),
)
};
if result < 0 {
return Err(std::io::Error::last_os_error());
}
let mut mapped_events = Vec::new();
for i in 0..result as usize {
let kevent = &self.events[i];
let ident = kevent.ident;
let filter = kevent.filter;
let mut buf = [0; 8];
if ident == self.notifier.0.as_raw_fd() as usize {
self.notifier.0.read(&mut buf)?;
self.modify(self.notifier.0.as_raw_fd(), Event::readable(self.notifier.0.as_raw_fd()))?;
}
let mut event = Event {
fd: ident,
readable: false,
writable: false,
};
match filter {
EVFILT_READ => event.readable = true,
EVFILT_WRITE => event.writable = true,
_ => {}
};
self.modify(event.fd as i32, Event::readable(event.fd as i32))?;
mapped_events.push(event);
}
Ok(mapped_events)
}
}
这个 Reactor 使用 kqueue(在 macOS 上)来监听事件。它还包含一个通知器,用于在需要时唤醒事件循环。
构建事件循环
事件循环是异步运行时的核心,它负责处理 I/O 事件和执行任务。让我们来实现一个简单的事件循环:
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
pub trait EventHandler: Send {
fn event(&mut self, event: Event);
fn fd(&self) -> usize;
}
pub struct RegistrationTask {
pub fd: usize,
pub reference: Box<dyn EventHandler>,
}
pub struct UnregistrationTask {
pub fd: usize,
}
pub struct ScheduledTask {
pub fd: usize,
}
pub enum Task {
RegistrationTask(RegistrationTask),
UnregistrationTask(UnregistrationTask),
ScheduledTask(ScheduledTask),
}
pub struct TaskQueue {
pub queue: Vec<Task>,
}
impl TaskQueue {
pub fn new() -> Self {
Self { queue: Vec::new() }
}
pub fn add_task(&mut self, task: Task) {
self.queue.push(task);
}
}
struct EventLoop {
reactor: Arc<Mutex<Reactor>>,
task_queue: Arc<Mutex<TaskQueue>>,
references: HashMap<usize, Box<dyn EventHandler>>,
}
impl EventLoop {
fn new(reactor: Arc<Mutex<Reactor>>, task_queue: Arc<Mutex<TaskQueue>>) -> Self {
Self {
reactor,
task_queue,
references: HashMap::new(),
}
}
fn register(&mut self, fd: usize, reference: Box<dyn EventHandler>) {
self.references.insert(fd, reference);
}
fn unregister(&mut self, fd: usize) {
self.references.remove(&fd);
}
fn process_tasks(&mut self) {
let mut tasks_to_process = Vec::new();
{
let mut task_queue = self.task_queue.lock().unwrap();
while let Some(task) = task_queue.queue.pop() {
tasks_to_process.push(task);
}
}
for task in tasks_to_process {
match task {
Task::RegistrationTask(reg_task) => {
self.register(reg_task.fd, reg_task.reference);
self.reactor.lock().unwrap().add(reg_task.fd as i32, Event::readable(reg_task.fd as i32)).unwrap();
}
Task::UnregistrationTask(unreg_task) => {
self.unregister(unreg_task.fd);
self.reactor.lock().unwrap().delete(unreg_task.fd as i32).unwrap();
}
Task::ScheduledTask(sched_task) => {
if let Some(reference) = self.references.get_mut(&sched_task.fd) {
reference.event(Event::readable(sched_task.fd as i32));
}
}
}
}
}
fn handle_events(&mut self, events: Vec<Event>) {
for event in events {
if let Some(reference) = self.references.get_mut(&event.fd) {
reference.event(event);
}
}
}
fn run(&mut self) {
loop {
// 处理任务队列
self.process_tasks();
// 从 Reactor 获取事件
let events = self.reactor.lock().unwrap().poll()
.expect("Error polling the reactor");
// 处理事件
self.handle_events(events);
}
}
}
这个事件循环不断地处理任务队列、轮询 Reactor 获取事件,并处理这些事件。
实现异步 TCP 监听器
为了展示如何使用我们的异步运行时,让我们实现一个异步 TCP 监听器:
use std::net::{TcpListener, TcpStream};
enum AsyncTcpListenerState {
Accepting(TcpStream),
Idle,
}
struct AsyncTcpListener {
listener: TcpListener,
fd: usize,
reactor: Arc<Mutex<Reactor>>,
task_queue: Arc<Mutex<TaskQueue>>,
state: Option<AsyncTcpListenerState>,
}
impl AsyncTcpListener {
fn new(addr: &str, reactor: Arc<Mutex<Reactor>>, task_queue: Arc<Mutex<TaskQueue>>) -> io::Result<Self> {
let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
let fd = listener.as_raw_fd() as usize;
Ok(Self {
listener,
fd,
reactor,
task_queue,
state: Some(AsyncTcpListenerState::Idle),
})
}
}
impl EventHandler for AsyncTcpListener {
fn event(&mut self, event: Event) {
if event.readable {
match self.listener.accept() {
Ok((client, addr)) => {
println!("Accepted connection from: {}", addr);
self.state.replace(AsyncTcpListenerState::Accepting(client));
self.task_queue.lock().unwrap().add_task(Task::ScheduledTask(ScheduledTask { fd: self.fd }));
}
Err(e) => eprintln!("Error accepting connection: {}", e),
}
}
}
fn fd(&self) -> usize {
self.fd
}
}
这个异步 TCP 监听器实现了 EventHandler
trait,允许它与我们的事件循环交互。
使用示例
让我们看看如何使用我们创建的异步运行时:
fn main() -> io::Result<()> {
let reactor = Arc::new(Mutex::new(Reactor::new()?));
let task_queue = Arc::new(Mutex::new(TaskQueue::new()));
let mut event_loop = EventLoop::new(reactor.clone(), task_queue.clone());
let listener = AsyncTcpListener::new("127.0.0.1:8080", reactor.clone(), task_queue.clone())?;
let listener_fd = listener.fd();
task_queue.lock().unwrap().add_task(Task::RegistrationTask(RegistrationTask {
fd: listener_fd,
reference: Box::new(listener),
}));
event_loop.run();
Ok(())
}
这个示例创建了一个异步 TCP 监听器,并将其注册到事件循环中。事件循环会持续运行,处理传入的连接。
总结
通过这个简单的实现,我们深入了解了 Rust 异步运行时的核心组件和工作原理。我们实现了:
一个基于 kqueue 的 Reactor 一个简单的事件循环 一个任务队列系统 一个异步 TCP 监听器
虽然这个实现相对简单,但它展示了异步编程的基本概念,包括事件驱动的 I/O、任务调度和事件循环。
记住,实际的异步运行时(如 tokio 或 async-std)要复杂得多,它们处理了更多的边缘情况,并提供了更高的性能和更丰富的功能。但是,理解这些基本概念将帮助你更好地使用和理解这些成熟的库。
继续探索、实践,相信你很快就能掌握 Rust 异步编程的精髓!
参考文章
Async Runtimes Part II | Zaid Humayun's Blog:https://redixhumayun.github.io/async/2024/09/18/async-runtimes-part-ii.html Asynchronous Programming in Rust:https://rust-lang.github.io/async-book tokio - An asynchronous Rust runtime:https://tokio.rs
书籍推荐
各位 Rust 爱好者,今天为大家介绍一本《Programming Rust: Fast, Safe Systems Development》(第二版) 是由 Jim Blandy、Jason Orendorff 和 Leonora Tindall 合著的 Rust 编程指南。本书深入探讨了 Rust 语言在系统编程中的应用,着重介绍如何利用 Rust 的独特特性来平衡性能和安全性。书中涵盖了 Rust 的基础数据类型、所有权和借用概念、特征和泛型、并发编程、闭包、迭代器以及异步编程等核心内容。这本更新版基于 Rust 2021 版本,为系统程序员提供了全面而实用的 Rust 编程指导。