基于go实现lsm tree之sstable结构

文摘   科技   2024-01-26 20:58   北京  

0 前言

本期咱们延续专题——基于 go 语言从零到一实现 lsm tree 的分享内容,本专题共分为下述四篇,本文是其中的第三篇 sstable 结构篇:

关于以上内容的源码均已上传到我的个人开源项目中,大家根据需要自取,如果觉得我的代码写得还凑合,可以留个 star,不胜感激.

开源项目 golsm 传送门 >> https://github.com/xiaoxuxiansheng/golsm

 

1 原理概述

1.1 数据块 block

在 sstable 中会基于 block 的维度对数据进行逻辑分组以便进行索引 index 和过滤器 filter 的组织.

在 block 中,由于 kv 数据的长度是不确定的,为实现不同 kv 数据之间的分隔,采用的方式是通过 header 额外标识每组 kv 对的数据长度

具体而言,此处会采用固定的 bit 位作为 header 标识,形式如下:

【key 长度】 -> 【val 长度】-> 【key】->【val】

此外,又因为 sstable 中数据 key 是有序的,因此相邻 key 之间有很大概率可以复用共同的前缀,因而在实现形式上可以进一步优化:

【与前一个 key 共享前缀长度】 -> 【key 剩余部分长度】->【val 长度】-> 【key 剩余部分】->【val】

综上所述,最后 kv 对数据在 block 内的存储组织形式如下图所示:

 

1.2 索引 index

sstable 与 block 是一对多关系,在 sstable 会基于 block 的维度进行索引桩点 index 的插入,用以提高读流程的数据检索效率.

如下图所示,每个 sstable 内的 key 是全局升序的,在不同 block 块之间插入分隔的 index key(上一个 block 最大 key =< index key < 下一个 block 最小 key),并通过 index key 映射到前一个 block 的信息,包括其偏移量和大小.

这样在读流程中,可以先通过 index 快速定位到数据可能存在的 block,再基于 block 的粒度展开详细的数据检索流程.

 

1.3 过滤器 filter

block 的完整数据存储在文件中,为了尽可能减少不必要的磁盘 io 操作,此处采取的思路是针对每个 block 在内存中建立一份数据 key 的缓存在检索时先通过缓存校验 key 是否存在 block 中,倘若不存在则减少一次磁盘 io 操作.

因此,这就对这份缓存数据有了明确要求:

  • • 足够准确: 针对 key 的存在性校验不能出现误判,或者在出现误判时需要有合理的兜底策略

  • • 足够省空间: 内存资源有限,以尽可能少的空间展示 key 是否存在的概要信息,至于 kv 对数据详情,可以在判定 key 存在后,通过读磁盘文件的 block 内容获取

 

综上,在 sstable filter 的技术选型上,使用到布隆过滤器(Bloom Filter) 这一数据结构.

有关布隆过滤器的完整实现原理,可参见我之前发表的文章——布隆过滤器技术原理及应用实战.

这里简要对布隆过滤器的特性进行概述:

  • • 布隆过滤器基于 bitmap 实现存储,能够极致节省空间,具备标识某个 key 是否存在于集合的能力

  • • 布隆过滤器具有一定的假阳性误判率,即倘若数据不存在,也有一定概率被误判为数据存在

  • • 布隆过滤器假阳性误判率可以通过合理设置 m(bitmap 长度)和 k(hash 函数个数)来进行控制

 

在 sstable 的 filter 模块设计中,采用布隆过滤器的契合度是很高的(省空间且高性能),其中存在的假阳性 badcase,无外乎是针对少量本身不存在的 key 额外多了一次读 block 的 io 操作,会有一定的性能损耗,但对流程不会有负面影响.

 

在 bloom filter 中需要使用到多个线性无关的 hash 函数进行 bitmap 与数据 key 的映射. 这里考虑到 hash 运算的性能,选用 murmur3 算法. 并且在构造独立的 hash 时,采用如下思路:

  • • 提供基准函数1:h1 = murmur3(key)

  • • 提供基准函数2:h2 = h1 >> 17 | h2 << 15

实际使用到的 hash 函数通过构造两个基准函数的线性无关组合获得:

  • • hash 函数 g0 = h1 + 0 * h2

  • • hash 函数 g1 = h1 + 1 * h2

  • • ...

  • • hash 函数 gk = h1 + k * h2

 

此处使用到的 murmur3 算法开源地址:https://github.com/spaolacci/murmur3

 

1.4 sstable 文件

至此,我们已经集齐 sstable 模块中的核心要素,

  • • data block:所有 kv 对数据以 block 的形式进行分组,其内每条 kv 数据的存储形式为 【与前一个 key 共享前缀长度】 -> 【key 剩余部分长度】->【val 长度】-> 【key 剩余部分】->【val】

  • • filter block: 为减少不必要的磁盘 io ,针对每个 block 在内存维护一个 filter 快速判定 key 是否存在 block 中. 每个 data block 有一个独立的 fiter bit map,持久化时将这些内容以 kv 对的形式统一整合在 filter block 中,其中 key 为某个 block 的 offset,value 为该 block 对应的 fitler bitmap

  • • index block:为加速数据检索流程,在各 datablock 之间设定 index 桩点. 持久化时通过 kv 对的形式将这部分内容统一整合在 index block 中,其中 key 为作为桩点分隔键的 index key,value 为 index 桩点前一个 data block 的 offset 和 size

 

一份完整的 sstable 组织形式如下所示:

至此,我们聊完 sstable 的理论部分,从下一章开始我们介绍一下 lsm tree 中有关于 sstable 部分的具体实现.

 

2 block 实现

我们采用自底向上的介绍顺序,首先介绍关于 sstable 中 block 的实现细节.

2.1 数据结构

Block 的核心成员属性包括:

  • • record:存储 block 数据的缓冲区

  • • entriesCnt:当前 block 内已有的 kv 对数量

  • • prevKey:上一笔写入 block 的数据 key

// sst 文件中的数据块,和索引、过滤器为一一对应关系
type Block struct {
    conf       *Config       // lsm tree 配置文件
    buffer     [30]byte      // 用于辅助转移数据的临时缓冲区
    record     *bytes.Buffer // 用于复制溢写数据的缓冲区
    entriesCnt int           // kv 对数量
    prevKey    []byte        // 最晚一笔写入的数据的 key
}

 

// 数据块构造器
func NewBlock(conf *Config) *Block {
    return &Block{
        conf:   conf,
        record: bytes.NewBuffer([]byte{}),
    }
}

 

2.2 追加数据

 

往 block 中追加一笔 kv 对数据的流程如上图所示,理解该过程的核心是要摘清楚 key 的共享前缀长度部分和剩余长度部分.

该流程具体的实现代码如下:

// 追加一组kv对到数据块中
func (b *Block) Append(key, value []byte) {
    // 兜底执行:设置 prevKey 为当前写入的 key;累加 entriesCnt 数量
    defer func() {
        b.prevKey = append(b.prevKey[:0], key...)
        b.entriesCnt++
    }()


    // 获取和之前 key 的共享key前缀长度
    sharedPrefixLen := util.SharedPrefixLen(b.prevKey, key)


    // 分别设置共享key长度||剩余key长度||值长度
    n := binary.PutUvarint(b.buffer[0:], uint64(sharedPrefixLen))
    n += binary.PutUvarint(b.buffer[n:], uint64(len(key)-sharedPrefixLen))
    n += binary.PutUvarint(b.buffer[n:], uint64(len(value)))


    // 将 共享key长度||剩余key长度||值长度 写入 record buffer
    _, _ = b.record.Write(b.buffer[:n])
    // 将 剩余key || value 写入 record buffer
    b.record.Write(key[sharedPrefixLen:])
    b.record.Write(value)
}

 

2.3 导出数据

当 block 内数据大小达到阈值或者写流程结束时,block 内的数据会被整合到 sstable 的全局 data buffer 中,对应实现代码如下:

// 把块中的数据溢写到 dest writer 中
func (b *Block) FlushTo(dest io.Writer) (uint64, error) {
    defer b.clear()
    n, err := dest.Write(b.ToBytes())
    return uint64(n), err
}

 

// 清理数据块中的数据
func (b *Block) clear() {
    b.entriesCnt = 0
    b.prevKey = b.prevKey[:0]
    b.record.Reset()
}

 

3 filter 实现

接下来是关于 sstable 中过滤器 filter 的具体实现.

3.1 抽象 interface

在 golsm 中,将 filter 定义成一个 interface,读者可以提供任意版本的过滤器实现类进行注入,当然,也可以使用 golsm 中默认实现的布隆过滤器.

// 过滤器. 用于辅助 sstable 快速判定一个 key 是否存在于某个 block 中
type Filter interface {
    Add(key []byte)                // 添加 key 到过滤器
    Exist(bitmap, key []byte) bool // 是否存在 key
    Hash() []byte                  // 生成过滤器对应的 bitmap
    Reset()                        // 重置过滤器
    KeyLen() int                   // 存在多少个 key
}

 

3.2 bloom filter 数据结构

下面是关于布隆过滤器的类定义,需要注意一个 bloom filter 实例对应的不是 sstable 的维度,而是 sstable 中的一个 data block.

布隆过滤器类中的成员属性为:

  • • m:bitmap 的长度

  • • hashedKeys:临时缓存一系列添加到 bloom filter 的 key(首轮 hash 值)

// 布隆过滤器
type BloomFilter struct {
    m          int      // bitmap 的长度,单位 bit
    hashedKeys []uint32 // 添加到布隆过滤器的一系列 key 的 hash 值
}


// 布隆过滤器构造器
func NewBloomFilter(m int) (*BloomFilter, error) {
    if m <= 0 {
        return nil, errors.New("m must be postive")
    }
    return &BloomFilter{
        m: m,
    }, nil
}

 

3.3 添加 key

每添加一个 key 到 bloom filter 中,会使用 murmur3 hash 算法将首轮 hash 值追加到 hashedKeys slice 中:

// 添加一个 key 到布隆过滤器
func (bf *BloomFilter) Add(key []byte) {
    bf.hashedKeys = append(bf.hashedKeys, murmur3.Sum32(key))
}

 

3.4 构造位图

 

当所有 key 都完成添加后,可以调用 Hash 方法生成对应的 filter bitmap:

  • • 根据 m(预期的 bitmap 长度)和 n(当前已有的数据 key 数量)推导出合适的 k(hash 函数个数)【这里最佳公式为 k = ln2 * m / n,具体参见——布隆过滤器原理篇

  • • 根据 m 根据出一个空白的 bitmap,并将 k 值设置在最后一个 byte 位置

  • • 基于基准函数 h1 和 h2 依次构造出 k 个线性无关的 hash 函数

  • • 当前 key 经 k 个 hash 函数映射出 k 个 bit 位,分别将其置为 1

// 生成过滤器对应的 bitmap. 最后一个 byte 标识 k 的数值
func (bf *BloomFilter) Hash() []byte {
    // k: 根据 m 和 n 推导出最佳 hash 函数个数
    k := bf.bestK()
    // 获取出一个空的 bitmap,最后一个 byte 位值设置为 k
    bitmap := bf.bitmap(k)


    // 第一个基准 hash 函数 h1 = murmur3.Sum32
    // 第二个基准 hash 函数 h2 = h1 >> 17 | h2 << 15
    // 之后所有使用的 hash 函数均通过 h1 和 h2 线性无关的组合生成
    // 第 i 个 hash 函数 gi = h1 + i * h2
    for _, hashedKey := range bf.hashedKeys {
        // hashedKey 为 h1
        // delta 为 h2
        delta := (hashedKey >> 17) | (hashedKey << 15)
        for i := uint32(0); i < uint32(k); i++ {
            // 第 i 个 hash 函数 gi = h1 + i * h2
            // 需要标记为 1 的 bit 位
            targetBit := (hashedKey + i*delta) % uint32(len(bitmap)<<3)
            bitmap[targetBit>>3] |= (1 << (targetBit & 7))
        }
    }


    return bitmap
}

 

// 生成一个空的 bitmap
func (bf *BloomFilter) bitmap(k uint8) []byte {
    // bytes = bits / 8 (向上取整)
    bitmapLen := (bf.m + 7) >> 3
    bitmap := make([]byte, bitmapLen+1)
    // 最后一位标识 k 的信息
    bitmap[bitmapLen] = k
    return bitmap
}

 

// 根据 m 和 n 推算出最佳的 k
func (bf *BloomFilter) bestK() uint8 {
    // k 最佳计算公式:k = ln2 * m / n m——bitmap 长度 n——key个数
    k := uint8(69 * bf.m / 100 / len(bf.hashedKeys))
    // k ∈ [1,30]
    if k < 1 {
        k = 1
    }
    if k > 30 {
        k = 30
    }
    return k
}

 

3.5 判断 key

判断 key 是否存在于一个 filter bitmap 中,采取如下步骤:

  • • 从 bitmap 最后一个 byte 取出 k 值

  • • 基于基准函数 h1 和 h2 依次构造出 k 个线性无关的 hash 函数

  • • key 基于 k 个 hash 函数映射到 k 个 bit 位上

  • • 依次校验每个 bit 位,但凡一个不为 1,则代表 key 一定不存在;如果所有 bit 位均为 1,则判断 key 存在(可能有假阳性误判)

// 判断过滤器中是否存在 key(注意,可能存在假阳性误判问题)
func (bf *BloomFilter) Exist(bitmap, key []byte) bool {
    // 生成 bitmap 时,需要把哈希函数个数 k 的值设置在 bitmap 的最后一个 byte 上
    if bitmap == nil {
        bitmap = bf.Hash()
    }
    // 获取hash 函数的个数 k
    k := bitmap[len(bitmap)-1]


    // 第一个基准 hash 函数 h1 = murmur3.Sum32
    // 第二个基准 hash 函数 h2 = h1 >> 17 | h2 << 15
    // 之后所有使用的 hash 函数均通过 h1 和 h2 线性无关的组合生成
    // 第 i 个 hash 函数 gi = h1 + i * h2


    // h1
    hashedKey := murmur3.Sum32(key)
    // h2
    delta := (hashedKey >> 17) | (hashedKey << 15)
    for i := uint32(0); i < uint32(k); i++ {
        // gi = h1 + i * h2
        targetBit := (hashedKey + i*delta) % uint32(len(bitmap)<<3)
        // 找到对应的 bit 位,如果值为 1,则继续判断;如果值为 0,则 key 肯定不存在
        if bitmap[targetBit>>3]&(1<<(targetBit&7)) == 0 {
            return false
        }
    }


    // key 映射的所有 bit 位均为 1,则认为 key 存在(存在误判概率)
    return true
}

 

4 sstWriter 实现

与 sstable 文件交互的流程,可以分为写入和读取两个流程. sstWriter 部分以写入流程的视角展开.

4.1 数据结构

SSTWriter 是 sst 文件的写入口,其中涉及到的核心成员属性如下:

  • • dest:对应的 sst 文件

  • • dataBuf:将 kv 数据写入 sst 文件前使用的缓冲区

  • • filterBuf:将 filter 数据写入 sst 文件前使用的缓冲区

  • • indexBuf:将 index 数据写入 sst 文件前使用的缓冲区

  • • blockToFilter:在内存中建立的 block 与 filter bitmap 的映射关系

  • • index:在内存中建立的所有内存桩点的 list

  • • dataBlock:存储 kv 数据的数据块

  • • filterBlock:存储 filter 数据的过滤器块

  • • indexBlock:存储 index 数据的索引块

  • • prevKey:上一笔写入数据的 key

  • • prevBlockOffset:前一个数据块的 offset

  • • prevBlockSize:前一个数据块的 size

// 对应于 lsm tree 中的一个 sstable. 这是写入流程的视角
type SSTWriter struct {
    conf          *Config           // 配置文件
    dest          *os.File          // sstable 对应的磁盘文件
    dataBuf       *bytes.Buffer     // 数据块缓冲区 key -> val
    filterBuf     *bytes.Buffer     // 过滤器块缓冲区 prev block offset -> filter bit map
    indexBuf      *bytes.Buffer     // 索引块缓冲区 index key -> prev block offset, prev block size
    blockToFilter map[uint64][]byte // prev block offset -> filter bit map
    index         []*Index          // index key -> prev block offset, prev block size


    dataBlock     *Block   // 数据块
    filterBlock   *Block   // 过滤器块
    indexBlock    *Block   // 索引块
    assistScratch [20]byte // 用于在写索引块时临时使用的辅助缓冲区


    prevKey         []byte // 前一笔数据的 key
    prevBlockOffset uint64 // 前一个数据块的起始偏移位置
    prevBlockSize   uint64 // 前一个数据块的大小
}

 

Index 是不同 dataBlock 之间设立的索引桩点:

// sstable 中用于快速检索 block 的索引
type Index struct {
    Key             []byte // 索引的 key. 保证其 >= 前一个 block 最大 key; < 后一个 block 的最小 key
    PrevBlockOffset uint64 // 索引前一个 block 起始位置在 sstable 中对应的 offset
    PrevBlockSize   uint64 // 索引前一个 block 的大小,单位 byte
}

 

// sstWriter 构造器
func NewSSTWriter(file string, conf *Config) (*SSTWriter, error) {
    dest, err := os.OpenFile(path.Join(conf.Dir, file), os.O_CREATE|os.O_WRONLY, 0644)
    if err != nil {
        return nil, err
    }


    return &SSTWriter{
        conf:          conf,
        dest:          dest,
        dataBuf:       bytes.NewBuffer([]byte{}),
        filterBuf:     bytes.NewBuffer([]byte{}),
        indexBuf:      bytes.NewBuffer([]byte{}),
        blockToFilter: make(map[uint64][]byte),
        dataBlock:     NewBlock(conf),
        filterBlock:   NewBlock(conf),
        indexBlock:    NewBlock(conf),
        prevKey:       []byte{},
    }, nil
}

 

4.2 追加数据

下面是将一组 kv 数据写入 sstWriter 的流程:

  • • 每启用一个空白的 dataBlock,需要添加新的索引桩点

  • • 将 kv 数据追加到 dataBlock 中

  • • 将 key 添加到 dataBlock 对应的 fitler 中

  • • 记录一下 prevKey 为最新的 key

  • • 倘若 dataBlock 大小达到阈值,则需要将 dataBlock 中的内容溢写到 sstWriter 的 dataBuf 中,并对 dataBlock 进行重置

// 追加一笔数据到 sstable 中
func (s *SSTWriter) Append(key, value []byte) {
    // 倘若开启一个新的数据块,需要添加索引
    if s.dataBlock.entriesCnt == 0 {
        s.insertIndex(key)
    }


    // 将数据写入到数据块中
    s.dataBlock.Append(key, value)
    // 将 key 添加到块的布隆过滤器中
    s.conf.Filter.Add(key)
    // 记录一下最新的 key
    s.prevKey = key


    // 倘若数据块大小超限,则需要将其添加到 dataBuffer,并重置块
    if s.dataBlock.Size() >= s.conf.SSTDataBlockSize {
        s.refreshBlock()
    }
}

 

func (s *SSTWriter) insertIndex(key []byte) {
    // 获取索引的 key
    indexKey := util.GetSeparatorBetween(s.prevKey, key)
    n := binary.PutUvarint(s.assistScratch[0:], s.prevBlockOffset)
    n += binary.PutUvarint(s.assistScratch[n:], s.prevBlockSize)


    s.indexBlock.Append(indexKey, s.assistScratch[:n])
    s.index = append(s.index, &Index{
        // 索引 key
        Key:             indexKey,
        // 前一个 block 的 offset
        PrevBlockOffset: s.prevBlockOffset,
        // 前一个 block 的 size
        PrevBlockSize:   s.prevBlockSize,
    })
}

 

func (s *SSTWriter) refreshBlock() {
    if s.conf.Filter.KeyLen() == 0 {
        return
    }


    s.prevBlockOffset = uint64(s.dataBuf.Len())
    // 获取 block 对应的 filter bitmap
    filterBitmap := s.conf.Filter.Hash()
    s.blockToFilter[s.prevBlockOffset] = filterBitmap
    n := binary.PutUvarint(s.assistScratch[0:], s.prevBlockOffset)
    // 将布隆过滤器数据写入到 filter bitmap
    s.filterBlock.Append(s.assistScratch[:n], filterBitmap)
    // 重置布隆过滤器
    s.conf.Filter.Reset()


    // 将 block 的数据添加到缓冲区
    s.prevBlockSize, _ = s.dataBlock.FlushTo(s.dataBuf)
}

 

4.3 溢写落盘

 

在完成 sstWriter 的所有写入操作后,可以通过 Finish 方法实现 sst 文件的落盘:

  • • 完成最后一个 dataBlock 的溢写操作,写入到 data buffer(准备写入 sst 文件)

  • • 补齐最后一个索引桩点

  • • 将 filterBlock 数据写入到 filter buffer(准备写入 sst 文件)

  • • 将 indexBlock 数据写入到 index buffer (准备写入 sst 文件)

  • • 将 data buffer、filter buffer、index buffer 数据分别写入 sst 文件

  • • 将记录 filter 部分偏移量、大小和 index 部分偏移量、大小的 footer 写入 sst 文件

// 完成 sstable 的全部处理流程,包括将其中的数据溢写到磁盘,并返回信息供上层的 lsm 获取缓存
func (s *SSTWriter) Finish() (size uint64, blockToFilter map[uint64][]byte, index []*Index) {
    // 完成最后一个块的处理
    s.refreshBlock()
    // 补齐最后一个 index
    s.insertIndex(s.prevKey)


    // 将布隆过滤器块写入缓冲区
    _, _ = s.filterBlock.FlushTo(s.filterBuf)
    // 将索引块写入缓冲区
    _, _ = s.indexBlock.FlushTo(s.indexBuf)


    // 处理 footer,记录布隆过滤器块起始、大小、索引块起始、大小
    footer := make([]byte, s.conf.SSTFooterSize)
    size = uint64(s.dataBuf.Len())
    n := binary.PutUvarint(footer[0:], size)
    filterBufLen := uint64(s.filterBuf.Len())
    n += binary.PutUvarint(footer[n:], filterBufLen)
    size += filterBufLen
    n += binary.PutUvarint(footer[n:], size)
    indexBufLen := uint64(s.indexBuf.Len())
    n += binary.PutUvarint(footer[n:], indexBufLen)
    size += indexBufLen


    // 依次写入文件
    _, _ = s.dest.Write(s.dataBuf.Bytes())
    _, _ = s.dest.Write(s.filterBuf.Bytes())
    _, _ = s.dest.Write(s.indexBuf.Bytes())
    _, _ = s.dest.Write(footer)


    blockToFilter = s.blockToFilter
    index = s.index
    return
}

 

5 sstReader 实现

接下来通过 sstReader 部分以读取流程的视角展开:

5.1 数据结构

SSTReader 是 sst 文件的读出口,其中涉及到的核心成员属性如下:

  • • src:对应的 sst 文件

  • • reader:封装了 sst 文件的读取器

  • • filterOffset:过滤器部分数据的起始偏移量

  • • filterSize:过滤器数据的大小

  • • indexOffset:索引部分数据的起始偏移量

  • • indexSize:索引部分数据的大小

// 对应于 lsm tree 中的一个 sstable. 这是读取流程的视角
type SSTReader struct {
    conf         *Config       // 配置文件
    src          *os.File      // 对应的文件
    reader       *bufio.Reader // 读取文件的 reader
    filterOffset uint64        // 过滤器块起始位置在 sstable 的 offset
    filterSize   uint64        // 过滤器块的大小,单位 byte
    indexOffset  uint64        // 索引块起始位置在 sstable 的 offset
    indexSize    uint64        // 索引块的大小,单位 byte
}


// sstReader 构造器
func NewSSTReader(file string, conf *Config) (*SSTReader, error) {
    src, err := os.OpenFile(path.Join(conf.Dir, file), os.O_RDONLY, 0644)
    if err != nil {
        return nil, err
    }


    return &SSTReader{
        conf:   conf,
        src:    src,
        reader: bufio.NewReader(src),
    }, nil
}

 

5.2 读取 footer

 

在 sstReader 读取数据前,需要先读取 footer 信息,获取到过滤器和索引部分的元数据信息:

// 读取 sstable footer 信息,赋给 sstreader 的成员属性
func (s *SSTReader) ReadFooter() error {
    // 从尾部开始倒退 sst footer size 大小的偏移量
    if _, err := s.src.Seek(-int64(s.conf.SSTFooterSize), io.SeekEnd); err != nil {
        return err
    }


    s.reader.Reset(s.src)


    var err error
    if s.filterOffset, err = binary.ReadUvarint(s.reader); err != nil {
        return err
    }


    if s.filterSize, err = binary.ReadUvarint(s.reader); err != nil {
        return err
    }


    if s.indexOffset, err = binary.ReadUvarint(s.reader); err != nil {
        return err
    }


    if s.indexSize, err = binary.ReadUvarint(s.reader); err != nil {
        return err
    }


    return nil
}

 

5.3 读取 index

 

通过 ReadIndex 方法,可以从 sst 文件中读取到索引部分信息:

  • • 通过 index 部分的偏移量和大小,精准读出 index block 部分数据

  • • 按照 index block 协议对数据进行解析,获取到 index 信息

// 读取索引块
func (s *SSTReader) ReadIndex() ([]*Index, error) {
    // 如果 footer 信息还没读取,则先完成 footer 信息加载
    if s.indexOffset == 0 || s.indexSize == 0 {
        if err := s.ReadFooter(); err != nil {
            return nil, err
        }
    }


    // 读取 index block 块的内容
    indexBlock, err := s.ReadBlock(s.indexOffset, s.indexSize)
    if err != nil {
        return nil, err
    }


    // 对 index block 块的内容进行解析
    return s.readIndex(indexBlock)
}

 

// 读取一个 block 块的内容
func (s *SSTReader) ReadBlock(offset, size uint64) ([]byte, error) {
    // 根据起始偏移量,设置文件的 offset
    if _, err := s.src.Seek(int64(offset), io.SeekStart); err != nil {
        return nil, err
    }
    s.reader.Reset(s.src)


    // 读取指定 size 的内容
    buf := make([]byte, size)
    _, err := io.ReadFull(s.reader, buf)
    return buf, err
}

 

// 解析 index block 块的内容
func (s *SSTReader) readIndex(block []byte) ([]*Index, error) {
    var (
        index   []*Index
        prevKey []byte
    )


    // 将 index block 块内容封装成一个 buffer
    buf := bytes.NewBuffer(block)
    for {
        // 每次读取一条 index 记录,key 为 block 之间的分隔键,value 为前一个 block 的 offset 和 size
        key, value, err := s.ReadRecord(prevKey, buf)
        if errors.Is(err, io.EOF) {
            break
        }
        if err != nil {
            return nil, err
        }


        blockOffset, n := binary.Uvarint(value)
        blockSize, _ := binary.Uvarint(value[n:])
        index = append(index, &Index{
            Key:             key,
            PrevBlockOffset: blockOffset,
            PrevBlockSize:   blockSize,
        })


        prevKey = key
    }
    return index, nil
}

 

// 读取一条 kv 对数据
func (s *SSTReader) ReadRecord(prevKey []byte, buf *bytes.Buffer) (key, value []byte, err error) {
    // 获取当前 key 和 prevKey 的共享前缀长度
    sharedPrexLen, err := binary.ReadUvarint(buf)
    if err != nil {
        return nil, nil, err
    }


    // 获取当前 key 剩余部分长度
    keyLen, err := binary.ReadUvarint(buf)
    if err != nil {
        return nil, nil, err
    }


    // 获取 val 长度
    valLen, err := binary.ReadUvarint(buf)
    if err != nil {
        return nil, nil, err
    }


    // 读取 key 剩余部分
    key = make([]byte, keyLen)
    if _, err = io.ReadFull(buf, key); err != nil {
        return nil, nil, err
    }


    // 读取 val
    value = make([]byte, valLen)
    if _, err = io.ReadFull(buf, value); err != nil {
        return nil, nil, err
    }


    // 拼接 key 共享前缀 + 剩余部分
    sharedPrefix := make([]byte, sharedPrexLen)
    copy(sharedPrefix, prevKey[:sharedPrexLen])
    key = append(sharedPrefix, key...)
    // 返回完整的 key、val
    return
}

 

5.4 读取 filter

 

通过 ReadFilter 方法,可以从 sst 文件中读取到过滤器部分信息:

  • • 通过 filter 部分的偏移量和大小,精准读出 filter block 部分数据

  • • 按照 filter block 协议对数据进行解析,获取到 filter 信息

// 读取过滤器
func (s *SSTReader) ReadFilter() (map[uint64][]byte, error) {
    // 如果 footer 信息还没读取,则先完成 footer 信息加载
    if s.filterOffset == 0 || s.filterSize == 0 {
        if err := s.ReadFooter(); err != nil {
            return nil, err
        }
    }


    // 读取 filter block 块的内容
    filterBlock, err := s.ReadBlock(s.filterOffset, s.filterSize)
    if err != nil {
        return nil, err
    }


    // 对 filter block 块的内容进行解析
    return s.readFilter(filterBlock)
}

 

// 解析 filter block 块的内容
func (s *SSTReader) readFilter(block []byte) (map[uint64][]byte, error) {
    blockToFilter := make(map[uint64][]byte)
    // 将 filter block 块内容封装成一个 buffer
    buf := bytes.NewBuffer(block)
    var prevKey []byte
    for {
        // 每次读取一条 block filter 记录,key 为 block 的 offset,value 为过滤器 bitmap
        key, value, err := s.ReadRecord(prevKey, buf)
        if errors.Is(err, io.EOF) {
            break
        }
        if err != nil {
            return nil, err
        }


        blockOffset, _ := binary.Uvarint(key)
        blockToFilter[blockOffset] = value
        prevKey = key
    }


    return blockToFilter, nil
}

 

5.5 读取 kv 数据

 

通过 ReadBlockData 方法,可以从 sst 文件中读取指定 block 中的 kv 对数据:

// 读取某个 block 的数据
func (s *SSTReader) ReadBlockData(block []byte) ([]*KV, error) {
    // 需要临时记录前一个 key 的内容
    var prevKey []byte
    // block 数据封装成 buffer
    buf := bytes.NewBuffer(block)
    var data []*KV


    for {
        // 每次读取一条 kv 对
        key, value, err := s.ReadRecord(prevKey, buf)
        if errors.Is(err, io.EOF) {
            break
        }
        if err != nil {
            return nil, err
        }


        data = append(data, &KV{
            Key:   key,
            Value: value,
        })
        // 对 prevKey 进行更新
        prevKey = key
    }
    return data, nil
}

 

通过 ReadData 方法,可以从 sst 文件中读取到全量 data block 中的 kv 对数据:

  • • 通过 filter 模块起始偏移量,推导得到全量 data block 的起始和终止位置

  • • 读取出全量 data block 的数据

  • • 解析全量 data block 数据转换成一系列 kv 对

// 读取 sstable 下的全量 kv 数据
func (s *SSTReader) ReadData() ([]*KV, error) {
    // 如果 footer 信息还没读取,则先完成 footer 信息加载
    if s.indexOffset == 0 || s.indexSize == 0 || s.filterOffset == 0 || s.filterSize == 0 {
        if err := s.ReadFooter(); err != nil {
            return nil, err
        }
    }


    // 读取所有 data block 的内容
    dataBlock, err := s.ReadBlock(0, s.filterOffset)
    if err != nil {
        return nil, err
    }


    // 解析所有 data block 的内容
    return s.ReadBlockData(dataBlock)
}

 

6 主流程串联

最后,我们梳理一下,在整个 lsm tree 的宏观视角下,有哪些流程涉及到与 sstable 模块的使用交互.

6.1 读流程

 

在 lsm tree 读取数据的流程中,会涉及到与 sstable 交互的部分:

  • • 读取内存中的 memtable 仍取得 kv 对后,会针对 sst 文件展开读取操作

  • • 接下来会针对 level0 层和 level1 层的 sst 文件展开检索

// 根据 key 读取数据
func (t *Tree) Get(key []byte) ([]byte, bool, error) {
    t.dataLock.RLock()
    // 1 首先读 active memtable.
    // ...


    // 2 读 readOnly memtable.  按照 index 倒序遍历,因为 index 越大,数据越晚写入,实时性越强
    // ...


    // 3 读 sstable level0 层. 按照 index 倒序遍历,因为 index 越大,数据越晚写入,实时性越强
    var err error
    // ...
    for i := len(t.nodes[0]) - 1; i >= 0; i-- {
        // ...
        t.nodes[0][i].Get(key)      
        // ...
    }
    // ...


    // 4 依次读 sstable level 1 ~ i 层,每层至多只需要和一个 sstable 交互. 因为这些 level 层中的 sstable 都是无重复数据且全局有序的
    for level := 1; level < len(t.nodes); level++ {
        // ...
        node.Get(key)
        // ...
    }


    // 5 至此都没有读到数据,则返回 key 不存在.
    return nil, false, nil
}

 

每个 sstable 在 lsm tree 中会被封装成一个 lsm node,通过 Get 方法在 node 中读取 kv 对:

  • • 首先通过 index 快速检索定位到 key 可能存在的 block

  • • 通过 block 块的 filter bitmap,快速判定 key 是否可能存在于 block 中

  • • 读取 block 中的全量 kv 对,依次遍历检索,若读到 key,则返回对应 val

// 查看是否在节点中
func (n *Node) Get(key []byte) ([]byte, bool, error) {
    // 通过索引定位到具体的块
    index, ok := n.binarySearchIndex(key, 0, len(n.index)-1)
    if !ok {
        return nil, false, nil
    }


    // 布隆过滤器辅助判断 key 是否存在
    bitmap := n.blockToFilter[index.PrevBlockOffset]
    if ok = n.conf.Filter.Exist(bitmap, key); !ok {
        return nil, false, nil
    }


    // 读取对应的块
    block, err := n.sstReader.ReadBlock(index.PrevBlockOffset, index.PrevBlockSize)
    if err != nil {
        return nil, false, err
    }


    // 将块数据转为对应的 kv 对
    kvs, err := n.sstReader.ReadBlockData(block)
    if err != nil {
        return nil, false, err
    }


    for _, kv := range kvs {
        if bytes.Equal(kv.Key, key) {
            return kv.Value, true, nil
        }
    }


    return nil, false, nil
}

 

// 二分查找,key 可能从属的 block index
func (n *Node) binarySearchIndex(key []byte, start, end int) (*Index, bool) {
    if start == end {
        return n.index[start], bytes.Compare(n.index[start].Key, key) >= 0
    }


    // 目标块,保证 key <= index[i].key && key > index[i-1].key
    mid := start + (end-start)>>1
    if bytes.Compare(n.index[mid].Key, key) < 0 {
        return n.binarySearchIndex(key, mid+1, end)
    }


    return n.binarySearchIndex(key, start, mid)
}

 

6.2 memtable 落盘

 

在 memtable 溢写落盘时,会在 level0 层生成对应的 sstable:

  • • 构造 sstWriter,写入全量 memtable 数据

  • • 将 sstWriter 生成的 sst 文件落盘

  • • 将 sstable 封装成 lsm node 节点插入到 lsm tree 拓扑结构中

// 将 memtable 的数据溢写落盘到 level0 层成为一个新的 sst 文件
func (t *Tree) flushMemTable(memTable memtable.MemTable) {
    // memtable 写到 level 0 层 sstable 中
    seq := t.levelToSeq[0].Load() + 1


    // 创建 sst writer
    sstWriter, _ := NewSSTWriter(t.sstFile(0, seq), t.conf)
    defer sstWriter.Close()


    // 遍历 memtable 写入数据到 sst writer
    for _, kv := range memTable.All() {
        sstWriter.Append(kv.Key, kv.Value)
    }


    // sstable 落盘
    size, blockToFilter, index := sstWriter.Finish()


    // 构造节点添加到 tree 的 node 中
    t.insertNode(0, seq, size, blockToFilter, index)
    // 尝试引发一轮 compact 操作
    t.tryTriggerCompact(0)
}

 

func (t *Tree) insertNode(level int, seq int32, size uint64, blockToFilter map[uint64][]byte, index []*Index) {
    file := t.sstFile(level, seq)
    sstReader, _ := NewSSTReader(file, t.conf)


    t.insertNodeWithReader(sstReader, level, seq, size, blockToFilter, index)
}

 

// 插入一个 node 到指定 level 层
func (t *Tree) insertNodeWithReader(sstReader *SSTReader, level int, seq int32, size uint64, blockToFilter map[uint64][]byte, index []*Index) {
    file := t.sstFile(level, seq)
    // 记录当前 level 层对应的 seq 号(单调递增)
    t.levelToSeq[level].Store(seq)


    // 创建一个 lsm node
    newNode := NewNode(t.conf, file, sstReader, level, seq, size, blockToFilter, index)
    // 对于 level0 而言,只需要 append 插入 node 即可
    if level == 0 {
        t.levelLocks[0].Lock()
        t.nodes[level] = append(t.nodes[level], newNode)
        t.levelLocks[0].Unlock()
        return
    }


    // 对于 level1~levelk 层,需要根据 node 中 key 的大小,遵循顺序插入
    for i := 0; i < len(t.nodes[level])-1; i++ {
        // 遵循从小到大的遍历顺序,找到首个最小 key 比 newNode 最大 key 还大的 node,将 newNode 插入在其之前
        if bytes.Compare(newNode.End(), t.nodes[level][i+1].Start()) < 0 {
            t.levelLocks[level].Lock()
            t.nodes[level] = append(t.nodes[level][:i+1], t.nodes[level][i:]...)
            t.nodes[level][i+1] = newNode
            t.levelLocks[level].Unlock()
            return
        }
    }


    // 遍历完 level 层所有节点都还没插入 newNode,说明 newNode 是该层 key 值最大的节点,则 append 到最后即可
    t.levelLocks[level].Lock()
    t.nodes[level] = append(t.nodes[level], newNode)
    t.levelLocks[level].Unlock()
}

 

6.3 sstable 排序归并

 

当 lsm tree 某个 level 层数据大小达到阈值时,会执行 level sorted merge 流程:

  • • 获取到本轮流程涉及到的所有 lsm node

  • • 归并生成本轮流程涉及到的所有 kv 对数据

  • • 依次生成 sst 文件,并将对应的 lsm node 插入到 lsm tree 拓扑结构中

  • • 删除本轮被 compact 的 lsm node 以及对应的 sst 文件

// 针对 level 层进行排序归并操作
func (t *Tree) compactLevel(level int) {
    // 获取到 level 和 level + 1 层内需要进行本次归并的节点
    pickedNodes := t.pickCompactNodes(level)


    // 插入到 level + 1 层对应的目标 sstWriter
    seq := t.levelToSeq[level+1].Load() + 1
    sstWriter, _ := NewSSTWriter(t.sstFile(level+1, seq), t.conf)
    defer sstWriter.Close()


    // 获取 level + 1 层每个 sst 文件的大小阈值
    sstLimit := t.conf.SSTSize * uint64(math.Pow10(level+1))
    // 获取本次排序归并的节点涉及到的所有 kv 数据
    pickedKVs := t.pickedNodesToKVs(pickedNodes)
    // 遍历每笔需要归并的 kv 数据
    for i := 0; i < len(pickedKVs); i++ {
        // 倘若新生成的 level + 1 层 sst 文件大小已经超限
        if sstWriter.Size() > sstLimit {
            // 将 sst 文件溢写落盘
            size, blockToFilter, index := sstWriter.Finish()
            // 将 sst 文件对应 node 插入到 lsm tree 内存结构中
            t.insertNode(level+1, seq, size, blockToFilter, index)
            // 构造一个新的 level + 1 层 sstWriter
            seq = t.levelToSeq[level+1].Load() + 1
            sstWriter, _ = NewSSTWriter(t.sstFile(level+1, seq), t.conf)
            defer sstWriter.Close()
        }


        // 将 kv 数据追加到 sstWriter
        sstWriter.Append(pickedKVs[i].Key, pickedKVs[i].Value)
        // 倘若这是最后一笔 kv 数据,需要负责把 sstWriter 溢写落盘并把对应 node 插入到 lsm tree 内存结构中
        if i == len(pickedKVs)-1 {
            size, blockToFilter, index := sstWriter.Finish()
            t.insertNode(level+1, seq, size, blockToFilter, index)
        }
    }


    // 移除这部分被合并的节点
    t.removeNodes(level, pickedNodes)


    // 尝试触发下一层的 compact 操作
    t.tryTriggerCompact(level + 1)
}

 

6.4 lsm tree 复原

 

在 lsm tree 复原流程中,会依次读取所有的 sst 文件,一一构造出对应的 sstReader,读取对应的 footer、filter、index 等信息,在内存中还原出对应的 lsm node,并插入到 lsm tree 中,形成对应的拓扑结构:

// 读取 sst 文件,还原出整棵树
func (t *Tree) constructTree() error {
    // 读取 sst 文件目录下的 sst 文件列表
    sstEntries, err := t.getSortedSSTEntries()
    if err != nil {
        return err
    }


    // 遍历每个 sst 文件,将其加载为 node 添加 lsm tree 的 nodes 内存切片中
    for _, sstEntry := range sstEntries {
        if err = t.loadNode(sstEntry); err != nil {
            return err
        }
    }


    return nil
}

 

// 将一个 sst 文件作为一个 node 加载进入 lsm tree 的拓扑结构中
func (t *Tree) loadNode(sstEntry fs.DirEntry) error {
    // 创建 sst 文件对应的 reader
    sstReader, err := NewSSTReader(sstEntry.Name(), t.conf)
    if err != nil {
        return err
    }


    // 读取各 block 块对应的 filter 信息
    blockToFilter, err := sstReader.ReadFilter()
    if err != nil {
        return err
    }


    // 读取 index 信息
    index, err := sstReader.ReadIndex()
    if err != nil {
        return err
    }


    // 获取 sst 文件的大小,单位 byte
    size, err := sstReader.Size()
    if err != nil {
        return err
    }


    // 解析 sst 文件名,得知 sst 文件对应的 level 以及 seq 号
    level, seq := getLevelSeqFromSSTFile(sstEntry.Name())
    // 将 sst 文件作为一个 node 插入到 lsm tree 中
    t.insertNodeWithReader(sstReader, level, seq, size, blockToFilter, index)
    return nil
}

 

7 小结

至此,本文内容结束. 本文和大家展开探讨了 lsm tree 中 sstable 结构的实现细节. 在此展望未来几周继续和大家展开讨论的内容:

  • • 基于go实现lsm tree 之主干框架(已完成)

  • • 基于go实现lsm tree之memtable结构(已完成)

  • • 基于go实现lsm tree之sstable结构(本篇)

  • • 基于go实现lsm tree之level sorted merge流程(待完成)


小徐先生的编程世界
在学钢琴,主业码农
 最新文章