0 前言
今天我们终于迎来专题——基于 go 语言从零到一实现 lsm tree 的完结篇——level sorted merge 流程篇. 在此先回顾一下本系列的全部内容:
• 基于go实现lsm tree之level sorted merge流程(本篇)
关于以上内容的源码均已上传到我的个人开源项目中,大家根据需要自取,如果觉得我的代码写得还凑合,可以留个 star,不胜感激.
开源项目 golsm 传送门 >> https://github.com/xiaoxuxiansheng/golsm
1 流程介绍
1.1 lsm compact 机制
首先,关于 lsm tree 的 level sorted merge 流程(简称 compact 流程),我们在此再针对这部分理论进行铺垫介绍:
• lsm tree 采取磁盘分层架构,将存储数据的 sst 文件分为 level0-levelk 共计 k+1 个层级
• level(i+1) 中 sst 文件的容量大小约为 level(i) 的 T 倍,T 为常量,通常取值为 10 左右
• 数据流向由浅入深,层层递进,即由 level(i) -> level(i+1)
• 内存有序表 memtable 的数据溢写生成 level0 的 sst 文件,因此 level0 层文件局部有序,文件间可能重复且无序
• 当某个 level 层数据容量达到阈值时,会发起 level(i) -> level(i+1) 的 compact 操作
• 通过数据流向可以看出,实时性较好的热数据位于浅层,实时性较差的冷数据位于深层. 倘若出现重复 kv 对,则最浅层一笔数据为有效数据,其余底层数据均为脏数据
• 数据从 level(i) 流向 level(i+1) 过程中,通过 compact 操作,数据去重和排序,保证汇入 level(i+1) 层后数据无重复且全局有序
• compact 操作可能产生连锁反应,即 level(i) -> level(i+1) 的 compact 流程完成后,可能导致 level(i+1) 数据容量达到阈值,进一步触发 level(i+1) -> level(i+2) 的 compact 流程
1.2 lsm compact 示例
下面通过一个示例,对 compact 流程进一步加以说明.
背景:某时刻,lsm tree 中 level1 层的数据容量达到阈值,接下来要发起 level(1) -> level(2) 的归并操作:
• 从 level1 中选取一个本轮 comact 的数据范围 [0-30] (此处选取范围的规则比较灵活)
• 查找到 level2 层与 [0-30] 有重叠的 sst 文件——[0-10] 和 [11-32]
• 按照 [0-10]、[11-32]、[0-30] 的顺序有序归并其中的 kv 对数据(如出现重复 key,level1 文件更晚写入,因此可以覆盖 level2 的脏数据)
• 将归并后的 kv 对数据写入 level2,根据 level2 文件容量阈值,可能被拆成多个文件,如 [0-16] 和 [17-32]
• 删除完成 compact 操作的旧文件 [0-10]、[11-32]、[0-30]
倘若因为这一轮 compact 操作,导致 level2 的数据容量又超出阈值,则进一步执行 level2 -> level3 的 compact 流程,以此类推.
2 核心模块
下面我们梳理具体的实现细节. 和 compact 流程有关的核心类包括 lsm node 和 lsm tree.
2.1 Node
Node 是 lsm tree 拓扑结构中对应于一个 sst 文件的节点. Node 中一方面记录了其在 lsm tree 中的拓扑信息,如层级以及序号,另一方面通过持有 sstReader、文件索引 index、块过滤器 filter 等成员属性,能够很好地支持 sst 文件的数据检索操作.
// lsm tree 中的一个节点. 对应一个 sstables
type Node struct {
conf *Config // 配置文件
file string // sstable 对应的文件名,不含目录路径
level int // sstable 所在 level 层级
seq int32 // sstable 的 seq 序列号. 对应为文件名中的 level_seq.sst 中的 seq
size uint64 // sstable 的大小,单位 byte
blockToFilter map[uint64][]byte // 各 block 对应的 filter bitmap
index []*Index // 各 block 对应的索引
startKey []byte // sstable 中最小的 key
endKey []byte // sstable 中最大的 key
sstReader *SSTReader // 读取 sst 文件的 reader 入口
}
2.2 Tree
Tree 对应的就是一棵 lsm tree 实例,其中维护了内存读写有序表 memtable 以及一系列内存只读有序表 rOnlyMemtable,并通过一个二维数组 nodes 实现了磁盘下多层级 sst 文件的拓扑结构.
// 一棵 lsm tree
type Tree struct {
conf *Config
// 读写数据时使用的锁
dataLock sync.RWMutex
// 每层 node 节点使用的读写锁
levelLocks []sync.RWMutex
// 读写 memtable
memTable memtable.MemTable
// 只读 memtable
rOnlyMemTable []*memTableCompactItem
// 预写日志写入口
walWriter *wal.WALWriter
// lsm树状数据结构
nodes [][]*Node
// memtable 达到阈值时,通过该 chan 传递信号,进行溢写工作
memCompactC chan *memTableCompactItem
// 某层 sst 文件大小达到阈值时,通过该 chan 传递信号,进行溢写工作
levelCompactC chan int
// lsm tree 停止时通过该 chan 传递信号
stopc chan struct{}
// memtable index,需要与 wal 文件一一对应
memTableIndex int
// 各层 sstable 文件 seq. sstable 文件命名为 level_seq.sst
levelToSeq []atomic.Int32
}
2.3 compact 协程
在一棵 lsm tree 实例启动时,会异步启动一个 compact goroutine,持续负责为只读 memtable 完成溢写落盘操作,以及为各 level 层完成 compact 操作.
// 构建出一棵 lsm tree
func NewTree(conf *Config) (*Tree, error) {
// 1 构造 lsm tree 实例
t := Tree{
// ...
// compact 协程接收待溢写落盘到 level0 层的只读 memtable
memCompactC: make(chan *memTableCompactItem),
// compact 协程接收某层需要执行 compact 操作的信号
levelCompactC: make(chan int),
// compact 协程退出信号
stopc: make(chan struct{}),
// 各 level 层级 sst 文件的最大序号,单调递增
levelToSeq: make([]atomic.Int32, conf.MaxLevel),
nodes: make([][]*Node, conf.MaxLevel),
// lsm tree 的 level 层级锁,保证在某个 level 层内修改操作 node 时并发安全
levelLocks: make([]sync.RWMutex, conf.MaxLevel),
}
// 2 读取 sst 文件,还原出整棵树
// ...
// 3 运行 lsm tree 压缩调整协程
go t.compact()
// 4 读取 wal 还原出 memtable
// ...
// 5 返回 lsm tree 实例
return &t, nil
}
compact 协程通过 for 循环 + select 多路复用的模式运行,持续监听 memCompactC 和 levelCompactC,接收到信号后执行对应的 flushMemtable 和 compactLevel 流程.
// 运行 compact 协程.
func (t *Tree) compact() {
for {
select {
// 接收到 lsm tree 终止信号,退出协程.
case <-t.stopc:
// log
return
// 接收到 read-only memtable,需要将其溢写到磁盘成为 level0 层 sstable 文件.
case memCompactItem := <-t.memCompactC:
t.compactMemTable(memCompactItem)
// 接收到 level 层 compact 指令,需要执行 level~level+1 之间的 level sorted merge 流程.
case level := <-t.levelCompactC:
t.compactLevel(level)
}
}
}
3 memtable 落盘
3.1 主流程
compact 协程接收到只读 memtable 后,会进入 compactMemTable 方法:
• 将 memtable 通过 sstWriter 溢写落盘到 level0 层,生成 sst 文件
• 从新生成的 sstable 包装成 lsm node,插入到 lsm tree nodes 数组的 level0 层
• 从 lsm tree 的 rOnlyMemTable 数组中移除已经完成落盘的只读 memtable
• 移除已完成落盘的 memtable 对应的 wal 文件,因为这部分数据已经安全了
// 将只读 memtable 溢写落盘成为 level0 层 sstable 文件
func (t *Tree) compactMemTable(memCompactItem *memTableCompactItem) {
// 处理 memtable 溢写工作:
// 1 memtable 溢写到 0 层 sstable 中
t.flushMemTable(memCompactItem.memTable)
// 2 从 rOnly slice 中回收对应的 table
t.dataLock.Lock()
for i := 0; i < len(t.rOnlyMemTable); i++ {
if t.rOnlyMemTable[i].memTable != memCompactItem.memTable {
continue
}
t.rOnlyMemTable = t.rOnlyMemTable[i+1:]
}
t.dataLock.Unlock()
// 3 删除相应的预写日志. 因为 memtable 落盘后数据已经安全,不存在丢失风险
_ = os.Remove(memCompactItem.walFile)
}
3.2 溢写落盘
将 memtable 溢写落盘到 level0 层的详细方法为 flushMemTable.
需要注意的是,在完成 level0 层 sst 文件溢写后,需要判断 level0 层文件容量是否达到阈值,如果达到的话,需要驱动一轮 level0 -> level1 的 compact 流程.
// 将 memtable 的数据溢写落盘到 level0 层成为一个新的 sst 文件
func (t *Tree) flushMemTable(memTable memtable.MemTable) {
// memtable 写到 level 0 层 sstable 中
seq := t.levelToSeq[0].Load() + 1
// 创建 sst writer
sstWriter, _ := NewSSTWriter(t.sstFile(0, seq), t.conf)
defer sstWriter.Close()
// 遍历 memtable 写入数据到 sst writer
for _, kv := range memTable.All() {
sstWriter.Append(kv.Key, kv.Value)
}
// sstable 落盘
size, blockToFilter, index := sstWriter.Finish()
// 构造节点添加到 tree 的 node 中
t.insertNode(0, seq, size, blockToFilter, index)
// 尝试引发一轮 compact 操作
t.tryTriggerCompact(0)
}
3.3 插入 lsm tree
通过 insertNode 方法,实现 sstable 对应 lsm node 实例的创建,并将其插入到 lsm tree nodes 数组的指定 level 层级中:
• 倘若插入到 level0 层,直接在当前层末尾追加即可(level0 层文件不保证全局去重和有序)
• 倘若插入到 level1 ~ levelk 层,则需要根据 sstable 数据 key 范围,按照大小顺序寻找到合适的位置插入(level1 ~ levelk 层文件保证全局去重和有序)
func (t *Tree) insertNode(level int, seq int32, size uint64, blockToFilter map[uint64][]byte, index []*Index) {
file := t.sstFile(level, seq)
sstReader, _ := NewSSTReader(file, t.conf)
t.insertNodeWithReader(sstReader, level, seq, size, blockToFilter, index)
}
// 插入一个 node 到指定 level 层
func (t *Tree) insertNodeWithReader(sstReader *SSTReader, level int, seq int32, size uint64, blockToFilter map[uint64][]byte, index []*Index) {
file := t.sstFile(level, seq)
// 记录当前 level 层对应的 seq 号(单调递增)
t.levelToSeq[level].Store(seq)
// 创建一个 lsm node
newNode := NewNode(t.conf, file, sstReader, level, seq, size, blockToFilter, index)
// 对于 level0 而言,只需要 append 插入 node 即可
if level == 0 {
t.levelLocks[0].Lock()
t.nodes[level] = append(t.nodes[level], newNode)
t.levelLocks[0].Unlock()
return
}
// 对于 level1~levelk 层,需要根据 node 中 key 的大小,遵循顺序插入
for i := 0; i < len(t.nodes[level])-1; i++ {
// 遵循从小到大的遍历顺序,找到首个最小 key 比 newNode 最大 key 还大的 node,将 newNode 插入在其之前
if bytes.Compare(newNode.End(), t.nodes[level][i+1].Start()) < 0 {
t.levelLocks[level].Lock()
t.nodes[level] = append(t.nodes[level][:i+1], t.nodes[level][i:]...)
t.nodes[level][i+1] = newNode
t.levelLocks[level].Unlock()
return
}
}
// 遍历完 level 层所有节点都还没插入 newNode,说明 newNode 是该层 key 值最大的节点,则 append 到最后即可
t.levelLocks[level].Lock()
t.nodes[level] = append(t.nodes[level], newNode)
t.levelLocks[level].Unlock()
}
3.4 驱动 compact
每当在 level 层中创建 sst 文件后,都需要校验当前 level 层是否有必要开启 compact 流程:
• 如果当前层已经是最深的一层,则不会执行 compact 流程
• 如果当前层 sst 文件总大小达到阈值,则往 levelCompactC 传递信号,驱动 compact 协程执行 compact 流程
func (t *Tree) tryTriggerCompact(level int) {
// 最后一层不执行 compact 操作
if level == len(t.nodes)-1 {
return
}
var size uint64
for _, node := range t.nodes[level] {
size += node.size
}
if size <= t.conf.SSTSize*uint64(math.Pow10(level))*uint64(t.conf.SSTNumPerLevel) {
return
}
go func() {
t.levelCompactC <- level
}()
}
4 level compact 流程
4.1 主流程
当 compact 协程接收到 compact 信号后,通过 compactLevel 方法驱动一轮 compact 流程:
• 首先获取到本轮 compact 所影响到的所有 lsm node(包括 level 层和 level + 1 层)
• 针对这些 lsm node 的 kv 对进行有序归并操作,出现重复 key 时,需要保留实时性更强的数据
• 通过 sstWriter,将 kv 对数据写入到 level + 1 层的 sst 文件中. 遵循 level + 1 层的文件容量阈值,必要的时候进行 sst 文件拆分
• 把新生成的 sstable 都封装成 lsm node,插入到 lsm tree 的 nodes 数组中
• 把完成 compact 操作的旧 sstable 对应 lsm node 从 lsm tree 的 nodes 数组中移除,并把这部分 sst 文件删除
• 检查 level + 1 层文件容量是否达到阈值,如有必要,驱动下一轮 compact 流程
// 针对 level 层进行排序归并操作
func (t *Tree) compactLevel(level int) {
// 获取到 level 和 level + 1 层内需要进行本次归并的节点
pickedNodes := t.pickCompactNodes(level)
// 插入到 level + 1 层对应的目标 sstWriter
seq := t.levelToSeq[level+1].Load() + 1
sstWriter, _ := NewSSTWriter(t.sstFile(level+1, seq), t.conf)
defer sstWriter.Close()
// 获取 level + 1 层每个 sst 文件的大小阈值
sstLimit := t.conf.SSTSize * uint64(math.Pow10(level+1))
// 获取本次排序归并的节点涉及到的所有 kv 数据
pickedKVs := t.pickedNodesToKVs(pickedNodes)
// 遍历每笔需要归并的 kv 数据
for i := 0; i < len(pickedKVs); i++ {
// 倘若新生成的 level + 1 层 sst 文件大小已经超限
if sstWriter.Size() > sstLimit {
// 将 sst 文件溢写落盘
size, blockToFilter, index := sstWriter.Finish()
// 将 sst 文件对应 node 插入到 lsm tree 内存结构中
t.insertNode(level+1, seq, size, blockToFilter, index)
// 构造一个新的 level + 1 层 sstWriter
seq = t.levelToSeq[level+1].Load() + 1
sstWriter, _ = NewSSTWriter(t.sstFile(level+1, seq), t.conf)
defer sstWriter.Close()
}
// 将 kv 数据追加到 sstWriter
sstWriter.Append(pickedKVs[i].Key, pickedKVs[i].Value)
// 倘若这是最后一笔 kv 数据,需要负责把 sstWriter 溢写落盘并把对应 node 插入到 lsm tree 内存结构中
if i == len(pickedKVs)-1 {
size, blockToFilter, index := sstWriter.Finish()
t.insertNode(level+1, seq, size, blockToFilter, index)
}
}
// 移除这部分被合并的节点
t.removeNodes(level, pickedNodes)
// 尝试触发下一层的 compact 操作
t.tryTriggerCompact(level + 1)
}
4.2 有序归并
通过 pickCompactNodes 方法,能够获取到本轮 compact 所涉及到的所有 lsm node:
• 以 level 层前一半 node 的最小 key 作为 startKey,最大 key 作为 endKey
• 遍历 level + 1 和 level 层所有 node,所有和 [startKey, endKey] 有重叠部分的 node 都追加到 list 进行归并
• node 进入 list 遵循 level 从大到小、index 从小到大的顺序,保证 list 中越靠后的 node 数据实时性越高
// 获取本轮 compact 流程涉及到的所有节点,范围涵盖 level 和 level+1 层
func (t *Tree) pickCompactNodes(level int) []*Node {
// 每次合并范围为当前层前一半节点
startKey := t.nodes[level][0].Start()
endKey := t.nodes[level][0].End()
mid := len(t.nodes[level]) >> 1
if bytes.Compare(t.nodes[level][mid].Start(), startKey) < 0 {
startKey = t.nodes[level][mid].Start()
}
if bytes.Compare(t.nodes[level][mid].End(), endKey) > 0 {
endKey = t.nodes[level][mid].End()
}
var pickedNodes []*Node
// 将 level 层和 level + 1 层 和 [start,end] 范围有重叠的节点进行合并
for i := level + 1; i >= level; i-- {
for j := 0; j < len(t.nodes[i]); j++ {
if bytes.Compare(endKey, t.nodes[i][j].Start()) < 0 || bytes.Compare(startKey, t.nodes[i][j].End()) > 0 {
continue
}
// 所有范围有重叠的节点都追加到 list
pickedNodes = append(pickedNodes, t.nodes[i][j])
}
}
return pickedNodes
}
通过 pickedNodesToKVs 方法,进行一系列 node 中 kv 对数据的有序归并:
• 借助 memtable,实现 kv 数据的有序排列
• 出现重复 key 时,用 node list 中靠后的 kv 对覆盖靠前的 kv 对,保证保留下来的是实时性更高的数据
// 获取本轮 compact 流程涉及到的所有 kv 对. 这个过程中可能存在重复 k,保证只保留最新的 v
func (t *Tree) pickedNodesToKVs(pickedNodes []*Node) []*KV {
// index 越小,数据越老. index 越大,数据越新
// 所以使用大 index 的数据覆盖小 index 数据,以新覆久
memtable := t.conf.MemTableConstructor()
for _, node := range pickedNodes {
kvs, _ := node.GetAll()
for _, kv := range kvs {
memtable.Put(kv.Key, kv.Value)
}
}
// 借助 memtable 实现有序排列
_kvs := memtable.All()
kvs := make([]*KV, 0, len(_kvs))
for _, kv := range _kvs {
kvs = append(kvs, &KV{
Key: kv.Key,
Value: kv.Value,
})
}
return kvs
}
4.3 移除旧节点
最后是移除一系列 lsm node 的 removeNodes 方法:
• 从 lsm tree nodes 数组中移除对应的 node
• 调用 node.Destroy 方法,删除其对应的 sst 文件. (这个流程可以异步完成,以提高 compact 协程的性能)
// 移除所有完成 compact 流程的老节点
func (t *Tree) removeNodes(level int, nodes []*Node) {
// 从 lsm tree 的 nodes 中移除老节点
outer:
for k := 0; k < len(nodes); k++ {
node := nodes[k]
for i := level + 1; i >= level; i-- {
for j := 0; j < len(t.nodes[i]); j++ {
if node != t.nodes[i][j] {
continue
}
t.levelLocks[i].Lock()
t.nodes[i] = append(t.nodes[i][:j], t.nodes[i][j+1:]...)
t.levelLocks[i].Unlock()
continue outer
}
}
}
go func() {
// 销毁老节点,包括关闭 sst reader,并且删除节点对应 sst 磁盘文件
for _, node := range nodes {
node.Destroy()
}
}()
}
func (n *Node) Destroy() {
n.sstReader.Close()
_ = os.Remove(path.Join(n.conf.Dir, n.file))
}
5 小结
祝贺!至此,我们已完成【基于go实现lsm tree】 系列的全部内容:
• 基于go实现lsm tree 之主干框架(已完成)
• 基于go实现lsm tree之memtable结构(已完成)
• 基于go实现lsm tree之sstable结构(已完成)
• 基于go实现lsm tree之level sorted merge流程(已完成)
在此做个展望,未来一段时间内会和大家一起探讨一个新的话题——如何基于 Golang 从零到一实现 B+ Tree