太强了! 这才是Apache Kafka 消息清理正确方式!

文摘   2024-10-12 00:00   重庆  

01

   前言   

消息的清理是 MQ 中间件的基本能力,可以避免 MQ 的存储占用空间无序增长。与其他消息产品不同,Apache Kafka(以下简称 Kafka) 中 topic 上的消息被消费后不会被马上清除,而是由 topic 级别的清理策略来控制。本文将简要介绍 Kafka 中的两种消息清理策略:deletion 和 compaction,探讨他们的应用场景、配置参数以及一些技术细节。

AutoMQ[1] 是与 Apache Kafka 100% 兼容的新一代云原生 Kafka,对 Kafkfa 的存储层进行了重新设计和实现,使得其可以构建在像S3这样的对象存储之上。得益于 AutoMQ 对 Apache Kafka 的完全兼容,本文中提到的原理和参数对于 AutoMQ 也同样适用。

02

名词定义

  • 消息:Kafka 官方一般称为 event 或 record。一个 event 包含一个 key(可选)和一个 value(消息本体);

  • 消息 batch:Kafka 会将多个消息聚合为一个 batch。具体来说,client 以 batch 形式向服务端生产或消费消息,服务端也按照 batch 进行消息的存储;

  • topic partition: topic 的一个分区。Kafka 中一个 topic 会被划分为多个 topic partition,以支持消费端和服务端的负载均衡;

  • segment: Kafka 中消息存储的基本单位。一个 topic partition 会被划分为多个 segment。它也是消息清理的基本单位;

03

Deletion or Compaction

我们可以在 Kafka 中为 topic 配置“cleanup.policy”参数,以指定它的清理策略。可选项包括:

  • delete默认策略,当 segment 的大小或者时间达到阈值后直接删除;

  • compact基于 key 的压缩策略,绑定同一个 key 的多个消息将仅保留最新的那个消息,其他消息将被删除。Kafka 的内部 topic "__consumer_offsets" 就是 compact 策略。

  • delete + compact混合策略,老的 segment 会因为大小或时间被删除,同时 topic partition 也会被 compact。

一般来说,如果你的业务关注的是 key 的终态 value(也就是 KV 之类的场景),例如记录用户每日的行走步数,或者账户的余额,那么 compact 比较适合[2]。此外,compact 策略下,业务的 key 最好是可枚举的少数值。key 取值过于分散将会导致 compaction 效果大打折扣,这种场景可以考虑采用 delete + compact 的策略。如果没有明显的 KV 特征,一般采用 delete 策略即可。

注:Kafka 支持修改 topic 的清理策略,无需重启。

04

 清理涉及的线程

Kafka 中以下线程会执行清理逻辑:

  • Scheduler 线程: 执行 "kafka-log-retention" 任务,定时检查纯 delete 策略的 topic 是否需要清理;

  • LogCleaner 持有的 N 个 CleanerThread: 执行 log compaction,同时执行 “delete + compact”混合策略下相关 topic 的 deletion;

其中 LogCleaner 需要将 server 侧的“log.cleaner.enable”配置为 true 才会开启(从 0.9.0.1 版本开始默认即为 true)。

以下将分别讨论这两类清理动作的细节。


4.1 Scheduler 触发的清理

Scheduler 定期执行执行 "kafka-log-retention" 任务,在该任务中将基于时间或者大小触发相关 segment 的删除。

4.1.1 涉及参数

除了上文的“log.cleaner.enable”,还包括:

  • log.retention.hours:Kafka 中消息保留的时间,默认取值为 168,也就是保留一个星期。前文提到过,清理的基本粒度是 segment,因此,只有在 segment 中最新的消息都超过了保留时间时,整个 segment 才会被删除;类似的其他时间粒度的参数还有 log.retention.minutes、log.retention.ms;

  • log.retention.bytes:topic partition 中最多保留的消息大小,默认取值 -1,也就是不做大小限制;

  • log.retention.check.interval.msv"kafka-log-retention" 任务的执行间隔,默认取值 300000,即 5 min;

4.1.2 清理流程

过滤纯 delete 策略的 topic partition;

调用 kafka.log.UnifiedLog#deleteOldSegments(),清理三类 segment:

  • deleteLogStartOffsetBreachedSegments: 删除 baseOffset <= logStartOffset 的 segment;

  • deleteRetentionSizeBreachedSegments: 基于大小删除多余的 segment;

  • deleteRetentionMsBreachedSegments: 基于时间删除过期的 segment;


4.2 LogCleaner 触发的清理

LogCleaner 是为了支持 compaction 而引入的组件,由“log.cleaner.enable”控制开启。LogCleaner 持有多个 CleanerThread,每个线程相对独立,基于 key 清理过时的消息。

compaction 策略下消息也可以被“删除”。如果 key 最新的 value 为 null,则 server 侧会将其视为删除的“声明”,并将在墓碑过期(见后文)后彻底删除该 key。

关于 compaction,Kafka 可以提供如下保障[3]:

  1. 追尾读的 consumer 可以消费到所有生产者写入的消息,这些消息有着连续的 offset;

  2. 消息的顺序不会变更,compaction 只是移除部分消息;

  3. 消息的 offset 不会变化,offset 一旦产生就是持久化的;

  4. 从头开始消费的 consumer 至少可以消费到终态的 value;

最后一条代表两个含义:

  • 最新的 value 一定会保存(除非是 null 值);

  • 有可能消费到 key 对应的早期的 value,原因可能是还没达到 compaction 的条件,也可能是相关消息带有墓碑标记(见后文)暂未删除;

4.2.1 涉及参数

除了上文的“log.cleaner.enable”,较为重要的配置还有:

  • log.cleaner.min.compaction.lag.ms: 参与 compaction 的消息需要满足的最小生存时间,可以避免较新的消息参与 compaction。默认值 0;

  • log.cleaner.max.compaction.lag.ms: 触发 compaction 满足的生存时间阈值,主要为了让不活跃的 topic partition 也能参与到 compaction。默认值 9223372036854775807,即不开启本特性;

  • log.cleaner.min.cleanable.ratio: topic partition 的 dirty 比例,超过阈值才有可能参与 compaction。更低的值意味着更高的清理频率。默认值 0.5;

  • delete.retention.ms: 带有墓碑标记的消息的“死缓”时间。默认值 86400000,即一天;

  • log.cleaner.threads: LogCleaner 持有的 CleanerThread 数目。默认值为 1;

  • log.cleaner.backoff.ms: CleanerThread 没有扫描到可 compaction 的 topic partition 后的静默时间。默认值 15000(15s);

4.2.2 清理流程

在开始之前,先介绍几个概念:

  • active segment: 当前活跃的 segment,也是最新的 segment,可以接受新消息的写入;

  • cleaned segments: 此前 compaction 的产物,这些 segments 中的所有消息,不存在重复的 key;

  • dirty segments: cleaned  segments 之后,active segment 之前的 segment。这些 segment 没有经历过 compaction。

大体可以分为:

  1. 待 compact 的 topic partition 的筛选;

  2. topic partition 的 compaction;

  3. 配置了“delete + compact”混合策略的 topic partition 执行 deletion;

Topic Partition 的筛选

本阶段,将从 server 端筛选出最“脏”的 topic partition,本质上,Kafka 希望过滤出以下 topic partition:

  • 存在 dirty 消息,但是不活跃的 topic partition,避免它们长期不被清理;

  • 活跃的,并超过了清理阈值的 topic partition;

大体逻辑如下:

其中:

  • inProgress lock: 用于线程之间的锁定,防止一个 topic partition 被多个 CleanerThread 选中,同时防止 topic partition 在清理策略变更时被 Scheduler 线程和 CleanerThread 同时选中;

  • partition uncleanable: 表示线程清理过程中发生过预期外的异常(非 ThreadShutdownException 或 ControlThrowable),将这些 partition “拉黑”;

  • firstDirtyOffset: 一般从 checkpoint 文件中读取,值为上次清理的最后位点 + 1;

  • firstUncleanalbeDirtyOffset: 不可清理的起始位点,取值为 min{log.lastStableOffset, log.activeSegment.baseOffset, 不满足 minCompactionLagMs 的 segment.baseOffset};

  • needCompactionNow: 取值为 (now - min(dirty segment 的 FirstBatchTimestamp)) > maxCompactionLagMs。为 true 往往意味着这个 topic partition 太久没有被命中了;

  • 取 max 值: 实际上就是取 dirty ratio 最高的 topic partition,一个 CleanerThread 一次仅对一个 topic partition 做 compaction;

  • dirty ratio = ([firstDirtyOffset, firstUncleanalbeDirtyOffset) 之间消息的大小)/([startOffset, firstUncleanalbeDirtyOffset) 之间消息的大小);

Partition 的 compaction

本阶段将构建 offsetMap,其中 key 为消息的 key,value 为绑定该 key 的最新消息的 offset。并根据该 map,分批构建新 segment。大致流程如下:

首先,在 [firstDirtyOffset, firstUncleanalbeDirtyOffset) 区间内构建 offsetMap。注意,此 map 不包含任何控制信息以及中断事务的消息。为了避免区间过长导致 map 无限膨胀,offsetMap 大小是受限的(所有 CleanerThread 的 offsetMap 总共的内存占用不可超过 128 MB)。因此,最终 offsetMap 的 latestOffset < firstUncleanalbeDirtyOffset。

接着,在[0, offsetMap.latestOffset] 区间内,将所有待清理的 segment 进行分组,每个 group 中的总的 logSize、indexSize、timeIndexSize 不可以超过相应的 topic config。预期每个 group 对应一个清理后的 segment。

之后,基于 offsetMap,为每个 group 中的 segment 中所有的 batch 进行过滤,写入到新 segment 中,不同 batch 中 record 的删除规则为:

  1. ControlBatch: 空 batch 或者空事务对应的 ControlBatch(事务数据在此前已经删除完毕),且墓碑已超时;

  2. dataBatch: 满足以下条件中的一个:

  • 中断事务中的消息;

  • record.offset() < offsetMap.get(key) 或者 (value 为 null,且墓碑超时);

上面提到的墓碑是一种两阶段删除手段,是 Kafka 为了让下游的消费者能够有机会完整地获取所有消息引入的一种机制。也就是说,如果消费者必须在 "delete.retention.ms" 时间内从头消费到最新位点,才可以完整“回放”所有消息。从效果上来说,墓碑相当于给事务 marker 和 null value 消息判了“死缓”。

需要注意的是,对于 magic 值 >= 2 的新版本 batch,墓碑时间会在第一次 compaction 时打入 batch。对于 magic 值 < 2 的老版本 batch,是依据 segment 的最后修改时间来近似推断是否墓碑超时的。

最后,过滤完毕的 record 写入新 segment。新 segment 上线,老 segment 被删除。将 offsetMap.latestOffset + 1 存入 clean offset checkpoint 文件。

整体来看,compaction 后,log startOffset 会小幅前进,同时原有的消息会进行一定“压缩”:

Segment deletion

与 Scheduler 触发的删除类似,配置了“delete + compact”混合策略后的 topic partition 也会删除旧的 segment,只是删除由 CleanerThread 完成。删除逻辑不再赘述。

05

总结

本文介绍了 Kafka 中消息的两种清理策略。首先从整体上介绍了 Kafka 中的两种消息清理策略,讨论了业务 topic 清理策略的选择问题。接着简要介绍了清理涉及的线程。最后,分别讨论了 Scheduler 和 LogCleaner 触发的清理的参数和技术细节。对于 delete 策略,会基于 partition size 或者消息过期时间进行 segement 的直接删除;对于 compact 策略,会构建 offset map,并基于该 map 保留同一个 key 对应的最新 value。

参考资料

[1] AutoMQ: https://www.automq.com

[2] Kafka Topic Configuration: Log Compaction: https://www.conduktor.io/kafka/kafka-topic-configuration-log-compaction/

[3] Compaction guarantees https://docs.confluent.io/kafka/design/log_compaction.html#compaction-guarantees




   往期推荐   


AWS 弹性伸缩特性介绍


2024-07-29

AutoMQ 开源可观测性方案:夜莺 Flashcat


2024-07-26

如何通过 CloudCanal 实现从 Kafka 到 AutoMQ 的数据迁移

2024-07-25

百行代码实现 Kafka 运行在 S3 之上


2024-07-23


END


关于我们

我们是来自 Apache RocketMQ 和 Linux LVS 项目的核心团队,曾经见证并应对过消息队列基础设施在大型互联网公司和云计算公司的挑战。现在我们基于对象存储优先、存算分离、多云原生等技术理念,重新设计并实现了 Apache Kafka 和 Apache RocketMQ,带来高达 10 倍的成本优势和百倍的弹性效率提升。


🌟 GitHub 地址:https://github.com/AutoMQ/automq

💻 官网https://www.automq.com

👀 B站:AutoMQ官方账号

🔍 视频号:AutoMQ



👉🏻 扫二维码  

加入我们的社区群



关注我们,一起学习更多云原生技术干货!

👇🏻点击下方阅读原文,前往 GitHub 了解体验!



大数据技能圈
分享大数据前沿技术,实战代码,详细文档
 最新文章