各位 Rust 爱好者好!今天我们来探讨一个有趣的话题:Rust 的异步运行时。对于 Rust 新手来说,异步编程可能看起来有点复杂,但别担心,我们会用简单的方式来理解它。本文将带你一步步实现一个基础的异步运行时,让你对 Rust 的异步机制有更深入的认识。
什么是异步运行时?
在开始之前,让我们先简单了解一下什么是异步运行时。异步运行时是一个执行异步代码的环境,它管理和调度异步任务,使得程序可以高效地处理并发操作。
从 Future 开始
在 Rust 中,异步编程的核心是 Future
trait。让我们来创建一个简单的 Future:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::thread;
struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
struct SharedState {
completed: bool,
waker: Option<std::task::Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 检查定时器是否完成
let mut state = self.shared_state.lock().unwrap();
if state.completed {
Poll::Ready(())
} else {
// 设置 waker,以便任务完成时可以通知执行器
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
let thread_shared_state = Arc::clone(&shared_state);
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
这个 TimerFuture
实现了一个简单的定时器,它会在指定时间后完成。
构建执行器
现在我们有了 Future,接下来需要一个执行器来运行它。我们将创建一个简单的单线程执行器:
use futures::future::{BoxFuture, FutureExt};
use futures::task::{waker_ref, ArcWake};
use std::sync::mpsc::{sync_channel, SyncSender, Receiver};
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
task_sender: SyncSender<Arc<Task>>,
}
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self.task_sender.send(cloned).expect("任务队列已满");
}
}
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("任务队列已满");
}
}
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
if let Poll::Pending = future.as_mut().poll(context) {
*future_slot = Some(future);
}
}
}
}
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
这个执行器可以接收任务并执行它们,直到所有任务完成。
运行我们的异步代码
现在让我们把所有东西放在一起,运行我们的异步代码:
需要在 Cargo.toml
文件中引入 futures
这个 crate:
[dependencies]
futures = "0.3.31"
fn main() {
let (executor, spawner) = new_executor_and_spawner();
spawner.spawn(async {
println!("你好,");
TimerFuture::new(Duration::new(2, 0)).await;
println!("世界!");
});
drop(spawner);
executor.run();
}
运行这段代码,你会看到它先打印 "你好,",然后等待 2 秒,最后打印 "世界!"。
总结
通过这个简单的例子,我们实现了 Rust 异步运行时的基本组件:
Future:表示异步计算 执行器:管理和运行 Future 任务:包装 Future 并提供唤醒机制
虽然这个实现非常基础,但它展示了 Rust 异步系统的核心概念。在实际应用中,我们通常会使用更成熟的异步运行时,如 tokio 或 async-std,它们提供了更多功能和更好的性能。
希望这篇文章能帮助你更好地理解 Rust 的异步编程!如果你对更深入的异步编程感兴趣,可以查看 Rust 异步编程的官方文档和更多相关资源。
参考文章
Async Runtimes | Zaid Humayun's Blog:https://redixhumayun.github.io/async/2024/08/05/async-runtimes.html Asynchronous Programming in Rust:https://rust-lang.github.io/async-book Tokio Documentation:https://tokio.rs
书籍推荐
各位 Rust 爱好者,今天为大家介绍一本《Programming Rust: Fast, Safe Systems Development》(第二版) 是由 Jim Blandy、Jason Orendorff 和 Leonora Tindall 合著的 Rust 编程指南。本书深入探讨了 Rust 语言在系统编程中的应用,着重介绍如何利用 Rust 的独特特性来平衡性能和安全性。书中涵盖了 Rust 的基础数据类型、所有权和借用概念、特征和泛型、并发编程、闭包、迭代器以及异步编程等核心内容。这本更新版基于 Rust 2021 版本,为系统程序员提供了全面而实用的 Rust 编程指导。