RocketMQ出Bug了,消息疯狂堆积!

文摘   2024-11-03 12:06   山西  

今天来聊聊在 RocketMQ 上遇到的坑,这次碰到的是消息堆积问题!消息疯狂堆积,一开始还挺淡定,心想不就是“几条消息”吗?

结果生产者那边嗖嗖地往里塞,消费者这边像个蜗牛,压根追不上节奏,愣是给挤成了“数据沙丁鱼罐头”。这就不简单了,得好好分析一波。

一、初探消息堆积问题 🕵️

首先,消息堆积这种情况一般就两种原因:要么是消费者速度慢,要么是生产者速度太快

这一检查发现,生产者和消费者的应用和服务器状态都挺正常,网络也没啥延迟。但既然都正常,凭什么消息还堵在这儿呢?

我立马开启排查模式,观察 RocketMQ 消费者端的各项指标,发现有个奇怪的现象:某几个消费实例的 ClientId 竟然相同!这种情况放在别的场景里可能没啥,但在 RocketMQ 中,这可绝对是个灾难源头。

二、问题深挖:ClientId 是如何引发灾难的?💥

接着往下分析,我怀疑是 ClientId 重复 导致了问题的爆发。为啥这么想?因为 RocketMQ 的消费者在做负载均衡时 是根据 ClientId 分配消费任务的。

多个消费者要是 ID 重复,就相当于“共享”了同一个身份,等于是几个人都拿同一张身份证,那任务分配不出问题才怪呢!

1. Docker 的网络模式暗藏玄机

在查明原因的过程中,我还发现,这个 ClientId 重复的锅,竟然得 Docker 来背。RocketMQ 消费者跑在 Docker 的 Host 模式下,导致每个消费者的 IP 地址都变成了宿主机的 IP。这一来,RocketMQ 默认生成的 ClientId 便重复了,因为它的生成规则是 IP 地址 + 进程号。同一个 IP 配合不同的进程,生成出来的 ClientId 就重复,RocketMQ 的负载均衡立马乱套了。

2. 源码里找答案 📜

毕竟技术功底摆在这儿,自然是要从 RocketMQ 源码里找答案。打开源码,仔细分析一波,果然验证了 ClientId 的生成规则是 ip:port。这解释了为何同一个宿主机的 IP 导致多个消费者 ClientId 相同。

public String buildClientId() {
    return localAddress.getHostAddress() + "@" + pid;
}

RocketMQ 负载均衡机制的确依赖于 ClientId,而这段代码生成的 ClientId 就是 ip:port 形式。多个容器在 Host 网络模式下启动,IP 地址一致,加上默认端口和进程号,导致了不同容器 ClientId 的重叠。

三、分析 RocketMQ 负载均衡机制 🤹‍♂️

RocketMQ 的负载均衡策略在消费端主要通过分区均衡实现。简单来说,RocketMQ 会把所有消息队列分给每个 ClientId。如果多个 ClientId 相同的消费者存在,队列分配的机制就会把一个队列的消息分发给多个消费端,而这些消费者因为“身份重叠”就只能接收到部分消息,无法发挥全部消费能力。

代码上简单展示一下负载均衡的逻辑:

public void doRebalance() {
    // 基于当前消费者的 ClientId 列表计算消息分配
    for (MessageQueue mq : mqAll) {
        String clientId = selectClientForQueue(mq);
        allocateQueue(clientId, mq);
    }
}

这里的 selectClientForQueue(mq) 就是根据 ClientId 来进行分配,一旦 ClientId 重复,这个逻辑就会出错,分配的队列资源都乱了套,导致了消费者端的消费性能大大下降。


四、解决方案出炉 🧰

既然找到原因了,解决问题的思路也就清晰了。立马采取了两项措施:

  1. 自定义设置唯一的 ClientId:在代码中为每个消费者实例添加独立的标识,确保每个 ClientId 唯一不重复,RocketMQ 的队列分配终于不再迷糊,负载均衡正常运转了。

  2. 提高消费者的消费速度:通过优化消费者端的配置,让消费处理速度提升,比如调整消息批量消费的数量参数等。这样一来,消息堆积的问题逐步得到缓解。

五、新问题:MongoDB 写入压力大报警 🏃‍♂️

事情往往不是一件一件地来,而是一连串连锁反应。消息堆积问题刚解决,东哥那边的 MongoDB 又开始写入压力报警。这是因为消费速度提高了,写入操作一下子涌向 MongoDB 造成的。

解决 MongoDB 的写入压力

为了应对 MongoDB 的写入压力,东哥采取了重置消费点位的方式,先暂停部分消费,再清理掉一部分历史数据。这样 MongoDB 的写入压力瞬间减少,消费系统得以正常运转。

六、总结分析 🧠

通过这次的“堆积灾难处理大作战”,我们也能得出一些经验:

  1. ClientId 的生成规则:RocketMQ 默认是 客户端 IP + 进程号 的组合。在 Docker 的 Host 模式下,同一宿主机的 IP 会导致 ClientId 重复问题,影响负载均衡机制。

  2. RocketMQ 负载均衡机制的特点:消费者负载均衡高度依赖 ClientId,ClientId 一旦重复会引发队列分配错乱,直接导致消息堆积。

  3. 消息堆积的应对方法:在确保 ClientId 唯一性后,还可以适当提高消费者的消费速度,通过批量消费和其他参数调整来减轻消息堆积的情况。

  4. 系统联动性影响:消息队列堆积处理时,要谨防对后续业务系统产生连锁影响,比如这里 MongoDB 的写入压力问题,处理上也得小心翼翼。

好了,这次总算是“战胜了”消息堆积,但显然每个系统环节都有可能牵一发而动全身。

以后再遇到消息堆积问题,大家不妨优先检查 ClientId 是否唯一,接着再优化消费速度,避免重蹈覆辙了。

-END-

ok,今天先说到这,老规矩,看完文章记得右下角给何老师点赞,

最后送给大家一个福利,我这里有一份搞副业的教程,这份教程里有100+个搞钱小项目:

网盘拉新核心玩法、公众号运营变现、小红书虚拟资料引流等,现在扫码加我微信,即可领取这份副业教程。

添加时备注:副业

程序媛山楂
5年+程序媛,专注于AI编程普及。本号主要分享AI编程、Chat账号、Chat教程、Sora教程、Suno AI、Sora账号、Sora提示词、AI换脸、AI绘画等,帮助解决各种AI疑难杂症。
 最新文章