在编写整洁的代码时,你可能熟悉存储库模式,它可以帮助隐藏你正在使用的数据库的细节。这意味着你可以切换不同的数据库,比如 PostgreSQL、MongoDB 或 Cassandra,而无需在代码中进行太多修改。
当你保存像订单这样的复杂数据时,通常还需要保存相关数据,比如审计日志和消息队列的出站事件。通常,订单及其相关的出站消息会分组在一个存储库中,而审计日志则在另一个存储库中。
然而,简单地使用两个独立的存储库进行保存可能会导致问题。
order_repository.insert(tx, order_id).await?;
auditlog_repository.insert(tx, auditlog_message).await?;
order_repository.insert_outbox(tx, order_id).await?;
例如,如果在保存订单后保存审计日志失败,你最终会在数据库中获得一个订单,但没有相关的审计日志。这种不完整的数据可能会阻止进一步处理订单。
为了解决这个问题,我们可以在关系型数据库中使用事务,它允许你将相关的保存步骤作为一个单一操作来处理。如果其中一部分失败,则不会保存任何数据,这可以防止不一致。在非关系型数据库中,你可以使用反规范化——将所有数据组合到一个单一文档或结构中——来确保一次性保存所有数据或不保存任何数据。
但是,我们如何将这些解决方案融入到我们的整洁代码实践中呢?这种逻辑应该是存储库的一部分,还是应该单独处理,以避免存储库承担过多职责,使其难以测试?
将此逻辑直接添加到服务层也不理想,因为它将基础架构问题与业务逻辑混合在一起,而我们通常希望避免这种情况。
这就是“单元工作”概念的用武之地。单元工作模式不是仅仅通过存储库抽象数据库交互,而是抽象出一组需要一起完成的完整操作。然后,这个单元在技术上与存储库相链接。
让我们看一个例子,更好地理解它是如何工作的。
pub struct OrderService {
order_uow: OrderUnitOfWork,
}
impl OrderService {
pub fn new(order_uow: OrderUnitOfWork) -> Self {
OrderService { order_uow }
}
pub async fn create_order(&self, order_id: i32, auditlog_message: String) -> Result<(), Error> {
// 进行验证并执行业务规则,例如检查产品可用性
// 保存订单
self.order_uow.create_order(order_id, auditlog_message).await
}
}
通常,你可能会看到一个订单存储库通过依赖注入直接在服务中使用,但现在我们引入了单元工作。
在 Rust 中实现这个单元工作抽象可能相当具有挑战性。让我们深入了解它是如何工作的:
pub struct UnitOfWork {
pool: PgPool,
}
impl UnitOfWork {
pub fn new(pool: PgPool) -> Self {
UnitOfWork { pool }
}
pub async fn execute<F>(&self, operation: F) -> Result<(), Error>
where
F: for<'c> FnOnce(&'c mut Transaction<'_, Postgres>) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'c>> + Send,
{
let mut tx: Transaction<'_, Postgres> = self.pool.begin().await?;
// 在事务中执行操作
let result = operation(&mut tx).await;
match result {
Ok(res) => {
tx.commit().await?;
Ok(res)
}
Err(e) => {
tx.rollback().await?;
Err(e)
}
}
}
}
pub struct AuditlogRepository {}
pub struct OrderRepository {}
impl AuditlogRepository {
pub async fn insert(&self, tx: &mut Transaction<'_, Postgres>, message: String) -> Result<(), Error> {
sqlx::query("INSERT INTO auditlog (message) VALUES ($1)")
.bind(&message)
.execute(&mut **tx)
.await?;
Ok(())
}
}
impl OrderRepository {
pub async fn insert(&self, tx: &mut Transaction<'_, Postgres>, order_id: i32) -> Result<(), Error> {
sqlx::query("INSERT INTO orders (order_id) VALUES ($1)")
.bind(&order_id)
.execute(&mut **tx)
.await?;
Ok(())
}
pub async fn insert_outbox(&self, tx: &mut Transaction<'_, Postgres>, order_id: i32) -> Result<(), Error> {
sqlx::query("INSERT INTO outbox (order_id) VALUES ($1)")
.bind(&order_id)
.execute(&mut **tx)
.await?;
Ok(())
}
}
pub struct OrderUnitOfWork {
uow: UnitOfWork,
auditlog_repository: Arc<AuditlogRepository>,
order_repository: Arc<OrderRepository>,
}
impl OrderUnitOfWork {
pub fn new(
uow: UnitOfWork,
auditlog_repository: Arc<AuditlogRepository>,
order_repository: Arc<OrderRepository>,
) -> Self {
OrderUnitOfWork {
uow,
auditlog_repository,
order_repository,
}
}
pub async fn create_order(&self, order_id: i32, auditlog_message: String) -> Result<(), Error> {
let order_repository = Arc::clone(&self.order_repository);
let auditlog_repository = Arc::clone(&self.auditlog_repository);
let auditlog_message = auditlog_message.clone();
self.uow
.execute(|tx| {
Box::pin(async move {
order_repository.insert(tx, order_id).await?;
auditlog_repository.insert(tx, auditlog_message).await?;
order_repository.insert_outbox(tx, order_id).await?;
Ok(())
})
})
.await
}
}
UnitOfWork
结构管理整个事务。它确保事务中的所有操作都完全执行,或者如果出现故障,则回滚。该结构中的 execute
函数特别有趣。它接受一个闭包,更准确地说是 FnOnce
,它将所有需要执行的操作分组起来。这听起来可能有点复杂,实际上,定义正确的约束以使这个设置正常工作被证明是相当困难的。
当你检查存储库中使用的方法时,你可以看到单元工作的集成。现在,每个方法都从单元工作中接收一个事务对象,突出了它们之间的紧密耦合。
在我们的示例的最后,我们实现了 OrderUnitOfWork
。这部分真正将整个概念整合在一起。它有一个方法,接受我们需要同时保存的数据——比如审计日志事件和订单本身。然后,这个方法通过调用 UnitOfWork
和必要的存储库来协调操作。
那么,为什么这里需要 Pin
呢?这是由于我们遇到的一个特定的编译器错误,这是由于 Rust 的安全保证造成的,我们将在下面讨论。
error: lifetime may not live long enough
--> src/main.rs:101:17
|
99 | .execute(|tx| {
| --- return type of closure impl Future<Output = Result<(), sqlx::Error>> contains a lifetime '2
| |
| has type &'1 mut Transaction<'_, Postgres>
100 | // Call a helper function that performs the transaction asynchronously
101 | async_execute_transaction(tx, order_repository, auditlog_repository, order_id, auditlog_message)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that '1 must outlive '2
error: lifetime may not live long enough
--> src/main.rs:101:17
|
99 | .execute(|tx| {
| --- return type of closure impl Future<Output = Result<(), sqlx::Error>> contains a lifetime '4
| |
| has type &mut Transaction<'3, Postgres>
100 | // Call a helper function that performs the transaction asynchronously
101 | async_execute_transaction(tx, order_repository, auditlog_repository, order_id, auditlog_message)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that '3 must outlive '4
为了解决生命周期问题,一种方法是将闭包传递给 execute
方法时返回的 Future 放入一个盒子里。这种技术确保异步块正确地处理它捕获的变量的生命周期。
为什么我们要克隆存储库?原因是我们在调用单元工作的 execute
函数时使用了 async move
。如果我们不移动数据,我们会遇到错误,因为异步块需要拥有它使用的数据。
error[E0597]: `auditlog_repository` does not live long enough
--> src/main.rs:103:21
|
96 | let auditlog_repository = Arc::clone(&self.auditlog_repository);
| ------------------- binding `auditlog_repository` declared here
...
100 | .execute(|tx| {
| ---- value captured here
101 | / Box::pin(async {
102 | | order_repository.insert(tx, order_id).await?;
103 | | auditlog_repository.insert(tx, auditlog_message).await?;
| | ^^^^^^^^^^^^^^^^^^^ borrowed value does not live long enough
104 | | order_repository.insert_outbox(tx, order_id).await?;
105 | |
106 | | Ok(())
107 | | })
| |__________________- returning this value requires that `auditlog_repository` is borrowed for `'static`
...
110 | }
| - `auditlog_repository` dropped here while still borrowed
实现这种模式允许管理复杂的事务,并明确地将技术问题与业务逻辑分离。例如,如果我们将数据库切换到 Cassandra,我们需要调整存储库和单元工作以适应不同的数据存储方法。
根据我的个人经验,虽然这种实现有效,但还有改进和优化的空间,特别是在减少克隆的使用方面。克隆会降低速度,虽然使用 Arc
(原子引用计数)在一定程度上缓解了这个问题,但它并不是完美的解决方案。