美团面试:10Wtps,Kafka为啥那快?kafka 零复制 Zero-copy 如何实现?

文摘   科技   2024-08-30 23:06   湖北  
FSAC未来超级架构师

架构师总动员
实现架构转型,再无中年危机


尼恩说在前面

在40岁老架构师 尼恩的读者交流群(50+)中,最近有小伙伴拿到了一线互联网企业如得物、阿里、滴滴、极兔、有赞、希音、百度、网易、美团的面试资格,遇到很多很重要的零复制的问题:
  • 美团面试:Kafka为啥那么快(每秒上十万QPS) ?什么是kafka零复制?
  • 说一说Rocketmq、是如何实现每秒上百万数据的超高并发写入的?
  • 说一说Rocketmq、如何实现每秒上十万QPS的超高吞吐量的读取的?
  • 说一说 Rocketmq、的零复制(/零拷贝)原理
  • 说一说 Kafka 是如何实现每秒上百万数据的超高并发写入的?
  • 说一说 Kafka如何实现每秒上十万QPS的超高吞吐量的读取的?
  • 说一说 Kafka 的零复制(/零拷贝)原理
最近有小伙伴在面试 美团,又遇到这一个问题。小伙伴支支吾吾的说了几句, 没说清楚,挂了。
所以,尼恩给大家做一下系统化、体系化的梳理,使得大家内力猛增,可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”,然后实现”offer直提”。
当然,这道面试题,以及参考答案,也会收入咱们的 《尼恩Java面试宝典PDF》V171版本,供后面的小伙伴参考,提升大家的 3高 架构、设计、开发水平。
最新《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》的PDF,请关注本公众号【技术自由圈】获取,回复:领电子书
关于kafka的面试题,这里奉上尼恩架构团队之前写的一篇高性能的文章
网易一面:单节点2000Wtps,Kafka怎么做的?
同时,与本文配套,尼恩决定给大家写一个0复制 系列,帮助大家吊打面试官:

本文目录

尼恩说在前面

Kafka采用了两种零拷贝技术

回顾一下mmap零复制  

 - mmap (memory-map)

如何通过mmap查询索引找到具体的消息数据

 - 分区目录

 - 分段日志和索引

 - 消息日志与索引关系

 - 消息日志(.log)

 - 偏移量索引(.index)

 - 时间索引(.timeindex)

 - 位移索引

通过索引查询消息过程

 - 改进的二分查找

 - Log 类采用跳跃表(SkipList)管理 LogSegment 对象

写入索引项的方法

 - 时间戳索引

 - 写入时间戳索引的索引项

 - 位移索引和时间戳索引的区别是什么?

sendfile 是最高性能的零复制技术

 - 传统拷贝,从磁盘读取文件并发送到网络的流程

 - 零拷贝,从磁盘读取文件并发送到网络的流程

Kafka的log 文件 与日志格式的演变

 - v0版本

 - v1版本

 - v2版本

 - 消息的批量生产

消息的log 文件存储

Kafka 写入日志的步骤

 - 生产者发送消息时的 消息集

 - Kafka 日志追加方式

 - broker如何 分析和验证消息集?

 - broker如何为消息集分配绝对偏移量?

Kafak通过顺序写实现写入的高性能

Kafak通过sendfile 零拷贝实现发送的高性能

 - 消费者从broker拉取数据

Kafka为啥达到100Wtps高性能?

说在最后:有问题找老架构取经


Kafka采用了两种零拷贝技术

Kafka之所以能够快速地处理大量数据,其中一个重要原因就是它采用了零拷贝(Zero-copy)技术。
Kafka采用了两种零拷贝技术来提高性能:mmap和sendfile。
Kafka采用了两种零拷贝技术, 主要有两个大的场景:
  • Broker 读写index 文件,用了 mmap零复制
  • Broker 向Consumer发消息,用了 sendfile 零复制
当然,零拷贝并不是Kafka的专利,而是操作系统的能力 ,又比如Netty,Rocketmq 都用到了零拷贝,这个后面尼恩会详细给大家做展开的介绍。
不过,零拷贝技术可以减少不必要的数据拷贝次数,从而提高数据传输效率,所以,也是面试的绝对重点, 各位看官,尤其是要转架构的, 一定要好好掌握。

回顾一下mmap零复制

mmap可以把文件映射到进程的虚拟内存空间,实现对文件的读取和修改而不需要用传统的read和write系统调用。这样可以减少一次数据拷贝,并且不同的虚拟内存地址可以指向同一个物理内存,实现数据共享

mmap (memory-map)

mmap (memory-map) 可以把文件映射到进程的虚拟内存空间。通过对这段内存的读取和修改,可以实现对文件的读取和修改,而不需要用read和write系统调用,但是这一切都需要操作系统在幕后工作(异步处理)。如下图所示,为mmap实现原理的示意图。
可以看到,通过mmap ,用户进程空间中某一块虚拟内存与内核中的物理内存(PageCache)形成映射,而这块物理内存与目标文件的某一块形成映射。
用户进程读取文件的过程不是传统的read系统调用,而是直接访问的PageCache,如果没有数据,系统会把文件的内容读取过来缓存起来,应该说就是利用的内核中的缓存区。
Java中的mmap底层 是通过JNI调用C , C语言中的mmap函数为:
void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);

addr:指定映射的起始地址,通常设为NULL,由内核来分配
length:代表将文件中映射到内存的部分的长度。
prot:映射区域的保护方式。可以为以下几种方式的组合:
PROT_EXEC 映射区域可被执行
PROT_READ 映射区域可被读取
PROT_WRITE 映射区域可被写入
PROT_NONE 映射区域不能存取
flags:映射区的特性标志位,常用的两个选项是:
MAP_SHARD:写入映射区的数据会复制回文件,且运行其他映射文件的进程共享
MAP_PRIVATE:对映射区的写入操作会产生一个映射区的复制,对此区域的修改不会写会原文件
fd:要映射到内存中的文件描述符,有open函数打开文件时返回的值。
offset:文件映射的偏移量,通常设置为0,代表从文件最前方开始对应,offset必须是分页大小的整数倍。
函数返回值:实际分配的内存的起始地址。

与mmap函数成对使用的,是 munmap函数,它是用来解除映射的函数,
Java中的munmap底层 是通过JNI调用C , C语言中的munmap函数为:
int munmap(void *start, size_t length)

start:映射的起始地址
length:文件中映射到内存的部分的长度
返回值:解除成功返回0,失败返回-1。

mmap零拷贝技术通过内存映射文件的方式,减少了数据在用户空间和内核空间之间的拷贝次数,从而提高了数据传输效率。
以下是mmap零拷贝的一般流程:
  1. 打开文件:首先,使用open系统调用打开需要进行内存映射的文件,并获取文件描述符(file descriptor)。
  2. 创建内存映射:通过mmap系统调用将文件的部分或全部内容映射到进程的地址空间。mmap的典型调用如下:
    cvoid *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
  3. 访问映射的内存:一旦内存映射建立,进程就可以直接访问映射的内存区域,就像访问普通内存一样。对这部分内存的读写操作将直接影响到文件的内容。
  4. 同步数据到磁盘:如果需要确保对映射区域的修改被写回到文件中,可以使用msync系统调用。这步操作是可选的,取决于应用是否需要立即将数据同步到磁盘。
  5. 解除映射:当不再需要映射时,使用munmap系统调用来解除映射,释放资源:
    cint munmap(void *start, size_t length);
  • start:映射区域的起始地址。
  • length:映射的长度。
  • 关闭文件:最后,使用close系统调用关闭文件描述符。
  • mmap零拷贝的优势在于:
    • 减少了数据在用户空间和内核空间之间的拷贝次数,因为数据可以直接在内核空间的缓冲区和用户空间的映射内存之间共享。
    • 减少了系统调用的开销,因为不需要频繁地进行readwrite操作。
    • 可以提高大文件传输和处理的效率。
    尼恩特别提示: mmap技术在进行文件映射的时候,一般有大小限制,在1.5GB~2GB之间
    所以,在很多消息中间件,会限制文件的大小。
    mmap技术可以将磁盘文件映射到内存中,用户通过修改内存就能修改磁盘文件。这样可以避免将数据从内核态拷贝到用户态再拷贝回内核态的过程,从而提高数据传输效率。

    如何通过mmap查询索引找到具体的消息数据

    以kafka_2.13-2.8.0为例,分析Kafka消息在磁盘上的存储结构、配置以及如何通过索引找到具体的消息数据。
    既然是日志索引相关的问题,正好以此来分析存储模块下的索引文件:

    分区目录

    一个分区(Partition)有1到多个副本(Replica),是主从结构,
    主(Leader)负责处理读写请求,从(Follower)只负责同步数据并在主宕机的时候顶替主实现高可用。
    在Kafka数据目录下存放着各分区目录(Partition),名称格式为 topic-partitionNo,如test-0代表名为test的Topic的0号分区。分区目录下存放消息的文件。

    分段日志和索引

    Kafka的消息是分段(Segment)存储在文件里的,当达到配置指定的条件就会创建新的分段文件。
    每个分段都都对应消息日志(.log),偏移量索引(.index)和时间索引(.timeindex)三个文件,
    文件名为起始偏移量(Offset),代表这个文件第一条消息的偏移量值。
    以下是日志分段和索引创建的配置项,详情见 Apache Kafka Broker 配置
    除了log.index.interval.bytes只影响单个索引的创建时机,其他配置都会触发日志分段。
    配置项默认值单位描述
    log.roll.ms
    null
    毫秒
    新日志段滚出的最大时间。如果未设置,则使用log.roll.hours中的值
    log.roll.hours
    168
    小时
    新日志段滚出的最大时间,从属于log.roll.ms属性
    log.segment.bytes
    1073741824(1G)
    B
    单个日志文件的最大大小
    log.index.size.max.bytes
    10485760(1MB)
    B
    偏移索引的最大字节数
    log.index.interval.bytes
    4096(4 KB)
    B
    将一个项添加到偏移索引中的间隔

    消息日志与索引关系

    Kafka数据最终都会保持在磁盘上,对于消息有三个关键的文件消息日志(.log),偏移量索引(.index)和时间索引(.timeindex)。
    消息日志保存的是消息的原数据,接收到的生产者(Producer)的消息会以追加的方式顺序写到这个文件中,顺序写效率远高于随机写,减轻了磁盘寻址压力。这是Kafka使用磁盘做存储却能保证高性能的原因之一。
    每个消息都会有一个自增的偏移量值,从0开始,每条消息都递增这个值,所以偏移量代表即将到来的下一条消息的偏移量值。
    Kafka中索引有偏移量索引和时间索引两种。
    它没有为每一条消息建立索引,那样索引文件会太过于庞大,而是分段建立,所以一个索引只能指明消息所在位置的范围,最终要在这个范围遍历查找。
    时间索引指向的是偏移量索引,偏移量索引指向了消息日志二进制位置。通过时间戳或者偏移量最终都可以定位到消息的具体位置。
    可以通过配置参数 log.index.interval.bytes控制两个索引间隔的字节数,超过这个大小就建立新索引。这个值越小,索引越密集,查询快但是文件体积大。

    消息日志(.log)

    通过消息日志(.log)可以看到每条消息具体的内容。
    # 只输出消息日志描述信息
    kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.log

    # 输出消息日志完整信息
    kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.log --print-data-log
    可以看到下图这个消息日志起始偏移量(Starting offset)是23147,代表这个日志第一条消息的偏移量,这个偏移量同时也是消息日志和两个索引文件的文件名。
    每n条消息组成一批(batch),每一批消息对应有一个描述信息,记录了这批消息的大小,偏移量范围baseOffset和lastOffset,位置(position)以及大小(batchSize)等信息。
    描述信息下面就是对应这一批具体的消息。如下图:

    偏移量索引(.index)

    # 查看偏移量索引内容
    kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.index
    偏移量索引是稀疏结构,每隔一段记录一条消息的索引。
    Offset指消息的偏移量,position指这个偏移量的消息所在的一批(batch)消息在.log中的起始二进制位置。

    时间索引(.timeindex)

    # 查看时间索引内容
    kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.timeindex1.2.
    时间索引也是稀疏结构,每隔一段记录一条消息的索引。
    时间戳(timestamp)指这条消息的创建时间,Offset指这个消息的偏移量。
    上面这条指令同时会输出根据时间戳索引查找消息的结果,比如创建时间为1632390207745的消息偏移量为23388,这条消息所在那一批消息的起始偏移量(Indexed offset / baseOffset:)为23388,终止偏移量(found log offset / lastOffset:)为23390,这一批消息一起有23390 ~ 23388 = 3条消息。
    Kafka通过MappedByteBuffer将索引文件映射到内存中,来加快索引的查询速度。

    位移索引

    不同索引类型保存不同的<Key , Value>对,对OffsetIndex位移索引而言,Key就是消息的相对位移,Value保存该消息的日志段文件中该消息第一个字节的物理文件位置。
    • 偏移量索引文件:
      定义:
    对于偏移量索引文件,保存的是 <相对偏移量,物理地址> 的对应关系,文件中的相对偏移量是单调递增的。
      查找:
    查询指定偏移量对应的消息时,使用改进的二分查找算法来快速定位偏移量的位置,
    如果指定的偏移量不在索引文件中,则会返回文件中小于指定偏移量的最大偏移量及对应的物理地址,该逻辑通过OffsetIndex.lookup()方法实现。
    一个参考的 稀疏索引.index文件的内容,大致如下
    OffsetPosition
    100
    4000
    110
    8200
    120
    13000
    130
    18000
    假设 ,要寻找 offset 为115位点对应的文件position,
    因为115介于「110-120」之间,因此稀疏索引能够提供的信息就是,110 需要从 8200 的位置开始往后找,这样也就粗略定位了115的大致position
      索引项:
    偏移量索引文件的索引项结构如下图所示,每个索引项记录了相对偏移量relativeOffset和对应消息的第一个字节在日志段文件中的物理地址position,共占用8个字节。
    • relativeOffset:相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset 的值;
    • position:物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节。
    尼恩提示:本质上, 消息的偏移量(offset)如果是 绝对偏移量, 那是一个long ,是要占用 8 个字节滴,那么,为啥这里是四个字节呢?
    为啥?
    索引项中没有直接使用long 类型绝对偏移量,而改为只占用 4 个字节 int 的相对偏移量(relativeOffset=offset-baseOffset),这样可以减小索引文件占用的空间。
    举个例子看一下:
    • 如果一个日志分段的 baseOffset (基础偏移量) 为 32,
    • 那么其文件名就是 00000000000000000032.log,
    • offset 为 35 的消息在索引文件中的 relativeOffset 的值为 35-32=3。
    为什么使用相对偏移量?这样可以节约存储空间。每条消息的绝对偏移量占用8个字节,而相对偏移量只占用4个字节(relativeOffset=offset-baseOffset)。
    在日志段文件滚动的条件中,有一个是:追加消息的最大偏移量和当前日志段的baseOffset的差值大于Int.MaxValue(4个字节),因为如果 相对偏移量 大于这个4个字节值,就无法存储相对偏移量了。
    所以, kafka有两个偏移量:
    • 绝对偏移量: OffsetIndex位移索引中是override def entrySize = 8,8个字节。
    • relativeOffset:相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节
    relativeOffset 相对位移是一个整型,占用4个字节,物理文件位置也是一个整型,同样占用4个字节,因此总共8个字节。
    总之,Kafka中的消息位移值是一个长整型,应该占用8个字节才对,但是,在保存OffsetIndex<Key , Value>对,Kafka做了一些优化,每个OffsetIndex对象在创建时,都已经保存了对应日志段对象的起始位移,因此保存与起始位移的差值就够了。
    1. 为了节省空间,一个索引项节省了4字节,想想那些日消息处理数万亿的公司。
    2. 因为内存资源是很宝贵的,索引项越短,内存中能存储的索引项就越多,索引项多了直接命中的概率就高了。

    通过索引查询消息过程

    偏移量索引和时间戳索引对应的类分别为:OffsetIndex 和 TimeIndex,其公共的抽象父类为AbstractIndex:
    与之相关的源码如下:
    1. AbstractIndex.scala:抽象类,封装了所有索引的公共操作
    2. OffsetIndex.scala:位移索引,保存了位移值和对应磁盘物理位置的关系
    3. TimeIndex.scala:时间戳索引,保存了时间戳和对应位移值的关系
    4. TransactionIndex.scala:事务索引,启用Kafka事务之后才会出现这个索引
    这里先介绍 OffsetIndex位移索引 文件。
    1.索引项大小定义:
    //偏移量索引文件索引项override def entrySize = 8
    //时间戳索引文件索引项override def entrySize = 12
    2.根据绝对偏移量计算相对偏移量:relativeOffset
    def relativeOffset(offset: Long): Int = {

    val relativeOffset = toRelative(offset)

    if (relativeOffset.isEmpty)

    throw new IndexOffsetOverflowException(s"Integer overflow for offset: $offset (${file.getAbsoluteFile})")

    relativeOffset.get

    }

    relativeOffset方法内部调用了toRelative方法:用给定的偏移量-日志段起始偏移量,如果结果合法则返回
    private def toRelative(offset: Long): Option[Int] = {

    val relativeOffset = offset - baseOffset

    if (relativeOffset < 0 || relativeOffset > Int.MaxValue)

    None

    else

    Some(relativeOffset.toInt)

    }

    3.将相对偏移量还原成绝对偏移量:parseEntry
    偏移量索引:
    override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {

    OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))

    }

    这个方法返回一个 OffsetPosition 类型。
    该类有两个方法,分别返回索引项的 Key 和 Value。
    这里的 parseEntry 方法,就是要构造 OffsetPosition 所需的 Key 和 Value。
    Key 是绝对偏移量,根据索引项中的相对偏移量计算,代码使用 baseOffset + relativeOffset(buffer, n) 的方式将相对偏移量还原成绝对偏移量;
    Value 是这个偏移量上消息在日志段文件中的物理位置,代码调用 physical 方法计算这个物理位置并把它作为 Value。
    最后,parseEntry 方法把 Key 和 Value 封装到一个 OffsetPosition 实例中,然后将这个实例返回。
    4.快速定位消息所在的物理文件位置
    e.g. 假设要查找偏移量为 230 的消息?
    第一步: 通过跳 表 ,找 分段的index文件
    Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,
    通过跳跃表方式,定位到在 00000000000000000217.index ,
    第二步: 通过 改进的二分查找, 找到不大于 相对偏移量的 最大索引项
    通过二分法在偏移量索引文件中找到不大于 230-217 =13 的最大索引项,即 offset 12 那栏,
    第三步:找日志文件,找到相对的目标 记录
    从日志文件物理位置456开始,继续向后查找找到相对偏移量为13的消息。
    def lookup(targetOffset: Long): OffsetPosition = {

    maybeLock(lock) {

    //复制出整个索引映射区

    val idx = mmap.duplicate

    // largestLowerBoundSlotFor 方法底层使用了改进版的二分查找算法寻找对应的槽

    val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)

    // 如果没找到,返回一个空的位置,即物理文件位置从0开始,表示从头读日志文件

    // 否则返回slot槽对应的索引项

    if(slot == -1)

    OffsetPosition(baseOffset, 0)

    else

    parseEntry(idx, slot)

    }

    }

    从上面 OffsetIndex.scala#lookup()` 的源,可以看到关键处有两点:
    • 偏移量索引使用 mmap 来映射操作索引数据,这样索引数据不需要拷贝到用户态,提高了性能
    • 调用 AbstractIndex.scala#largestLowerBoundSlotFor() 方法从索引数据中查找确定消息数据读取的起始位置
    AbstractIndex.scala#largestLowerBoundSlotFor()` 的源码如下:
    protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int =
    indexSlotRangeFor(idx, target, searchEntity)._1

    private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = {
    // check if the index is empty
    if(_entries == 0)
    return (-1, -1)

    def binarySearch(begin: Int, end: Int) : (Int, Int) = {
    // binary search for the entry
    var lo = begin
    var hi = end
    while(lo < hi) {
    val mid = (lo + hi + 1) >>> 1
    val found = parseEntry(idx, mid)
    val compareResult = compareIndexEntry(found, target, searchEntity)
    if(compareResult > 0)
    hi = mid - 1
    else if(compareResult < 0)
    lo = mid
    else
    return (mid, mid)
    }
    (lo, if (lo == _entries - 1) -1 else lo + 1)
    }

    //使用所有索引数据 entry 的总量 _entries 减去热区数据大小_warmEntries,
    // 确定一个热区索引的起始位置,这样可以保障只在索引数据的尾部进行二分查找
    val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
    // check if the target offset is in the warm section of the index
    if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
    return binarySearch(firstHotEntry, _entries - 1)
    }

    // check if the target offset is smaller than the least offset
    if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
    return (-1, 0)

    binarySearch(0, firstHotEntry)
    }

    AbstractIndex.scala#largestLowerBoundSlotFor()` 的主要逻辑是从索引数据中二分查找确定消息数据在文件中的物理起始点,这里需要注意索引文件实际进行了冷热分区,其中关键如下:
    1. 使用所有索引数据 entry 的总量 _entries 减去热区数据大小_warmEntries,确定一个热区索引的起始位置,这样可以保障只在索引数据的尾部进行二分查找
    2. 之所以这样处理,是因为 Kafka 的索引是在末尾追加写入的,并且一般写入的数据很快就会被读取,数据热点集中在尾部。索引数据一般都在页缓存中,而操作系统的内存是有限的,必然要通过类似 LRU 的机制淘汰页缓存。
    3. 如果每次二分查找都从头开始,则索引中间部分的数据所在的页缓存大概率已经被淘汰掉,从而导致缺页中断,必须重新从磁盘上读文件,影响性能
    尼恩提示:页缓存也叫文件缓冲,是文件系统数据在内存中的缓存结构,Kafka 的消息数据存储也充分利用了页缓存,如果消息写入消费速度相当,则消费时大概率直接命中缓存而不经过磁盘IO,极大提高性能。但是当某个消费者消费速度落后时,可能会导致 Kafka 节点上的页缓存频繁切换,拖累整个集群的性能
    相关视频:《尼恩Java硬核架构班第16章:RocketMQ第一部曲:葵花宝典(高性能秘籍)架构师视角解读OS底层的mmap、pagecache、zerocopy等底层的底层知识 》 (点击此链接学习)
    偏移量索引文件的查找原理:
    假设要查找偏移量为230的消息,查找过程如下:
    • 首先找到baseOffset=217的日志段文件(这里使用了跳跃表的结构来加速查找)
    • 计算相对偏移量relativeOffset=230-217=13
    • 在索引文件中查找  不大于13 的最大相对偏移量对应的索引项,即[12,456]
    • 根据12对应的物理地址456,在日志文件.log中定位到准确位置
    • 从日志文件物理位置456继续向后查找找到相对偏移量为13,即绝对偏移量为230,物理地址为468的消息
      注意:
    • 消息在log文件中是以批次存储的,而不是单条消息进行存储。索引文件中的偏移量保存的是该批次消息的最大偏移量,而不是最小的。
    • Kafka强制要求索引文件大小必须是索引项大小(8B)的整数倍,假设broker端参数log.index.size.max.bytes 设置的是67,那么Kafka内部也会将其转为64,即不大于67的8的最大整数倍。

    改进的二分查找

    就Kafka而言,索引是在文件末尾追加的写入的,并且一般写入的数据立马就会被读取。所以数据的热点集中在尾部。并且操作系统基本上都是用页为单位缓存和管理内存的,内存又是有限的,因此会通过类LRU机制淘汰内存。
    看起来LRU非常适合Kafka的场景,但是使用标准的二分查找会有缺页中断的情况,毕竟二分是跳着访问的。
    简单的来讲,假设某索引占page cache 13页,此时数据已经写到了12页。
    按照kafka访问的特性,此时访问的数据都在第12页,因此二分查找的特性,此时缓存页的访问顺序依次是0,6,9,11,12。
    因为频繁被访问,所以这几页一定存在page cache中。
    当第12页不断被填充,满了之后会申请新页第13页保存索引项,而按照二分查找的特性,此时缓存页的访问顺序依次是:0,7,10,12。
    这7和10很久没被访问到了,很可能已经不再缓存中了,然后需要从磁盘上读取数据。
    注释说:在他们的测试中,这会导致至少会产生从几毫秒跳到1秒的延迟。
    基于以上问题,Kafka使用了改进版的二分查找,改的不是二分查找的内部,而且把所有索引项分为热区和冷区 这个改进可以让查询热数据部分时,遍历的Page永远是固定的,这样能避免缺页中断。
    看到这里其实我想到了一致性hash,一致性hash相对于普通的hash不就是在node新增的时候缓存的访问固定,或者只需要迁移少部分数据。

    Log 类采用跳跃表(SkipList)管理 LogSegment 对象

    每个 topic 分区对应一个 Log 类对象(一个 broker 节点上只允许存放分区的一个副本,所以从 broker 视角来看一个分区对应一个 Log 类对象),其中包含了一系列隶属对应 topic 分区的 LogSegment 对象,Log 类采用跳跃表(SkipList)数据结构对这些 LogSegment 对象进行管理。
    上图展示了 LogSegment 在 Log 中基于 SkipList 的组织形式(其中青色小圆圈表示单个 LogSegment 对象)。

    写入索引项的方法

    偏移量索引:append
    写入索引项append方法的实现, 通过mmap 实现 idex 文件读写的 零复制,流程图如下
    写入索引项append方法的实现, 通过mmap 实现 idex 文件读写的 零复制,代码如下
    def append(offset: Long, position: Int): Unit = {
    inLock(lock) {
    // 索引文件如果已经写满,直接抛出异常
    require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
    // 要保证待写入的位移offset比当前索引文件中所存的位移值要大
    // 这主要是为了维护索引的单调性
    if (_entries == 0 || offset > _lastOffset) {
    trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
    mmap.putInt(relativeOffset(offset))//向mmap写入相对位移值
    mmap.putInt(position)//向mmap写入物理文件位置
    _entries += 1//更新索引项个数
    _lastOffset = offset//更新当前索引文件最大位移值
    // 确保写入索引项格式符合要求
    require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
    } else {
    throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
    s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
    }
    }
    }

    时间戳索引

    TimeIndex保存的是<时间戳,相对位移值>,时间戳需要长整型来保存,相对位移值使用Integer来保存。因此TimeIndex单个索引项需要占用12个字节。

    写入时间戳索引的索引项

    def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
    inLock(lock) {
    if (!skipFullCheck)
    // 索引文件如果已经写满,直接抛出异常
    require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
    // 这主要是为了维护索引的单调性
    if (_entries != 0 && offset < lastEntry.offset)
    throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
    s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
    // 这主要是为了维护索引的单调性
    if (_entries != 0 && timestamp < lastEntry.timestamp)
    throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
    s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")

    if (timestamp > lastEntry.timestamp) {
    trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
    mmap.putLong(timestamp)//向mmap写入时间戳
    mmap.putInt(relativeOffset(offset))//向mmap写入相对位移值
    _entries += 1
    _lastEntry = TimestampOffset(timestamp, offset)
    require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
    }
    }
    }

    位移索引和时间戳索引的区别是什么?

    Kafka中有三大类索引:位移索引、时间戳索引和已中止事务索引。分别对应了.index、.timeindex、.txnindex文件。
    与之相关的源码如下:
    1. AbstractIndex.scala:抽象类,封装了所有索引的公共操作
    2. OffsetIndex.scala:位移索引,保存了位移值和对应磁盘物理位置的关系
    3. TimeIndex.scala:时间戳索引,保存了时间戳和对应位移值的关系
    4. TransactionIndex.scala:事务索引,启用Kafka事务之后才会出现这个索引

    sendfile 是最高性能的零复制技术

    sendfile技术可以直接在内核完成输入和输出,不需要拷贝到用户空间再写出去。这样可以减少不必要的数据拷贝次数,提高数据传输效率。
    并且,sendfile 是最高性能的零复制技术。 具体请参见 :《尼恩Java硬核架构班第16章:RocketMQ第一部曲:葵花宝典(高性能秘籍)架构师视角解读OS底层的mmap、pagecache、zerocopy等底层的底层知识 》 (点击此链接学习)

    传统拷贝,从磁盘读取文件并发送到网络的流程

    如果您的应用程序要从磁盘读取文件并通过网络发送它,则可能会进行一堆不必要的拷贝,以及用户态/内核态的切换。
    来一个更复杂的 传统拷贝,从磁盘读取文件并发送到网络的流程图
    一些术语:
    • read buffer: 读缓冲区,操作系统的 page cache
    • socket buffer: 套接字缓冲区,OS 用于管理数据包的字节缓冲区
    • NIC buffer: 网卡中的字节缓冲区
    • DMA copy: DMA 是 Direct Memory Access 的缩写,是内存控制器的一个功能,可以避免 CPU 的干预,允许硬件(图形卡、声卡、网卡等)直接访问内存 (RAM) 里的某些数据
    在这个例子中,我们有 4 次模式切换(用户态和内核态之间的切换)和 4 次数据拷贝。
    • 应用程序(这里指 Kafka)利用 DMA copy 从磁盘 load 数据到 read buffer(用户态->内核态
    • read buffer 到应用程序的缓存区(内核态->用户态
    • 应用程序要发数据到网络上,实际是先写到 socket buffer(用户态->内核态
    • socket buffer 到 NIC buffer(响应数据写完之后,由内核态返回用户态)
    于是,为了解决这一问题,DMA 技术就出现了,每个 I/O 设备都有自己的 DMA 控制器,通过这个 DMA 控制器,CPU 只需要告诉 DMA 控制器,我们要传输什么数据,从哪里来,到哪里去,就可以放心离开了。
    后续的实际数据传输工作,都会由 DMA 控制器来完成,CPU 不需要参与数据传输的工作。

    零拷贝,从磁盘读取文件并发送到网络的流程

    来一个更复杂的 零拷贝,从磁盘读取文件并发送到网络的流程图
    为了减少拷贝,把数据从磁盘直接发向网络,那 Kafka 在存储数据的时候,就要保证存储的数据格式和将要发出的 response 格式一致。
    在传统拷贝模式下,第二步、第三步没啥意义,因为 Kafka 没有对数据做额外处理,只是简单转发。那能否从磁盘直接发向网络呢?答案是肯定的。
    通过零拷贝技术,磁盘上的数据还是要先进入 read buffer,然后不用再拷贝到应用程序的缓存区,而是直接拷贝到 NIC buffer,图上的步骤 2:Appends just file descriptors,只是把文件描述符交给了 Socket buffer,实际数据并没有拷贝给 Socket buffer。这就是所谓的 scatter-gather 操作(也称为 Vectorized I/O),scatter-gather 是仅将 read buffer 数据指针存储在 socket buffer 中,并让 DMA 直接从内存读取数据的行为。
    最终结果如何呢?
    • 4 次模式切换变成了 2 次
    • 2 次 DMA 拷贝,仍然是 2 次
    • 1 次微小的指针拷贝
    如果上面的流程看不懂,请参见尼恩 零复制 系列的第一篇:

    Kafka的log 文件 与日志格式的演变

    先看一下Kafka的log 文件。 这个关系到log文件的 外边结构。
    Kafka引入了日志分段LogSegment 的概念,将Log切分为多个LogSegment, 一个LogSegment 一个log文件。
    Log中追加消息时是顺序写入的,且只能写入最后一个LogSegment,此前的都不能写入。
    每个LogSegment 对应于磁盘上的一个日志文件和两个索引文件 和 其他文件。
    偏移量索引文件(.index)和时间戳索引文件(.timeindex)。
    每个LogSegment有基准偏移 baseOffset,表示当前LogSegment中第一条消息的offset。
    每个LogSegment还可能会包含".delete",".clean"等临时文件
    接下来,再看一下Kafka消息协议。 这个关系到log文件的 内部结构。

    v0版本

    Kafka消息格式的第一个版本通常称为v0版本,在 Kafka 0.10.0之前都采用的这个消息格式

    v1版本

    Kafka从 0.10.0 版本开始到 0.11.0 版本之前所使用的消息格式版本为v1
    v1 比 v0 版本就多了一个 timestamp 字段,表示消息的时间戳。

    v2版本

    Kafka 0.11.0 版本开始所使用的消息格式版本为v2,这个版本的消息相 vO v1版本而言改动很大,同时还参考了Protocol Buffer 引入了变长整型( Varints )和ZigZag编码。
    第一个特点:v2版本的 消息压缩
    kafka将多条消息一起压缩。v2版本中消息集称为 Record Batch,而不是先前的 MessageSet,其内部也包含了一条或多条消息 , 即一个Record Batch可能含有1-N条消息
    消息压缩 通过参数 compression.type配置。默认值为producer,表示保留生产者使用的压缩方式。
    参数还可以配置为gzip、snapp、lz4
    第二个特点:v2版本的 变长字段
    Varints 是使用一个或多个字节来序列化整数的一种方法。Record内部字段大量采用了 Varints 变长字段

    消息的批量生产

    消息生产端Producer这里没有太多需要同步的,一言蔽之就是将消息封装后发送给Broker端,不过读者这里想强调一下 Record Batch 的概念
    在默认情况下,单Batch的上限是16K,一个Batch可以存储1条或者多条消息,这个取决于Producer端的配置,如果Producer设置了黏性分区策略,linger.ms聚批时间设置足够长(例如1000ms),那么很容易将Batch填满;又或者linger.ms配置了默认值(linger.ms=0),那么聚批将不会被触发,那一个Batch上就只有一条消息。
    因此无论怎样,Record Batch是消息的载体,也是消息读取的最小单位(注意不是消息本身,这里在后文还会提及)
    上图表明了,某个 Record Batch 中可能只有一条消息,也有可能存在多条,甚至将16K全部填充满;无论哪种case,Producer 都是以 Record Batch 粒度将消息发送至Broker的

    消息的log 文件存储

    消息的存储,包括log 文件和 index文件 总体逻辑上的关系,映射到实际代码中在磁盘上的关系则是如下图所示:
    每个分区对应一个Log对象,在磁盘中就是一个子目录,子目录下面会有多组日志段即多Log Segment,每组日志段包含:消息日志文件(以log结尾)、位移索引文件(以index结尾)、时间戳索引文件(以timeindex结尾)。
    其实还有其它后缀的文件,例,例如.txnindex、.deleted等等。篇幅有限,暂不提起。
    其中log文件是用来存储消息的,而index文件则是用来存储稀疏索引的
    • log文件:通过append的方式向文件内进行追加,每个Segment对应一个log文件
    • index文件:索引文件,每隔4K存储一次offset+position,帮助快速定位指定位点的文件position用的
    注:这里为什么ndex文件 要隔4K做一次稀疏索引,而不是3K或者5K呢?
    其实这里主要是与硬件兼容,现在多数厂商的硬件,单次扫数据的大小一般都是4K对齐的,很多硬件都提升到了8K甚至16K,稀疏索引设置为4K,能保证即便是当前的 Record Batch 只有 1 个字节,后续的内容也能缓存在Page Cache中,下次扫描的时候,可以直接从缓存中读取,而不用扫描磁盘
    另外,基于V2的存储版本,消息的查询都是以 Record Batch 作为最小粒度查询的,而 Producer 设置的 Record Batch 的默认值为16K,即如果消息攒批合理的话,稀疏索引可能是每隔16K构建起来的

    Kafka 写入日志的步骤

    服务端将生产者产生的消息集存储到日志文件,要考虑对消息集进行分段存储。
    如图6-3所示,服务端将消息追加到日志文件,具体步骤如下。
    1. 每个分区对应的日志对象管理了分区的所有日志分段。
    2. 将消息集追加到当前活动的日志分段,任何时刻,都只会有一个活动的日志分段
    3. 每个日志分段对应一个数据文件和索引文件,消息内容会追加到 Log 数据文件中。
    4. 操作底层数据的接口是文件通道,消息集提供一个writeFullyTo()方法,参数是文件通道
    5. 消息集(ByteBufferMessageSet)的writeFullyTo()方法,调用文件通道的write()方法,将底层包含消息内容的字节缓冲区(ByteBuffer)写到 File文件通道中。
    6. 字节缓冲区写到File 文件通道中,消息就持久化到日志分段对应的log 分段 数据文件中了

    生产者发送消息时的 消息集

    生产者发送消息时,会在客户端将属于同一个分区的一批消息,作为一个生产请求发送给服务端。
    底层是字节缓冲区的ByteBufferMessageSet对象。
    伪代码如下:
    // Java版本的生产者客户端传递的消息内容是ByteBuffer,无需额外处理
    class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
    // Scala版本的客户端传递Message对象,要将消息集填充到字节缓冲区中
    def this(codec: CompressionCodeccounter:LongRef,messages: Message*) {
    // create()的返回值是ByteBuffer,通过this()再调用类级别的构造函数
    this(create(0ffsetAssigner(counter,messages.size),messages:_*))
    }
    }


    消息集中的每条消息(Message)都会被分配一个相对偏移量,而每一批消息的相对偏移量都是从0开始的。
    下图给出了一个示例,生产者写到分区P批匹消息:
    • 第一批消息有4条消息,对应的偏移量是[0,1,2,3];
    • 第二批消息有3条消息,对应的偏移量是[0,1,2]。
    客户端每次发送给服务端的一批消息,它的字节缓冲区只属于这一批消息,字节缓冲区不是共享的数据结构。
    消息集中的每条消息由3部分组成: 偏移量、数据大小、消息内容。
    • 每条消息的第一部分内容是偏移量。
    Kafka存储消息时,会为每条消息都指定一个唯一的偏移量。
    同一个分区的所有日志分段,它们的偏移量从0开始不断递增。不同分区的偏移量之间没有关系,所以说Kaka只保证同一个分区的消息有序性,但是不保证跨分区消息的有序性。
    • 每条消息的第二部分是当前这条消息的长度。
    消息长度通常不固定,而且在读取文件时客户端可能期望直接定位到指定的偏移量。
    记录消息长度的好处是:如果不希望读取这条消息,只需要读取出消息长度这个字段的值,然后跳过这些大小的字节,这样就可以定位到下一条数据的起始位置。
    • 第三部分是消息的具体内容,
    和消息集的第二部分类似,每条消息的键值之前也都会先记录键的长度和值的长度。
    注意:消息格式是在客户端定义的消息集在传给服务端之前,就用ByteBufferMessageSet封装好。服务端接收的每个分区消息就是ByteBufferMessageSet。
    另外,如所示,每条消息除了保存消息的键值内容外,还保存一些其他数据,比如校验值、魔数、键的长度、值的长度等。
    消息集的writeMessage()方法将每条消息(Message)填充到字节缓冲区中,缓冲区会暂存每个分区的一批消息, 这个方法实际上是在客户端调用的,填充消息 才会为这批消息设置从开始递增的偏移量,
    如下所示,在服务端调用文件通道的写方法时,才会将消息集字节缓冲区的内容刷写到文件中。

    Kafka 日志追加方式

    服务端将每个分区的消息追加到日志中,是以日志分段为单位的。
    当日志分段累加的消息达到阙值大小(文件大小达到1GB)时,会新创建一个日志分段保存新的消息,而分区的消息总是追加到最新的日志分段中。
    每个日志分段都有一个基准偏移量(segmentBaseoffset,或者叫baseoffset),这个基准偏移量是分区级别的绝对偏移量,而且这个值在日志分段中是固定的。有了这个基准偏移量,就可以计算出每条消息在分区中的绝对偏移量,最后把消息以及对应的绝对偏移量写到日志文件中。
    日志追加方法中的messages参数是客户端创建的消息集,这里面的偏移量是相对偏移量。
    在追加到日志分段时,validMessages变量已经是绝对偏移量了,具体步骤如下。
    1. 对客户端传递的消息集进行验证,确保每条消息的(相对)偏移量都是单调递增的。
    2. 删除消息集中无效的消息。如果大小一致,直接返回messages,否则会进行截断。
    3. 为有效消息集的每条消息分配(绝对)偏移量。
    4. 将更新了偏移量值的消息集追加到当前日志分段中。
    5. 更新日志的偏移量(下一个偏移量 nextOffsetMetadata )必要时调用flush()方法刷写磁盘。
    Log 类定义了 Log#append 方法,用于往 Log 对象中追加消息数据。
    需要注意的一点是,Log 对象使用 SkipList 管理多个 LogSegment,我们在执行追加消息时是不能够往 SkipList 中的任意 LogSegment 对象执行追加操作的,Kafka 设计仅允许往 activeSegment 对象中追加消息。
    方法 Log#append 实现如下:
    def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
    // 1. 解析、校验待追加的消息数据,封装成 LogAppendInfo 对象
    val appendInfo = this.analyzeAndValidateRecords(records)
    // 如果消息数据个数为 0,则直接返回
    if (appendInfo.shallowCount == 0) return appendInfo

    // 2. 剔除待追加消息中未通过验证的字节部分
    var validRecords = this.trimInvalidBytes(records, appendInfo)

    try {
    // 将待追加消息中剩余有效的字节追加到 Log 对象中
    lock synchronized {
    // 3.1 如果指定需要分配 offset
    if (assignOffsets) {
    // 获取当前 Log 对象对应的最后一个 offset 值,以此开始向后分配 offset
    val offset = new LongRef(nextOffsetMetadata.messageOffset)
    // 更新待追加消息的 firstOffset 为 Log 对象最后一个 offset 值
    appendInfo.firstOffset = offset.value
    val now = time.milliseconds
    val validateAndOffsetAssignResult = try {
    // 对消息(包括压缩后的)的 magic 值进行统一
    // 验证数据完整性,并分配 offset,同时按要求更新消息的时间戳
    LogValidator.validateMessagesAndAssignOffsets(
    validRecords,
    offset,
    now,
    appendInfo.sourceCodec,
    appendInfo.targetCodec,
    config.compact,
    config.messageFormatVersion.messageFormatVersion,
    config.messageTimestampType,
    config.messageTimestampDifferenceMaxMs)
    } catch {
    case e: IOException =>
    .....
    }
    validRecords = validateAndOffsetAssignResult.validatedRecords
    appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
    appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
    // 更新待追加消息的 lastOffset 值
    appendInfo.lastOffset = offset.value - 1
    // 如果时间戳类型为 LOG_APPEND_TIME,则修改时间戳
    if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
    appendInfo.logAppendTime = now

    // 如果在执行 validateMessagesAndAssignOffsets 操作时修改了消息的长度,
    //则需要重新验证,防止消息过长
    if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
    for (logEntry <- validRecords.shallowEntries.asScala) {
    if (logEntry.sizeInBytes > config.maxMessageSize) {
    .....
    }
    }
    }
    }
    // 3.2 不需要分配 offset
    else {
    // 如果消息的 offset 不是单调递增,或者消息的 firstOffset 小于 Log 中记录的下一条消息 offset,则说明 appendInfo 非法
    if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
    ...
    }

    // 4. 校验待追加消息的长度,保证不超过了单个 LogSegment 所允许的最大长度(对应 segment.bytes 配置)
    if (validRecords.sizeInBytes > config.segmentSize) {
    throw new RecordBatchTooLargeException(
    "Message set size is %d bytes which exceeds the maximum configured segment size of %s.".format(validRecords.sizeInBytes, config.segmentSize))
    }

    // 5. 获取 activeSegment 对象,如果需要则创建新的 activeSegment 对象
    val segment = this.maybeRoll(
    messagesSize = validRecords.sizeInBytes,
    maxTimestampInMessages = appendInfo.maxTimestamp,
    maxOffsetInMessages = appendInfo.lastOffset)


    // 6. 往 activeSegment 中追加消息
    segment.append(
    firstOffset = appendInfo.firstOffset,
    largestOffset = appendInfo.lastOffset,
    largestTimestamp = appendInfo.maxTimestamp,
    shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
    records = validRecords)

    // 7. 更新 LEO 中记录的当前 Log 最后一个 offset 值
    this.updateLogEndOffset(appendInfo.lastOffset + 1)
    ...

    // 8. 如果刷盘时间间隔达到阈值(对应 flush.messages 配置),则执行刷盘
    if (unflushedMessages >= config.flushInterval)
    this.flush() // 将 [recoveryPoint, logEndOffset) 之间的数据刷盘

    appendInfo
    }
    } catch {
    ...
    }
    }
    nextOffsetMetadata 读写操作发生在 务端处理生产请求和拉取请求时,具体步骤如下:
    1. 生产者发送消息集给服务端,服务端会将这一批消息追加到日志中。
    2. 每条消息需要指定绝对偏移量,服务端会用nextoffsetMetadata的值作为起始偏移量。
    3. 服务端将每条带有偏移量的消息写入到日志分段中。
    4. 服务端会获取这一批消息中最后一条消息的偏移量,加上一后更新nextoffsetMetadata。
    5. 消费线程(消费者或备份副本)会根据这个变量的最新值拉取消息。一旦变量值发生变化消费线程就能拉取到新写入的消息。
    nextoffsetMetadata变量是一个关于日志的偏移量元数据对象(LogoffsetMetadata)。
    日志的偏移量元数据都是从当前活动的日志分段(activeSegment)获取相关的信息:下一条消息的偏移量、当前日志分段的基准偏移量、当前日志分段的大小。
    LogSegment#append 方法的实现,该方法用于往当前 LogSegment 对应的 log 文件中追加消息数据,并在需要时更新对应的 index 和 timeindex 索引数据。
    LogSegment#append 方法实现如下:
    def append(firstOffset: Long, // 待追加消息的起始 offset
    largestOffset: Long, // 待追加消息中的最大 offset
    largestTimestamp: Long, // 待追加消息中的最大时间戳
    shallowOffsetOfMaxTimestamp: Long, // 最大时间戳消息对应的 offset
    records: MemoryRecords) { // 待追加的消息数据
    if (records.sizeInBytes > 0) {
    ...
    // 获取物理位置(当前分片的大小)
    val physicalPosition = log.sizeInBytes()
    if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp)

    require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")

    // 将消息数据追加到 log 文件
    val appendedBytes = log.append(records)
    trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")

    // 更新已追加的消息对应的最大时间戳,及其 offset
    if (largestTimestamp > maxTimestampSoFar) {
    maxTimestampSoFar = largestTimestamp
    offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
    }

    // 如果当前累计追加的日志字节数超过阈值(对应 index.interval.bytes 配置)
    if (bytesSinceLastIndexEntry > indexIntervalBytes) {
    // 更新 index 和 timeindex 文件
    index.append(firstOffset, physicalPosition)
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
    bytesSinceLastIndexEntry = 0 // 重置当前累计追加的日志字节数
    }
    // 更新累计加入的日志字节数
    bytesSinceLastIndexEntry += records.sizeInBytes
    }
    }
    如果当前追加的消息数据是有效的,则 LogSegment 会调用 FileRecords#append 方法将消息数据追加到对应的 log 文件中,并更新本地记录的已追加消息的最大时间戳及其 offset。
    前面我们介绍了 Kafka 并不会对每条消息都建立索引,而是采用稀疏索引的策略间隔指定大小的字节数(对应 index.interval.bytes 配置)建立索引项,如果当前累计追加的消息字节数超过该配置值,则 Kafka 会更新对应的 index 和 timeindex 数据。

    broker如何 分析和验证消息集?

    对消息集进行分析和验证,主要利用了Kafka中“分区的消息必须有序”这个特性。
    分析和验证方法的返回值是一个日志追加信息(LogAppendInfo)对象,该对象的内容包括: 消息集第一条和最后条消息的偏移量、消息集的总字节大小、偏移量是否单调递增。
    日志追加信息表示消息集的概要信息,但并不包括消息内容。
    日志追加信息对象也是追加日志方法的最后返回值。
    服务端上层类(比如分区、副本管理器调用追加日志的方法,期望得到这一批消息的概要信息,比如第一个偏移量和最后一个偏移量。
    这样,它们就可以根据偏移量计算出一共追加了多少条消息(服务端接收的消息集和最后真正被追加的消息数量可能会不一样)。
    上层类甚至还可以做一些复杂的业务逻辑处理,比如根据最后一个偏移量判断被延迟的生产请求是否可以完成。相关代码如下:
    //对要追加的消息集进行分析和验证,消息太大或者无效会被丢弃
    def analyzeAndValidateMessageSet(messages:ByteBufferMessageSet)={
    var shallowMessageCount = 0 //消息数量
    var validBytesCount =0 //有效字节数
    //第一条消息和最后一条 (循环时表示上一条消息的偏移量)消息的偏移量
    var firstOffset,astoffset = -1L
    var monotonic =true // 是否单调递增
    for(messageAndOffset <- messages,shallowIterator) {
    // 在第一条消息中更新firstoffset
    if(firstOffset <0) firstOffset = messageAndOffset.offset
    if(lastOffset >= messageAndOffset.offset) monotonic = false
    //每循环一条消息,就更新
    lastoffsetLastOffset = messageAndOffset.offset
    val m= messageAndoffset.message
    val messageSize = MessageSet.entrySize(m)
    m.ensureValid()//检查消息是否有效
    shallowMessageCount +=1
    validBytesCount += messageSize
    }
    LogAppendInfo(firstOffset,lastOffset, sourceCodec,targetCodec,shallowMessageCount,validBytesCount,monotonic)
    }

    前面说过,消息集对象中消息的偏移量是从0开始的相对偏移量,并且它的底层是一个字节缓冲区。
    那么要获得消息集中第一条消息和最后一条消息的偏移量,只能再把字节缓冲区解析出来,读取每一条消息的偏移量。
    这里因为还要对每条消息进行分析和验证,所以读取消息是不可避免的。
    分析消息集的每条消息时,都会更新最近的偏移量(lastoffset)但只会在分析第一条消息时更新起始偏移量(firstoffset)。
    判断消息集中所有消息的偏移量是否单调递增,只需要比较最近的偏移量和当前消息的偏移量。
    如果每次处理一条消息时,当前消息的偏移量都比最近的偏移量值(上-条消息的偏移量)大,说明消息集是单调递增的。
    对消息集的每条消息都验证和分析后,下一步要为消息分配绝对偏移量,最后才能追加到日志分段

    broker如何为消息集分配绝对偏移量?

    存储到日志文件中的消息,必须是分区 的绝对偏移量。
    为消息集分配绝对偏移量时,以nextoffsetMetadata的偏移量作为起始偏移量。
    分配完成后还要更新nextoffsetMetadata的偏移量值。
    为了保证在分配过程中,获取偏移量的值并加一是一个原子操作,起始偏移量会作为原子变量传入validateMessagesAndAssignoffsets()方法。
    相关代码如下:
    // 消息集 加到 志,获取最近的偏移量作为初始佳
    class Log{
    def append(l'lessages : ByteBuffe MessageSet) {
    // nextOffsetMetadata 表示最近 一条消息的偏移量
    val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
    //offset参数作为原子变量,在分配偏移量时,先获取出值再加一
    validMessages=validMessages.validateMessagesAndAssignOffsets(offset)
    //offset的返回值是最后一条消息的偏移量再加一,那么最后一条消息就要减一
    appendInfo.lastOffset = offset.get - 1
    segment.append(appendInfo.firstoffset,validMessages) // 追加消息集
    //更新nextoffsetMetadata,用最后一条消息的偏移量加一表示最近下一条
    updateLogEndOffset(appendInfo.lastOffset +1)
    }
    }

    //字节缓冲区消息集根据指定的偏移量计数器、更新每条消息的偏移量
    class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
    def validateMessagesAndAssignOffsets(offsetCounter:AtomicLong)={
    var messagePosition =0
    buffer.mark()//先标记
    while(messagePosition < sizeInBytes - MessageSet.LogOverhead){
    buffer.position(messagePosition)// 定位到每条消息的起始位置
    //以最新的偏移量计数器为基础,每条消息的偏移量都在此基础上不断加一
    buffer.putLong(offsetCounter.getAndIncrement())
    val messageSize= buffer.getInt()
    // 消息的大小//更新消息的起始位置,为下一条消息做准备 (12+消息大小,表示一条完整的消息)
    messagePosition += MessageSet.LogOverhead + messageSize
    buffer.reset()//重置的时候,回到最开始标记的地方
    this// 还是返回字节缓冲区消息集。除了偏移量改了,其他均没有变化
    }
    }

    根据“1.消息集”中消息集的格式,为消息分配偏移量,实际上是更新每条消息的偏移量数据(offset)。消息的大小(size)和消息内容(Message)都不需要变动。
    现在的问题主要是:如何在字节缓冲区中定位到每条消息的偏移量所在位置。
    定位消息偏移量的方式有两种:
    • 一种是按照顺序完整地读取每条消息,这种方式代价比较大,我们实际上只需要更改偏移量,不需要读取每条消息的实际内容;
    • 另一种是先读取出消息大小的值,然后计算下一条消息的起始偏移量,最后直接用字节缓冲区提供的定位方法(position())直接定位到下一条消息的起始位置。
    因为底层字节缓冲区和消息集对象是一一对应的,所以消息集中第一条消息的偏移量一定是从字节缓冲区的位置0开始的。
    每条消息的长度计算方式是:8 + 4 + 消息大小。
    其中,消息大小的值可以从第二部分读取。
    如表6-2所示,第一条消息中“消息的大小”存的值是3,表示消息本身的内容长度是3,整个消息占用的大小就是:8+4+3=15。
    假设偏移量计数器初始值为10(即nextoffsetMetadata的值)第一条消息的偏移量就等于10。
    分配第一条消息的偏移量时,修改前面8字节的内容为10。
    接下来要修改第二条消息的偏移量为11,通过读取第一条消息的大小(等于3)再加上12字节,就定位到第二条消息起始位置(等于15)。
    修改第三条消息的偏移量为12也是类似的,通过读取第二条消息的大小(等于5)再加上12字节(第二条消息总共占用了17字节)就可以定位到第三条消息的起始位置(15再加上17等于32)以此类推,第四条消息的起始位置等于第三条消息占用的12字节再加上32,等于48。
    在写人每条消息的绝对偏移量后,只会读取消息的大小,不会读取这条消息的实际内容。
    消息集经过分配绝对偏移量后,才可以追加到日志分段中,日志分段接收消息集并写到文件中。

    Kafak通过顺序写实现写入的高性能

    通过上面的分析可以看到 ,kafka 写log 数据的时候,是以磁盘顺序写的方式来写的,也就是说仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据。
    为啥kafka 要顺序写,而不是随机写,是因为硬盘速度慢,尤其 机械硬盘。
    机械硬盘的性能为啥那么慢? 看看结构就知道:
    机械磁盘上的每个磁道被等分为若干个弧段,这些弧段称之为扇区。
    如何在磁盘中读/写数据? 需要 物理动作,去移动 “磁头” 到目标 扇区
    机械磁盘的读写以扇区为基本单位。完成一次磁盘 IO,需要经过寻道旋转数据传输三个步骤。
    看经典大图:
    为什么要采用磁盘顺序写?
    正因为 完成一次磁盘 IO,需要经过寻道、旋转和数据传输三个步骤:
    1. 寻道(时间):磁头移动定位到指定磁道;
    2. 旋转延迟(时间):等待指定扇区从磁头下旋转经过;
    3. 数据传输(时间):数据在磁盘、内存与网络之间的实际传输。
    首先必须找到柱面,即磁头需要移动对准相应磁道,这个过程叫做寻道,所耗费时间叫做寻道时间,然后目标扇区旋转到磁头下,这个过程耗费的时间叫做旋转时间。
    怎么样才能提高磁盘的读写效率呢?
    即采用 顺序写,这样就不需要寻道时间,
    而且,只需很少的旋转时间,将数据追加到文件的末尾,不是在文件的随机位置来修改数据。
    因此,顺序写 省去寻道旋转可以极大地提高磁盘读写的性能。
    Kafka 采用顺序写文件的方式来提高磁盘写入性能。
    顺序写文件,基本减少了磁盘寻道旋转的次数。磁头再也不用在磁道上乱舞了,而是一路向前飞速前行。
    Kafka 中每个分区是一个有序的,不可变的消息序列,新的消息不断追加到 Partition 的末尾。
    在 Kafka 中 Partition 只是一个逻辑概念,Kafka 将 Partition 划分为多个 Segment,每个 Segment 对应一个物理文件,Kafka 对 segment 文件追加写,这就是顺序写文件。

    Kafak通过sendfile 零拷贝实现发送的高性能

    Kafak通过sendfile 零拷贝实现发送消息,从磁盘读取文件并发送到网络的流程图
    为了使用sendfile 零拷贝,把数据从磁盘直接发向网络,那 Kafka 在存储数据的时候,就要保证存储的数据格式和将要发出的 response 格式一致。
    所以, Kafka 没有对数据做额外处理,只是简单转发。
    通过零拷贝技术,磁盘上的数据还是要先进入 read buffer,然后不用再拷贝到应用程序的缓存区,而是直接拷贝到 NIC buffer,图上的步骤 2:Appends just file descriptors,只是把文件描述符交给了 Socket buffer,实际数据并没有拷贝给 Socket buffer。
    这就是所谓的 scatter-gather 操作(也称为 Vectorized I/O),scatter-gather 是仅将 read buffer 数据指针存储在 socket buffer 中,并让 DMA 直接从内存读取数据的行为。
    最终结果如何呢?
    • 4 次模式切换变成了 2 次
    • 2 次 DMA 拷贝,仍然是 2 次
    • 1 次微小的指针拷贝

    消费者从broker拉取数据

    消费者从broker拉取数据,broker把数据写入 SOCKET channel,服务端的代码如下:
    @Override
    public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
    long newSize = Math.min(channel.size(), end) - start;
    int oldSize = sizeInBytes();
    if (newSize < oldSize)
    throw new KafkaException(String.format(
    "Size of FileRecords %s has been truncated during write: old size %d, new size %d",
    file.getAbsolutePath(), oldSize, newSize));

    long position = start + offset;
    long count = Math.min(length, oldSize - offset);
    return destChannel.transferFrom(channel, position, count);
    }

    @Override
    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    return fileChannel.transferTo(position, count, socketChannel);
    }
    fileChannel.transferTo()方法直接将当前通道内容传输到另一个通道,没有涉及到Buffer的任何操作,NIO中的Buffer是JVM堆或者堆外内存,但不论如何他们都是操作系统内核空间的内存。
    也就是说这种方式不会有内核缓冲区到用户缓冲区的读写问题
    transferTo()的实现方式就是通过系统调用sendfile()(当然这是Linux中的系统调用)
    相当于直接把请求数据的ByteBuffer(内核态,数据还没复制到用户态)通过FileChannel不用用户态和内核态相互之间的复制,直接转到socketChannel
    Kafka 的数据传输通过 TransportLayer 来完成,其子类 PlaintextTransportLayer 通过Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法实现零拷贝

    Kafka为啥达到100Wtps高性能?

    Kafka之 重要原因就是它采用了零拷贝(Zero-copy)技术 + 顺序写。
    Kafka采用了两种零拷贝技术来提高性能:mmap 零拷贝 和sendfile 零拷贝 。
    Kafka采用了 顺序写 技术来提高性能:顺序写 log 文件。
    40岁老架构师尼恩在这里,给大家留一个遗留问题:
    遗留问题:sendfile 零拷贝 比 mmap 零拷贝 的性能更高,为啥 读写index 文件不用 sendfile 零拷贝 ?
    以上问题的答案,请来尼恩的 技术自由圈 社群交流。
    同时,与本文配套,尼恩决定给大家写一个0复制 系列,帮助大家吊打面试官:

    说在最后:有问题找老架构取经‍

    通过对kafak 零拷贝的充分介绍,可以充分展示一下大家雄厚的 “技术肌肉”,让面试官爱到 “不能自已、口水直流”,然后实现”offer直提”。
    在面试之前,建议大家系统化的刷一波 5000页《尼恩Java面试宝典PDF》,里边有大量的大厂真题、面试难题、架构难题。
    很多小伙伴刷完后, 吊打面试官, 大厂横着走。
    在刷题过程中,如果有啥问题,大家可以来 找 40岁老架构师尼恩交流。
    另外,如果没有面试机会,可以找尼恩来改简历、做帮扶。
    遇到职业难题,找老架构取经, 可以省去太多的折腾,省去太多的弯路。
    尼恩指导了大量的小伙伴上岸,前段时间,刚指导一个40岁+被裁小伙伴,拿到了一个年薪100W的offer。
    狠狠卷,实现 “offer自由” 很容易的, 前段时间一个武汉的跟着尼恩卷了2年的小伙伴, 在极度严寒/痛苦被裁的环境下, offer拿到手软, 实现真正的 “offer自由” 。

    被裁之后, 空窗1年/空窗2年, 如何  起死回生  ? 


    案例1:42岁被裁2年,天快塌了,急救1个月,拿到开发经理offer,起死回生


    案例2:35岁被裁6个月, 职业绝望,转架构急救上岸,DDD和3高项目太重要了

    案例3:失业15个月,学习40天拿offer, 绝境翻盘,如何实现?


     被裁之后,100W 年薪 到手, 如何 人生逆袭? 


    100W案例,100W年薪的底层逻辑是什么? 如何实现年薪百万? 如何远离  中年危机?

    100W案例240岁小伙被裁6个月,猛卷3月拿100W年薪 ,秘诀:首席架构/总架构

    环境太糟,如何升 P8级,年入100W?

    如何  逆天改命,包含AI、大数据、golang、Java  等      


      职业救助站

      实现职业转型,极速上岸


      关注职业救助站公众号,获取每天职业干货
      助您实现职业转型、职业升级、极速上岸
      ---------------------------------

      技术自由圈

      实现架构转型,再无中年危机


      关注技术自由圈公众号,获取每天技术千货
      一起成为牛逼的未来超级架构师

      几十篇架构笔记、5000页面试宝典、20个技术圣经
      请加尼恩个人微信 免费拿走

      暗号,请在 公众号后台 发送消息:领电子书

      如有收获,请点击底部的"在看"和"",谢谢

      技术自由圈
      疯狂创客圈(技术自由架构圈):一个 技术狂人、技术大神、高性能 发烧友 圈子。圈内一大波顶级高手、架构师、发烧友已经实现技术自由;另外一大波卷王,正在狠狠卷,奔向技术自由
       最新文章