Rust 中的单元工作模式

科技   2024-09-20 21:38   广东  

在编写整洁的代码时,你可能熟悉存储库模式,它可以帮助隐藏你正在使用的数据库的细节。这意味着你可以切换不同的数据库,比如 PostgreSQL、MongoDB 或 Cassandra,而无需在代码中进行太多修改。

当你保存像订单这样的复杂数据时,通常还需要保存相关数据,比如审计日志和消息队列的出站事件。通常,订单及其相关的出站消息会分组在一个存储库中,而审计日志则在另一个存储库中。

然而,简单地使用两个独立的存储库进行保存可能会导致问题。

order_repository.insert(tx, order_id).await?;
auditlog_repository.insert(tx, auditlog_message).await?;
order_repository.insert_outbox(tx, order_id).await?;

例如,如果在保存订单后保存审计日志失败,你最终会在数据库中获得一个订单,但没有相关的审计日志。这种不完整的数据可能会阻止进一步处理订单。

为了解决这个问题,我们可以在关系型数据库中使用事务,它允许你将相关的保存步骤作为一个单一操作来处理。如果其中一部分失败,则不会保存任何数据,这可以防止不一致。在非关系型数据库中,你可以使用反规范化——将所有数据组合到一个单一文档或结构中——来确保一次性保存所有数据或不保存任何数据。

但是,我们如何将这些解决方案融入到我们的整洁代码实践中呢?这种逻辑应该是存储库的一部分,还是应该单独处理,以避免存储库承担过多职责,使其难以测试?

将此逻辑直接添加到服务层也不理想,因为它将基础架构问题与业务逻辑混合在一起,而我们通常希望避免这种情况。

这就是“单元工作”概念的用武之地。单元工作模式不是仅仅通过存储库抽象数据库交互,而是抽象出一组需要一起完成的完整操作。然后,这个单元在技术上与存储库相链接。

让我们看一个例子,更好地理解它是如何工作的。

pub struct OrderService {
    order_uow: OrderUnitOfWork,
}

impl OrderService {
    pub fn new(order_uow: OrderUnitOfWork) -> Self {
        OrderService { order_uow }
    }

    pub async fn create_order(&self, order_id: i32, auditlog_message: String) -> Result<(), Error> {
        // 进行验证并执行业务规则,例如检查产品可用性
        // 保存订单
        self.order_uow.create_order(order_id, auditlog_message).await
    }
}

通常,你可能会看到一个订单存储库通过依赖注入直接在服务中使用,但现在我们引入了单元工作。

在 Rust 中实现这个单元工作抽象可能相当具有挑战性。让我们深入了解它是如何工作的:

pub struct UnitOfWork {
    pool: PgPool,
}

impl UnitOfWork {
    pub fn new(pool: PgPool) -> Self {
        UnitOfWork { pool }
    }

    pub async fn execute<F>(&self, operation: F) -> Result<(), Error>
        where
            F: for<'cFnOnce(&'c mut Transaction<'_, Postgres>) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'c>> + Send,
    {
        let mut tx: Transaction<'_, Postgres> = self.pool.begin().await?;

        // 在事务中执行操作
        let result = operation(&mut tx).await;

        match result {
            Ok(res) => {
                tx.commit().await?;
                Ok(res)
            }
            Err(e) => {
                tx.rollback().await?;
                Err(e)
            }
        }
    }
}

pub struct AuditlogRepository {}

pub struct OrderRepository {}

impl AuditlogRepository {
    pub async fn insert(&self, tx: &mut Transaction<'_, Postgres>, message: String) -> Result<(), Error> {
        sqlx::query("INSERT INTO auditlog (message) VALUES ($1)")
            .bind(&message)
            .execute(&mut **tx)
            .await?;

        Ok(())
    }
}

impl OrderRepository {
    pub async fn insert(&self, tx: &mut Transaction<'_, Postgres>, order_id: i32) -> Result<(), Error> {
        sqlx::query("INSERT INTO orders (order_id) VALUES ($1)")
            .bind(&order_id)
            .execute(&mut **tx)
            .await?;
        Ok(())
    }

    pub async fn insert_outbox(&self, tx: &mut Transaction<'_, Postgres>, order_id: i32) -> Result<(), Error> {
        sqlx::query("INSERT INTO outbox (order_id) VALUES ($1)")
            .bind(&order_id)
            .execute(&mut **tx)
            .await?;
        Ok(())
    }
}

pub struct OrderUnitOfWork {
    uow: UnitOfWork,
    auditlog_repository: Arc<AuditlogRepository>,
    order_repository: Arc<OrderRepository>,
}

impl OrderUnitOfWork {
    pub fn new(
        uow: UnitOfWork,
        auditlog_repository: Arc<AuditlogRepository>,
        order_repository: Arc<OrderRepository>,
    ) -> Self {
        OrderUnitOfWork {
            uow,
            auditlog_repository,
            order_repository,
        }
    }

    pub async fn create_order(&self, order_id: i32, auditlog_message: String) -> Result<(), Error> {
        let order_repository = Arc::clone(&self.order_repository);
        let auditlog_repository = Arc::clone(&self.auditlog_repository);
        let auditlog_message = auditlog_message.clone();

        self.uow
            .execute(|tx| {
                Box::pin(async move {
                    order_repository.insert(tx, order_id).await?;
                    auditlog_repository.insert(tx, auditlog_message).await?;
                    order_repository.insert_outbox(tx, order_id).await?;

                    Ok(())
                })
            })
            .await
    }
}

UnitOfWork 结构管理整个事务。它确保事务中的所有操作都完全执行,或者如果出现故障,则回滚。该结构中的 execute 函数特别有趣。它接受一个闭包,更准确地说是 FnOnce,它将所有需要执行的操作分组起来。这听起来可能有点复杂,实际上,定义正确的约束以使这个设置正常工作被证明是相当困难的。

当你检查存储库中使用的方法时,你可以看到单元工作的集成。现在,每个方法都从单元工作中接收一个事务对象,突出了它们之间的紧密耦合。

在我们的示例的最后,我们实现了 OrderUnitOfWork。这部分真正将整个概念整合在一起。它有一个方法,接受我们需要同时保存的数据——比如审计日志事件和订单本身。然后,这个方法通过调用 UnitOfWork 和必要的存储库来协调操作。

那么,为什么这里需要 Pin 呢?这是由于我们遇到的一个特定的编译器错误,这是由于 Rust 的安全保证造成的,我们将在下面讨论。

error: lifetime may not live long enough
   --> src/main.rs:101:17
    |
99  |             .execute(|tx| {
    |                       --- return type of closure impl Future<Output = Result<(), sqlx::Error>> contains a lifetime '2
    |                       |
    |                       has type &'1 mut Transaction<'_, Postgres>
100 |                 // Call a helper function that performs the transaction asynchronously
101 |                 async_execute_transaction(tx, order_repository, auditlog_repository, order_id, auditlog_message)
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that '1 must outlive '2

error: lifetime may not live long enough
   --> src/main.rs:101:17
    |
99  |             .execute(|tx| {
    |                       --- return type of closure impl Future<Output = Result<(), sqlx::Error>> contains a lifetime '4
    |                       |
    |                       has type &mut Transaction<'3, Postgres>
100 |                 // Call a helper function that performs the transaction asynchronously
101 |                 async_execute_transaction(tx, order_repository, auditlog_repository, order_id, auditlog_message)
    |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that '3 must outlive '4

为了解决生命周期问题,一种方法是将闭包传递给 execute 方法时返回的 Future 放入一个盒子里。这种技术确保异步块正确地处理它捕获的变量的生命周期。

为什么我们要克隆存储库?原因是我们在调用单元工作的 execute 函数时使用了 async move。如果我们不移动数据,我们会遇到错误,因为异步块需要拥有它使用的数据。

error[E0597]: `auditlog_repository` does not live long enough
   --> src/main.rs:103:21
    |
96  |           let auditlog_repository = Arc::clone(&self.auditlog_repository);
    |               ------------------- binding `auditlog_repository` declared here
...
100 |               .execute(|tx| {
    |                        ---- value captured here
101 | /                 Box::pin(async {
102 | |                     order_repository.insert(tx, order_id).await?;
103 | |                     auditlog_repository.insert(tx, auditlog_message).await?;
    | |                     ^^^^^^^^^^^^^^^^^^^ borrowed value does not live long enough
104 | |                     order_repository.insert_outbox(tx, order_id).await?;
105 | |
106 | |                     Ok(())
107 | |                 })
    | |__________________- returning this value requires that `auditlog_repository` is borrowed for `'static`
...
110 |       }
    |       - `auditlog_repository` dropped here while still borrowed

实现这种模式允许管理复杂的事务,并明确地将技术问题与业务逻辑分离。例如,如果我们将数据库切换到 Cassandra,我们需要调整存储库和单元工作以适应不同的数据存储方法。

根据我的个人经验,虽然这种实现有效,但还有改进和优化的空间,特别是在减少克隆的使用方面。克隆会降低速度,虽然使用 Arc (原子引用计数)在一定程度上缓解了这个问题,但它并不是完美的解决方案。

文章精选

Tailspin:用 Rust 打造的炫彩日志查看器

Rust: 重塑系统编程的安全壁垒

Youki:用Rust编写的容器运行时,性能超越runc

使用C2Rust将C代码迁移到Rust

Rust语言中如何优雅地应对错误


Rust编程笔记
与你一起在Rust的世界里探索、学习、成长!
 最新文章