揭秘eBay Kafka跨数据中心高可用方案

文摘   2023-08-25 10:58   新加坡  




01



Forewords

前 言



伴随着互联网业务的快速迭代和日趋成熟,数据的快速流动和实时分析处理一直是推动业务创新的重要驱动力。快速捕捉业务数据,敏捷分析数据特征,实时获取业务洞见并反馈到业务运营,这个闭环的运行效率决定了业务竞争力。Kafka作为数据流动的枢纽引擎,因其出色的性能表现和丰富的开发生态,逐渐成为数据接入和数据分发的不二选择。


Kafka作为分布式数据引擎,单数据中心部署的性能和稳定已得到了生产验证。但如何在多数据中心之间保证服务和数据的高可靠性和连续性,业界还没有统一便捷的方案。为了搭建高可用的数据链路,Kafka开发者往往需要在应用端构建复杂的探活能力和切换机制,以确保当Kafka集群甚至数据中心不可用时,数据链路仍然能连续运行且数据完备。这种复杂的应用逻辑和维护成本,给应用开发带来了负担,同时,故障发生时的切换流程又经常不可避免的引入人为操作,导致数据链路中断。因此,提供应用无感的Kafka跨数据中心高可用方案极具业务价值。



02



Backgrounds

背景介绍



Rheos作为eBay的实时大数据平台,提供了Kafka、Flink和Rheos SQL三种业务产品。在eBay,Kafka支撑了公司内部准实时(Near-Real-Time)数据链路的各大业务组,如广告、支付及风控、市场营销、AI和搜索等等。发展至今,eBay内部共有Kafka生产集群超百个,数据日接入超万亿条。

 

eBay的数据中心主要分布在US三个区域:LVS (Las Vegas)SLC (Salt Lake City)RNO (Reno)。Kafka集群的部署方式主要有三种:多地独立 (multi-standalone)、两地互备 (active-active with mirroring) 和多地互备 (local-aggregation with mirroring)。每种部署方式中各Kafka集群逻辑上组成了一个Stream,同一个Stream内各Kafka集群复用相同的配置和元数据 (Topic、ACL)。各部署方式的拓扑结构和上下游的接入方式分别描述如下。



多 地 独


图1. 多地独立Stream的Kafka集群拓扑



在上图所示部署方式中,三个数据中心的Kafka集群互相独立,上游producer application和下游consumer application协调一致的接入点,以确保数据端到端流通。如果上游应用需要切换接入点,下游应用需同步切换。这种部署方式一般适用于部门内各业务线的数据架构解耦。




两 地 互 备


图2. 两地互备Stream的Kafka集群拓扑



在上图所示的部署方式中,两个数据中心的Kafka集群互为备份。针对每个topic,指定其中一个数据中心作为主,另一中心为备,中间由Mirrormaker组件负责从主往备实时复制数据。上游应用只能向topic的主数据中心发数据,而下游应用可以自行从任一数据中心读该topic。当上游应用需要切换数据中心写入时,需同时更换写入topic,下游应用也要适应性的更换topic以继续读(下游应用也可同时订阅两个topic,即上图中的topicN.slc和topicN.lvs,来避免这种更换)。



多 地 互 备


图3. 多地互备Stream的Kafka集群拓扑



在上图所示的部署方式中,一个Stream的各Kafka集群分为两层:local和aggregation。local层Kafka集群的数据由Mirrormaker组件实时往aggregation层Kafka集群复制,因此,aggregation层的每个集群包含了local层各集群的数据总和。上游应用往local层的Kafka集群写入,下游应用从aggregation层的Kafka集群接收。上下游应用可以灵活选取同数据中心的接入点访问以实现更低延迟,且上下游应用切换接入点完全解耦。



03



Architecture

架构设计


设 计 理 念


在实现跨数据中心高可用这个挑战上,多地独立模式相对简单,但是各中心数据互相孤立,下游应用切换中心时无法回溯历史数据;两地互备模式两中心都包含全量数据,但是上游应用切换接入点以及元数据管理比较复杂;多地互备模式各中心都包含全量数据,且客户接口简单直观,灵活扩展多中心部署以实现更高级别的灾备场景。

 

在设计应用无感的跨数据中心高可用方案时,我们尽量保持客户接口跟原生Kafka的接口相似,这样熟悉Kafka应用开发的客户在实施上线时便能一目了然,极大程度降低学习成本。


综合考虑,我们采用多地互备模式来实现该方案。



架 构 概 览


方案的总体架构表示如下图。Dsicovery Service负责各Kafka集群的健康监测,并上报结果到数据库。PubSubLib定期调用Discovery Service来感知当前客户端所连集群是否健康,以决定是否需要切换集群。我们基于开源的Mirrormaker2组件来实时跨集群数据拷贝,并定制了模块定期快照源端和目标端的offset映射关系,Offset Management Service基于此计算aggregation层各数据中心的Kafka集群间offset的寻址关系,以便consumer切数据中心时无缝实现断点续读。接下来我们将深入剖析各组件的设计和实现细节。


图4. 跨数据中心高可用方案总体架构



组 件 剖 析

元数据探测服务(Discovery Service)


该组件的核心功能在于定期探测平台内各集群的健康状况,同时,为注册的producer和consumer选择合适的目标集群。为了尽可能真实地反映集群健康状态,探测服务基于Kafka原生协议来探测Kafka服务是否健康响应,以避免漏报场景比如Kafka服务的端口可达但是服务不可用。探测服务的架构概览可表述如下图。


图5. 元数据探测服务架构图


探测服务本身设计为类无状态服务,服务实例部署于各数据中心,以避免单点故障。为实现更好的扩展性,每个服务实例只负责一部分集群的探活任务。各实例中竞选一个作为leader,负责为每个实例分配探活任务。服务的总体工作流程可描述为:


1)检测服务组当前是否已经选出leader,若无则竞选leader,根据竞选结果决定自己为leader或follower;

2)如果是leader,则检查当前服务组在册的服务实例并收集需要探活的所有Kafka集群,为每个服务实例分配相应的探活任务;

3)每个服务实例接收分配到的任务,并发向任务中指定的各Kafka集群节点发送Kafka metadata request以探活,如果能在指定时间内收到response,则标记该节点为可用,当一个集群超过半数节点可用且可用数量不少于3时,声明该集群为健康;

4)服务实例将探活结果存入数据库,并根据集群健康状态按需更新注册的consumer和producer的目标集群以供客户端查询。



每个服务实例都以此流程周期性地执行任务,当前配置了周期间隔为1分钟。服务组前置了Loadbalancer供客户端调用,服务实例在收到客户端请求后读库查询该客户端关联的集群状态,结合客户端属性(如producer的co-locality配置)返回相应的目标集群。同时,服务实例将本次探测结果存入本地缓存,以应对来自同一个应用里的各客户端瞬时高并发探测请求。



状态管理服务(Offset Management Service)


在Kafka跨数据中心高可用的架构中,我们采用的是local-aggregation多地互备的Kafka部署方式。应用将数据写入local层集群,经由Kafka Mirrormaker复制到aggregation层各集群,下游应用从aggregation层消费kafka数据。虽然aggregation层各集群都有全量的数据,但是由于数据从local层各集群并行复制交叉汇入以及复制过程间或消息重发,每个aggregation集群相同数据所在的消息位移(offset)都是不同的。因此当consumer在不同的aggregation集群之间做切换时,如果直接从之前集群的offset继续消费则会有数据漏消费的风险。在我们的方案中,实现了Offset Management Service,用来保证在consumer切换aggregation集群时不会丢失数据,达到at-least-once语义的目标。


Offset Management Service的工作流程

图6. Offset Management Service工作流程



Offset Management Service的工作流程如图6所示,主要分为如下几个步骤:


1)Offset mapping fetcher模块用于获取每个Local集群与Aggregation集群对于同一条数据的消息位移值 (offset),组成一组上下游的消息位移映射 (offset mapping),我们将所有的消息位移映射存储到offset mapping store,作为消息位移计算的依据。

2)Group offset fetcher模块用于定时获取每个消费组(consumer group)正在消费的offset,以上图为例,图中应用正在消费slc的数据,模块将定期跟踪该消费组在slc aggregation集群上的offset。

3)Group offset calculator模块根据应用在当前aggregation集群上的offset,计算出在其他aggregation集群上最大且不会丢数据的offset,并提交到相应集群上。



整个行为从全局视角来看,就是应用在消费某一个aggregation集群数据时,Offset Management Service将监护该应用消费组(consumer group)并向其他aggregation集群提交一个最大的不会丢失数据的消息位移,这样当消费组切换集群时,就会从之前Offset Management Service已提交的消息位移开始消费,实现无缝断点续读。


应用在实施该方案时,使用的是平台定制的PubSubLib,该Lib结合Discovery Service确保同一个消费组里的各实例在同一时段都从同一个aggregation集群消费。由此确保Offset Management Service从相应的aggregation集群上获取消费组最新的offset且准确提交到剩余的各集群。



上下游消息位移映射的获取和存储


Kafka local层(上游)集群到aggregation层(下游)集群的数据复制我们采用的是Mirrormaker组件,并且对Mirrormaker做了一些定制,使其能够定期快照记录local和aggregation集群的offset映射关系,并写入local Kafka集群的internal topic。Offset Management Service通过监听这些内部主题获得所有数据中心local和aggregation的消息位移关系,并存入offset mapping store。每条offset mapping的数据内容如下图所示,额外记录的各上游集群当前最大offset (log end offset, LEO)的用途将在下一小节讨论offset计算时详细展开。


图7. 消息位移映射表内容



对于offset mapping 的存储我们有两种实现:

  • 基于数据库的实现

  • 基于本地内存 + Kafka持久化的实现


在项目前期我们将offset mapping存储在MySQL中,这样的好处是实现简单且方便debug,利用组合索引也能达到不错的查询性能。但是随着topic和partition数增加,offset mapping表的数据量急剧膨胀,轻易就能达到千万级别,使得MySQL的插入和查询性能下降。


考虑到offset mapping内容天然的时效属性以及只读特征,我们将offset mapping的存储改为本地内存 + Kafka持久化的实现方案,以提高系统的可伸缩性,满足更大数据量的场景。当Offset Management Service收到了offset mapping之后会将数据缓存到本地内存并同时发送到用于存储offset mapping的Kafka集群。当服务重启后,会从该Kafka集群加载未过期的offset mapping进内存。所有offset的查询都是直接在内存中查询返回结果,保证了查询性能。Offset Management Service在我们的设计中是可以灵活扩展的,并且根据stream做数据分片。在服务端我们有一套offset mapping的过期清理机制,以避免内存无限膨胀。



跨集群消息位移的计算


正如文章此前所述,aggregation层各集群分别从local层各集群复制数据交叉汇入,local层的某条消息经复制后在各aggregation层集群的offset是不一致的。借助offset mapping表,Offset Management Service可以把aggregation集群的某个offset换算成各local集群的offset。由此,根据aggregation集群C的offset1计算aggregation集群D的offset2时,只要保证集群D offset2之前的消息对应各local集群的offset都小于等于集群C offset1之前的消息对应各local集群的offset,就可以保证consumer group从集群C offset1切到集群D offset2继续读数据时,在C集群上所有未读消息在D集群上都能读全。


图8. 一个topic partition跨集群消息位移的换算


图8是一个双DC集群计算offset的例子。集群A/B是两个local集群,数据复制到C/D两个aggregation集群,右边是一张offset mapping表。由于记录每条数据offset mapping的数据量巨大,我们采取的是定期记录。下面我们将分步骤讲解计算过程,根据图8集群C的offset 8计算集群D的offset为例:


1)根据集群C offset ‘8’(下称C8)找到对应各local集群的offset:查表发现,C8的offset mapping不存在,继续在offset mapping表中向前寻找C中小于offset ‘8’的映射记录,找到了 B3 -> C5 和 A4 -> C7 这两个offset mapping。

2)根据1)得到的各local集群的offset分别找集群D的offset:分别查找A4和B3在表中对应到D的offset,能找到 A4 -> D8,B集群则向前寻找查到了 B2 -> D5。

3)从2)拿到的offset候选值中取最小值:D8 和 D5 之间取小,故对于C8计算得到D的offset为5。


以上的计算方法在所有local集群都有流量的时候可以很好的工作,但是当某个local集群无流量时,该算法会得到一个相当小的offset,使得消费组进行aggregation集群切换时会重复消费大量数据。为了减少重复消费,我们对算法进行了增强,在计算的过程中排除了数据流量已经停止的local集群


在记录每条offset mapping的时候,我们会获取各local集群此时对于同一个分区的最大offset (log end offset, LEO),并一同记录在offset mapping表中。当拿到一个offset mapping时,如果上游offset和该DC该分区的LEO相同,则表明这条offset mapping记录是数据流量停止前最后一条数据,可以作为流量停止的依据。


图9. 引入LEO后一个topic partition跨集群消息位移计算


图9是加入了LEO之后的offset计算例子,右边offset mapping表多了一列用于记录所有上游集群的LEO,下面还是以图9集群C的offset 8计算集群D的offset为例,讲解新的计算过程:


1) 根据C8找到对应各local集群的offset:根据C8向前能够找到A4 -> C7和B2 -> C6这两个offset mapping。

2) 根据1)得到的各local集群offset分别找对应D的offset:分别查到的映射记录为A4->D6 (A:5,B:2)和B2->D3 (A:3,B:2)。

3) 根据2)得到的结果判断是否有流量停止的local集群以进行结果剪枝:对比可见,存入这两条映射记录之间local B无新数据汇入(LEO无变化),且local B的最后一条记录已汇入到D3 (LEO==B2),此时可以安全的忽略B2->D3这条搜索结果。

4) 根据3)剪枝后的结果中取最小值:摒弃掉D3之后仅剩D6,即为找到的最终值。


对于上述例子,C8计算到的D集群offset为6。如果不做剪枝,结果即为D3,且后续为C中更大offset值换算D的offset时,得到的结果都将是D3,势必引入大量不必要的重复消息。



客户端SDK (Pub-Sub Library)


有了元数据探测服务(Discovery Service),客户端就能根据选定的健康集群自动连接。我们只需要在客户端加入一个定时的探测线程来查询元数据探测服务,便可以让应用中所有的producer或consumer实例得知当前各个Kafka集群的可达性状况。每次收到元数据探测服务的返回结果后,HA Client会对比本地存储的上一轮探测的结果,发现两者有变化时便会触发一次集群切换的动作。


Rheos提供了三种接口来使用Rheos HA Client,分别是Generic Kafka HA ClientFlink HA Connector以及Rheos SQL



Generic Kafka HA Client


为了实现快速切换,我们在每个HA Producer实例中预初始化了到各健康local集群的KafkaProducer,并指定其中一个作为主producer,其余作为备用producer。当检测到需要切换Kafka集群时,从备用producer列表里选定一个健康的producer迅速升级为主producer。


HA Consumer的切换逻辑类似于HA Producer,不同点在于,HA Consumer中只存在一个Kafka Consumer,即主Kafka的consumer。这么做的原因在于,各aggregation集群都是全量数据,HA consumer只能从其中一个读取。此外,状态管理服务(Offset Management Service)会定期向备用Kafka同步主Kafka的consumer group offset,这就要求在备用Kafka中不能有该consumer group的消费实例,否则consumer group offset便无法写入目标Kafka中。


图10. Generic Kafka HA Client 切换示意



针对producer,为了使发送消息更可靠,我们额外提供一种机制,当消息在发往主Kafka失败时,HA Producer会重试发往另一个备用Kafka集群,以应对集群健康因素之外的其他间歇失败。背后的机制是,当主producer发送数据时,HA Client会创建一个回调函数。当收到了异常返回时,这个回调函数会调用备用producer来做一次数据的重发。要启用这个功能,只需要在创建HA Producer的时候开启跨集群重试(Cross DC Retry)这个选项即可。跨集群重试能够带来更可靠的消息传递,但有一个副作用,那就是它会破坏数据在同一个partition中的顺序。因为当数据被备用producer转发到另一个备用的Kafka之后,在MirrorMaker中它们被视为来自不同数据源,会由不同的读写任务来mirror数据,所以也就无法保证数据原有的发送顺序。



Flink Kafka HA Connector 与 Rheos SQL


Flink Kafka HA Connector基于Flink-1.13开发。其中Kafka HA Sink Connector实现相对简单,基于SinkFunction的Sink Connector,我们将社区的KafkaSinkFunction中的client替换为了上文所述HA client,这样便使得Kafka Sink Connector具有了主备Kafka集群自动切换的能力。Source Connector的实现则相对复杂。自Flink-1.12,社区引入了新的Source接口(FLIP-27),其中引入了Enumerator,Split,SourceReader等概念。Enumerator会负责从数据源产生Split并把它们分配给各个SourceReader。基于分配到的Split,SourceReader会去读取相应的数据。在我们Kafka HA Source Connector的实现中,除了基本的Split创建和分配,Enumerator还充当了元数据探测线程的角色。在运行过程中,Enumerator会定期查询元数据探测服务来检测主Kafka是否发生切换,一旦发现切换,Enumerator会生成一个集群切换通知(SwitchClusterEvent),并发送给所有的SourceReader。SourceReader在接收到集群切换通知后,会先停止当前KafkaConsumer(此时consumer可能已经遇到了问题停止消费了),并根据所分配topic-partition的当前offset去状态管理服务查询其在新的主Kafka中相应的offset,之后便会启动一个新的KafkaConsumer从新的主Kafka消费数据。


图11. Flink Kafka Ha Source Connect 架构示意


在Flink job创建checkpoint(savepoint同理)的过程中,Kafka HA Source Connector会将Split的分配结果、当前主Kafka的信息和对应topic-partition的offset都写入checkpoint中,以便在job异常重启时恢复数据消费。为了防止主Kafka切换后,之前生成的checkpoint失效。当Flink Job从checkpoint恢复时,如果Enumerator发现checkpoint中存储的主Kafka与当前探测到的主Kafka不同,Enumerator会向SourceReader发送集群切换通知,在收到集群切换通知后,SourceReader会根据存储在checkpoint中的offset向状态管理服务请求新的主Kafka中相应的offset,并从转换后的offset恢复数据消费。


对于Rheos SQL Connector,SQL API是对于Flink API的高层次封装,底层的实现一致。



04



Summary

总 结


本文完整讨论了基于local-aggregation集群拓扑, 设计Kafka跨数据中心高可用方案的思路,同时支撑了上下游数据和服务的高可用和连续性。跨区域高可用一直是Kafka社区热门讨论的话题,目前业界尚未形成统一标准方案。笔者期望本文内容能抛砖引玉,能给同行一些借鉴和参考。


eBay技术荟
eBay技术荟,与你分享最卓越的技术,最前沿的讯息,最多元的文化。
 最新文章