使 API 具有弹性:使用发件箱模式提高 .NET 微服务的可靠性

科技   2024-12-11 05:54   上海  


在微服务的世界里,我们都遇到过事情未按计划进行的情况。想象一下这样的场景:你有一个微服务,它会将新订单保存到数据库中,然后发布一条消息来通知其他服务。一切原本都很顺利……直到消息发布失败,导致你系统的部分环节失去同步。好消息是,这正是“发件箱模式”(Outbox Pattern)大显身手的地方。

今天,我们来深入探讨一下这种模式如何提高你的API的可靠性,以及它为何非常适用于.NET项目。

为何采用发件箱模式?

发件箱模式背后的主要理念是避免那些令人头疼的不一致性问题。该模式通过在单个数据库事务中同时记录数据更新和消息,来确保它们保持同步。然后,我们可以异步处理这些已记录的消息,并将它们推送到消息系统中。

设置:一个结合.NET、实体框架(Entity Framework)和RabbitMQ的实际示例

在这个示例中,假设我们的服务用于处理订单。每当创建一个新订单时,我们会将其保存到数据库中,并通过发布消息来通知其他服务(例如库存、配送服务)。

让我们开始吧。

步骤1:定义发件箱表

我们首先要创建一个“OutboxMessages”表来存储待处理的消息。以下是实体框架中的模型:

public class OutboxMessage
{
public Guid Id { get; set; } = Guid.NewGuid();
public string MessageType { get; set; }
public string Payload { get; set; }
public bool Processed { get; set; } = false;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? ProcessedAt { get; set; }
}

“Processed”字段用于让我们知晓消息是否已经发送,而“MessageType”字段有助于识别消息的用途。

步骤2:在事务内写入发件箱

神奇的地方就在这里。在我们的订单处理方法中,在保存订单本身之后,会立即向“OutboxMessages”表中添加一条消息。由于这两个操作处于同一个事务中,我们就能确保它们会一起成功(或一起失败)。

public class OrderService
{
private readonly AppDbContext _context;

public OrderService(AppDbContext context)
{
_context = context;
}

public async Task CreateOrderAsync(Order order)
{
using var transaction = await _context.Database.BeginTransactionAsync();

try
{
// 将订单保存到数据库
_context.Orders.Add(order);
await _context.SaveChangesAsync();

// 准备发件箱消息
var outboxMessage = new OutboxMessage
{
MessageType = "OrderCreated",
Payload = JsonConvert.SerializeObject(new { order.Id, order.CustomerId, order.TotalAmount }),
Processed = false
};

// 将消息保存在发件箱中
_context.OutboxMessages.Add(outboxMessage);
await _context.SaveChangesAsync();

// 提交事务
await transaction.CommitAsync();
}
catch (Exception)
{
await transaction.RollbackAsync();
throw;
}
}
}

这样一来,现在订单和发件箱消息都被存储在数据库中了,并且都被妥善地包裹在一个整齐的事务里。

步骤3:异步处理发件箱

现在该使用我们的“发件箱处理器”(OutboxProcessor)了,它会定期检查未处理的消息,并将它们发布到RabbitMQ中。让我们深入看看代码:

public class OutboxProcessor
{
private readonly AppDbContext _context;
private readonly IModel _rabbitMqChannel;

public OutboxProcessor(AppDbContext context, IConnection rabbitMqConnection)
{
_context = context;
_rabbitMqChannel = rabbitMqConnection.CreateModel();
}

public async Task ProcessOutboxMessagesAsync()
{
var messages = await _context.OutboxMessages
.Where(m =>!m.Processed)
.ToListAsync();

foreach (var message in messages)
{
var payload = Encoding.UTF8.GetBytes(message.Payload);
_rabbitMqChannel.BasicPublish(exchange: "", routingKey: "OrderQueue", basicProperties: null, body: payload);

// 将消息标记为已处理
message.Processed = true;
message.ProcessedAt = DateTime.UtcNow;
await _context.SaveChangesAsync();
}
}
}

在这段代码中:

  • “发件箱处理器”会获取所有未处理的消息。

  • 每条消息都会被发布到RabbitMQ。

  • 在成功发布之后,消息会被标记为已处理,这样它就不会再次被发送了。

步骤4:通过后台服务自动处理发件箱

我们希望发件箱处理能够定期运行,所以让我们将它添加为一个“后台服务”(BackgroundService):

public class OutboxWorker : BackgroundService
{
private readonly OutboxProcessor _outboxProcessor;

public OutboxWorker(OutboxProcessor outboxProcessor)
{
_outboxProcessor = outboxProcessor;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await _outboxProcessor.ProcessOutboxMessagesAsync();
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
}
}

这个“发件箱工作器”(OutboxWorker)确保我们的处理器每30秒运行一次,按需检查消息并进行发布。

通过发件箱模式,我们的.NET微服务能够可靠地处理订单创建和消息发布。我们避免了状态不一致的情况,并且确保了即使RabbitMQ暂时宕机,我们也不会丢失任何消息。

所以,下次你在.NET中构建微服务时,考虑使用发件箱模式来让你的API坚如磐石吧。你会庆幸自己这么做的!

如果你喜欢我的文章,请给我一个赞!谢谢


架构师老卢
资深软件架构师, 分享编程、软件设计经验, 教授前沿技术, 分享技术资源(每天发布电子书),每天进步一点点...
 最新文章