快速上手 Rust 中的 Channel

科技   2024-09-23 09:58   广东  

通道是在 Rust 中实现线程间通信的绝佳方式。基于通道消息,您可以委派任务并在需要时执行。当谈到跨线程通信时,将线程可视化为可以收集来自多个任务的信息的小队列更容易,您可以聚合信息并在现有应用程序的基础上构建。


设计

我们希望持续将指标报告到所需的后台。因此,我们自然需要一个单独的线程专门用于指标。这个单独的线程将利用一个公共通道,该通道充当来自读取器线程的接收器。读取器线程需要为我们的通道拥有发送器句柄。

但是,应该使用哪种类型的通道呢?通道有两种类型:有界通道(您可以在其中存储有限数量的数据)和无界通道(可以存储无限数量的数据)。使用无界通道似乎是显而易见的方法,因为它为我们提供了大量的缓冲区。但是,如果我们的任何指标线程死亡,缓冲区将继续增长,从而导致内存问题。即使没有内存问题,如果存在大量消息积压,系统也会开始失去性能!我们不希望因内存不足 (OOM) 问题而崩溃。因此,我们将使用有界通道。我们可以假设我们发送的数据并不复杂,因此我们可以设置所需的阈值。

Image 1: None

实现

首先,我们需要在我们的生态系统中添加一个新线程。我们使用 Rust 中的 join_set。让我们在 metrics.rs 中创建一个名为 metrics 的新模块。

// metrics.rs
pub async fn handle_receiver(){
        // 写入一些指标后台。
        println!("Received : {}", metricinfo)
}

现在,我们可以在 join_set 中导入此函数并生成任务。

join_set.spawn(async move {
        metrics::handle_receiver(&mut rx).await;
    });


while let Some(_) = join_set.join_next().await {
    println!("Process Complete in Join Handle");
}

此函数什么也不做,一旦到达这里就会退出。提醒一下,join_set 将收集所有线程,并一起运行它们,并等待所有这些线程完成。

现在,我们使用 Tokio 的 mpsc 库创建一个有界通道。这里的 mpsc 代表多生产者,单消费者通道。我们将从多个线程生成指标,并将其发送到指标线程中的单个接收器。

let (multi_producer, mut single_receiver) = mpsc::channel(32);

由于接收器将接收消息并改变状态,因此它被保留为一个可变变量。对于每个工作线程,我们需要发送多生产者的副本作为发送器。此时,我们将明确告诉通道应该处理的消息类型。我们将此副本传递给线程函数,以便它们可以处理它。

for path in paths {
        let thread_channel:tokio::sync::mpsc::Sender<String
                   = multi_producer.clone();
        
        ...
        
        // 启动线程以处理文件。
        join_set.spawn(async move {
            thread_process(&thread_channel, &native_counter_map, &counter_file, &aggregate_file, &tail_file_name, &tail_folder, &audit_file, &output_folder, &rotation_threshold, &counter, &aggregate_file_counter).await;
        });

        counter = counter + 1;
    }

请注意,在将信息发送到函数时,我们通过引用传递。我们可以再次从聚合器函数传递引用到读取文件函数。

async fn thread_process(thread_channel: &tokio::sync::mpsc::Sender<String>, counter_map: &HashMap<Stringi32>, counter_file: &String, aggregate_file: &String, tail_file_name: &String, tail_folder: &String, audit_file: &String, output_folder: &String, rotation_threshold: &i32, counter: &i32, aggregate_counter_mutex: &Arc<Mutex<i32>>) -> i32 {
    // 计算要跟踪的文件的路径。

    ...

    // 开始读取日志文件
    read_large_file(&thread_channel, &f_name, f_counter, aggregate_file, tail_folder, audit_file, counter_file, output_folder, rotation_threshold, aggregate_counter_mutex).await;

    return counter.clone();
}

在读取文件函数中,我们可以使用发送器句柄发送信息。例如,我想要报告时间戳、正在读取的文件和计数器位置。因此,我们可以像这样添加发送器:-

async fn read_large_file(thread_channel: &tokio::sync::mpsc::Sender<String>, f_name: &String, f_counter: i32, aggregate_file: &String, tail_folder: &String, audit_file: &String, counter_file: &String, output_folder: &String, rotation_threshold: &i32, aggregate_counter_mutex: &Arc<Mutex<i32>>) {
    // 打开要跟踪的文件

    ...

    for file_line in file_reader.lines() {

        ...

        if counter % 5 == 0{
            let current_timestamp = chrono::offset::Utc::now();
            let metric_string = current_timestamp.to_string() + " : file_counter : {" + f_name + "," + &counter.clone().to_string() + "}";
            thread_channel.send(metric_string).await.unwrap();
        }

        ...
    }

    ...
}

在这里,我们导入了 chrono crate 以获取当前时间戳,并正在创建指标字符串。我们一当计数器达到 5 的倍数时,就会将指标报告到通道。

如您所见,发送到通道的代码非常简单。让我们处理接收器部分。我们将重新访问 metrics.rs 文件并添加以下代码。

pub async fn handle_receiver(rx: &mut tokio::sync::mpsc::Receiver<String>){
    while let Some(metricinfo) = rx.recv().await {
        // 写入一些指标后台。
        println!("Received : {}", metricinfo)
    }
}

接收器等待来自线程的信息。即使对于有界通道,此过程也不会退出。

就是这样!在 Rust 代码中添加一个简单的通道就这么简单。

结果

我们正在读取多个文件,并且我们获得了所有这些文件的并行指标报告!

Image 2: None

这是运行输出。我们可以扩展应用程序以将指标报告到 Prometheus,甚至在 Rust 上创建自己的指标存储!

总结

在本文中,我们构建了一个基于通道的线程间安全同步。您可以根据需要使用通道扩展此应用程序,并将通道无缝集成到您的应用程序中。

本文的代码可以关注公众号并回复 “rust-channel” 获取。

如果您喜欢这篇文章,请为它点赞关注并分享给您的好友!感谢您阅读到此。

我创建了一个Rust的知识星球,现在可以免费加入,除去官方收取的5元费用,剩下20元加我返现。

Rust编程笔记
与你一起在Rust的世界里探索、学习、成长!
 最新文章