通道是在 Rust 中实现线程间通信的绝佳方式。基于通道消息,您可以委派任务并在需要时执行。当谈到跨线程通信时,将线程可视化为可以收集来自多个任务的信息的小队列更容易,您可以聚合信息并在现有应用程序的基础上构建。
设计
我们希望持续将指标报告到所需的后台。因此,我们自然需要一个单独的线程专门用于指标。这个单独的线程将利用一个公共通道,该通道充当来自读取器线程的接收器。读取器线程需要为我们的通道拥有发送器句柄。
但是,应该使用哪种类型的通道呢?通道有两种类型:有界通道(您可以在其中存储有限数量的数据)和无界通道(可以存储无限数量的数据)。使用无界通道似乎是显而易见的方法,因为它为我们提供了大量的缓冲区。但是,如果我们的任何指标线程死亡,缓冲区将继续增长,从而导致内存问题。即使没有内存问题,如果存在大量消息积压,系统也会开始失去性能!我们不希望因内存不足 (OOM) 问题而崩溃。因此,我们将使用有界通道。我们可以假设我们发送的数据并不复杂,因此我们可以设置所需的阈值。
实现
首先,我们需要在我们的生态系统中添加一个新线程。我们使用 Rust 中的 join_set
。让我们在 metrics.rs
中创建一个名为 metrics 的新模块。
// metrics.rs
pub async fn handle_receiver(){
// 写入一些指标后台。
println!("Received : {}", metricinfo)
}
现在,我们可以在 join_set
中导入此函数并生成任务。
join_set.spawn(async move {
metrics::handle_receiver(&mut rx).await;
});
while let Some(_) = join_set.join_next().await {
println!("Process Complete in Join Handle");
}
此函数什么也不做,一旦到达这里就会退出。提醒一下,join_set
将收集所有线程,并一起运行它们,并等待所有这些线程完成。
现在,我们使用 Tokio 的 mpsc
库创建一个有界通道。这里的 mpsc
代表多生产者,单消费者通道。我们将从多个线程生成指标,并将其发送到指标线程中的单个接收器。
let (multi_producer, mut single_receiver) = mpsc::channel(32);
由于接收器将接收消息并改变状态,因此它被保留为一个可变变量。对于每个工作线程,我们需要发送多生产者的副本作为发送器。此时,我们将明确告诉通道应该处理的消息类型。我们将此副本传递给线程函数,以便它们可以处理它。
for path in paths {
let thread_channel:tokio::sync::mpsc::Sender<String>
= multi_producer.clone();
...
// 启动线程以处理文件。
join_set.spawn(async move {
thread_process(&thread_channel, &native_counter_map, &counter_file, &aggregate_file, &tail_file_name, &tail_folder, &audit_file, &output_folder, &rotation_threshold, &counter, &aggregate_file_counter).await;
});
counter = counter + 1;
}
请注意,在将信息发送到函数时,我们通过引用传递。我们可以再次从聚合器函数传递引用到读取文件函数。
async fn thread_process(thread_channel: &tokio::sync::mpsc::Sender<String>, counter_map: &HashMap<String, i32>, counter_file: &String, aggregate_file: &String, tail_file_name: &String, tail_folder: &String, audit_file: &String, output_folder: &String, rotation_threshold: &i32, counter: &i32, aggregate_counter_mutex: &Arc<Mutex<i32>>) -> i32 {
// 计算要跟踪的文件的路径。
...
// 开始读取日志文件
read_large_file(&thread_channel, &f_name, f_counter, aggregate_file, tail_folder, audit_file, counter_file, output_folder, rotation_threshold, aggregate_counter_mutex).await;
return counter.clone();
}
在读取文件函数中,我们可以使用发送器句柄发送信息。例如,我想要报告时间戳、正在读取的文件和计数器位置。因此,我们可以像这样添加发送器:-
async fn read_large_file(thread_channel: &tokio::sync::mpsc::Sender<String>, f_name: &String, f_counter: i32, aggregate_file: &String, tail_folder: &String, audit_file: &String, counter_file: &String, output_folder: &String, rotation_threshold: &i32, aggregate_counter_mutex: &Arc<Mutex<i32>>) {
// 打开要跟踪的文件
...
for file_line in file_reader.lines() {
...
if counter % 5 == 0{
let current_timestamp = chrono::offset::Utc::now();
let metric_string = current_timestamp.to_string() + " : file_counter : {" + f_name + "," + &counter.clone().to_string() + "}";
thread_channel.send(metric_string).await.unwrap();
}
...
}
...
}
在这里,我们导入了 chrono
crate 以获取当前时间戳,并正在创建指标字符串。我们一当计数器达到 5 的倍数时,就会将指标报告到通道。
如您所见,发送到通道的代码非常简单。让我们处理接收器部分。我们将重新访问 metrics.rs
文件并添加以下代码。
pub async fn handle_receiver(rx: &mut tokio::sync::mpsc::Receiver<String>){
while let Some(metricinfo) = rx.recv().await {
// 写入一些指标后台。
println!("Received : {}", metricinfo)
}
}
接收器等待来自线程的信息。即使对于有界通道,此过程也不会退出。
就是这样!在 Rust 代码中添加一个简单的通道就这么简单。
结果
我们正在读取多个文件,并且我们获得了所有这些文件的并行指标报告!
这是运行输出。我们可以扩展应用程序以将指标报告到 Prometheus,甚至在 Rust 上创建自己的指标存储!
总结
在本文中,我们构建了一个基于通道的线程间安全同步。您可以根据需要使用通道扩展此应用程序,并将通道无缝集成到您的应用程序中。
本文的代码可以关注公众号并回复 “rust-channel” 获取。
如果您喜欢这篇文章,请为它点赞关注并分享给您的好友!感谢您阅读到此。
我创建了一个Rust的知识星球,现在可以免费加入,除去官方收取的5元费用,剩下20元加我返现。