0 前言
去年年底,和大家分享了一个个人项目的实现方案——基于协程池架构实现分布式定时器 XTimer.
在这次分享中我有提到, xtimer 前身是基于消息队列架构实现的 workflow timer. 之所以在演进过程中把实现方式改为协程池,主要是考虑到消息队列组件存在比较高的使用和维护成本. 然而从技术流程本身出发,基于消息队列的实现方式实际上对于分布式定时器的核心流程解耦以及纵向架构扩展都是有利的.
基于这一点,最近针对使用 redis 这种轻量级缓存组件实现消息队列的可行性方案进行了调研,为 xtimer 未来的重构之路做好铺垫. 这里特别把相关内容摘出来和大家一起分享探讨一下.
下面是本期分享内容的目录大纲:
1 消息队列
要想把本期分享话题聊到位,首先我们需要理清楚,一个合格的消息队列(MQ,message queue)应该具备哪些核心能力.
1.1 核心能力
谈到消息队列,其最重要的两项能力就是解耦和削峰.
针对于 mq 的解耦功能而言,这里举个生活中的例子来帮助大家进一步理解这个概念.
假设我们在网上购买了一些商品. 在上班开会的时候,快递小哥打通了我们的电话,告知物品已经送达到我们的家门口.
这个时候,倘若这个快递小哥是个比较耿直的人,强烈要求我们必须立刻过去当面签收,这个流程才能结束. 那此时的我们就尬住了. 我们得和领导打个招呼,赶回家去签收快递. 在我们到家之前,快递小哥也必须一直守在原地等我们回去签收交接. 整个流程是比较僵硬的,在这一次交接动作完成之前,双方都没办法再灵活处理其他事项了.
上述例子就类似于我们在业务流程中基于 http/rpc 发起的一次同步请求,上游(快递小哥)在发出请求后(打电话),会阻塞等待下游(作为签收方的我们)给到反馈(完成签收操作),否则整个流程会一直阻塞住.
然而在实际场景中,我们知道还有一个叫作“快递超市”的存在. 当快递到达时,快递小哥可以将我们的物品先存放在快递超市中,登记好接收方的个人信息后,并给接收方发完通知短信后,快递小哥就可以先撤离现场,去忙活其他事情了. 接下来,快递超市会为接收方承担起托管快递的职责,接收方只需要选择在合适的时间去快递超市收取物品即可.
这个流程相比之下就显得灵活很多,由于有快递超市这个缓冲区的存在,使得我们和快递小哥之间的交互流程能够实现解耦. 在这个流程中,快递小哥就类似于生产者 producer,我们作为接收方,类似于消费者 consumer,而负责承上启下、托管快递的快递超市则类似于消息队列 mq.
聊完了这个生活场景,我们再从技术视角出发,对 mq 所带来的解耦能力进行一轮阐述:
• 在有了 mq 后,producer 不需要过分关心 consumer 的身份信息,只需要把消息按照指定的协议投递到对应的 topic 即可
• producer 在处理请求时,只需要把消息投递到 mq 即可认为流程处理结束,相比于同步请求下游,整个流程会更加轻便灵活,拥有更高的吞吐量
• 因为有 mq 作为缓冲层. 下游 consumer 可以设定好合适的消费限流参数,按照指定的速率进行消费,能够在很大程度上对 consumer 起到保护作用
下面我们再用同样的例子说明一下消息队列的另一项核心功能——削峰.
假设现在正值双十一时期,我们剁手一通买买买,导致同时有大量的快递在同一个时段到达. 这时候,快递超市就为我们起到“削峰”的效果. 不论快递数量的多少,我们不用第一时间立刻进行响应处理,而是能够选择在合适的时间到达快递超市进行取件. 如果快递数量很大,我们一次拿不完的话,我们也可以量力而行,每次只收取一部分,分成多个批次处理.
这个流程就类似于 mq 所带来的消息削峰的能力. 在实际的生产环境中,倘若上游请求量很大,而下游都需要第一时间进行同步响应的话,这对于下游系统可能产生很大的负荷. 此时如果能把同步流程转为异步,把消息放到 mq 组件中进行一轮缓冲,让下游可以根据自身的处理能力,按照自己的节奏消化这部分积攒的流量,这对于下游系统来说能起到很好的保护作用.
前面是从宏观功能的视角出发,聊到了 mq 对应的异步流程所具备的优势. 下面我们再谈一谈,作为 mq 组件需要具备哪些基础能力:
• 消息不丢失
mq 最基本的一项能力是,要确保整个交互流程不出现消息丢失的问题. 这里我们可以从三个环节去看待这个问题:
• producer 将 msg 投递到 mq 时不出现丢失
• msg 存放在 mq 时不出现丢失
• consumer 从 mq 消费 msg 时不出现丢失
针对于上述第二点,各 mq 组件在实现上大抵上是基于数据落盘+数据备份的方式保证的.
而针对于上述的一、三点,则是通过两个交互环节中的 ack 机制保证的. 以 producer 投递 msg 到 mq 的环节为例,只要 mq 没有给到投递成功的 ack 反馈,那么 producer 就应该把本次投递流程视为失败,执行重新投递的操作. consumer 的消费流程同样如此.
因此,mq 交互流程主要通过 ack 机制保证消息投递以及消费环节做到 at least once(至少一次)的语义,然而无法保证消息不重复的问题. 因此,处于最下游的消费者 consumer 需要能够具备消息幂等去重的能力,避免流程被重复处理.
• 支持消息存储
另一项能力是支持消息的存储. 以我们前面提到的取快递的例子来说,快递超市需要有一个实体店面,店面具有着一定的容量能够存放一定数量的快递. 这样当下游 consumer 没来得及第一时间消费消息时,消息能缓存在 mq 组件中一段时间,让消费方自由选择合适的时间过来进行消费.
1.2 流程类型
我们定义 mq 类型时,可以从多个维度出发. 这里我主要根据 consumer 消费的流程,将 mq 分为 push 型和 pull 型.
• push 型:
push 型指的是当 producer 将消息投递到 mq 时,由 mq 负责将消息以推送的形式主动发送给各个建立了订阅关系的消费方.
• pull 型:
pull 型指的是当 mq 中存在消息时,由 consumer 主动执行拉取消息的操作.
关于以上两种 mq 类型,在这里个人有一些比较浅显的认知:
对于 push 型,存在的优势是:
• 流程实时性比较强,消息来了就执行推送
• 比较契合发布/订阅的模型
劣势:
• 对下游 consumer 的保护力度不够. mq 的核心功能是解耦、削峰,本质上是提供了一个缓冲的空间,让 consumer 能根据自己的消费能力在合适的时机进行消息处理. 所以 push 型在这方面体现的优势不够明显,消息到达后就需要向各个 consumer 发起推送. 不过这个问题可以在一定程度上通过消费限流的方式加以弥补.
对于 pull 型则刚好相反:
• 优势是:下游握有消费操作的主动权,能选择在合适的时机执行消费操作
• 劣势是:实时性会弱一些,和主动 pull 的轮询机制有关
1.3 redis 实现 mq 的问题
理清楚 mq 的核心功能和诉求后,下面我们先明确一下基于 redis 实现 mq 存在的一类通用问题:
• 存储昂贵
redis 本身是基于内存实现的缓存组件,因此在存储消息时总容量相对有限.
• 数据丢失
此外,redis 存储消息时会不可避免地存在数据丢失的风险,可以从两个方面出发考虑:
• 内存是易失性存储. 即便 redis 中有 rdb/aof 之类的持久化机制加以弥补,但这个持久化流程是异步执行的,无法提供百分百的保证力度
• redis 走的是 ap 高可用流派,数据的主从复制流程是异步执行的,主从切换时数据存在弱一致问题
以上问题,不论是在 redis 缓存数据还是实现 mq 的流程中都是存在的,这个问题我们在选型使用 redis 时需要做到了然于心,这一点在后续 4.3 小节中会进一步展开说明.
2 redis list
基于 redis 实现 mq 的方式之一是使用 redis 中的 list 结构.
redis 中的 list 是一个双向链表,天然契合 mq 中的 queue 队列模型. 我们在使用 redis list 实现 mq 时,可以生产消息的流程具象化为一次将数据追加到 list 尾部的操作;同时,我们可以把消费消息的流程具象化为一次从 list 头部摘取数据的操作. 如此这般,一个简易版的消息队列就实现了.
2.1 操作指令
下面我们对涉及到的几个指令展开介绍:
首先,在使用 list 充当消息队列时,list 对应的 key 则对应为消息的 topic 名称.
producer 在投递消息时,可以使用 lpush 指令,对应的时间复杂度为 O(1). 指令文档链接:https://redis.io/commands/lpush/
127.0.0.1:6379> lpush my_list_topic msg
(integer) 1
• my_list_topic: topic 名称
• msg:投递的消息内容
consumer 消费消息时,使用 rpop 指令,对应的时间复杂度 O(1). 指令文档链接:https://redis.io/commands/rpop/
127.0.0.1:6379> rpop my_list_topic
"msg"
• my_list_topic:topic 名称
• msg:获取到的消息
2.2 消费流程分析
在上述流程中,存在的第一个问题是,consumer 的轮询消费流程应该如何组织.
这种基于 list 实现的 mq 是属于 pull 类型. 消费方自行组织流程,并在合适的时机通过 rpop 执行进行消息的主动拉取.
首先,consumer 在消费时,一定是一个类似于 loop thread 的自旋模型,每一轮循环中,通过 rpop 指令尝试从 list 中读取消息,如果成功读取到了消息,则进行相应的逻辑处理.
然而在此处,需要注意的是,redis 的 rpop 指令是非阻塞型的,即在 list 没有数据时,也会即时返回一个结果为 nil 的响应,这样我们在组织这段自旋程序的时候就显得有些尴尬了:
• 倘若我们在 rpop 捕捉到 nil 时,立即开启下一轮循环,则这个轮询行为可能是没有意义的,因为 list 中可能仍然不存在数据. 这样的高频率自旋,对于 cpu 资源是一种无谓的损耗
• 倘若我们选择让 consumer 休眠一段时间进行循环,这个休眠的时长又具有一定的人为误判性. 倘若我们把时长设得太短,仍然会存在 cpu 浪费的问题;倘若设得太长,则可能会导致消息处理不及时的问题
在这个过程中,最理想的实现方案是,在 list 中有数据到达时,我们令 consumer 即时获取到对应的结果;倘若 list 数据为空,则令 consumer 陷入阻塞等待的状态,直到有数据抵达时程序才被唤醒.
所幸,这个诉求在 redis list 中是可以满足的. 我们可以使用 redis 中的 brpop 指令替代 rpop 指令,做到在有数据时才返回响应,否则令当前程序陷入阻塞. brpop 指令文档:https://redis.io/commands/brpop/
127.0.0.1:6379> brpop my_list_topic 0
1) "my_list_topic"
2) "msg"
• my_list_topic: topic 名称
• 0:阻塞等待的超时时长,达到此阈值仍未获取数据时会返回 nil. 如果设置为 0 ,则代表没有这个超时限制.
2.3 局限性分析
即便我们采用 brpop 解决了 consumer 合理阻塞消费数据的问题,这种基于 redis list 实现的 mq 仍然不能称为一个成熟的实现方案,其中主要存在着以下几项缺陷:
• 无法支持发布/订阅模式
list 中的数据是独一份的,被 pop 出去后就不复存在了.
因此 redis 中的 list 是无法支持 mq 中的发布/订阅模式的,即下游倘若存在多个独立的消费者组 consumer group,各自都需要独立获取一份完整的数据,那么此时通过 redis list 是无法满足这个诉求的.
• 无法支持消费端 ack 机制
consumer 通过 brpop 获取到数据后,倘若发生宕机或者其他意外错误,没有一种有效的手段能给予 mq 一个消息处理失败的反馈. 这条消息一旦从 list 中被取走,就不再有机会被重新获取了,因此在这个场景下,消息就真的丢失了.
3 redis pub/sub
为解决 redis list 存在的无法支持发布/订阅模式的问题,redis 提供了 pub/sub 机制,能够有效地弥补这方面的缺陷.
pub/sub 全称为 publish/subscribe,顾名思义,指的正是消息队列中的发布/订阅模式.
为了贴合 pub/sub 的语义,在本章中,我们统一把生产者 producer 称为 publisher,消费者 consumer 称为 subscriber,大家留意一下.
在实现上,pub/sub 会在 publisher 和 subscriber 之间建立一个用于实时通讯的信道——channel. 在传递消息时,会根据 channel 查找到所有建立过订阅关系的 subscriber,一一将消息送达到它们手中.
3.1 操作指令
下面我们对使用 pub/sub 实现 mq 流程涉及到的几个核心指令进行介绍.
首先,消费方 subscriber 通过 subscribe 指令建立对某个 channel 的订阅关系. 指令文档:https://redis.io/commands/subscribe/
127.0.0.1:6379> subscribe my_channel_topic
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "my_channel_topic"
3) (integer) 1
• my_channel_topic: topic 名称
每个通过 subscribe 指令建立 channel 订阅关系的使用方都会被视为一个独立的 subscriber,后续 channel 中有消息到达时,会被复制成多份,一一推送到各个 subscriber 手中.
生产方 publisher 通过 publish 指令往对应的 channel 中执行消息投递操作. 指令文档:https://redis.io/commands/publish/
127.0.0.1:6379> publish my_channel_topic msg
(integer) 1
此时,之前对这个 channel 执行过 subscribe 操作的 subscriber 都会接收到这则消息:
1) "message"
2) "my_channel_topic"
3) "msg"
此外,值得一提的是,消费者通过 subscribe 指令会对 channel 采用阻塞模式进行监听,只有在有消息到来时,才会从阻塞状态中被唤醒.
3.2 实现原理
下面我们对 redis pub/sub 模式的实现原理进行介绍. 在理清原理后,我们才能对这个流程中存在的问题进行更为通透的认识和分析.
• 首先,消费方 subscriber 通过 subscribe 指令建立和指定 channel 之间的订阅关系. 这时在 redis 中会维护好 channel 和对应 subscriber 列表的映射关系,并在内存中为每个在线活跃的 subscriber 分配好一个缓冲区 buffer,用以承载后续到来的消息数据
• 接下来随着 publisher 执行 publish 指令,往对应 channel 中投递消息后,此时 redis 会实时查看 channel 对应 subscriber 名单,往每个 subscriber 的缓冲区 buffer 中推送这条数据
• 各执行了 subscribe 指令的 subscriber 会处于阻塞监听缓冲区 buffer 的状态,随着新数据到达,subscriber 会获取到这笔数据
基于这个流程,我们能看出来,pub/sub 对于 channel 以及 subscribers 之间的实时映射关系存在强依赖. 因此在操作的执行顺序上,我们需要保证先执行 subscribe 指令,再执行 publish 执行,否则前几笔 publish 投递的数据就会因为不存在 subscriber 而被直接丢弃.
3.3 优缺点分析
理完了 pub/sub 的实现原理,下面我们可以对其存在的优劣势进行分析了.
首先,pub/sub 模式最大的优势就是能够支持发布/订阅能力,同一份消息会被推送给所有通过 subscribe 操作订阅了该 channel 的 subscriber.
然而,pub/sub 存在的问题是很显著的,就是丢消息问题. 这个问题可以从多个维度展开:
• 缺乏 ack 机制
这个问题和 redis list 相同, subscriber 在获取到消息后,没有对应的 ack 机制,因此倘若处理失败,想要执行消息的重放操作是无法做到的
• 缺乏消息存储能力
我觉得 redis pub/sub 机制就有点类似于 golang 中的无缓冲型 channel. 它相当于只维护了channel 和 subscribers 的映射关系,但是每条被投递的消息都是即来即走,并不会停留在 channel 中,于是在以下几个场景中,都会发生消息丢失问题:
• subscriber 宕机:倘若某个 subscriber 中途宕机,则会被踢出名单,在恢复前的这段时间内,到达的消息都会彻底与这个 subscriber 无缘
• redis 宕机:每条 publish 的消息都会第一时间分发到 subscriber 对应的内存缓冲区中,而这个缓冲区是完全基于内存实现的易失性存储. 一旦 redis 服务端宕机,缓冲区中的数据就完全丢失且不可恢复了. 此外,pub/sub 模式下的消息数据不属于 redis 中的基本数据类型,因此 redis 中的持久化机制 rdb 和 aof 对于 pub/sub 中的数据是完全不生效的,数据丢失的可能性大幅度提高
• subscriber 消息积压:由于消息数据会被放在 redis 侧各 subscriber 的缓冲区 buffer 中,这部分空间是相对有限的,一旦某个 subscriber 因为消费能力弱,导致 buffer 中的的数据发生积压,此时 redis 很可能会自动把 subscriber 踢除下线,于是这部分数据也丢失了
针对最后这一点,subscriber 对应的缓冲区容量阈值可以在 redis.conf 文件中进行配置,其默认值为:
client-output-buffer-limit pubsub 32mb 8mb 60
对应的含义是,倘若某个 subscriber 的缓冲区 buffer 大小达到 32MB,则 subscriber 会被踢下线;倘若缓冲区内数据量在连续 60s 内达到 8MB 大小,subscriber 也会踢下线.
聊到这里,我们发现不论是 redis 中的 list 还是 pub/sub 功能,各自都存在着比较明显的功能缺陷,都是无法被当作一个成熟的 mq 组件来使用的.
不过大家不用气馁,真正的重头戏马上就来. 接下来我们聊到的第三种实现方案 redis streams 才是真正意义上趋近于成熟的 mq 实现方案.
4 redis streams
从 redis 5.0 中,一个新的数据类型——streams 被推出了. 这种数据类型的目标正是直奔实现 mq 组件的功能而去的.
4.1 操作指令
首先,我们理一下,使用 redis streams 时涉及到的几个核心操作指令.
• 生产消息
首先是用于生产消息的指令,通过 XADD 指令往 topic 中投入一组 kv 对消息. 指令文档:https://redis.io/commands/xadd/
XADD my_streams_topic * key1 val1
"1638515664470-0"
XADD topic1 * key2 val2
"1638515672769-0"
• my_streams_topic:topic 名称
• *:消息自动生成唯一标识 id,基于时间戳+自增序号生成
• key1/val1、key2/val2:消息数据 kv 对
• 消费消息
接下来是用于消费消息的指令,通过 XREAD 指令从对应 topic 中获取消息. 指令文档:https://redis.io/commands/xread/
XREAD STREAMS my_streams_topic 0-0
1) 1) "my_streams_topic"
2) 1) 1) "1638515664470-0"
2) 1) "key1"
2) "val1"
2) 1) "1638515672769-0"
2) 1) "key2"
2) "val2"
• my_streams_topic: topic 名称
• 0-0:从头开始消费. 倘若这里填为某条消息 id,则代表从这条消息之后(不包含这条消息)开始消费
• 阻塞模式消费消息:
streams 支持在消费时,采用阻塞模式进行消费. 倘若存在数据则即时返回处理,否则会阻塞消费流程.
# BLOCK 0 表示阻塞等待时没有超时时间上限
XREAD BLOCK 0 STREAMS my_streams_topic 1638515672769-0
(nil)
• BLOCK:阻塞消费模式
• 0:阻塞等待超时时间,超过这个时长会返回 nil. 设置为 0 则表示不设置超时阈值
• 创建消费者组
streams 也支持发布订阅模式,能保证消息被多个消费者组 consumer group 同时消费到.
首先需要进行消费者组的创建. 指令文档:https://redis.io/commands/xgroup-create/
XGROUP CREATE my_streams_topic my_group 0-0
OK
• my_streams_topic:topic 名称
• my_group:消费者组名称
• 0-0:从头开始消费
• 基于消费者组消费消息
同一份数据在同一个消费者组下只会被消费到一次. 不同消费者组各自能获取到独立完整的消息数据.
通过 XReadGroup 指令,以消费者组的身份进行消费. 指令文档:https://redis.io/commands/xreadgroup/
XREADGROUP GROUP my_group consumer BLOCK 0 STREAMS my_streams_topic >
1) 1) "topic1"
2) 1) 1) "1638515664470-0"
2) 1) "key1"
2) "val1"
2) 1) "1638515672769-0"
2) 1) "key2"
2) "val2"
• consumer:消费者名称
• my_streams_topic:topic 名称
• block 0: 采用阻塞等待的模式,0 代表没有超时上限
• >: 读最新的消息 (尚未分配给某个 consumer 的消息)
还有另一种消费模式,读取的是已分配给当前消费者,但是还未经确认的老消息:
XREADGROUP GROUP my_group consumer STREAMS my_streams_topic 0-0
1) 1) "topic1"
2) 1) 1) "1638515664470-0"
2) 1) "key1"
2) "val1"
2) 1) "1638515672769-0"
2) 1) "key2"
2) "val2"
• 0-0:标识读取已分配给当前 consumer ,但是还没经过 xack 指令确认的消息
• 确认消息:
通过 xack 指令,携带上消费者组、topic 名称以及消息 id,能够完成对某条消息的确认操作. 文档链接:https://redis.io/commands/xack/
127.0.0.1:6379> XACK my_streams_topic my_group 1638515664470-0
(integer) 1
• my_streams_topic:topic 名称
• my_group:消费者组名称
• 1638515664470-0:消息 id
4.2 优缺点分析
下面针对基于 redis streams 实现 mq 的优劣势进行分析.
• 支持发布/订阅模式
redis streams 引入了消费者组 group 的概念,因此是能够保证各个消费者组 consumer group 均能够获取到一份独立而完整的消息数据.
• 数据可持久化
redis 中的 streams 和 string、list 等数据类型一样,都能够通过 rdb( redis database)、aof( append only file) 的持久化机制进行落盘存储,能够在很大程度上降低数据丢失的概率.
• 支持消费端 ack 机制
redis streams 中另一项非常重要的改进,是支持 consumer 的 ack 能力. consumer 在处理好某条消息后,能通过 xack 指令对该消息进行确认. 这样对于没经过 ack 确认的消息,redis streams 还是为 consumer 保留了重新消费的能力.
• 支持消息缓存
和 pub/sub 模式不同的是,redis streams 中会实际开辟内存空间用于存储 streams 中的数据. 因此哪怕某个 consumer group 是在消息生产之后才完成注册操作,也能够进行消息溯源,从 topic 起点开始执行消息的消费操作.
不过这里需要考虑的问题是,redis 基于内存实现消息数据的存储,倘若大量的消息数据被堆积在内存中,在资源使用上会存在很大的压力和很高的成本,严重时甚至可能发生 OOM 问题.
基于此,redis streams 支持在每次投递消息时,显式设定一个 topic 中能缓存的数据长度,来人为限制这个缓存空间的容量.
这里可以通过在 XADD 指令中加上 maxlen 参数,用于指定 topic 中能缓存的数据长度:
XADD topic1 MAXLEN 10000 * key1 val1
• MAXLEN 10000:最多缓存 10000 条数据
这样倘若 topic 数据容量超限,则新消息的到达会把最老的消息挤出队列,意味着也可能存在数据丢失的风险,因此大家在使用时需要合理设置 maxlen 参数.
4.3 整体对比分析
到这里为止,最后一个实现方案 redis streams 也介绍完毕,最后我们回过头对今天介绍过的几类 redis 实现 mq 的方案进行一轮总结.
mq 实现方案 | 发布/订阅能力 | 消费端ACK机制 | 消息缓存能力 | 数据丢失风险 |
list | 不支持 | 不支持 | 支持 | 低 |
pub/sub | 支持 | 不支持 | 不支持 | 高 |
streams | 支持 | 支持 | 支持 | 低 |
可以看到,在各项能力上 list 和 pub/sub 互有千秋,而 streams 可以说是兼具了各方面的优势,称得上是已经趋近于成熟的 mq 实现方案.
然而,大家应该能注意到,我在此处评价 streams 方案的数据丢失风险时,仅仅是评价为“低”,而不是“无”,这一点我接下来会继续加以说明.
下面我们再进一步拿 redis streams 和业界专业的 mq 组件进行对比,就以我比较熟悉的 kafka 组件为例,来看看 redis stream 存在哪些方面的优劣:
mq 组件 | 消息存储介质 | 消息分区/并发能力 | 数据丢失风险 | 运维成本 |
redis streams | 内存 | 不支持 | 低 | 低 |
kafka | 磁盘 | 支持 | 理论上不存在 | 偏高 |
可以看到,redis streams 在存储介质上需要使用内存,因此消息存储容量相对有限;且同一个 topic 的数据由于对应为同一个 key,因此会被分发到相同节点,无法实现数据的纵向分治,因此不具备类似于 kafka 纵向分区以提高并发度的能力.
此外,很重要的一个点是,基于 redis 实现的 mq 一定是存在消息丢失的风险的. 尽管在生产端和消费端,producer/consumer 在和 mq 交互时可以通过 ack 机制保证在交互环节不出现消息的丢失,然而在 redis 本身存储消息数据的环节就可能存在数据丢失问题,原因在于:
• redis 数据基于内存存储:哪怕通过最严格 aof 等级设置,由于持久化流程本身是异步执行的,也无法保证数据绝对不丢失
• redis 走的是 ap 高可用流派:为保证可用性,redis 会在一定程度上牺牲数据一致性. 在主从复制时,采用的是异步流程,倘若主节点宕机,从节点的数据可能存在滞后,这样在主从切换时消息就可能丢失
与之相对的,kafka 只要合理设置好 ISR(In Sync Replica) 有关参数,理论上在集群存在多数节点仍能正常运作的情况下,对应的消息数据是不会出现丢失的.
前面我们谈到了 redis 相比于传统 mq 组件的一些劣势,现在我们再来聊聊它具备的一些优势:就是相对轻量化,相比于传统 mq 组件有着更低的使用和运维成本.
因此,在实际的选型过程中,我们可以根据业务诉求进行抉择. 倘若业务流程对于数据的精度没有特别严格的要求,那此时使用 redis streams 这样一种轻量化的 mq 实现方案未尝不是一种好的选择和尝试.
5 redmq——纯 redis 实现的消息队列
从第 5 章开始,我们正式进入实战篇的部分.
遵循本文第 4 章基于 redis stream 实现 mq 的思路,我日前使用 go 语言,开源实现了一款 基于 redis 实现消息队列的客户端 sdk:redmq,该项目的开源地址为:https://github.com/xiaoxuxiansheng/redmq
5.1 实现架构
内容可以分为三个核心模块:
• redis client:封装了 redis streams 相关指令,包括 XADD、XREADGROUP、XACK
• producer:生产者,内置了 redis 客户端,通过 XADD 指令实现消息的生产投递
• consumer:消费者,内置了 redis 客户端,通过 XREADGROUP 指令实现消息的消费,通过 XACK 指令实现消息的确认
5.2 redis 客户端
5.2.1 redigo
本项目中,redis 客户端底层是基于开源的 golang redis 客户端 sdk:redigo 实现的,使用的源码版本为 v1.8.9,该项目的开源地址为:https://github.com/gomodule/redigo
5.2.2 客户端类
下面是关于客户端的实现,核心步骤通过代码注释加以说明:
// Client Redis 客户端.
type Client struct {
// 用户自定义配置
opts *ClientOptions
// redis 连接池
pool *redis.Pool
}
// 其中网络协议、redis 地址、redis 密码为必填项
func NewClient(network, address, password string, opts ...ClientOption) *Client {
c := Client{
opts: &ClientOptions{
network: network,
address: address,
password: password,
},
}
// 注入用户自定义的配置参数
for _, opt := range opts {
opt(c.opts)
}
// 对非法的配置参数进行修复
repairClient(c.opts)
// 创建 redis 连接池
pool := c.getRedisPool()
// 返回 redis 客户端实例
return &Client{
pool: pool,
}
}
// 获取 redis 连接池
func (c *Client) getRedisPool() *redis.Pool {
return &redis.Pool{
// 最大空闲连接数
MaxIdle: c.opts.maxIdle,
// 连接最长空闲时间
IdleTimeout: time.Duration(c.opts.idleTimeoutSeconds) * time.Second,
// 创建连接的方法
Dial: func() (redis.Conn, error) {
c, err := c.getRedisConn()
if err != nil {
return nil, err
}
return c, nil
},
// 最大活跃连接数
MaxActive: c.opts.maxActive,
// 当连接不够时,是阻塞等待还是立即返回错误
Wait: c.opts.wait,
// 测试方法
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
// 获取连接
func (c *Client) GetConn(ctx context.Context) (redis.Conn, error) {
return c.pool.GetContext(ctx)
}
// 获取 redis 配置
func (c *Client) getRedisConn() (redis.Conn, error) {
if c.opts.address == "" {
panic("Cannot get redis address from config")
}
// 注入密码
var dialOpts []redis.DialOption
if len(c.opts.password) > 0 {
dialOpts = append(dialOpts, redis.DialPassword(c.opts.password))
}
// 创建新的连接
return redis.DialContext(context.Background(),
c.opts.network, c.opts.address, dialOpts...)
}
5.2.3 投递消息
下面是用于投递消息的 XADD 指令的实现源码:
func (c *Client) XADD(ctx context.Context, topic string, maxLen int, key, val string) (string, error) {
// topic 名称不能为空
if topic == "" {
return "", errors.New("redis XADD topic can't be empty")
}
// 从 redis 连接池中获取连接
conn, err := c.pool.GetContext(ctx)
if err != nil {
return "", err
}
// 使用完毕后把连接放回连接池
defer conn.Close()
// 执行 XADD 指令,并返回生成的消息 id
return redis.String(conn.Do("XADD", topic, "MAXLEN", maxLen, "*", key, val))
}
5.2.4 消费新消息
消费消息时使用的是 redis streams 的 XREADGROUP 指令,其中又可以分为消费新消息(尚未分配给任何 consumer 的消息)以及消费未确认的老消息(分配给当前 consumer,但是还没经过 XACK 确认的消息).
func (c *Client) XReadGroup(ctx context.Context, groupID, consumerID, topic string, timeoutMiliSeconds int) ([]*MsgEntity, error) {
return c.xReadGroup(ctx, groupID, consumerID, topic, timeoutMiliSeconds, false)
}
在 xReadGroup 方法中,会根据用户传入的 pending 表示是否为 true,代表当前是消费处理新消息还是未经确认的老消息:
func (c *Client) xReadGroup(ctx context.Context, groupID, consumerID, topic string, timeoutMiliSeconds int, pending bool) ([]*MsgEntity, error) {
// 消费者组 id、消费者 id、topic 名称缺一不可
if groupID == "" || consumerID == "" || topic == "" {
return nil, errors.New("redis XREADGROUP groupID/consumerID/topic can't be empty")
}
// 从 redis 连接池中获取新连接
conn, err := c.pool.GetContext(ctx)
if err != nil {
return nil, err
}
// 使用完毕后返还连接
defer conn.Close()
var rawReply interface{}
// 倘若 pending 为 true,代表需要消费的是已分配给当前 consumer 但是还未经 xack 确认的老消息. 此时采用非阻塞模式进行处理
if pending {
rawReply, err = conn.Do("XREADGROUP", "GROUP", groupID, consumerID, "STREAMS", topic, "0-0")
} else {
// 倘若 pending 为 false,代表需要消费的是尚未分配给任何 consumer 的新消息,此时会才用阻塞模式执行操作
rawReply, err = conn.Do("XREADGROUP", "GROUP", groupID, consumerID, "BLOCK", timeoutMiliSeconds, "STREAMS", topic, ">")
}
if err != nil {
return nil, err
}
reply, _ := rawReply.([]interface{})
if len(reply) == 0 {
return nil, ErrNoMsg
}
replyElement, _ := reply[0].([]interface{})
if len(replyElement) != 2 {
return nil, errors.New("invalid msg format")
}
// 对消费到的数据进行格式化
var msgs []*MsgEntity
rawMsgs, _ := replyElement[1].([]interface{})
for _, rawMsg := range rawMsgs {
_msg, _ := rawMsg.([]interface{})
if len(_msg) != 2 {
return nil, errors.New("invalid msg format")
}
msgID := gocast.ToString(_msg[0])
msgBody, _ := _msg[1].([]interface{})
if len(msgBody) != 2 {
return nil, errors.New("invalid msg format")
}
msgKey := gocast.ToString(msgBody[0])
msgVal := gocast.ToString(msgBody[1])
msgs = append(msgs, &MsgEntity{
MsgID: msgID,
Key: msgKey,
Val: msgVal,
})
}
return msgs, nil
}
5.2.5 重复消费未确认消息
XReadGroupPending 方法用于消费未经 XACK 确认的老消息,内部也会调用 xReadGroup 方法,但是会把 pending 标识置为 true.
func (c *Client) XReadGroupPending(ctx context.Context, groupID, consumerID, topic string) ([]*MsgEntity, error) {
return c.xReadGroup(ctx, groupID, consumerID, topic, 0, true)
}
5.3 生产者
5.3.1 类定义
下面是关于生产者 producer 模块的实现:
// 生产者 producer 类定义
type Producer struct {
// 内置的 redis 客户端
client *redis.Client
// 用户自定义的生产者配置参数
opts *ProducerOptions
}
// 生产者 producer 的构造器函数
func NewProducer(client *redis.Client, opts ...ProducerOption) *Producer {
p := Producer{
client: client,
opts: &ProducerOptions{},
}
// 注入用户自定义的配置参数
for _, opt := range opts {
opt(p.opts)
}
// 对非法的配置参数进行修复
repairProducer(p.opts)
// 返回 producer 实例
return &p
}
有关于 producer 的一些配置项:
type ProducerOptions struct {
// topic 可以缓存的消息长度,单位:条. 当消息条数超过此数值时,会把老消息踢出队列
msgQueueLen int
}
type ProducerOption func(opts *ProducerOptions)
func WithMsgQueueLen(len int) ProducerOption {
return func(opts *ProducerOptions) {
opts.msgQueueLen = len
}
}
// 默认为每个 topic 保留 500 条消息
func repairProducer(opts *ProducerOptions) {
if opts.msgQueueLen <= 0 {
opts.msgQueueLen = 500
}
}
5.3.2 投递消息
producer 投递消息时,会使用到 redis 客户端中的 XADD 方法:
// 生产一条消息
func (p *Producer) SendMsg(ctx context.Context, topic, key, val string) (string, error) {
return p.client.XADD(ctx, topic, p.opts.msgQueueLen, key, val)
}
5.3.3 使用示例
下面展示一个使用 producer 进行消息投递的单测代码示例:
import (
"context"
"testing"
"github.com/xiaoxuxiansheng/redmq"
"github.com/xiaoxuxiansheng/redmq/redis"
)
const (
// 连接 redis 的传输层协议,默认为 tcp
network = "tcp"
// redis 的地址(ip:port)
address = "请输入 redis 地址"
// redis 服务器的密码,如果没设密码可以传为空串
password = "请输入 redis 密码"
// 投递到哪个 topic
topic = "请输入 topic 名称"
)
func Test_Producer(t *testing.T) {
// 创建一个 redis 客户端
client := redis.NewClient(network, address, password)
// 构造生产者 producer 实例,并限制一个 topic 最多保留 10 条消息
producer := redmq.NewProducer(client, redmq.WithMsgQueueLen(10))
ctx := context.Background()
// 投递消息
msgID, err := producer.SendMsg(ctx, topic, "test_k", "test_v")
if err != nil {
t.Error(err)
return
}
// 打印这条消息对应的 msg id
t.Log(msgID)
}
5.4 消费者
5.4.1 类定义
下面是关于消费者 consumer 的类定义:
// 消费者 consumer 类定义
type Consumer struct {
// consumer 生命周期管理
ctx context.Context
// 停止 consumer 的控制器
stop context.CancelFunc
// 接收到 msg 时执行的回调函数,由使用方定义
callbackFunc MsgCallback
// redis 客户端,基于 redis 实现 message queue
client *redis.Client
// 消费的 topic
topic string
// 所属的消费者组
groupID string
// 当前节点的消费者 id
consumerID string
// 各消息累计失败次数
failureCnts map[redis.MsgEntity]int
// 一些用户自定义的配置
opts *ConsumerOptions
}
// 消费者 consumer 构造器函数
func NewConsumer(client *redis.Client, topic, groupID, consumerID string, callbackFunc MsgCallback, opts ...ConsumerOption) (*Consumer, error) {
// cancel context,用于提供停止 consumer 的控制器
ctx, stop := context.WithCancel(context.Background())
// 构造 consumer 实例
c := Consumer{
client: client,
ctx: ctx,
stop: stop,
callbackFunc: callbackFunc,
topic: topic,
groupID: groupID,
consumerID: consumerID,
opts: &ConsumerOptions{},
failureCnts: make(map[redis.MsgEntity]int),
}
// 校验 consumer 中的参数,包括 topic、groupID、consumerID 都不能为空,且 callbackFunc 不能为空
if err := c.checkParam(); err != nil {
return nil, err
}
// 注入用户自定义的配置项
for _, opt := range opts {
opt(c.opts)
}
// 修复非法的配置参数
repairConsumer(c.opts)
// 启动 consumer 守护 goroutine,负责轮询消费消息
go c.run()
// 返回 consumer 实例
return &c, nil
}
关于 consumer 的一些自定义配置项:
type ConsumerOptions struct {
// 每轮阻塞消费新消息时等待超时时长
receiveTimeout time.Duration
// 处理消息的最大重试次数,超过此次数时,消息会被投递到死信队列
maxRetryLimit int
// 死信队列,可以由使用方自定义实现
deadLetterMailbox DeadLetterMailbox
// 投递死信流程超时阈值
deadLetterDeliverTimeout time.Duration
// 处理消息流程超时阈值
handleMsgsTimeout time.Duration
}
type ConsumerOption func(opts *ConsumerOptions)
func WithReceiveTimeout(timeout time.Duration) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.receiveTimeout = timeout
}
}
func WithMaxRetryLimit(maxRetryLimit int) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.maxRetryLimit = maxRetryLimit
}
}
func WithDeadLetterMailbox(mailbox DeadLetterMailbox) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.deadLetterMailbox = mailbox
}
}
func WithDeadLetterDeliverTimeout(timeout time.Duration) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.deadLetterDeliverTimeout = timeout
}
}
func WithHandleMsgsTimeout(timeout time.Duration) ConsumerOption {
return func(opts *ConsumerOptions) {
opts.handleMsgsTimeout = timeout
}
}
// 修复非法的配置参数
func repairConsumer(opts *ConsumerOptions) {
// 默认阻塞消费的超时时长为 2 s
if opts.receiveTimeout < 0 {
opts.receiveTimeout = 2 * time.Second
}
// 默认同一条消息最多处理 3 次,超过此次数,则进入死信
if opts.maxRetryLimit < 0 {
opts.maxRetryLimit = 3
}
// 如果用户没传入死信队列,则使用默认的死信队列,只会打印一下消息
if opts.deadLetterMailbox == nil {
opts.deadLetterMailbox = NewDeadLetterLogger()
}
// 投递消息进入死信队列的流程的超时时间
if opts.deadLetterDeliverTimeout <= 0 {
opts.deadLetterDeliverTimeout = time.Second
}
// 处理消息执行 callback 回调的超时时间
if opts.handleMsgsTimeout <= 0 {
opts.handleMsgsTimeout = time.Second
}
}
关于死信队列的定义:
// 死信队列,当消息处理失败达到指定次数时,会被投递到此处
type DeadLetterMailbox interface {
Deliver(ctx context.Context, msg *redis.MsgEntity) error
}
// 默认使用的死信队列,仅仅对消息失败的信息进行日志打印
type DeadLetterLogger struct{}
func NewDeadLetterLogger() *DeadLetterLogger {
return &DeadLetterLogger{}
}
func (d *DeadLetterLogger) Deliver(ctx context.Context, msg *redis.MsgEntity) error {
log.ErrorContextf(ctx, "msg fail execeed retry limit, msg id: %s", msg.MsgID)
return nil
}
接下来是 consumer 在消费消息后执行的回调函数:
// 接收到消息后执行的回调函数
type MsgCallback func(ctx context.Context, msg *redis.MsgEntity) error
5.4.2 消费消息
接下来是 consumer goroutine 轮询消费消息的流程,主要包括三个步骤:
• 消费新消息
• 投递失败次数超限的老消息进入死信
• 消费未确认老消息
对应流程示意如下:
下面是源码部分:
// 该方法会在 consumer 构造器函数中被异步启动
func (c *Consumer) run() {
// 通过 for 循环实现自旋模型
for {
// select 多路复用,保证在 Consumer stop 方法被执行时,该 goroutine 能及时退出
select {
case <-c.ctx.Done():
return
default:
}
// 接收处理新消息
msgs, err := c.receive()
if err != nil {
log.ErrorContextf(c.ctx, "receive msg failed, err: %v", err)
continue
}
// 接收到新消息后,执行对应的 callback 回调方法
tctx, _ := context.WithTimeout(c.ctx, c.opts.handleMsgsTimeout)
c.handlerMsgs(tctx, msgs)
// 把失败次数超限的老消息投递到死信队列
tctx, _ = context.WithTimeout(c.ctx, c.opts.deadLetterDeliverTimeout)
c.deliverDeadLetter(tctx)
// 接收之前就已分配给当前 consumer,但是还未得到 xack 确认的老消息
pendingMsgs, err := c.receivePending()
if err != nil {
log.ErrorContextf(c.ctx, "pending msg received failed, err: %v", err)
continue
}
// 接收到老消息后,执行对应的 callback 回调方法
tctx, _ = context.WithTimeout(c.ctx, c.opts.handleMsgsTimeout)
c.handlerMsgs(tctx, pendingMsgs)
}
}
consumer 在消费新消息时,使用到 redis 客户端的 XReadGroup 方法:
func (c *Consumer) receive() ([]*redis.MsgEntity, error) {
msgs, err := c.client.XReadGroup(c.ctx, c.groupID, c.consumerID, c.topic, int(c.opts.receiveTimeout.Milliseconds()))
if err != nil && !errors.Is(err, redis.ErrNoMsg) {
return nil, err
}
return msgs, nil
}
consumer 在消费未经确认的老消息时,使用到 redis 客户端的 XReadGroupPending 方法:
func (c *Consumer) receivePending() ([]*redis.MsgEntity, error) {
pendingMsgs, err := c.client.XReadGroupPending(c.ctx, c.groupID, c.consumerID, c.topic)
if err != nil && !errors.Is(err, redis.ErrNoMsg) {
return nil, err
}
return pendingMsgs, nil
}
5.4.3 执行回调
在 consumer 消费到消息后,需要执行对应的 callback 函数,此时倘若 callback 执行成功了,则需要调用 xack 方法进行确认答复;倘若 callback 执行失败,则需要对失败次数进行累加,倘若失败次数达到上限,则这条消息最终会被投递到死信队列中:
func (c *Consumer) handlerMsgs(ctx context.Context, msgs []*redis.MsgEntity) {
for _, msg := range msgs {
if err := c.callbackFunc(ctx, msg); err != nil {
// 失败计数器累加
c.failureCnts[*msg]++
continue
}
// callback 执行成功,进行 ack
if err := c.client.XACK(ctx, c.topic, c.groupID, msg.MsgID); err != nil {
// ack 失败的情况需要关注
log.ErrorContextf(ctx, "msg ack failed, msg id: %s, err: %v", msg.MsgID, err)
continue
}
// ack 成功了,从 map中清零对应消息的失败次数
delete(c.failureCnts, *msg)
}
}
5.4.4 投递死信
倘若某条消息失败次数达到上限,则会被投递到死信队列中,对应源码如下:
func (c *Consumer) deliverDeadLetter(ctx context.Context) {
// 对于失败达到指定次数的消息,投递到死信中,然后执行 ack
for msg, failureCnt := range c.failureCnts {
if failureCnt < c.opts.maxRetryLimit {
continue
}
// 投递死信队列
if err := c.opts.deadLetterMailbox.Deliver(ctx, &msg); err != nil {
log.ErrorContextf(c.ctx, "dead letter deliver failed, msg id: %s, err: %v", msg.MsgID, err)
}
// 对于被投递到死信队列的消息,需要执行 ack 操作,后续不需要再通过常规流程重复处理了
if err := c.client.XACK(ctx, c.topic, c.groupID, msg.MsgID); err != nil {
log.ErrorContextf(c.ctx, "msg ack failed, msg id: %s, err: %v", msg.MsgID, err)
continue
}
// 对于 ack 成功的消息,将其从 failure map 中删除
delete(c.failureCnts, msg)
}
}
5.4.5 停止消费
倘若使用方需要提前终止 consumer 的消费流程,则可以调用 Consumer.Stop 方法. 方法内部会执行 consumer 中内置的 stop 函数,本质上是 context 中的 cancel 函数,会通过停止 consumer 中 context 的方式,保证 consumer 的运行 goroutine 能正常退出.
// 停止 consumer
func (c *Consumer) Stop() {
c.stop()
}
5.4.6 使用示例
在进行 consumer 消费之前,有两个操作是需要前置执行的:
• 对应的 topic 需要提前创建好. topic 创建时机是通过首次执行 XADD 指令完成的
• 对应的 group 需要提前创建好. 可以通过 XGROUP CREATE 指令完成创建:
XGROUP CREATE my_streams_topic my_group 0-0
OK
下面给出使用 consumer 进行消费的单测示例代码.
import (
"context"
"testing"
"time"
"github.com/xiaoxuxiansheng/redmq"
"github.com/xiaoxuxiansheng/redmq/redis"
)
const (
// redis 传输层协议
network = "tcp"
// redis 服务器地址:ip:port
address = "请输入 redis 地址"
// redis 服务器密码,没有密码则设为空串
password = "请输入 redis 密码"
// topic 名称
topic = "请输入 topic 名称"
// 消费者组 id
consumerGroup = "请输入消费者组名称"
// 消费者 id
consumerID = "请输入消费者名称"
)
// 用户自定义实现的死信队列
type DemoDeadLetterMailbox struct {
do func(msg *redis.MsgEntity)
}
func NewDemoDeadLetterMailbox(do func(msg *redis.MsgEntity)) *DemoDeadLetterMailbox {
return &DemoDeadLetterMailbox{
do: do,
}
}
// 死信队列接收消息的处理方法
func (d *DemoDeadLetterMailbox) Deliver(ctx context.Context, msg *redis.MsgEntity) error {
d.do(msg)
return nil
}
func Test_Consumer(t *testing.T) {
// 创建 redis 客户端
client := redis.NewClient(network, address, password)
// consumer 接收到消息后的执行的 callback 回调处理函数
callbackFunc := func(ctx context.Context, msg *redis.MsgEntity) error {
t.Logf("receive msg, msg id: %s, msg key: %s, msg val: %s", msg.MsgID, msg.Key, msg.Val)
return nil
}
// 创建自定义实现的死信队列实例
demoDeadLetterMailbox := NewDemoDeadLetterMailbox(func(msg *redis.MsgEntity) {
t.Logf("receive dead letter, msg id: %s, msg key: %s, msg val: %s", msg.MsgID, msg.Key, msg.Val)
})
// 构造并启动消费者 consumer 实例
consumer, err := redmq.NewConsumer(client, topic, consumerGroup, consumerID, callbackFunc,
// 每条消息最多重试处理 2 次
redmq.WithMaxRetryLimit(2),
// 每轮接收消息的阻塞等待超时时间为 2 s
redmq.WithReceiveTimeout(2*time.Second),
// 注入自定义实现的死信队列
redmq.WithDeadLetterMailbox(demoDeadLetterMailbox))
if err != nil {
t.Error(err)
return
}
// 程序退出前停止 consumer
defer consumer.Stop()
// 十秒后退出单测程序
<-time.After(10 * time.Second)
}
至此,正文内容全部结束.
6 总结
本期和大家一起探讨了如何基于 redis 实现消息队列,其中实现方案包括三类:
• redis list:最简单粗暴的实现,存在问题包括:不支持发布/订阅模式、消费端缺少 ack 机制
• redis pub/sub:支持发布/订阅模式,有较高的丢数据风险,消费端同样不支持 ack 机制
• redis streams:趋近于成熟的 mq 实现方式. 支持发布/订阅模式,消费端能支持 ack 机制. 但是受限于 redis 自身的特性,仍无法杜绝丢失数据的可能性
最后,本人使用 go 语言,基于 redis streams 开源实现了一款 mq 客户端 sdk,开源地址:https://github.com/xiaoxuxiansheng/redmq,欢迎大家批评指正.