阿里面试官:RocketMQ 如何保证消息不丢失,如何保证消息不被重复消费?

文摘   2024-12-16 14:31   陕西  
今天咱们聊点有意思的东西——RocketMQ,关于它是如何保证消息不丢失,如何避免消息被重复消费的。
我们时常得面对消息队列的各种“挑战”。特别是消息丢失和重复消费这两大难题,想必每个做过消息队列相关开发的朋友都有一肚子苦水要吐。
放心,今天我们就来好好掰扯掰扯这些问题。

一、RocketMQ:靠谱的消息队列

首先给大家简单介绍一下RocketMQ。RocketMQ 是一个分布式的消息队列系统,作为阿里巴巴开源的消息队列中间件,它有着非常高的可靠性和性能表现。无论是事务消息、延时消息,还是消息顺序消费,RocketMQ 都能轻松应对。
但是,说到它如何保证消息不丢失,以及如何防止重复消费的问题,咱们就得更深入地了解它的机制了。别急,慢慢往下看,咱们一步步解析。

二、消息不丢失的保障

在分布式系统中,消息丢失是一个非常致命的问题。为了确保消息的可靠投递,RocketMQ 提供了几种机制来保证消息不丢失。

1. 消息存储的持久化

首先,最重要的一点是消息存储的持久化。RocketMQ 使用了基于文件的存储方式,所有消息都会先被存储到磁盘上,而不是仅仅存储在内存中。这意味着即使消息消费者异常退出,或者 RocketMQ 服务重启,只要磁盘上的数据没有丢失,消息就依然能够恢复。
// 这是RocketMQ的消息存储模式,存储在磁盘上
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message("TopicTest""TagA""Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
在上述代码中,生产者将消息发送到 RocketMQ 的服务器中。生产者会将消息写入到磁盘的消息存储文件中,这样就保证了即使消息被发送后,RocketMQ 服务宕机,消息依然可以被恢复。

2. 消息的确认机制

RocketMQ 提供了 同步发送异步发送单向发送 的方式来投递消息。在同步发送模式下,RocketMQ 会在消息投递到 Broker 后,等待 Broker 给出一个确认响应(即 ACK),确认消息已经成功接收并存储在磁盘上。如果 Broker 没有给出确认,消息就会被重新投递。这样就避免了消息丢失的情况。
// 同步发送方式,确保消息已经被 Broker 确认
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
    // 处理发送失败的情况,可以做重试
    System.out.println("消息发送失败,进行重试...");
}

3. 消息的副本机制

为了防止磁盘损坏导致的消息丢失,RocketMQ 采用了 副本机制。每个主题的消息会在多个 Broker 之间进行副本同步。默认情况下,RocketMQ 会为每个消息维护多个副本(一般是2个),保证一个副本丢失时,其他副本可以恢复消息。
// Broker 的副本机制,确保消息不丢失
Message msg = new Message("TopicTest""TagA""Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
通过副本机制,即使一个节点宕机,其他节点上仍然有备份,可以恢复消息,避免丢失。

三、如何避免消息重复消费

消息重复消费是另一个常见的问题,通常会出现在以下几种场景:
  • 消息被消费者多次消费
  • 消息被重新投递到消费者(网络异常等原因)
为了保证每条消息只被消费一次,RocketMQ 采用了以下几种方法:

1. 消息的 唯一标识

RocketMQ 消息的每一条都有一个 全局唯一的消息 ID。这个消息 ID 是生产者生成的,消费者可以通过它来判断是否已经消费过该条消息。
// 消息ID,用于避免重复消费
Message msg = new Message("TopicTest""TagA""Hello RocketMQ".getBytes());
msg.setKeys("msgId_12345"); // 自定义消息ID,确保唯一
SendResult sendResult = producer.send(msg);

2. 消费进度的存储

消费者每次消费完一条消息,都会记录消费进度(通常是消息的偏移量),存储在 RocketMQ 的 消费进度存储 中。消费者恢复时,会从上次消费的进度继续消费,避免重复消费。
// 消费者提交消费进度
MessageListenerConcurrently messageListener = (messageExtList, context) -> {
    for (MessageExt messageExt : messageExtList) {
        System.out.println("消费消息: " + new String(messageExt.getBody()));
        // 提交消费进度
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
};
消费者消费完每条消息后,会通过 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 来标记该消息已成功消费,并提交消费进度。这样即使在消费者故障的情况下,也能避免重复消费。

3. 消费的 幂等性 设计

在实际开发中,确保消费者端具有幂等性非常重要。即使同一条消息被消费者多次消费,最终的效果也应该是一样的。你可以通过在消费者端记录消费过的消息 ID 或其他标识来实现幂等性。例如,如果在数据库中插入数据时,采用唯一索引来避免重复插入。
// 在数据库中插入数据时,通过唯一索引保证幂等性
public void consumeMessage(Message message) {
    String messageId = message.getKeys();  // 获取消息ID
    if (!messageAlreadyConsumed(messageId)) {
        // 消费消息,插入数据库
        insertData(message);
    } else {
        // 已经消费过该消息,跳过
        System.out.println("消息已被消费,跳过");
    }
}

四、总结一下

说到保证消息不丢失和避免重复消费,RocketMQ 做得确实很到位:
  1. 消息持久化存储:通过磁盘存储,消息即使在系统崩溃后也能恢复。
  2. 消息确认机制:保证消息已经成功存储,减少了丢失的风险。
  3. 副本机制:多个副本保证消息的高可用性。
  4. 幂等性设计:通过消费者端的幂等性处理,即使重复消费,也能保证业务的正确性。
至于如何让你的消息系统更稳健,大家可以根据自己的业务需求适当调节 RocketMQ 的配置,比如消费模式、消息重试机制等。

-END-


ok,今天先说到这,老规矩,给大家分享一份不错的副业资料,感兴趣的同学找我领取。

以上,就是今天的分享了,看完文章记得右下角给何老师点赞,也欢迎在评论区写下你的留言

程序员老鬼
10年+老程序员,专注于AI知识普及,已打造多门AI课程,本号主要分享国内AI工具、AI绘画提示词、Chat教程、AI换脸、Chat中文指令、Sora教程等,帮助读者解决AI工具使用疑难问题。
 最新文章