一、RocketMQ:靠谱的消息队列
二、消息不丢失的保障
1. 消息存储的持久化
// 这是RocketMQ的消息存储模式,存储在磁盘上
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
2. 消息的确认机制
// 同步发送方式,确保消息已经被 Broker 确认
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
// 处理发送失败的情况,可以做重试
System.out.println("消息发送失败,进行重试...");
}
3. 消息的副本机制
// Broker 的副本机制,确保消息不丢失
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
三、如何避免消息重复消费
消息被消费者多次消费 消息被重新投递到消费者(网络异常等原因)
1. 消息的 唯一标识
// 消息ID,用于避免重复消费
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
msg.setKeys("msgId_12345"); // 自定义消息ID,确保唯一
SendResult sendResult = producer.send(msg);
2. 消费进度的存储
// 消费者提交消费进度
MessageListenerConcurrently messageListener = (messageExtList, context) -> {
for (MessageExt messageExt : messageExtList) {
System.out.println("消费消息: " + new String(messageExt.getBody()));
// 提交消费进度
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
};
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
来标记该消息已成功消费,并提交消费进度。这样即使在消费者故障的情况下,也能避免重复消费。3. 消费的 幂等性 设计
// 在数据库中插入数据时,通过唯一索引保证幂等性
public void consumeMessage(Message message) {
String messageId = message.getKeys(); // 获取消息ID
if (!messageAlreadyConsumed(messageId)) {
// 消费消息,插入数据库
insertData(message);
} else {
// 已经消费过该消息,跳过
System.out.println("消息已被消费,跳过");
}
}
四、总结一下
消息持久化存储:通过磁盘存储,消息即使在系统崩溃后也能恢复。 消息确认机制:保证消息已经成功存储,减少了丢失的风险。 副本机制:多个副本保证消息的高可用性。 幂等性设计:通过消费者端的幂等性处理,即使重复消费,也能保证业务的正确性。
-END-
以上,就是今天的分享了,看完文章记得右下角给何老师点赞,也欢迎在评论区写下你的留言。