今天咱们聊点有意思的东西——RocketMQ,关于它是如何保证消息不丢失,如何避免消息被重复消费的。我们时常得面对消息队列的各种“挑战”。特别是消息丢失和重复消费这两大难题,想必每个做过消息队列相关开发的朋友都有一肚子苦水要吐。一、RocketMQ:靠谱的消息队列
首先给大家简单介绍一下RocketMQ。RocketMQ 是一个分布式的消息队列系统,作为阿里巴巴开源的消息队列中间件,它有着非常高的可靠性和性能表现。无论是事务消息、延时消息,还是消息顺序消费,RocketMQ 都能轻松应对。但是,说到它如何保证消息不丢失,以及如何防止重复消费的问题,咱们就得更深入地了解它的机制了。别急,慢慢往下看,咱们一步步解析。二、消息不丢失的保障
在分布式系统中,消息丢失是一个非常致命的问题。为了确保消息的可靠投递,RocketMQ 提供了几种机制来保证消息不丢失。1. 消息存储的持久化
首先,最重要的一点是消息存储的持久化。RocketMQ 使用了基于文件的存储方式,所有消息都会先被存储到磁盘上,而不是仅仅存储在内存中。这意味着即使消息消费者异常退出,或者 RocketMQ 服务重启,只要磁盘上的数据没有丢失,消息就依然能够恢复。// 这是RocketMQ的消息存储模式,存储在磁盘上
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
在上述代码中,生产者将消息发送到 RocketMQ 的服务器中。生产者会将消息写入到磁盘的消息存储文件中,这样就保证了即使消息被发送后,RocketMQ 服务宕机,消息依然可以被恢复。2. 消息的确认机制
RocketMQ 提供了 同步发送、异步发送 和 单向发送 的方式来投递消息。在同步发送模式下,RocketMQ 会在消息投递到 Broker 后,等待 Broker 给出一个确认响应(即 ACK),确认消息已经成功接收并存储在磁盘上。如果 Broker 没有给出确认,消息就会被重新投递。这样就避免了消息丢失的情况。// 同步发送方式,确保消息已经被 Broker 确认
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 处理发送失败的情况,可以做重试
System.out.println("消息发送失败,进行重试...");
}
3. 消息的副本机制
为了防止磁盘损坏导致的消息丢失,RocketMQ 采用了 副本机制。每个主题的消息会在多个 Broker 之间进行副本同步。默认情况下,RocketMQ 会为每个消息维护多个副本(一般是2个),保证一个副本丢失时,其他副本可以恢复消息。// Broker 的副本机制,确保消息不丢失
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
通过副本机制,即使一个节点宕机,其他节点上仍然有备份,可以恢复消息,避免丢失。三、如何避免消息重复消费
消息重复消费是另一个常见的问题,通常会出现在以下几种场景:为了保证每条消息只被消费一次,RocketMQ 采用了以下几种方法:1. 消息的 唯一标识
RocketMQ 消息的每一条都有一个 全局唯一的消息 ID。这个消息 ID 是生产者生成的,消费者可以通过它来判断是否已经消费过该条消息。// 消息ID,用于避免重复消费
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
msg.setKeys("msgId_12345"); // 自定义消息ID,确保唯一
SendResult sendResult = producer.send(msg);
2. 消费进度的存储
消费者每次消费完一条消息,都会记录消费进度(通常是消息的偏移量),存储在 RocketMQ 的 消费进度存储 中。消费者恢复时,会从上次消费的进度继续消费,避免重复消费。// 消费者提交消费进度
MessageListenerConcurrently messageListener = (messageExtList, context) -> {
for (MessageExt messageExt : messageExtList) {
System.out.println("消费消息: " + new String(messageExt.getBody()));
// 提交消费进度
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
};
消费者消费完每条消息后,会通过 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
来标记该消息已成功消费,并提交消费进度。这样即使在消费者故障的情况下,也能避免重复消费。3. 消费的 幂等性 设计
在实际开发中,确保消费者端具有幂等性非常重要。即使同一条消息被消费者多次消费,最终的效果也应该是一样的。你可以通过在消费者端记录消费过的消息 ID 或其他标识来实现幂等性。例如,如果在数据库中插入数据时,采用唯一索引来避免重复插入。// 在数据库中插入数据时,通过唯一索引保证幂等性
public void consumeMessage(Message message) {
String messageId = message.getKeys(); // 获取消息ID
if (!messageAlreadyConsumed(messageId)) {
// 消费消息,插入数据库
insertData(message);
} else {
// 已经消费过该消息,跳过
System.out.println("消息已被消费,跳过");
}
}
四、总结一下
说到保证消息不丢失和避免重复消费,RocketMQ 做得确实很到位:- 消息持久化存储:通过磁盘存储,消息即使在系统崩溃后也能恢复。
- 消息确认机制:保证消息已经成功存储,减少了丢失的风险。
- 幂等性设计:通过消费者端的幂等性处理,即使重复消费,也能保证业务的正确性。
至于如何让你的消息系统更稳健,大家可以根据自己的业务需求适当调节 RocketMQ 的配置,比如消费模式、消息重试机制等。对编程、职场感兴趣的同学,可以链接我,微信:coder301 拉你进入“程序员交流群”。