kafka底层原理浅析

文摘   2023-11-10 11:00   江苏  
  1. 背景

    最近做一个业务,大致逻辑为从OSS下载文件(很大,单个上G),将文件保存的绝对路径(和其他信息)写入一个input.json文件后调用算法部门同学编写的脚本,然后解析脚本的输出文件output.json并入库。

    日常工作中和算法交互其实蛮频繁的,但这次因为涉及到()文件下载,定时任务每次处理完一批数据整个过程显得比较长(业务部门等不了),且当前我们Java这边启动的实例有3台,如果只在某一台触发,也没有很好的利用其他机器。


  2. 前置

    关于Java如何调用C或Python脚本,可移步到《Mybatis批量更新SQL如何写》篇幅底部查看,想搭建kafka集群环境的可参考《ZooKeeper 集群环境搭建与底层原理》和一定要看的《Kafka 集群环境搭建踩过的坑》两篇文章,包会的那种哦!


  3. 类比

    nginx可以做请求的负载均衡,那如果不是外部请求,内部每天触发的job能不能也有一个东东能根据当前生产部署的机器做到类似nginx的功能的,答案是肯定的。如上我的逻辑就可以优化为,定时触发Job运行,运行过程中先将数据分组作为消息发到kafka,然后自产自消,多个实例(包含自己)就可以对这类消息进行消费,这样既缩短了跑批时间又充分利用了资源(业务大拆小——解耦)

    注:这里说的负载均衡指Job已经确定在某台机器跑起来了,并非Job运行前,因为运行前xxl_job已经可以实现针对集群的各种路由策略;另外,消息一定要分组,要考虑幂等性等问题。


  4. 主题

    上面铺垫了很多,开始进入本篇的主题——kafka,结合另一篇环境搭建,整理下当前大致的架构模型如下图:

    名词解释:

    broker:一台kafka就是一个broker,多个可以组成一个集群,一个broker可包含多个topic.

    topic:根据业务区分划定的类别(主题),可以拆分成多个patition分布在单个或多个broker上.

    patition:具体存储数据的物理分区且数据有序,kafka保证一个patition中的数据按顺序下发给consumer(若需保证整个topic的消费有序由客户端考虑或设置分区数为1).

    leader:负责某个partition的读写,(针对)每个分区都有一个leader.

    follower:接收leader指令拉取消息并更新自身数据,作为patition的数据副本以防数据丢失,冗余容错().

    producer:生产并发布消息到broker的客户端.

    consumer:读取kafka消息的客户端.

    consumer group一个组包含1个或多个消费(一个消费者只能属于一个消费组),组内消费者共享一个topic的所有分区消息。


    来个栗子:

    创建一个名为hello的topic,指定分区数和本数都为3,通过查看topic详情得出以下结论:

    有几个分区展示几行信息。

    Leader和Follower都是针对partition的概念,如partition 0 ,leader为broker.id=1的节点,其它同理。

    Replicas显示了针对Patition而言对应的副本节点列表(无论节点是否为leader)

    Isr(In-Sync Replicas)展示与leader已同步副本且状态是存活的节点列表。


  5. kafka工作流程

  6. 上图例子topic hello 有3个partition,换个角度来看更好理解:

     之所以要分区是为了支持:

    高扩展:partition根据机器调整,一个topic由N个partition组成,集群扩展就可以适应任意大小的数据量。

    高并发:以partition为单位读写,意味着可并发向一个topic的不同partition进行读写操作。

    高可用:数据以多个副本存在可保证故障发生时(数据还在),leader不能处理请求时,新的leader将会从follower中重新产生然后继续处理逻辑。


    生产者往topic中发送消息,数据真正存储在patition中,通过源码可以看见,消息发送到哪个分区大概有3种分区策略:

    指定partition参数,消息直接存储到对应的partition。

    没有partition但有key,则用key的hash值与topic的partition数量取模确定存放消息的partition。

    即无partition又无key,则使用轮询方式(首次生成一个随机数与partition数量取模,后续依次自增)


    每个partition类似一个数组,消息永远(追加)写在最后一个offset下,offset无论是生产者发送消息还是消费者处理消息都是从小到大的变化,用于唯一标记一条消息。(消费组中)每个消费者都会与kafka建立长连接,不断拉取消息进行处理并记录自己已消费的offset,以便故障恢复后继续消费;在kafka v0.9之前,消费者默认将offset保存在zk中,在v0.9之前消费者默认将offset保存在kafka内置的topic中,当kafka收到应答(ack),则将offset修改为新值,并同步在zk中更新它。


  7. kafka底层文件

    一个topic可以拆成N个partition用来负载均衡,append到partition中的消息以顺序写磁盘方式(比随机写效率高)保存在硬盘中,kafka进一步将partition以segment为单位的粒度进行细分:



    如图所示(hello-0,hello-1,hello-2里面都一样),segment文件分:

    .index:offset索引文件,用来记录.log文件中消息的相对偏移量和物理位置;跟clickhouse一样,文件使用了稀疏索引来减少文件大小(每隔log.index.interval.bytes=4kb大小的数据量记录一个索引)

    .log:生产者写入的消息体本身。

    .timeindex:与.index功能一样,只不过以时间作为索引用来快速定位数据。


    也可以用命令查看数据:

    .\bin\windows\kafka-run-class.bat kafka.tools.DumpLogSegments --files .\kafka_logs\kfk\hello-1\00000000000000000000.log --print-data-log

    index文件只有两列:offset、position

    offset:表示这条数据在当前segment文件中的位置,是这个文件的第几条。Position:这条数据在当前segment 文件中的物理偏移量。


    segment划分规则只要满足时间log.roll.hours=168(7 x 24)或文件大小log.segment.bytes=1073741842(1G)就会构建新的segment。

    文件命名规则为:partition全局第一个segment从0开始,后续每个segment文件(长度为20个数字字符)为上一个segment文件最后一条消息的offset值。

    假如我有大量的消息,那么上面partition目录下就可能成对的出现很多.index和.log文件。


    消息定位:每个partition下消费者在消费后记录了最新的offset,继续消费会提交offset给kafka,然后根据segment下的文件名包含offset,用二分查找定位到对应的index文件,再根据index文件中的元数据定位到具体消息偏移量后从.log文件读取。

    若发送的最后一条消息(在内存中)的offset为119,查找流程大致如下:



  8. ACK机制

    无论是生产端还是消费端,ack是防止消息丢失的重要手段,kafka提供了三种级别供用户在可靠性和延迟间选择:

    request.required.acks=0,生产者不等kafka(完成leader与follower间的副本同步) act,不停发送消息,所有消息只会被发送一次(at most once),该模式效率高,但可靠性低。所有

    request.required.acks=1(default),生产者接收leader完成本地日志后的ack,但暂未与ISR(in-sync replicas)列表中的节点同步数据副本,该模式效率和可靠性都适中。

    request.required.acks=-1,ISR列表中的所有节点已完成副本同步后向生产者回复ack,如果一直收不到会被认为失败重发消息(at least once),有可能导致数据重复,该模式效率低可靠性高。

    min.insync.replicas=n,当且仅当acks=-1时该配置有效,用于表明最少需要完成n个副本同步再ack,否则当ISR中的副本小于n时会触发异常。


  9. ISR如何选举

    topic创建时通过--replication-factor指定副本数量集合,统称为AR(Assigned Replicas),leader维护着ISR列表,follower与leader同步数据超过(replica.lag.time.max.ms)配置会被移出ISR列表存入OSR(Outof-Sync Replicas)所以:AR-副本=ISR-可用副本+OSR-备用副本;ISR列表包含leader+与leader成功保持同步的followers,当leader发生故障时,从ISR中选举新的leader,ISR始终动态保持稳定。


  10. 消费者消费方式

    消息被投递有2种方式:推-push或拉-pull,强制推有可能因为消费端处理的能力不一样,导致在有的节点来不及处理,有可能造成网络拥堵或拒绝服务;拉会根据自身能力出发以一定频率请求,不足之处为:如果没有消息,则产生空轮询,浪费资源。

    consumer端有以下2种方式解决上述问题

    控制消息方式:可单条消费也可以批量消费(当批量消费时,offset默认是自动提交,需要变更为:false)

    超时返回机制:消费者消费同时传入时间,当前如果没有消息则在等待指定时间后再返回。

    拉模式需要注意2点:如果消费线程数大于partition数量,则有的消费者收不到消息;如果partition数大于消费者数量,则有的消费者会收到多个partition消息,无法保证topic内消息的顺序(单个partition中消息有序)


  11. 消费者分区策略

    一个消费组中有多个消费者,一个topic有多个partition,二者如何对应?kafka提供了三种策略:RoundRobin、Range(default)、Sticky,用户可通过partition.assignment.strategy指定。

    Range策略:用topic的总分区数除以消费者线程,得出每个消费者要处理的partition(除不尽时前面的线程处理余出来的partition)。

    RoundRobin策略:先把所有topic的partition和消费者线程排序,通过轮询的方式一一匹配;如果一个组内的某个消费者没有订阅某个topic,则对应的topic partition不会轮询分配给该消费者,导致分配不平衡,应该很少有同学这样配置吧

    Sticky(粘黏)策略:该模式在v0.11中引入,目的为:a.保证分区尽可能均匀,消费者间的分区数相差最多1个;b.分区尽可能与上次分配相同。所有消费者状态都正常时分配结果与RoundRobin类似,当某个消费者故障时与其对应的partition会再平衡到其它消费者,对于原消费者处理的分区尽量保证不变,避免消费者切换导致处理一半的分区会重新处理,浪费系统资源。

  12. kafka思想

    个人理解:kafka在大数据下的高效、可靠的表现,在于不停的分(段)治,从开始发送消息到最细粒度的存储,都在想办法尽量的让消息(顺序——读写操作时间复杂度都为O(1))存储在一个尽量小“区域”中,将生成的索引常驻内存配合上一定的算法,使得消息得以快速处理。这种思想值得每一个同学在面对(实际)问题要解决方案时借鉴


    “无论在什时候,永远不要以为自己已知道了一切”,期待看到的同学指点一二本文参考链接如下:

    https://zhuanlan.zhihu.com/p/366141522

    https://zhuanlan.zhihu.com/p/444798458

    如果看到的同学觉得有用,点赞、关注、转发搞一个阔以不


晚霞程序员
一位需要不断学习的30+程序员……