尼恩说在前面
美团面试:Kafka为啥那么快(每秒上十万QPS) ?什么是kafka零复制? 说一说Rocketmq、是如何实现每秒上百万数据的超高并发写入的? 说一说Rocketmq、如何实现每秒上十万QPS的超高吞吐量的读取的? 说一说 Rocketmq、的零复制(/零拷贝)原理 说一说 Kafka 是如何实现每秒上百万数据的超高并发写入的? 说一说 Kafka如何实现每秒上十万QPS的超高吞吐量的读取的? 说一说 Kafka 的零复制(/零拷贝)原理
最新《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》的PDF,请关注本公众号【技术自由圈】获取,回复:领电子书
第一篇:零基础 ,穿透 零复制 底层原理 (文章阅读量破W,点击此链接学习) 第2篇:从0到1: 穿透Kafka 0复制的底层原理和实操 (本文) 第3篇:从0到1: 穿透Netty 0复制的底层原理和实操 (规划中,具体参见尼恩的公号) 第4篇:从0到1: 穿透Rocketmq 0复制的底层原理和实操 (规划中,具体参见尼恩的公号) 相关视频:《尼恩Java硬核架构班第16章:RocketMQ第一部曲:葵花宝典(高性能秘籍)架构师视角解读OS底层的mmap、pagecache、zerocopy等底层的底层知识 》 (点击此链接学习)
本文目录
- 尼恩说在前面
- Kafka采用了两种零拷贝技术
- 回顾一下mmap零复制
- mmap (memory-map)
- 如何通过mmap查询索引找到具体的消息数据
- 分区目录
- 分段日志和索引
- 消息日志与索引关系
- 消息日志(.log)
- 偏移量索引(.index)
- 时间索引(.timeindex)
- 位移索引
- 通过索引查询消息过程
- 改进的二分查找
- Log 类采用跳跃表(SkipList)管理 LogSegment 对象
- 写入索引项的方法
- 时间戳索引
- 写入时间戳索引的索引项
- 位移索引和时间戳索引的区别是什么?
- sendfile 是最高性能的零复制技术
- 传统拷贝,从磁盘读取文件并发送到网络的流程
- 零拷贝,从磁盘读取文件并发送到网络的流程
- Kafka的log 文件 与日志格式的演变
- v0版本
- v1版本
- v2版本
- 消息的批量生产
- 消息的log 文件存储
- Kafka 写入日志的步骤
- 生产者发送消息时的 消息集
- Kafka 日志追加方式
- broker如何 分析和验证消息集?
- broker如何为消息集分配绝对偏移量?
- Kafak通过顺序写实现写入的高性能
- Kafak通过sendfile 零拷贝实现发送的高性能
- 消费者从broker拉取数据
- Kafka为啥达到100Wtps高性能?
- 说在最后:有问题找老架构取经
Kafka采用了两种零拷贝技术
Broker 读写index 文件,用了 mmap零复制 Broker 向Consumer发消息,用了 sendfile 零复制
当然,零拷贝并不是Kafka的专利,而是操作系统的能力 ,又比如Netty,Rocketmq 都用到了零拷贝,这个后面尼恩会详细给大家做展开的介绍。 不过,零拷贝技术可以减少不必要的数据拷贝次数,从而提高数据传输效率,所以,也是面试的绝对重点, 各位看官,尤其是要转架构的, 一定要好好掌握。
回顾一下mmap零复制
mmap (memory-map)
void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);
addr:指定映射的起始地址,通常设为NULL,由内核来分配
length:代表将文件中映射到内存的部分的长度。
prot:映射区域的保护方式。可以为以下几种方式的组合:
PROT_EXEC 映射区域可被执行
PROT_READ 映射区域可被读取
PROT_WRITE 映射区域可被写入
PROT_NONE 映射区域不能存取
flags:映射区的特性标志位,常用的两个选项是:
MAP_SHARD:写入映射区的数据会复制回文件,且运行其他映射文件的进程共享
MAP_PRIVATE:对映射区的写入操作会产生一个映射区的复制,对此区域的修改不会写会原文件
fd:要映射到内存中的文件描述符,有open函数打开文件时返回的值。
offset:文件映射的偏移量,通常设置为0,代表从文件最前方开始对应,offset必须是分页大小的整数倍。
函数返回值:实际分配的内存的起始地址。
int munmap(void *start, size_t length)
start:映射的起始地址
length:文件中映射到内存的部分的长度
返回值:解除成功返回0,失败返回-1。
打开文件:首先,使用 open
系统调用打开需要进行内存映射的文件,并获取文件描述符(file descriptor)。创建内存映射:通过 mmap
系统调用将文件的部分或全部内容映射到进程的地址空间。mmap
的典型调用如下:cvoid *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset); 访问映射的内存:一旦内存映射建立,进程就可以直接访问映射的内存区域,就像访问普通内存一样。对这部分内存的读写操作将直接影响到文件的内容。 同步数据到磁盘:如果需要确保对映射区域的修改被写回到文件中,可以使用 msync
系统调用。这步操作是可选的,取决于应用是否需要立即将数据同步到磁盘。解除映射:当不再需要映射时,使用 munmap
系统调用来解除映射,释放资源:cint munmap(void *start, size_t length);
start
:映射区域的起始地址。length
:映射的长度。
close
系统调用关闭文件描述符。减少了数据在用户空间和内核空间之间的拷贝次数,因为数据可以直接在内核空间的缓冲区和用户空间的映射内存之间共享。 减少了系统调用的开销,因为不需要频繁地进行 read
和write
操作。可以提高大文件传输和处理的效率。
如何通过mmap查询索引找到具体的消息数据
分区目录
分段日志和索引
配置项 | 默认值 | 单位 | 描述 |
消息日志与索引关系
消息日志(.log)
# 只输出消息日志描述信息
kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.log
# 输出消息日志完整信息
kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.log --print-data-log
偏移量索引(.index)
# 查看偏移量索引内容
kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.index
时间索引(.timeindex)
# 查看时间索引内容
kafka-dump-log.sh --files /var/kafka-logs/test-0/00000000000000023147.timeindex1.2.
位移索引
偏移量索引文件:
.index
文件的内容,大致如下Offset | Position |
relativeOffset:相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节,当前索引文件的文件名即为 baseOffset 的值; position
:物理地址,也就是消息在日志分段文件中对应的物理位置,占用 4 个字节。
尼恩提示:本质上, 消息的偏移量(offset)如果是 绝对偏移量, 那是一个long ,是要占用 8 个字节滴,那么,为啥这里是四个字节呢?
如果一个日志分段的 baseOffset (基础偏移量) 为 32, 那么其文件名就是 00000000000000000032.log, offset 为 35 的消息在索引文件中的 relativeOffset 的值为 35-32=3。
绝对偏移量: OffsetIndex位移索引中是override def entrySize = 8,8个字节。 relativeOffset:相对偏移量,表示消息相对于 baseOffset 的偏移量,占用 4 个字节
为了节省空间,一个索引项节省了4字节,想想那些日消息处理数万亿的公司。 因为内存资源是很宝贵的,索引项越短,内存中能存储的索引项就越多,索引项多了直接命中的概率就高了。
通过索引查询消息过程
AbstractIndex.scala:抽象类,封装了所有索引的公共操作 OffsetIndex.scala:位移索引,保存了位移值和对应磁盘物理位置的关系 TimeIndex.scala:时间戳索引,保存了时间戳和对应位移值的关系 TransactionIndex.scala:事务索引,启用Kafka事务之后才会出现这个索引
//偏移量索引文件索引项override def entrySize = 8
//时间戳索引文件索引项override def entrySize = 12
def relativeOffset(offset: Long): Int = {
val relativeOffset = toRelative(offset)
if (relativeOffset.isEmpty)
throw new IndexOffsetOverflowException(s"Integer overflow for offset: $offset (${file.getAbsoluteFile})")
relativeOffset.get
}
private def toRelative(offset: Long): Option[Int] = {
val relativeOffset = offset - baseOffset
if (relativeOffset < 0 || relativeOffset > Int.MaxValue)
None
else
Some(relativeOffset.toInt)
}
override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {
OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}
ConcurrentSkipListMap
来保存在每个日志分段,00000000000000000217.index
,def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {
//复制出整个索引映射区
val idx = mmap.duplicate
// largestLowerBoundSlotFor 方法底层使用了改进版的二分查找算法寻找对应的槽
val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
// 如果没找到,返回一个空的位置,即物理文件位置从0开始,表示从头读日志文件
// 否则返回slot槽对应的索引项
if(slot == -1)
OffsetPosition(baseOffset, 0)
else
parseEntry(idx, slot)
}
}
偏移量索引使用 mmap 来映射操作索引数据,这样索引数据不需要拷贝到用户态,提高了性能 调用 AbstractIndex.scala#largestLowerBoundSlotFor()
方法从索引数据中查找确定消息数据读取的起始位置
protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): Int =
indexSlotRangeFor(idx, target, searchEntity)._1
private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = {
// check if the index is empty
if(_entries == 0)
return (-1, -1)
def binarySearch(begin: Int, end: Int) : (Int, Int) = {
// binary search for the entry
var lo = begin
var hi = end
while(lo < hi) {
val mid = (lo + hi + 1) >>> 1
val found = parseEntry(idx, mid)
val compareResult = compareIndexEntry(found, target, searchEntity)
if(compareResult > 0)
hi = mid - 1
else if(compareResult < 0)
lo = mid
else
return (mid, mid)
}
(lo, if (lo == _entries - 1) -1 else lo + 1)
}
//使用所有索引数据 entry 的总量 _entries 减去热区数据大小_warmEntries,
// 确定一个热区索引的起始位置,这样可以保障只在索引数据的尾部进行二分查找
val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
// check if the target offset is in the warm section of the index
if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
return binarySearch(firstHotEntry, _entries - 1)
}
// check if the target offset is smaller than the least offset
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
return (-1, 0)
binarySearch(0, firstHotEntry)
}
使用所有索引数据 entry 的总量 _entries
减去热区数据大小_warmEntries
,确定一个热区索引的起始位置,这样可以保障只在索引数据的尾部进行二分查找之所以这样处理,是因为 Kafka 的索引是在末尾追加写入的,并且一般写入的数据很快就会被读取,数据热点集中在尾部。索引数据一般都在页缓存中,而操作系统的内存是有限的,必然要通过类似 LRU 的机制淘汰页缓存。 如果每次二分查找都从头开始,则索引中间部分的数据所在的页缓存大概率已经被淘汰掉,从而导致缺页中断,必须重新从磁盘上读文件,影响性能
首先找到baseOffset=217的日志段文件(这里使用了跳跃表的结构来加速查找) 计算相对偏移量relativeOffset=230-217=13 在索引文件中查找 不大于13 的最大相对偏移量对应的索引项,即[12,456] 根据12对应的物理地址456,在日志文件.log中定位到准确位置 从日志文件物理位置456继续向后查找找到相对偏移量为13,即绝对偏移量为230,物理地址为468的消息
消息在log文件中是以批次存储的,而不是单条消息进行存储。索引文件中的偏移量保存的是该批次消息的最大偏移量,而不是最小的。 Kafka强制要求索引文件大小必须是索引项大小(8B)的整数倍,假设broker端参数log.index.size.max.bytes 设置的是67,那么Kafka内部也会将其转为64,即不大于67的8的最大整数倍。
改进的二分查找
Log 类采用跳跃表(SkipList)管理 LogSegment 对象
写入索引项的方法
def append(offset: Long, position: Int): Unit = {
inLock(lock) {
// 索引文件如果已经写满,直接抛出异常
require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
// 要保证待写入的位移offset比当前索引文件中所存的位移值要大
// 这主要是为了维护索引的单调性
if (_entries == 0 || offset > _lastOffset) {
trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
mmap.putInt(relativeOffset(offset))//向mmap写入相对位移值
mmap.putInt(position)//向mmap写入物理文件位置
_entries += 1//更新索引项个数
_lastOffset = offset//更新当前索引文件最大位移值
// 确保写入索引项格式符合要求
require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
} else {
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
}
}
}
时间戳索引
写入时间戳索引的索引项
def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
inLock(lock) {
if (!skipFullCheck)
// 索引文件如果已经写满,直接抛出异常
require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
// 这主要是为了维护索引的单调性
if (_entries != 0 && offset < lastEntry.offset)
throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
// 这主要是为了维护索引的单调性
if (_entries != 0 && timestamp < lastEntry.timestamp)
throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")
if (timestamp > lastEntry.timestamp) {
trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
mmap.putLong(timestamp)//向mmap写入时间戳
mmap.putInt(relativeOffset(offset))//向mmap写入相对位移值
_entries += 1
_lastEntry = TimestampOffset(timestamp, offset)
require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
}
}
}
位移索引和时间戳索引的区别是什么?
AbstractIndex.scala:抽象类,封装了所有索引的公共操作 OffsetIndex.scala:位移索引,保存了位移值和对应磁盘物理位置的关系 TimeIndex.scala:时间戳索引,保存了时间戳和对应位移值的关系 TransactionIndex.scala:事务索引,启用Kafka事务之后才会出现这个索引
sendfile 是最高性能的零复制技术
传统拷贝,从磁盘读取文件并发送到网络的流程
read buffer: 读缓冲区,操作系统的 page cache socket buffer: 套接字缓冲区,OS 用于管理数据包的字节缓冲区 NIC buffer: 网卡中的字节缓冲区 DMA copy: DMA 是 Direct Memory Access 的缩写,是内存控制器的一个功能,可以避免 CPU 的干预,允许硬件(图形卡、声卡、网卡等)直接访问内存 (RAM) 里的某些数据
应用程序(这里指 Kafka)利用 DMA copy 从磁盘 load 数据到 read buffer( 用户态->内核态
)read buffer 到应用程序的缓存区( 内核态->用户态
)应用程序要发数据到网络上,实际是先写到 socket buffer( 用户态->内核态
)socket buffer 到 NIC buffer(响应数据写完之后,由内核态返回用户态)
零拷贝,从磁盘读取文件并发送到网络的流程
4 次模式切换变成了 2 次 2 次 DMA 拷贝,仍然是 2 次 1 次微小的指针拷贝
第一篇:零基础 ,穿透 零复制 底层原理 (文章阅读量破W,点击此链接学习) 或者,参见尼恩的超基础的硬核架构视频:《尼恩Java硬核架构班第16章:RocketMQ第一部曲:葵花宝典(高性能秘籍)架构师视角解读OS底层的mmap、pagecache、zerocopy等底层的底层知识 》 (点击此链接学习)
Kafka的log 文件 与日志格式的演变
v0版本
v1版本
v2版本
消息的批量生产
消息的log 文件存储
log文件:通过append的方式向文件内进行追加,每个Segment对应一个log文件 index文件:索引文件,每隔4K存储一次offset+position,帮助快速定位指定位点的文件position用的
Kafka 写入日志的步骤
每个分区对应的日志对象管理了分区的所有日志分段。 将消息集追加到当前活动的日志分段,任何时刻,都只会有一个活动的日志分段 每个日志分段对应一个数据文件和索引文件,消息内容会追加到 Log 数据文件中。 操作底层数据的接口是文件通道,消息集提供一个writeFullyTo()方法,参数是文件通道 消息集(ByteBufferMessageSet)的writeFullyTo()方法,调用文件通道的write()方法,将底层包含消息内容的字节缓冲区(ByteBuffer)写到 File文件通道中。 字节缓冲区写到File 文件通道中,消息就持久化到日志分段对应的log 分段 数据文件中了
生产者发送消息时的 消息集
// Java版本的生产者客户端传递的消息内容是ByteBuffer,无需额外处理
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
// Scala版本的客户端传递Message对象,要将消息集填充到字节缓冲区中
def this(codec: CompressionCodeccounter:LongRef,messages: Message*) {
// create()的返回值是ByteBuffer,通过this()再调用类级别的构造函数
this(create(0ffsetAssigner(counter,messages.size),messages:_*))
}
}
第一批消息有4条消息,对应的偏移量是[0,1,2,3]; 第二批消息有3条消息,对应的偏移量是[0,1,2]。
每条消息的第一部分内容是偏移量。
每条消息的第二部分是当前这条消息的长度。
第三部分是消息的具体内容,
Kafka 日志追加方式
对客户端传递的消息集进行验证,确保每条消息的(相对)偏移量都是单调递增的。 删除消息集中无效的消息。如果大小一致,直接返回messages,否则会进行截断。 为有效消息集的每条消息分配(绝对)偏移量。 将更新了偏移量值的消息集追加到当前日志分段中。 更新日志的偏移量(下一个偏移量 nextOffsetMetadata )必要时调用flush()方法刷写磁盘。
Log#append
方法,用于往 Log 对象中追加消息数据。Log#append
实现如下:def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
// 1. 解析、校验待追加的消息数据,封装成 LogAppendInfo 对象
val appendInfo = this.analyzeAndValidateRecords(records)
// 如果消息数据个数为 0,则直接返回
if (appendInfo.shallowCount == 0) return appendInfo
// 2. 剔除待追加消息中未通过验证的字节部分
var validRecords = this.trimInvalidBytes(records, appendInfo)
try {
// 将待追加消息中剩余有效的字节追加到 Log 对象中
lock synchronized {
// 3.1 如果指定需要分配 offset
if (assignOffsets) {
// 获取当前 Log 对象对应的最后一个 offset 值,以此开始向后分配 offset
val offset = new LongRef(nextOffsetMetadata.messageOffset)
// 更新待追加消息的 firstOffset 为 Log 对象最后一个 offset 值
appendInfo.firstOffset = offset.value
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
// 对消息(包括压缩后的)的 magic 值进行统一
// 验证数据完整性,并分配 offset,同时按要求更新消息的时间戳
LogValidator.validateMessagesAndAssignOffsets(
validRecords,
offset,
now,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
config.messageFormatVersion.messageFormatVersion,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs)
} catch {
case e: IOException =>
.....
}
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
// 更新待追加消息的 lastOffset 值
appendInfo.lastOffset = offset.value - 1
// 如果时间戳类型为 LOG_APPEND_TIME,则修改时间戳
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
// 如果在执行 validateMessagesAndAssignOffsets 操作时修改了消息的长度,
//则需要重新验证,防止消息过长
if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
for (logEntry <- validRecords.shallowEntries.asScala) {
if (logEntry.sizeInBytes > config.maxMessageSize) {
.....
}
}
}
}
// 3.2 不需要分配 offset
else {
// 如果消息的 offset 不是单调递增,或者消息的 firstOffset 小于 Log 中记录的下一条消息 offset,则说明 appendInfo 非法
if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
...
}
// 4. 校验待追加消息的长度,保证不超过了单个 LogSegment 所允许的最大长度(对应 segment.bytes 配置)
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException(
"Message set size is %d bytes which exceeds the maximum configured segment size of %s.".format(validRecords.sizeInBytes, config.segmentSize))
}
// 5. 获取 activeSegment 对象,如果需要则创建新的 activeSegment 对象
val segment = this.maybeRoll(
messagesSize = validRecords.sizeInBytes,
maxTimestampInMessages = appendInfo.maxTimestamp,
maxOffsetInMessages = appendInfo.lastOffset)
// 6. 往 activeSegment 中追加消息
segment.append(
firstOffset = appendInfo.firstOffset,
largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
// 7. 更新 LEO 中记录的当前 Log 最后一个 offset 值
this.updateLogEndOffset(appendInfo.lastOffset + 1)
...
// 8. 如果刷盘时间间隔达到阈值(对应 flush.messages 配置),则执行刷盘
if (unflushedMessages >= config.flushInterval)
this.flush() // 将 [recoveryPoint, logEndOffset) 之间的数据刷盘
appendInfo
}
} catch {
...
}
}
生产者发送消息集给服务端,服务端会将这一批消息追加到日志中。 每条消息需要指定绝对偏移量,服务端会用nextoffsetMetadata的值作为起始偏移量。 服务端将每条带有偏移量的消息写入到日志分段中。 服务端会获取这一批消息中最后一条消息的偏移量,加上一后更新nextoffsetMetadata。 消费线程(消费者或备份副本)会根据这个变量的最新值拉取消息。一旦变量值发生变化消费线程就能拉取到新写入的消息。
LogSegment#append
方法的实现,该方法用于往当前 LogSegment 对应的 log 文件中追加消息数据,并在需要时更新对应的 index 和 timeindex 索引数据。LogSegment#append
方法实现如下:def append(firstOffset: Long, // 待追加消息的起始 offset
largestOffset: Long, // 待追加消息中的最大 offset
largestTimestamp: Long, // 待追加消息中的最大时间戳
shallowOffsetOfMaxTimestamp: Long, // 最大时间戳消息对应的 offset
records: MemoryRecords) { // 待追加的消息数据
if (records.sizeInBytes > 0) {
...
// 获取物理位置(当前分片的大小)
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp)
require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
// 将消息数据追加到 log 文件
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
// 更新已追加的消息对应的最大时间戳,及其 offset
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
}
// 如果当前累计追加的日志字节数超过阈值(对应 index.interval.bytes 配置)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
// 更新 index 和 timeindex 文件
index.append(firstOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0 // 重置当前累计追加的日志字节数
}
// 更新累计加入的日志字节数
bytesSinceLastIndexEntry += records.sizeInBytes
}
}
FileRecords#append
方法将消息数据追加到对应的 log 文件中,并更新本地记录的已追加消息的最大时间戳及其 offset。index.interval.bytes
配置)建立索引项,如果当前累计追加的消息字节数超过该配置值,则 Kafka 会更新对应的 index 和 timeindex 数据。broker如何 分析和验证消息集?
//对要追加的消息集进行分析和验证,消息太大或者无效会被丢弃
def analyzeAndValidateMessageSet(messages:ByteBufferMessageSet)={
var shallowMessageCount = 0 //消息数量
var validBytesCount =0 //有效字节数
//第一条消息和最后一条 (循环时表示上一条消息的偏移量)消息的偏移量
var firstOffset,astoffset = -1L
var monotonic =true // 是否单调递增
for(messageAndOffset <- messages,shallowIterator) {
// 在第一条消息中更新firstoffset
if(firstOffset <0) firstOffset = messageAndOffset.offset
if(lastOffset >= messageAndOffset.offset) monotonic = false
//每循环一条消息,就更新
lastoffsetLastOffset = messageAndOffset.offset
val m= messageAndoffset.message
val messageSize = MessageSet.entrySize(m)
m.ensureValid()//检查消息是否有效
shallowMessageCount +=1
validBytesCount += messageSize
}
LogAppendInfo(firstOffset,lastOffset, sourceCodec,targetCodec,shallowMessageCount,validBytesCount,monotonic)
}
broker如何为消息集分配绝对偏移量?
// 消息集 加到 志,获取最近的偏移量作为初始佳
class Log{
def append(l'lessages : ByteBuffe MessageSet) {
// nextOffsetMetadata 表示最近 一条消息的偏移量
val offset = new AtomicLong(nextOffsetMetadata.messageOffset)
//offset参数作为原子变量,在分配偏移量时,先获取出值再加一
validMessages=validMessages.validateMessagesAndAssignOffsets(offset)
//offset的返回值是最后一条消息的偏移量再加一,那么最后一条消息就要减一
appendInfo.lastOffset = offset.get - 1
segment.append(appendInfo.firstoffset,validMessages) // 追加消息集
//更新nextoffsetMetadata,用最后一条消息的偏移量加一表示最近下一条
updateLogEndOffset(appendInfo.lastOffset +1)
}
}
//字节缓冲区消息集根据指定的偏移量计数器、更新每条消息的偏移量
class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet {
def validateMessagesAndAssignOffsets(offsetCounter:AtomicLong)={
var messagePosition =0
buffer.mark()//先标记
while(messagePosition < sizeInBytes - MessageSet.LogOverhead){
buffer.position(messagePosition)// 定位到每条消息的起始位置
//以最新的偏移量计数器为基础,每条消息的偏移量都在此基础上不断加一
buffer.putLong(offsetCounter.getAndIncrement())
val messageSize= buffer.getInt()
// 消息的大小//更新消息的起始位置,为下一条消息做准备 (12+消息大小,表示一条完整的消息)
messagePosition += MessageSet.LogOverhead + messageSize
buffer.reset()//重置的时候,回到最开始标记的地方
this// 还是返回字节缓冲区消息集。除了偏移量改了,其他均没有变化
}
}
一种是按照顺序完整地读取每条消息,这种方式代价比较大,我们实际上只需要更改偏移量,不需要读取每条消息的实际内容; 另一种是先读取出消息大小的值,然后计算下一条消息的起始偏移量,最后直接用字节缓冲区提供的定位方法(position())直接定位到下一条消息的起始位置。
Kafak通过顺序写实现写入的高性能
寻道
、旋转
和数据传输
三个步骤。寻道(时间):磁头移动定位到指定磁道; 旋转延迟(时间):等待指定扇区从磁头下旋转经过; 数据传输(时间):数据在磁盘、内存与网络之间的实际传输。
寻道
、旋转
可以极大地提高磁盘读写的性能。顺序写
文件,基本减少了磁盘寻道
和旋转
的次数。磁头再也不用在磁道上乱舞了,而是一路向前飞速前行。Kafak通过sendfile 零拷贝实现发送的高性能
4 次模式切换变成了 2 次 2 次 DMA 拷贝,仍然是 2 次 1 次微小的指针拷贝
消费者从broker拉取数据
@Override
public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
throw new KafkaException(String.format(
"Size of FileRecords %s has been truncated during write: old size %d, new size %d",
file.getAbsolutePath(), oldSize, newSize));
long position = start + offset;
long count = Math.min(length, oldSize - offset);
return destChannel.transferFrom(channel, position, count);
}
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}
Kafka为啥达到100Wtps高性能?
第一篇:零基础 ,穿透 零复制 底层原理 (文章阅读量破W,点击此链接学习) 第2篇:从0到1: 穿透Kafka 0复制的底层原理和实操 (本文) 第3篇:从0到1: 穿透Netty 0复制的底层原理和实操 (规划中,具体参见尼恩的公号) 第4篇:从0到1: 穿透Rocketmq 0复制的底层原理和实操 (规划中,具体参见尼恩的公号) 相关视频:《尼恩Java硬核架构班第16章:RocketMQ第一部曲:葵花宝典(高性能秘籍)架构师视角解读OS底层的mmap、pagecache、zerocopy等底层的底层知识 》 (点击此链接学习)
说在最后:有问题找老架构取经
被裁之后, 空窗1年/空窗2年, 如何 起死回生 ?
案例1:42岁被裁2年,天快塌了,急救1个月,拿到开发经理offer,起死回生
案例2:35岁被裁6个月, 职业绝望,转架构急救上岸,DDD和3高项目太重要了
案例3:失业15个月,学习40天拿offer, 绝境翻盘,如何实现?
被裁之后,100W 年薪 到手, 如何 人生逆袭?
100W案例,100W年薪的底层逻辑是什么? 如何实现年薪百万? 如何远离 中年危机?
如何 逆天改命,包含AI、大数据、golang、Java 等
实现职业转型,极速上岸
关注职业救助站公众号,获取每天职业干货
助您实现职业转型、职业升级、极速上岸
---------------------------------
实现架构转型,再无中年危机
关注技术自由圈公众号,获取每天技术千货
一起成为牛逼的未来超级架构师
几十篇架构笔记、5000页面试宝典、20个技术圣经
请加尼恩个人微信 免费拿走
暗号,请在 公众号后台 发送消息:领电子书
如有收获,请点击底部的"在看"和"赞",谢谢