👉目录
1 引言
2 Kafka 宏观认知
3 Kafka 高可靠性探究
4 Kafka 高性能探究
5 其他知识探究
本文将带你探索 Kafka 的 ack 策略、数据持久化技术以及提升系统性能的关键设计,包括批量处理、压缩、PageCache 和零拷贝等技术。同时,文章还涵盖了负载均衡和集群管理,为你提供一个全面视角,理解 Kafka 如何满足大规模分布式系统中对消息队列的严苛要求。
01
异步解耦:同步调用转换成异步消息通知,实现生产者和消费者的解耦。想象一个场景,在商品交易时,在订单创建完成之后,需要触发一系列其他的操作,比如进行用户订单数据的统计、给用户发送短信、给用户发送邮件等等。如果所有操作都采用同步方式实现,将严重影响系统性能。针对此场景,我们可以利用消息中间件解耦订单创建操作和其他后续行为。 削峰填谷:利用 broker 缓冲上游生产者瞬时突发的流量,使消费者消费流量整体平滑。对于发送能力很强的上游系统,如果没有消息中间件的保护,下游系统可能会直接被压垮导致全链路服务雪崩。想象秒杀业务场景,上游业务发起下单请求,下游业务执行秒杀业务(库存检查,库存冻结,余额冻结,生成订单等等),下游业务处理的逻辑是相当复杂的,并发能力有限,如果上游服务不做限流策略,瞬时可能把下游服务压垮。针对此场景,我们可以利用 MQ 来做削峰填谷,让高峰流量填充低谷空闲资源,达到系统资源的合理利用。
02
Producer:生产者,负责消息的创建并通过一定的路由策略发送消息到合适的 Broker; Broker:服务实例,负责消息的持久化、中转等功能; Consumer:消费者,负责从 Broker 中拉取(Pull)订阅的消息并进行消费,通常多个消费者构成一个分组,消息只能被同组中的一个消费者消费; ZooKeeper:负责 broker、consumer 集群元数据的管理等;
topic:消息主题。Kafka 按 topic 对消息进行分类,我们在收发消息时只需指定 topic。 partition:分区。为了提升系统的吞吐,一个 topic 下通常有多个 partition,partition 分布在不同的 Broker 上,用于存储 topic 的消息,这使 Kafka 可以在多台机器上处理、存储消息,给 kafka 提供给了并行的消息处理能力和横向扩容能力。另外,为了提升系统的可靠性,partition 通常会分组,且每组有一个主 partition、多个副本 partition,且分布在不同的 broker 上,从而起到容灾的作用。 segment:分段。宏观上看,一个 partition 对应一个日志(Log)。由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据检索效率低下,Kafka 采取了分段和索引机制,将每个 partition 分为多个 segment,同时也便于消息的维护和清理。每个 segment 包含一个 .log 日志文件、两个索引(.index、timeindex)文件以及其他可能的文件。每个 Segment 的数据文件以该段中最小的 offset 为文件名,当查找 offset 的 Message 的时候,通过二分查找快找到 Message 所处于的 Segment 中。 offset:消息在日志中的位置,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量。offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
详情请访问地址:https://cloud.tencent.com/product/ckafka
03
消息从生产者可靠地发送至 Broker;-- 网络、本地丢数据。 发送到 Broker 的消息可靠持久化;-- PageCache 缓存落盘、单点崩溃、主从同步跨网络。 消费者从 Broker 消费到消息且最好只消费一次 -- 跨网络消息传输。
Producer 发送消息后,能够收到来自 Broker 的消息保存成功 ack; Producer 发送消息后,能够捕获超时、失败 ack 等异常 ack 并做处理;
Request.required.acks = 0:请求发送即认为成功,不关心有没有写成功,常用于日志进行分析场景。 Request.required.acks = 1:当 leader partition 写入成功以后,才算写入成功,有丢数据的可能。 Request.required.acks= -1:ISR 列表里面的所有副本都写完以后,这条消息才算写入成功,强可靠性保证。
异步发送
在主协程中调用异步发送 kafka 消息的时候,其本质是将消息体放进了一个 input 的 channel,只要入 channel 成功,则这个函数直接返回,不会产生任何阻塞。相反,如果入 channel 失败,则会返回错误信息。因此调用 async 写入的时候返回的错误信息是入 channel 的错误信息,至于具体最终消息有没有发送到 kafka 的 broker,我们无法从返回值得知。 当消息进入 input 的 channel 后,会有另一个 dispatcher 的协程负责遍历 input,来真正发送消息到特定 Broker 上的主 Partition 上。发送结果通过一个异步协程进行监听,循环处理 err channel 和 success channel,出现了 error 就记一个日志。因此异步写入场景时,写 kafka 的错误信息,我们暂时仅能够从这个错误日志来得知具体发生了什么错,并且也不支持我们自建函数进行兜底处理,这一点在 trpc-go 的官方也得到了承认。
同步发送
Broker 返回 Producer 成功 ack 时,消息是否已经落盘; Broker 宕机是否会导致数据丢失,容灾机制是什么; Replica 副本机制带来的多副本间数据同步一致性问题如何解决;
HW: High Watermark,高水位,表示已经提交(commit)的最大日志偏移量,Kafka 中某条日志“已提交”的意思是 ISR 中所有节点都包含了此条日志,并且消费者只能消费 HW 之前的数据; LEO: Log End Offset,表示当前 log 文件中下一条待写入消息的 offset;
Leader HW:min(所有副本 LEO),为此 Leader 副本不仅要保存自己的 HW 和 LEO,还要保存 follower 副本的 HW 和 LEO,而 follower 副本只需保存自己的 HW 和 LEO; Follower HW:min(follower 自身 LEO,leader HW)。
初始状态
HW L =0,LEO L =0,HW F=0,LEO F=0。
Follower 第一次 fetch
Leader 收到 Producer 发来的一条消息完成存储, 更新 LEO L =1; Follower从Leader fetch数据, Leader收到请求,记录follower的LEO F =0,并且尝试更新 HW L =min(全部副本 LEO)=0; leade 返回 HW L=0和 LEO L=1给 Follower,Follower 存储消息并更新LEOF =1, HW=min(LEO F ,HW L )=0。
Follower 第二次fetch:
Follower 再次从 Leader fetch 数据, Leader 收到请求,记录 follower 的 LEO F =1,并且尝试更新 HW L =min(全部副本 LEO)=1; leade 返回 HW L =1和 LEO L =1给 Follower,Leader 收到请求,更新自己的 HW=min(LEO F ,HW L )=1。
副本 B 作为 leader 收到 producer 的 m2 消息并写入本地文件,等待副本 A 拉取。 副本 A 发起消息拉取请求,请求中携带自己的最新的日志offset(LEO=1),B 收到后更新自己的 HW 为1,并将 HW=1的信息以及消息 m2 返回给 A。 A 收到拉取结果后更新本地的 HW 为1,并将 m2 写入本地文件。发起新一轮拉取请求(LEO=2),B收到A拉取请求后更新自己的 HW 为2,没有新数据只将 HW=2 的信息返回给 A,并且回复给 producer 写入成功。此处的状态就是图中第一步的状态。
A 和 B 均为 ISR 中的节点。副本 A 作为 leader,收到 producer 的消息 m2的请求后写入 PageCache 并在某个时刻刷新到本地磁盘。 副本 B 拉取到 m2 后写入 PageCage 后(尚未刷盘)再次去 A 中拉取新消息并告知 A 自己的 LEO=2,A收到更新自己的 HW 为1并回复给 producer 成功。 此时 A 和 B 同时宕机,B 的 m2 由于尚未刷盘,所以 m2 消息丢失。此时的状态就是第1步的状态。
Leader Epoch
Consumer 在消费消息的过程中需要向 Kafka 汇报自己的位移数据,只有当 Consumer 向 Kafka 汇报了消息位移,该条消息才会被 Broker 认为已经被消费。因此,Consumer 端消息的可靠性主要和 offset 提交方式有关,Kafka 消费端提供了两种消息提交方式:
04
Kafka 高性能的核心是保障系统低延迟、高吞吐地处理消息,为此,Kafaka 采用了许多精妙的设计:
异步发送。
批量发送。
压缩技术。
PageCache 机制&顺序追加落盘。
零拷贝。
稀疏索引。
broker & 数据分区。
多 reactor 多线程网络模型。
Kafka 支持批量发送消息,将多个消息打包成一个批次进行发送,从而减少网络传输的开销,提高网络传输的效率和吞吐量。Kafka 的批量发送消息是通过以下两个参数来控制的:
batch.size:控制批量发送消息的大小,默认值为16KB,可适当增加 batch.size 参数值提升吞吐。但是,需要注意的是,如果批量发送的大小设置得过大,可能会导致消息发送的延迟增加,因此需要根据实际情况进行调整。
linger.ms:控制消息在批量发送前的等待时间,默认值为0。当 linger.ms 大于0时,如果有消息发送,Kafka 会等待指定的时间,如果等待时间到达或者批量大小达到 batch.size,就会将消息打包成一个批次进行发送。可适当增加 linger.ms 参数值提升吞吐,比如10~100。
在 Kafka 的生产者客户端中,当发送消息时,如果启用了批量发送,Kafka 会将消息缓存到缓冲区中。当缓冲区中的消息大小达到 batch.size 或者等待时间到达 linger.ms 时,Kafka 会将缓冲区中的消息打包成一个批次进行发送。如果在等待时间内没有达到 batch.size,Kafka 也会将缓冲区中的消息发送出去,从而避免消息积压。
Kafka 支持压缩技术,通过将消息进行压缩后再进行传输,从而减少网络传输的开销(压缩和解压缩的过程会消耗一定的 CPU 资源,因此需要根据实际情况进行调整。),提高网络传输的效率和吞吐量。
Kafka 支持多种压缩算法,通过配置参数 compression.type(默认值为 none,表示不进行压缩)控制。在 Kafka 2.1.0版本之前,仅支持 GZIP,Snappy 和 LZ4,2.1.0后还支持 Zstandard 算法(Facebook 开源,能够提供超高压缩比)。这些压缩算法性能对比(两指标都是越高越好)如下:
吞吐量:LZ4>Snappy>zstd 和 GZIP
压缩比:zstd>LZ4>GZIP>Snappy。
在 Kafka 的生产者客户端中,当发送消息时,如果启用了压缩技术,Kafka 会将消息进行压缩后再进行传输。在消费者客户端中,如果消息进行了压缩,Kafka 会在消费消息时将其解压缩。注意:Broker 如果设置了和生产者不通的压缩算法,接收消息后会解压后重新压缩保存。Broker 如果存在消息版本兼容也会触发解压后再压缩。
transferTo/transferFrom
调用操作系统的 sendfile 实现零拷贝。总共发生 2 次内核数据拷贝、2 次上下文切换和一次系统调用,消除了 CPU 数据拷贝,如下:Kafka 的索引文件是按照稀疏索引的思想进行设计的。稀疏索引的核心是不会为每个记录都保存索引,而是写入一定的记录之后才会增加一个索引值,具体这个间隔有多大则通过 log.index.interval.bytes 参数进行控制,默认大小为 4 KB,意味着 Kafka 至少写入 4KB 消息数据之后,才会在索引文件中增加一个索引项。可见,单条消息大小会影响 Kakfa 索引的插入频率,因此 log.index.interval.bytes 也是 Kafka 调优一个重要参数值。由于索引文件也是按照消息的顺序性进行增加索引项的,因此 Kafka 可以利用二分查找算法来搜索目标索引项,把时间复杂度降到了 O(lgN),大大减少了查找的时间。
位移索引文件.index
位移索引文件的索引项结构如下:
相对位移:保存于索引文件名字上面的起始位移的差值,假设一个索引文件为:00000000000000000100.index,那么起始位移值即 100,当存储位移为 150 的消息索引时,在索引文件中的相对位移则为 150 - 100 = 50,这么做的好处是使用 4 字节保存位移即可,可以节省非常多的磁盘空间。
文件物理位置:消息在 log 文件中保存的位置,也就是说 Kafka 可根据消息位移,通过位移索引文件快速找到消息在 log 文件中的物理位置,有了该物理位置的值,我们就可以快速地从 log 文件中找到对应的消息了。下面我用图来表示 Kafka 是如何快速检索消息:
假设 Kafka 需要找出位移为 3550 的消息,那么 Kafka 首先会使用二分查找算法找到小于 3550 的最大索引项:[3528, 2310272],得到索引项之后,Kafka 会根据该索引项的文件物理位置在 log 文件中从位置 2310272 开始顺序查找,直至找到位移为 3550 的消息记录为止。
时间戳索引文件.timeindex
Kafka 在 0.10.0.0 以后的版本当中,消息中增加了时间戳信息,为了满足用户需要根据时间戳查询消息记录,Kafka 增加了时间戳索引文件,时间戳索引文件的索引项结构如下:
时间戳索引文件的检索与位移索引文件类似,如下快速检索消息示意图:
SocketServer 和 KafkaRequestHandlerPool 是其中最重要的两个组件:
SocketServer:实现 Reactor 模式,用于处理多个 Client(包括客户端和其他 broker 节点)的并发请求,并将处理结果返回给 Client。
KafkaRequestHandlerPool:Reactor 模式中的 Worker 线程池,里面定义了多个工作线程,用于处理实际的 I/O 请求逻辑。
整个服务端处理请求的流程大致分为以下几个步骤:
Acceptor 接收客户端发来的请求。
轮询分发给 Processor 线程处理。
Processor 将请求封装成 Request 对象,放到 RequestQueue 队列。
KafkaRequestHandlerPool 分配工作线程,处理 RequestQueue 中的请求。
KafkaRequestHandler 线程处理完请求后,将响应 Response 返回给 Processor 线程。
Processor 线程将响应返回给客户端。
05
Kafka 生产端的负载均衡主要指如何将消息发送到合适的分区。Kafka 生产者生产消息时,根据分区器将消息投递到指定的分区中,所以 Kafka 的负载均衡很大程度上依赖于分区器。Kafka 默认的分区器是 Kafka 提供的 DefaultPartitioner。它的分区策略是根据 Key 值进行分区分配的:
如果 key 不为 null:对 Key 值进行 Hash 计算,从所有分区中根据 Key 的 Hash 值计算出一个分区号;拥有相同 Key 值的消息被写入同一个分区,顺序消息实现的关键;
如果 key 为 null:消息将以轮询的方式,在所有可用分区中分别写入消息。如果不想使用 Kafka 默认的分区器,用户可以实现 Partitioner 接口,自行实现分区方法。
在 Kafka 中,每个分区(Partition)只能由一个消费者组中的一个消费者消费。当消费者组中有多个消费者时,Kafka 会自动进行负载均衡,将分区均匀地分配给每个消费者。在 Kafka 中,消费者负载均衡算法可以通过设置消费者组的 partition.assignment.strategy 参数来选择。目前主流的分区分配策略以下几种:
range: 在保证均衡的前提下,将连续的分区分配给消费者,对应的实现是 RangeAssignor;
round-robin:在保证均衡的前提下,轮询分配,对应的实现是 RoundRobinAssignor;
0.11.0.0版本引入了一种新的分区分配策略 StickyAssignor,其优势在于能够保证分区均衡的前提下尽量保持原有的分区分配结果,从而避免许多冗余的分区分配操作,减少分区再分配的执行时间。
欢迎校招和社招同学加入极客星球圈子,一起学习,做更好的自己:
修炼基本功:分享多年基础技术深度理解,基础概念深度解析,经典书籍推荐和读书分享,阅读经典开源软件源码等;
扩展技术和商业视野:分享海内外热门技术发展,大厂技术内幕,业界解决方案;
校招/社招就业指导:简历优化,面试指导,各类后端学习路线就业指导等;
职场普升/技术专家:分享宝贵的职场经验,如何成为核心员工, 团队合作,大厂做事经验,快速普升,技术专家等;
专属交流群:开阔眼界,分享学习心得,问题答疑解惑,讨论交流!
深入理解计算机系统
深入理解操作系统(调度,内存,网络,IO)
深入理解并发技术全景指南
深入理解编程语言
深入理解算法与数据结构
深入理解网络协议
深入理解网络编程
深入理解性能优化
深入理解分布式技术
深入理解AI技术全景图
深入理解数据库
深入理解代码设计
深入理解架构设计
详细了解:极客星球,扫描下面的优惠劵加入,一起学习,相互激励,一起进步。
坚持分享干货内容,欢迎大家关注极客重生
感谢大家在看,转发,点赞
推荐阅读;