探索 Gabriel2,基于 Tokio 的高性能 Actor 模型库

文摘   2024-07-05 00:02   江苏  

探索 Gabriel2,基于 Tokio 的高性能 Actor 模型库

unsetunset前言:unsetunset

在当今快速发展的技术时代,软件架构和设计模式的创新是推动行业进步的关键因素。Actor 模型,作为一种并发编程范式,因其在处理高并发和分布式系统方面的优势而日益受到重视。Rust 语言以其安全性和性能优势,在系统编程领域占据着重要地位。本文介绍的 Gabriel2,正是在这样的背景下应运而生,它是一个基于 Tokio 的 Rust 编写的 Actor 库,旨在为开发者提供一个高效、灵活且易于使用的并发处理工具。

Gabriel2 的设计哲学是简洁而强大,它不仅支持异步消息发送和处理,还提供了丰富的 Actor 生命周期管理功能,包括但不限于状态的可变性、自我引用、以及远程 Actor 的支持。此外,Gabriel2 还引入了事件总线和流式处理的概念,进一步增强了其在复杂系统中的应用潜力。通过本文的详细介绍和示例代码,读者将能够深入理解 Gabriel2 的核心特性和使用方式,为构建高性能的并发应用程序打下坚实的基础。

Gabriel2:确实,这是一款基于 Tokio 的 Rust 编写的 Actor 库。

unsetunset特性unsetunset

  • [x] 异步消息发送
  • [x] 异步处理 Actor 消息
  • [x] 支持"发送即忘"的消息模式
  • [x] 支持"发送并等待"响应的消息模式
  • [x] Actor 的动态状态管理
  • [x] 在上下文中实现 Actor 自引用
  • [x] 完整的 Actor 生命周期管理(启动前(pre_start),关闭前(pre_stop))
  • [x] Actor 作为数据汇聚点(Sink)
  • [x] 从 Actor 中获取数据流(Stream)
  • [x] 支持远程 Actor 通信
  • [x] 集成事件总线机制

unsetunset待办事项unsetunset

  • [ ] 实现负载均衡功能

unsetunset使用方法unsetunset

Cargo.toml

[dependencies]
gabriel2 = { version = "1.4.1", features = ["remote", "sink-stream", "broadcast"] }

echo.rs

use std::sync::Arc;
use gabriel2::*;

use bincode::{Decode, Encode};
use derive_more::{Display, Error};

#[derive(Debug)]
pub struct EchoActor;

#[derive(Debug)]
pub enum EchoMessage {
    Ping,
}

#[derive(Debug)]
pub enum EchoResponse {
    Pong {counter: u32},
}

#[derive(Debug, Clone)]
pub struct EchoState {
    pub counter: u32,
}

#[derive(Debug, Display, Error)]
pub enum EchoError {
    #[display(fmt = "未知错误")]
    Unknown,
}

impl From<std::io::Error> for EchoError {
    fn from(_err: std::io::Error) -> Self {
        EchoError::Unknown
    }
}

impl Handler for EchoActor {
    type Actor = EchoActor;
    type Message = EchoMessage;
    type State = EchoState;
    type Response = EchoResponse;
    type Error = EchoError;

    async fn receive(&self, ctx: Arc<Context<Self::Actor, Self::Message, Self::State, Self::Response, Self::Error>>) -> Result<EchoResponse, EchoError> {
        match ctx.mgs {
            EchoMessage::Ping => {
                println!("收到Ping");
                let mut state_lock = ctx.state.lock().await;
                state_lock.counter += 1;
                if state_lock.counter > 10 {
                    Err(EchoError::Unknown)
                } else {
                    Ok(EchoResponse::Pong{counter: state_lock.counter})
                }
            }
        }
    }
}

main.rs

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", EchoActor {}, state, 100000).await?;

    println!("发送Ping");
    echo_ref.send(EchoMessage::Ping).await?;

    println!("发送Ping并请求响应");
    let pong = echo_ref.ask(EchoMessage::Ping).await?;
    println!("收到 {:?}", pong);

    _ = echo_ref.stop().await;
    Ok(())
}

示例输出:

发送Ping
发送Ping并请求响应
收到Ping
收到Ping
收到 Pong { counter: 2 }

示例源代码:Gabriel2 测试[1]

unsetunset下沉unsetunset

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
    let echo_sink = ActorSink::sink(echo_ref.clone());
    let message_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);
    _ = message_stream.forward(echo_sink).await;
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    Ok(())
}

示例输出:

收到Ping
收到Ping
收到Ping

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
    let (echo_sink, echo_stream) = ActorSink::sink_stream(echo_ref.clone());
    let message_stream = futures::stream::iter(vec![EchoMessage::Ping, EchoMessage::Ping, EchoMessage::Ping]).map(Ok);
    _ = message_stream.forward(echo_sink).await;
    echo_stream.for_each(|message| async move {
        println!("收到 {:?}", message.unwrap());
    }).await;
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

    Ok(())
}

示例输出:

收到Ping
收到Ping
收到Ping
收到 Pong { counter: 1 }
收到 Pong { counter: 2 }
收到 Pong { counter: 3 }

unsetunset远程unsetunset

远程准备:

将“bincode”中的 Encode, Decode 添加到 EchoActor, EchoMessage, EchoResponse, EchoState 和 EchoError 的 derive(..)中

远程版本:

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;
    let echo_server = ActorServer::new("echo_server""127.0.0.1"9001, echo_ref).await?;
    let echo_client: Arc<ActorClient<EchoActor, EchoMessage, EchoState, EchoResponse, EchoError >> = ActorClient::new("echo_client""127.0.0.1"9001).await?;

    println!("发送Ping");
    echo_client.send(EchoMessage::Ping).await?;

    println!("发送Ping并请求响应");
    let pong = echo_client.ask(EchoMessage::Ping).await?;
    println!("收到 {:?}", pong);

    _ = echo_client.stop().await;
    _ = echo_server.stop().await;
    Ok(())
}

事件总线

#[tokio::main]
async fn main() -> Result<(), EchoError> {
    let state = EchoState {
        counter: 0,
    };

    #[derive(Debug, Copy, Clone)]
    enum EventElement {
        Fire,
        Water
    }

    let echo_ref = ActorRef::new("echo", crate::echo::EchoActor {}, state, 100000).await?;

    let event_bus: Arc<EventBus<EventElement>> = Arc::new(EventBus::new());

    let subscriber_id = event_bus.subscribe(move |event: EventElement| {
        async move {
            match event {
                EventElement::Fire => {
                    let _ = echo_ref.send(EchoMessage::Ping).await;
                    ()
                },
                _ => ()
            }
        }}).await;

    event_bus.publish(EventElement::Fire).await;

    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

    event_bus.unsubscribe(subscriber_id).await;

    Ok(())
}

unsetunset附录unsetunset

https://github.com/igumnoff/gabriel2/tree/main

unsetunset结语:unsetunset

随着本文对 Gabriel2 特性和应用的深入探讨,我们可以看到,这一 Actor 库为 Rust 开发者提供了一个强大的工具,以应对日益增长的并发编程需求。Gabriel2 的灵活性和扩展性使其成为构建复杂系统的理想选择,无论是在本地环境还是分布式系统中。

参考资料
[1]

Gabriel2 测试: https://github.com/igumnoff/gabriel2/tree/main/test


编程悟道
自制软件研发、软件商店,全栈,ARTS 、架构,模型,原生系统,后端(Node、React)以及跨平台技术(Flutter、RN).vue.js react.js next.js express koa hapi uniapp Astro
 最新文章