etcd存储引擎之存储设计

文摘   科技   2024-03-03 13:16   北京  

0 前言

本期我们继续延续 etcd 存储引擎系列的话题. 在该系列中,我们以 boltdb 作为 b+树 工程化落地的学习案例,该项目开源地址为:https://github.com/etcd-io/bbolt,go 语言纯度接近 100%. (本系列涉及走读的 boltdb 源码版本为 v1.3.8

 

下面是本专题的分享节奏,本文是其中的第二篇——存储设计篇:

  • • etcd存储引擎之主干框架(已完成):偏宏观视角下介绍 boltdb 的定位、架构、特性,通过几个核心流程浅探 boltdb 实现源码

  • • etcd存储引擎之存储设计(本篇):介绍 boltdb 存储模型、机制的设计实现,包含磁盘、内存两部分

  • • etcd存储引擎之b+树实现(待填坑):介绍 b+ 树理论模型及 boltdb 实现案例,包括模型定义及 crud 流程梳理

  • • etcd存储引擎之事务实现(待填坑):介绍 boltdb 事务的执行模式及实现原理

 

1 整体架构

1.1 宏观模型

 

借助上面这张存储架构图,我们总结一下 boltdb 的宏观存储模型设计:

  • • 单文件存储:基于本地单个磁盘文件进行数据持久存储,存储数据均为 kv 形式

  • • mmap:基于 mmap(memory-mapping file)技术,将 db 文件内容映射到内存指定区域(我们称之为 page buffer),于读流程中屏蔽磁盘 io 细节

  • • pwrite+fdatasync:基于 file writeAt + fdatasync 技术,支持于文件指定 offset 写入数据,并保证同步等待设备 io 结果

  • • 存储分 page:借鉴局部性原理,以 page 作为存储交换的最小单位,向操作系统看齐

  • • page 分类:page 分为 meta、freelist、branch 和 leaf 四类,前两者偏向全局维度的元信息记录;后两者与实际数据存储相关

  • • b+ tree:数据存储采用 b+ 树模型. branch page 和 leaf page 分别对应 b+ 树的枝干和叶子节点. 其在需要修改时,会基于懒加载机制在内存中反序列化为 node 实例模型

  • • copy-on-write: 采取以空间换时间策略,全程基于写时复制技术,实现脏数据副本拷贝处理,直到落盘完成才对原数据进行覆盖,以此保证数据安全性

  • • bucket:定位为相互隔离的数据组,可以简单类比于表. 每个 bucket 对应一棵独立 b+ 树,bucket 之间也通过 b+ 树模型建立父子层级拓扑关系

(这些概念内容已在前文——etcd存储引擎之主干框架 中完成一轮铺垫. 大家如果看完觉得恍惚,可以在完成本文后续章节学习后再回顾一遍,温故而知新. )

 

1.2 研究路线

下面介绍一下本文的学习路线:

  • • 第二章——page:包含 page 核心概念,源码定义、正反序列化流程、四类 page 实现细节等;

  • • 第三章——缓存&持久化:介绍读、写流程采用的 mmap、pwrite+fdatasync 的原理,并结合几个主流程向大家揭示 boltdb 如何采用 copy-on-write 机制

  • • 第四章——b+ tree:介绍内存中基于懒加载机制反序列化的 b+ 树模块. (有关 b+ 树的内容,本篇只是点到为止. 这部分会在下一篇——etcd存储引擎之b+树实现 中,以更聚焦的视角和大家一起展开进一步探讨)

  • • 第五章——bucket:介绍有关 bucket 的概念及具体的源码实现

 

2 page

2.1 概念

基于局部性原理,也为了契合操作系统存储交换模型,boltdb 内的数据也是也 page 为单位组织的.

 

每个 page 分为 header 和 body 两部分:

  • • header:格式统一,记录 page 的元信息,如页号、数据量、类型等

  • • body:格式与页类型有关,记录 page 中的具体内容,共分为 meta、freelist、branch、leaf 四类:

  • • meta page:存储 boltdb 的元数据,如版本号、全局递增的事务 id 记录

  • • freelist page:存储空闲 page 信息, (定位可类比于 go 语言中的 heap)

  • • branch element page:存储数据索引, 逻辑意义上属于 b+树中的枝干节点

  • • leaf element page:存储kv数据, 逻辑意义上属于b+树中的叶子节点

 

 

每当 boltdb 实例在初始化时,需要率先完成 4 个 page 的初始化和持久化:

  • • meta page * 2: 记录好数据库的元信息;meta page 数量是两个,和 boltdb 采用的 copy-on-write 机制有关(本文 3.3 小节详细展开)

  • • freelist: 全局维度的空闲 page 管理模块

  • • leaf element page: 作为一棵空白 b+ 树的根节点,由于没有数据,因此会属于叶子节点

 

2.2 header

前面提到 page 由 header 和 body 两部分组成. 下面我们首先来看一下各类 page 通用的 header 结构的详细定义.

page header 核心属性包括:

  • • id: page 的全局 id

  • • flags: page 类型,分为 meta、freelist、branch、leaf 四类

  • • count: page 中数据量. 针对 freelist 为空闲页数量;针对 branch、leaf 为 key 数量;meta 不使用此字段

  • • overflow: 溢出页数量. 存在过大的数据条目时,会找到地址连续的多个 page body 拼接成一个逻辑意义上的“big page” ,此时会通过 overflow 记录溢出页数量,body 总数则为 overflow + 1,且共享同一个 header

  • • ptr: page body 起始位置

 

对应源码如下,尽管命名为 page,但大家需要知道其含义其实为 page header:

// page header
type page struct {
    id       pgid    // page 的全局唯一 id. 借由 page id 和 page size 可以推算出其在 boltdb 中的位置
    flags    uint16  // page 类型,包含:meta、freelist、branch、leaf 四类
    count    uint16  // page 中的数据条数. 针对 freelist,标识空闲页数量;针对 branch 和 leaf,代表 key 的数量
    overflow uint32  // 溢出页个数,倘若存在过大的数据,则需要使用到溢出页. 溢出页地址连续,共享相同的 page header
    ptr      uintptr // page body 的起始位置
}


type pgid uint64 // 页 id


const (
    // 四种 page header flag
    branchPageFlag   = 0x01  // b+ 树枝干节点
    leafPageFlag     = 0x02  // b+ 树叶子节点
    metaPageFlag     = 0x04  // meta 
    freelistPageFlag = 0x10  // freelist
)

 

2.3 meta

2.3.1 类定义

下面介绍的是 meta page 的 body.

 

meta 中主要记录是 boltdb 的元数据,其核心属性包括:

  • • magic: boltdb 定义的魔数. 用于校验 db 的合法性

  • • version: boltdb 版本号

  • • pageSize: 每个 page 大小,通常和操作系统 page 大小保持一致

  • • root: 默认创建的始祖 bucket. 该 bucket 不直接存储 kv 数据,其下 b+ 树的 kv 标识的都是用户创建的子 bucket(表)

  • • freelist: freelist page 的 id

  • • pgid:标识了已执行 mmap 映射的边界,需要扩容时则需要推进 mmap 范围,并递增此字段

  • • txid: 标识下一个待分配的事务 id. 事务 id 是全局递增的. 当有读写事务启动时,会 copy 一份 meta 副本,并在其中递增字段. 在获取 meta 时,也会优先选用 txid 较大的那一例,因为其数据实时性更高

  • • checksum: 校验和,用于验证数据合法性

具体代码展示如下:

// meta 类型 page 的 page body
type meta struct {
    magic    uint32 // 魔数,与 bolt db 版本相关
    version  uint32 // bolt db 版本
    pageSize uint32 // 采用的页大小,单位 byte
    flags    uint32 // 保留字段,暂未启用
    root     bucket // 数据库的始祖 bucket
    freelist pgid   //  freelist page 的 id
    pgid     pgid   // 下一个分配的 page id,mmap 扩容时使用
    txid     txid   // 下一个事务 id,递增
    checksum uint64 // meta 页校验和
}

 

2.3.2 副本机制

在 boldtb 的 page buffer 中,index = 0 和 1 的两个 page 一定是 2 个 meta page. 之所以要创建双份,则和 boldtb 采用的 copy-on-write 策略相关:

  • • 前提:boltdb 中,全局同时只能存在一个读写事务

  • • step1:启动读写事务: 从两个 meta page 中获取合法且 tx id 较大的那个 meta page,拷贝一份其副本到内存,并将对应 tx id 加 1

  • • step2:执行读写事务:执行数据更新,此时更新内容都会基于写时复制策略,生成在一系列 page 副本中(成为脏页)

  • • step3:提交读写事务:一系列脏页持久化落盘,成功后将读写事务手中持有的 meta page 副本也进行持久化,覆盖回到 index = 0 或者 1 的其中一个 meta page(根据 tx id % 2 的规则映射)

至此,一次读写事务完成了,对应的 meta page 也实现了更新迭代.

值得一提的是:在 step3 中,倘若 meta page 在溢写落盘过程中发生错误,则代表事务执行失败,此时磁盘上的 meta page 内容可能也发生损坏,但是由于有双份 meta page 兜底,因此 boltdb 还能基于另一份完好的 meta page 正常提供服务,数据也基于此会正常回溯到事务提交前的版本,这个点和 step1 中取 meta page 的策略形成了呼应.

(这个点大家第一次看大概率会觉得晦涩,没关系,可以结合本文 3.3 小节的内容进行呼应)

 

下面是 db 类定义的代码示例,可见其中确实持有双份 meta page 的引用:

type DB struct {
    // ...
    meta0    *meta // 两个轮换使用的 meta page 之一
    meta1    *meta // 两个轮换使用的 meta page 之一
    // ...
}

 

下面是 boltdb 中 transaction 类的定义,其中持有的 meta 属性正是在 step1 中拷贝生成的 meta 副本:

// 事务
type Tx struct {
    // ...
    db             *DB            // 事务从属的 db 实例
    meta           *meta          // db meta page
    root           Bucket         // db 根 bucket
    pages          map[pgid]*page // 事务中反序列化过的 page
    // ...
}

 

2.3.3 序列化

接下来我们串一下 meta page 的正反序列化过程.

读写事务提交时,会需要将内存中 meta 副本转成 page 实例,并最终进行溢写落盘.

func (tx *Tx) Commit() error {
    // ...脏页溢写落盘
    // ...


    // 事务持有的 meta page 副本溢写落盘
    if err := tx.writeMeta(); err != nil {
        tx.rollback()
        return err
    }
    // ...


    return nil
}

 

// writeMeta writes the meta to the disk.
func (tx *Tx) writeMeta() error {
    // 序列化 meta 副本
    tx.meta.write(p)


    // ... meta 落盘持久化


    return nil
}

 

meta 序列化方法为 meta.write,核心步骤包括:

  • • 传入 page 实例: page 实际是一个 page header 实例

  • • 设置 page id: id 根据 meta 中记录的事务 id % 2 得到,因此结果必然为 0 或者 1,呼应了 meta page 在 db file 中的位置

  • • 设置 page 类型:flag 取为 metaPageFlag,对应为 meta page 类型

  • • 设置校验和:通过 sum64 算法生成 checksum

  • • 深拷贝 meta:本质上将 page header 的 ptr 指针指向 meta 在 page buffer 中的起始位置

// meta 内容写入到 page
func (*meta) write(*page) {
    // 前置校验 ...


    // meta page 的 id 只能是 0 或者 1. 根据 tx id 的单双数可以得知
    p.id = pgid(m.txid % 2)
    // 标识 page 类型为 meta
    p.flags |= metaPageFlag


    // 校验和
    m.checksum = m.sum64()
    // 通过深拷贝方式,令 page ptr 指向 meta 副本的起始位置
    m.copy(p.meta())
}

 

2.3.4 反序列化

在 boltdb 每次执行 mmap 映射时(初始化或者扩容),都需要更新 db 实例持有的两个 meta 引用,此时需要对 meta 进行反序列化:

// mmap
func (db *DB) mmap(minsz int) (err error) {
    // 1 加锁 ...


    // 2 mmap 系统调用 ...


    // 3 更新 db 实例持有的 meta 引用实例
    db.meta0 = db.page(0).meta()
    db.meta1 = db.page(1).meta()


    // ...


    return nil
}

 

可以看到,获取 meta 实例的方式是直接从 page buffer 取出 0 号和 1 号 page,并通过其 ptr 引用直接获取到 meta body:

// 获取 meta 类型的 page body
func (*page) meta() *meta {
    return (*meta)(unsafe.Pointer(&p.ptr))
}

 

2.3.4 校验

每当启动事务时,需要从 2 个 meta page 中获取合法且 tx id 较大的一个作为拷贝的对象. 此时会涉及到对 meta page 的校验:

// 初始化事务
func (tx *Tx) init(db *DB) {
    // ...
    // 1 创建 meta 副本
    tx.meta = &meta{}
    // 2 获取合法且实时性更高的 meta 进行深拷贝
    db.meta().copy(tx.meta)


    // 3 设置 bucket


    // 4 读写事务需要递增 meta 副本中的 tx id
    if tx.writable {
        tx.pages = make(map[pgid]*page)
        tx.meta.txid += txid(1)
    }
}

 

在 db.meta 方法中,会在优先获取 tx id 更大的 meta 的同时,对 meta 合法性校验,保证返回的 meta 内容是合法的:

// 从 db 中获取合法且实时性更高的 meta
func (db *DB) meta() *meta {
    // 2 个轮换使用的 meta 实例
    metaA := db.meta0
    metaB := db.meta1
    // 优先取 tx id 较大的那个
    if db.meta1.txid > db.meta0.txid {
        metaA = db.meta1
        metaB = db.meta0
    }


    // 校验 meta 合法性,保证返回的 meta 是合法的
    if err := metaA.validate(); err == nil {
        return metaA
    } else if err := metaB.validate(); err == nil {
        return metaB
    }


    // 两个 meta 都有问题,panic
    panic("bolt.DB.meta(): invalid meta pages")
}

 

校验 meta page 是否合法,包括对magic、version和checksum的校验:(主要是checksum)

// 校验 meta page
func (*meta) validate() error {
    if m.magic != magic {
        return ErrInvalid
    } else if m.version != version {
        return ErrVersionMismatch
    } else if m.checksum != 0 && m.checksum != m.sum64() {
        return ErrChecksum
    }
    return nil
}


// The data file format version.
const version = 2


// Represents a marker value to indicate that a file is a Bolt DB.
const magic uint32 = 0xED0CDAED

 

2.4 freelist

2.4.1 “只借不还”

在介绍 freelist 之前,需要先聊到 boltdb 中一个很重要的机制:boltdb 采取 mmap 操作时,只会扩容,不会缩容. 即便在读写事务中存在 page 的释放,也只会将空闲的 page 缓存到 freelist 中统一管理,而不会将其归还给操作系统,这样设计的原因在于:

  • • 释放的 page 可能位于 page buffer 内部而非尾端,倘若归还操作系统,会打破 page buffer 的连续性

  • • 通过实践经验得知,在数据量达到某个阈值后,即便发生删除操作导致某个时刻暂时低于阈值,最终往往也会再次超过该阈值

 

基于以上,freelist 应运而生,它用于存储空闲 page 信息,定位类似于 go 语言中 heap,采取以空间换时间的策略,缓存并管理空闲的 page 以供后续复用,降低与操作系统的交互频率.

 

2.4.2 类定义

freelist 的类定义如上所示,核心属性包括:

  • • ids:可直接使用的已释放 page 列表

  • • pending:等待被释放的空闲 page 列表

  • • cache: ids + pending 的全集

  • • allocate:

对应代码如下:

// 空闲页 
type freelist struct {
    ids     []pgid          // 已经可用的释放页
    pending map[txid][]pgid // 等待被释放的空闲页. txid 为仍在使用这些页的最大 txid. 每当有事务被分配时,可以找到所有已经小于当前最小 txid 的 tx 使用的 pending page 进行批量释放
    cache   map[pgid]bool   // 判断某个页是否是空闲的或者待释放的
    allocate       func(txid txid, n int) pgid // 分配 page 的入口函数,有着不同的算法实现. n 为要求的连续 page 数量.
}

 

如何理解上述提到的待释放 pending 列表呢?

这部分内容和 boltdb 事务隔离性挂钩,为了保证事务的【可重复读】语义,在一些读写事务中,哪怕已经归还了某些 page,但只要这些 page 还在被一些版本更早的事务所“阅读”,那么其内容就不能被覆盖,需要保证阅读视角的一致性.

(这里算是做个简单剧透,有关这部分内容,会在本系列第四篇——etcd存储引擎之事务实现 中进一步展开介绍)

 

2.4.3 free page count

freelist 中的 ids 属于 page body 部分,结合 page header 中的 count 字段,能够推到得出 freelist 中的空闲 page 数量:

// page header
type page struct {
    // ...
    count    uint16  // page 中的数据条数. 针对 freelist,为标识空闲页数量;
    // ...
}

 

由于 count 字段为 uint16,因此能表示的数值范围有限,最大值即为 0xFFFF,但在实践场景中,free page 数可能大于该值,因此 freelist 在读取 count 值时采取了一种特殊的协议:

  • • 倘若 free page 数 < 0xFFFF,则使用 count 记录真实的 free page 数量

  • • 倘若 free page 数 >= 0xFFFF,则将 count 置为 0xFFFF,并通过 freelist page body 中首个 pgid 的位置(uint64)记录真实的 free page 数量

 

上述策略对应的实现示意图如下:

 

 

2.4.4 free 空闲页

在读写事务提交时,倘若有些 page 已经弃置不用,则通过 free 方法将其追加到 freelist 的 pending 列表:

// free 空闲页
func (*freelist) free(txid txid, p *page) {
    // meta page 不允许被 free
    if p.id <= 1 {
        panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id))
    }


    // 追加到 pending 中,key 为释放这些 page 的 tx id
    txp := f.pending[txid]
    if txp == nil {
        txp = &txPending{}
        f.pending[txid] = txp
    }
    
    // ...
    for id := p.id; id <= p.id+pgid(p.overflow); id++ {
        // ...
        // 分别追加到 pending 和 cache 中
        txp.ids = append(txp.ids, id)
        f.cache[id] = struct{}{}
    }
}

freelist 中的 pending 是一个 map,key 是事务 id,value 是该事务 free 的空闲页列表. 之所以要通过 tx id 作为键标识,是因为 tx id 是全局递增的,只要某个时刻,所有小于该 tx id 的事务都结束了,就代表这些 old page 已经不再存在“读者”,也就可以真正释放这部分 page,进而得以保全【可重复读】的语义.

 

2.4.5 release 空闲页

当所有小于 txid 的事务都结束时,该 txid 下的空闲页都能够真正得到 release,只需要将其从 pending 中移除,并追加到 freelist 的 ids 中即可:

// release 所有 <= txid 的事务的 pending 列表
func (*freelist) release(txid txid) {
    m := make(pgids, 0)
    for tid, txp := range f.pending {
        if tid <= txid {
            // pending 中的页追加到 ids 中
            m = append(m, txp.ids...)
            // 从 pending 中移除
            delete(f.pending, tid)
        }
    }
    // ...
}

 

2.4.6 allocate page

读写事务提交时,会基于 copy-on-write 机制,针对所有涉及更改的节点分配新的 page 副本,此时会需要从 db 中分配可用的 page:

  • • 第一步:尝试从 freelist 中分配 page

  • • 第二步:倘若 freelist 余量不足,则需要进行扩容,此时会进行新一轮 mmap 操作拓宽容量范围

// 提交事务
func (tx *Tx) Commit() error {
    // 1 rebalance b+ tree...
    // ...


    opgid := tx.meta.pgid
    // 2 spill b+ tree
    if err := tx.root.spill(); err != nil {
        tx.rollback()
        return err
    }
    
    // 3 free 不用的 page 到 freelist ...
    
    // 4 脏数据 page 溢写落盘
     
    // 5 meta page 副本溢写落盘


    // ...
}

 

func (*node) spill() error {
    // ...
    // 针对所有涉及变更的 node,free 老的 page,申请分配新的 page
    for _, node := range nodes {
        // free 老的 page
        if node.pgid > 0 {
            tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))
            node.pgid = 0
        }


        // 分配新的 page
        p, err := tx.allocate((node.size() + tx.db.pageSize - 1) / tx.db.pageSize)
        if err != nil {
            return err
        }


        // ...
    }


    // ...
    return nil
}

 

func (tx *Tx) allocate(count int) (*page, error) {
    p, err := tx.db.allocate(tx.meta.txid, count)
    // ...
}

 

此时会使用到 db 中的对象池 pagePool,尝试复用单个 page 对应的字节数组,并且尝试从 freelist 中获取可用的 page,如果没有找到目标,则通过 mmap 进行扩容:

func (db *DB) allocate(txid txid, count int) (*page, error) {
    // 1 倘若需要获取的 page 数量为 1,通过 pagePool 复用字节数组
    var buf []byte
    if count == 1 {
        buf = db.pagePool.Get().([]byte)
    } else {
        buf = make([]byte, count*db.pageSize)
    }
    p := (*page)(unsafe.Pointer(&buf[0]))
    p.overflow = uint32(count - 1)


    // 2 尝试从 freelist 获取可用的 page
    if p.id = db.freelist.allocate(txid, count); p.id != 0 {
        return p, nil
    }


    // 3 freelist 资源不足,则通过 mmap 扩容...


    // ...
    return p, nil
}

 

至此,我们走到 freelist 分配可用 page 的入口方法,该方法是 freelist 类中的一个成员属性——allocate

type freelist struct {
    freelistType   FreelistType                // freelist 类型,默认使用 array 形式
    ids            []pgid                      // 所有已经释放可用的 page
    // ...
    cache          map[pgid]struct{}           // 所有空闲 page
    // ...
    allocate       func(txid txid, n int) pgid // 分配可用 page 的入口函数
    // ...
}

 

默认情况下,allocate 方法的实现是 freelist.arrayAllocate 方法,其实现思路就是平铺直叙地对可用页列表 ids 从左往右进行遍历,直到首次找到满足要求的 n 个连续可用 page,则返回首个 page 对应 id

// 为某个事务分配连续的 n 个 可用 page
func (*freelist) arrayAllocate(txid txid, n int) pgid {
    if len(f.ids) == 0 {
        return 0
    }


    // 按照正序在 freelist 的 ids 列表中遍历,直到找到连续的 n 个可用 page 时,返回首个 page id
    // 倘若找不到目标,则返回 0
    var initial, previd pgid
    for i, id := range f.ids {
        // ...


        // 新的可用 page 和上一个不连续,则放弃上一个,从新的 page 开始尝试
        if previd == 0 || id-previd != 1 {
            initial = id
        }


        // 找到连续可用的 n 个 page 了
        if (id-initial)+1 == pgid(n) {
            // 刚好是末尾,则直接从 ids 截断最后被取走的部分
            if (+ 1) == n {
                f.ids = f.ids[i+1:]
            // 否则从 ids 中部截去被取走的部分
            } else {
                copy(f.ids[i-n+1:], f.ids[i+1:])
                f.ids = f.ids[:len(f.ids)-n]
            }


            // 被取用的 page 从 freelist 的 cache 中移除
            for i := pgid(0); i < pgid(n); i++ {
                delete(f.cache, initial+i)
            }
            // 返回首个 page id
            return initial
        }


        previd = id
    }
    return 0
}

 

2.4.7 序列化

在读写事务提交时,会将最新版本的 freelist 副本序列化成 page 的形式,并通过 meta 副本持有新版本 freelist 的引用:

// 提交读写事务
func (tx *Tx) Commit() error {
    // ...
    // 1 调整 b+ 树
    
    // 2 序列化 freelist,并让 meta 副本指向它
    if !tx.db.NoFreelistSync {
        err := tx.commitFreelist()
        if err != nil {
            return err
        }
    } 
    
    // 3 脏数据页落盘
    
    // 4 meta page 落盘,freelist 随之被落盘更新
    if err := tx.writeMeta(); err != nil {
        tx.rollback()
        return err
    }
    
    // ...


    return nil
}

 

func (tx *Tx) commitFreelist() error {
    // ...
    
    // freelist 序列化到 page 中
    if err := tx.db.freelist.write(p); err != nil {
        tx.rollback()
        return err
    }
    
    // meta 副本指向新的 freelist page
    tx.meta.freelist = p.id


    return nil
}

 

freelist.write 是将 freelist 内容序列化成 page 的核心方法:

  • • 设置 page flag 标识为 freelist

  • • 获取 freelist 中的空闲 page 总数,遵循 0xFFFF 的特殊协议,将 page 数量设置在 header 的 count 字段或者 body 的首个 pgid 的位置

  • • 将所有空闲 page 追加写入到 header ptr 对应 body 所在位置

此处值得一提的是,不管是已释放 ids 还是待释放的 pending 中的空闲 page,都会被序列化到 page body 中. 这是因为 boltdb 正常运行过程中,会统一使用到内存中的 freelist 实例,此时能够通过 pending 区分出哪些 page 是待释放的. 倘若未来需要用到 freelist page 反序列化成内存中的 page 实例,则必然是面临 boltdb 重启的场景,此时由于不存在运行的事务,因此不存在【可重复读】的诉求,所有 pending page 都能被视作可用 page 进行复用.

// 将 freelist 的内容序列化的 page 中
func (*freelist) write(*page) error {
    // 通过 flag 标识该 page 类型 freelist
    p.flags |= freelistPageFlag


    // 获取总的空闲页数量
    lenids := f.count()
    // 如果为 0
    if lenids == 0 {
        p.count = uint16(lenids)
        // 如果为小于 0xFFFF 的值
    } else if lenids < 0xFFFF {
        p.count = uint16(lenids)
        // 将所有空闲页有序的 copy 到 page ptr 的位置中
        f.copyall(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[:])
    } else {
        // 特殊协议
        p.count = 0xFFFF
        // 0 号位置记录真实的空闲页数量
        ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0] = pgid(lenids)
        // 将所有空闲页有序的 copy 到 page ptr 的 [1:] 位置中
        f.copyall(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[1:])
    }


    return nil
}

 

2.4.8 反序列化

在 db 启动时,倘若 db 文件已存在,则需要读取其中 freelist 部分内容,将其反序列化成 freelist 实例:

func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
    // ...
    // 加载 freelist
    db.loadFreelist()
    // ...
}

 

func (db *DB) loadFreelist() {
    db.freelistLoad.Do(func() {
        // 构造 freelist 实例
        db.freelist = newFreelist(db.FreelistType)
        // 反序列化 freelist
        db.freelist.read(db.page(db.meta().freelist))
        // ...
    })
}

 

freelist.read 是反序列化入口方法:

  • • 遵循 0xFFFF 协议,读取空闲 page 数量

  • • 深拷贝 freelist ids 列表

  • • 针对 freelist ids 列表进行排序

// 从 page 中读取 freelist 的内容,将其反序列化到内存中的 freelist 实例
func (*freelist) read(*page) {
    // 读取 page 的 count 值
    idx, count := 0, int(p.count)
    // 如果 count 值为 65535,则触发特殊协议,第 0 页记录的才是真实的 free page 数量
    if count == 0xFFFF {
        idx = 1 // 真正的数据往后偏移一个 uint64 的位置
        count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
    }


    // count = 0,代表没有空闲页
    if count == 0 {
        f.ids = nil
    } else {
        // 从真正的起始位置开始读取空闲页列表
        ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
        // 将内容深拷贝的 freelist 实例的 ids 中
        f.ids = make([]pgid, len(ids))
        copy(f.ids, ids)


        // 对空闲页进行排序
        sort.Sort(pgids(f.ids))
    }


    // ...
}

 

 

2.5 branch&leaf

2.5.1 branch

branch page 用于存储数据索引, 对应的是 b+ 树中的枝干节点. 由于 boltdb 中,key 数据是不定长的,因此在 branch page 中采取了 shadow paging 技术,通过在 page body 中放入一系列定长的 branchPageElement,在其中分别标注好对应 key 数据的地址和长度,以及该 key 所映射的子节点的 page id.

branchPageElement 的示意图如下:

 

对应 branchPageElement 的实现代码如下:

  • • pos:key 的起始位置

  • • ksize:key 的长度,单位 byte

  • • pgid:key 映射的子节点 page id

// b+树中的枝干节点中的一个元素
type branchPageElement struct {
    pos   uint32 // 对应 key 的起始位置
    ksize uint32 // 对应 key 的大小
    pgid  pgid   // key 对应子节点的 page id
}

 

倘若某个 page 是 branch 类型,则通过 page header 获取指定 index 的 branchPageElement 的方法如下:

// 获取 index 对应的 branch page element
func (*page) branchPageElement(index uint16) *branchPageElement {
    return &((*[0x7FFFFFF]branchPageElement)(unsafe.Pointer(&p.ptr)))[index]
}

 

获取到 branchPageElement 后,可以结合 ksize 和 pos 信息,通过地址偏移的方式获取其对应的 key 值

func (*branchPageElement) key() []byte {
    buf := (*[maxAllocSize]byte)(unsafe.Pointer(b))
    return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[b.pos]))[:b.ksize]
}

 

2.5.2 leaf

leaf page 用于存储 kv 数据, 对应的是 b+ 树中的叶子节点. 和 branch page 情况相同,由于 key 和 value 不定长,因此此处同样采取了 shadow paging 技术,通过在 page body 中放入一系列定长的 leafPageElement,在其中标注好对应 key 数据的地址、长度以及 value 数据的长度.

elementPageElement 的示意图如下:

 

leafPageElement 对应实现代码如下:

  • • flags:标识 value 是普通数据还是一个子 bucket

  • • pos:key 的起始位置

  • • ksize:key 的长度,单位 byte

  • • vsize:value 的长度,单位 byte. 需要知道,value 和 key 是相邻的,因此其起始位置为 pos + ksize

// b+树中的叶节点中的一个元素
type leafPageElement struct {
    flags uint32 // 标识 kv 对标识内容是普通数据还是 bucket
    pos   uint32 // key 的起始位置
    ksize uint32 // key 的大小
    vsize uint32 // value 的大小
}

 

倘若某个 page 类型为 leaf,则通过 page header 获取指定 index 的 leafPageElement 的方法如下:

// 获取 index 对应的 leaf page element
func (*page) leafPageElement(index uint16) *leafPageElement {
    n := &((*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr)))[index]
    return n
}

 

获取到 leafPageElement 后,结合 pos、ksize、vsize 等信息,通过地址偏移的方式可以读取对应的 key 和 value 内容:

func (*leafPageElement) key() []byte {
    buf := (*[maxAllocSize]byte)(unsafe.Pointer(l))
    return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[l.pos]))[:l.ksize]
}

 

func (*leafPageElement) value() []byte {
    buf := (*[maxAllocSize]byte)(unsafe.Pointer(l))
    return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[l.pos+l.ksize]))[:l.vsize]
}

 

3 缓存&持久化

从本章开始,我们从读、写流程的视角,来梳理 boltdb 中有关数据缓存与持久化的机制.

 

3.1 mmap

3.1.1 核心概念

 

boltdb 的读流程基于 mmap 技术实现.

所谓 mmap,全称 memory mapping,这种技术能够实现磁盘文件与内存空间的映射,屏蔽磁盘 io 的细节,使得使用方能够像访问内存中的字节数组一样去访问磁盘文件内容.

 

3.1.2 流程源码

boltdb 中 mmap 的大小上限为 256TB,mmap 映射后,对应的内容通过 db.data 字段获取,并且通过一把读写锁 mmaplock 保证其并发安全性

const maxMapSize = 0xFFFFFFFFFFFF // 256TB


type DB struct {
    // ...
    path     string   // db 文件路径
    openFile func(string, int, os.FileMode) (*os.File, error) // 打开 db 文件方法
    file     *os.File  // db 文件
    // ...
    dataref  []byte // mmap 原始字节数组,为只读模式,不可写
    data     *[maxMapSize]byte  // mmap 映射 db 文件得到的字节数组
    datasz   int  // mmap size
    // ...
    mmaplock sync.RWMutex // 保护 mmap 操作并发安全的读写锁
    // ...
}

 

在 db 启动或者提交读写事务发现可用 page 不足时,都会执行 mmap 操作. 前者是初始化,后者是扩容:

// 启动 boltdb
func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
    // 1 构造 db 实例 ...


    // 2 读取配置 ...
    
    // 3 打开 db 文件 ...
    
    // 4 初始化 db 文件头 4 个 page ...


    // 5 执行 mmap 操作
    if err := db.mmap(options.InitialMmapSize); err != nil {
        _ = db.close()
        return nil, err
    }


    // 6 返回 db 实例 ...
}

 

// 提交读写事务时,需要为事务分为指定数量的 page
func (db *DB) allocate(txid txid, count int) (*page, error) {
    // 1 尝试取 freelist ...
    
    // 2 freelist 空间不足,则通过 mmap 扩容
    p.id = db.rwtx.meta.pgid
    var minsz = int((p.id+pgid(count))+1) * db.pageSize
    if minsz >= db.datasz {
        if err := db.mmap(minsz); err != nil {
            return nil, fmt.Errorf("mmap allocate error: %s", err)
        }
    }


    // 3 更新 meta 副本中的 pgid 范围...
    return p, nil
}

 

mmap 操作的入口是 db.mmap 方法,核心步骤为:

  • • 加锁——mmaplock

  • • 调整合适的 mmap size(小于 1GB 倍增,大于 1GB 向上取整)

  • • 如果有读写事务在运行,需要执行 dereference 操作(将所有 b+ 树节点反序列化到内存,这样在持久化时会有机会更新 page 信息)

  • • 如果之前有过 mmap 操作,需要执行 unmmap 系统调用

  • • 进行新一轮 mmap 系统调用

  • • 校验 meta page 合法性

// 执行 mmap 操作
func (db *DB) mmap(minsz int) (err error) {
    // 1 加锁保证并发安全
    db.mmaplock.Lock()
    defer db.mmaplock.Unlock()


    // ...
    // 2 调整合适的 mmap size
    size, err = db.mmapSize(size)
    
    // ...
    // 3 如果有读写事务在运行,需要更新其引用的 b+ 树节点(即将新一轮 mmap,节点对应 page 可能需要调整)
    // 做法就是将这些节点都反序列化到内存,这样在溢写落盘时,就会有更新的机会
    if db.rwtx != nil {
        db.rwtx.root.dereference()
    }


    // 4 如果之前进行过 mmap,需要解除映射
    if err = db.munmap(); err != nil {
        return err
    }


    // 5 建立新的 mmap 映射
    if err = mmap(db, size); err != nil {
        return err
    }


    // ...
    
    // 6 校验 meta page 合法性
    db.meta0 = db.page(0).meta()
    db.meta1 = db.page(1).meta()
    
    err0 := db.meta0.validate()
    err1 := db.meta1.validate()
    if err0 != nil && err1 != nil {
        return err0
    }


    return nil
}

 

mmap 系统调用的实现与操作系统版本有关,这里我简单给出 unix 系统的实现:

// mmap memory maps a DB's data file.
func mmap(db *DB, sz int) error {
    // Map the data file to memory.
    b, err := unix.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|db.MmapFlags)
    if err != nil {
        return err
    }


    // Advise the kernel that the mmap is accessed randomly.
    err = unix.Madvise(b, syscall.MADV_RANDOM)
    if err != nil && err != syscall.ENOSYS {
        // Ignore not implemented error in kernel because it still works.
        return fmt.Errorf("madvise: %s", err)
    }


    // Save the original byte slice and convert to a byte array pointer.
    db.dataref = b
    db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
    db.datasz = sz
    return nil
}

 

3.1.3 mmap size

由于 mmap 系统调用的成本较高,为了减少执行频率,boltdb 在每轮执行 mmap 时会预留一部分空间:在新容量小于 1GB 时,则从 32KB 基础上持续倍增找到首个满足条件容量;在新容量超过 1GB 时,则基于 GB 向上取整.

// 获取合适的 mmap 大小
func (db *DB) mmapSize(size int) (int, error) {
    // 32KB - 1GB 范围内采取倍增策略
    for i := uint(15); i <= 30; i++ {
        if size <= 1<<{
            return 1 << i, nil
        }
    }


    // ...


    // 大于 1GB,则基于 GB 向上取整
    sz := int64(size)
    if remainder := sz % int64(maxMmapStep); remainder > 0 {
        sz += int64(maxMmapStep) - remainder
    }


    // ...
}


const maxMmapStep = 1 << 30 // 1GB

 

3.2 pwrite+fdatasync

3.2.1 核心概念

boltdb 写流程采用 pwrite + fdatasync 技术实现:

  • • pwrite: 该操作本质上是文件的 writeAt 方法,能实现将数据写入到文件指定 offset 的能力. 需要注意的是,在 linux 系统中,出于性能优化的目的,pwrite 系统调用不会等待磁盘 io 完成,而是仅在将任务提交到设备 io 队列后就返回,因此在严格意义上无法保证持久化操作的稳定性

  • • fdatasync: 该操作会在确保设备 io 执行完成后才返回,以此来保证数据持久化操作的稳定性,与 pwrite 操作形成一套严丝合缝的组合拳

 

3.2.2 流程源码

pwrite 操作默认为文件的 writeAt 方法,入口是 db 的 ops 成员属性:

type DB struct {
    // ...


    ops struct { // 将数据写到 db 文件指定位置的 pwrite 操作
        writeAt func([]byte, off int64) (int, err error)
    }


    // ...
}

 

 

在读写事务提交过程中,不论是脏数据页还是 meta 副本的溢写落盘流程,都会应用到 pwrite + fdatasync 指令:

func (tx *Tx) Commit() error {
    // 1 调整各个 bucket 下的 b+ 树,保证其平衡性


    // ...
    // 2 倘若 b+ 树平衡性调整后,page 空间不足,则需要进行扩容...


    // 3 脏数据溢写落盘
    if err := tx.write(); err != nil {
        tx.rollback()
        return err
    }


    // ...
    // 4 meta page 溢写落盘
    if err := tx.writeMeta(); err != nil {
        tx.rollback()
        return err
    }
    
    // 5 关闭事务...
    // 6 执行 commit 回调函数...


    return nil
}

 

// 脏页溢写落盘
func (tx *Tx) write() error {
    // 1 获取脏页并排序...


    // 遍历每个脏页,依次写入到 db 文件中
    for _, p := range pages {
        // ...
        // 如果 page 过大,则需要拆分多个批次写入
        for {
            // ...


            // pwrite 操作
            if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
                return err
            }


            // ...         
        }
    }


    // 执行 fdatasync 操作,保证脏页成功落盘
    if !tx.db.NoSync || IgnoreNoSync {
        if err := fdatasync(tx.db); err != nil {
            return err
        }
    }


    // ...
    return nil
}

 

func (tx *Tx) writeMeta() error {
    // ...
    // pwrite
    if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
        return err
    }
    
    // fdatasync
    if !tx.db.NoSync || IgnoreNoSync {
        if err := fdatasync(tx.db); err != nil {
            return err
        }
    }


    // ...
    return nil
}

 

pwrite 操作为 os/file 标准库下的 WriteAt方法:

package os/file


func (*File) WriteAt([]byte, off int64) (int, err error) {
    // ...
    for len(b) > 0 {
        m, e := f.pwrite(b, off)
        if e != nil {
            err = f.wrapErr("write", e)
            break
        }
        n += m
        b = b[m:]
        off += int64(m)
    }
    return
}

 

unix 系统下,fdatasync 的实现为 file.sync 方法:

// fdatasync flushes written data to a file descriptor.
func fdatasync(db *DB) error {
    return db.file.Sync()
}


func (*File) Sync() error {
    // ...
    if e := f.pfd.Fsync(); e != nil {
        return f.wrapErr("sync", e)
    }
    return nil
}

 

3.3 copy on write

纵观整个 boltdb 的交互流程,都贯彻了对 copy-on-write 策略的应用,下面我们就以一个事务开启和提交的流程,来总结一下其中哪些环节提现了写时复制的思路.

3.3.1 meta&bucket 副本

在一个新事务初始化时,会拷贝出一份 meta page 和 bucket 副本后续涉及到对这部分内容的修改时,都在副本的基础上执行,再通过事务提交环节一次性实现修改内容的覆盖生效.

func (tx *Tx) init(db *DB) {
    // ...
    // 为当前事务复制一份 meta 副本
    tx.meta = &meta{}
    db.meta().copy(tx.meta)


    // 深拷贝复制出一份 bucket 实例
    tx.root = newBucket(tx)
    tx.root.bucket = &bucket{}
    *tx.root.bucket = tx.meta.root
    // ...
}

 

// 深拷贝一份 meta 副本
func (*meta) copy(dest *meta) {
    *dest = *m
}

 

3.3.2 脏数据page副本

在提交读写事务过程中,会基于 copy on write 机制,将所有涉及变更的节点序列化成一份新的 page 副本,而不是在原 page 上直接更新

// 提交事务
func (tx *Tx) Commit() error {
    // 1 rebalance ...


    // 2 记录原本已分配的 page 范围
    // ...
    // 3 spill spill 时,还会针对改动的部分,基于 copy on write 机制生成一系列脏数据的 page 副本
    if err := tx.root.spill(); err != nil {
        tx.rollback()
        return err
    }
    
    // ...
    // ...


    // 4 脏页落盘持久化
    if err := tx.write(); err != nil {
        tx.rollback()
        return err
    }


    // ...


    // 5 meta page 落盘持久化
    if err := tx.writeMeta(); err != nil {
        tx.rollback()
        return err
    }
    
    // 至此,代表事务已经提交成功了,数据已经成功更新!
    // ...


    return nil
}

 

// 针对当前 node 进行拆分
func (*node) spill() error {
    // ...
    // 遍历所有涉及变更的节点
    for _, node := range nodes {
        // ...
        
        // 分配对应数量的 page 副本
        p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
        if err != nil {
            return err
        }


        // ...
        // 将变更后的 node 内容写入到 page 副本中中
        node.pgid = p.id
        node.write(p)
        node.spilled = true


        // ...
    }


    // ...
}

 

3.3.3 副本覆盖生效

在事务提交的持久化环节,随着一些脏数据 page 副本的溢写落盘,其中的内容已经得到了持久化保证:

func (tx *Tx) write() error {
    // ...
    // 脏数据 page pwrite
    for _, p := range pages {
        // ...
        for {
            // ...
            // pwrite
            if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
                return err
            }
            // ...
        }
    }


    // fdatasync
    if !tx.db.NoSync || IgnoreNoSync {
        if err := fdatasync(tx.db); err != nil {
            return err
        }
    }


    // ...
}

但大家需要注意,截止到此时,这部分数据还不会被 boltdb 所认可,因为 db 中的 meta 还是老版本,其中的 b+ 树也还是原本的引用,接下来就是真正“画龙点睛”的一步——读写事务 meta 副本落盘:

// 读写事务 meta 副本落盘
func (tx *Tx) writeMeta() error {
    // pwrite
    if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
        return err
    }
    
    // fdatasync
    if !tx.db.NoSync || IgnoreNoSync {
        if err := fdatasync(tx.db); err != nil {
            return err
        }
    }


    // ...
    return nil
}

随着新版本 meta 的落盘持久化,由其所指向的新版 bucket 以及其中一系列修改后的 b+ 树节点也就真正得到了“正名”.

只要该 meta 落盘成功,那么其中的 tx id 就是全局最大的,未来在新的读写流程中,都会以该版本 meta 为准,一轮 copy-on-write 策略的实施至此完美收官!

 

3.4 扩容

在读写事务提交时,倘若发现可用 page 不足,则会推动 boltdb 进行扩容:

func (tx *Tx) Commit() error {
    // 1 调整各个 bucket 下的 b+ 树,保证其平衡性
    // 1 rebalance ...
    
    // 2 记录原本已分配的 page 范围
    opgid := tx.meta.pgid


    // ...
    // 3 spill spill 时,还会针对改动的部分,基于 copy on write 机制生成一系列脏数据的 page 副本
    if err := tx.root.spill(); err != nil {
        tx.rollback()
        return err
    }
    // ...
    // 4 倘若 b+ 树平衡性调整后,page 范围右扩,需要对文件对应调整
    if tx.meta.pgid > opgid {
        if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
            tx.rollback()
            return err
        }
    }


    // ...
    return nil
}

前文已经针对上述的 spill 流程做过一些介绍,在申请可用 page 作为脏数据 page 副本时,可能存在 freelist 资源不足的情况,此时则会推动 boltdb 发动新一轮 mmap 操作,并且更新其 meta 副本中的 pgid 值.

于是,db.grow 方法得以执行,其中会通过 truncate 方法实现文件截断操作,实现“扩容”语义,并通过 fdatasync 确保其生效:

// db 扩容
func (db *DB) grow(sz int) error {
    // 1 调整合适的扩容大小 ...    
    if db.datasz <= db.AllocSize { // AllocSize 默认值为 16M
        sz = db.datasz
    } else {
        // 超过 16M,则每次增长 16M
        sz += db.AllocSize
    }    


    // 2 通过 truncate + fsync 操作,确保扩容完成
    if !db.NoGrowSync && !db.readOnly {
        if runtime.GOOS != "windows" {
            if err := db.file.Truncate(int64(sz)); err != nil {
                return fmt.Errorf("file resize error: %s", err)
            }
        }
        // fdatasync
        if err := db.file.Sync(); err != nil {
            return fmt.Errorf("file sync error: %s", err)
        }
        // ...
    }


    // 3 新的文件大小
    db.filesz = sz
    return nil
}

 

4 b+ tree

本章我们简单点一些有关 b+ 树部分的内容(聚焦内存部分),详细内容我们在下一篇——etcd存储引擎之b+树实现 中展开.

4.1 设定

在 boltdb 中,采用 b+ 树模型进行数据的存储. 实际上,依附于不同的存储介质,我们也同样可以从两个视角出发来看待其针对 b+ 树的实现.

  • • page:基于 mmap 技术实现的 page 底层本质上和磁盘文件内容关联. 因此如本文 2.5 小节介绍的 branch& leaf page,可以理解为“磁盘上”的 b+ 树节点,通过 branchPageElement、leafPageElement,以及其中针对 key、value 和子节点的指向,形成一颗逻辑上的 b+ 树拓扑结构

  • • node:内存中反序列化得到的 b+ 树节点,是基于懒加载机制实现的,因此只有在需要修改某个节点时,才会从对对应的 branch&leaf page 中将其反序列化成 node 实例,并最终基于 copy-on-write 机制将其序列化成新的 page 副本,并通过 pwrite + fdatasync 实现持久化

 

 

4.2 node

node 是内存中反序列化得到的 b+ 树节点类,其核心成员属性定义为:

  • • bucket:从属的 bucket. 每棵 b+ 树必然从属于某个 bucket(表)

  • • isLeaf: 标识该节点是叶子节点还是枝干节点

  • • unbalanced: 倘若节点执行过删除 key 操作,则 unbalanced 会被置为 true. 该节点在持久化前需要先执行 rebalance 操作

  • • spilled: 标识该节点是否已经执行过 spill 操作

  • • key: 该节点中最小的一个 key

  • • pgid: 该节点对应的 page id,即其反序列化的来源. 值得一提的是,node 在持久化时会被写入到一个新的 page 副本中,正是所谓的 copy on write 机制

  • • parent: 父节点

  • • children: 一系列子节点,在 dereference、rebalance 和 spill 流程中使用

  • • inodes:该节点下的一系列 kv 数据(广义的 kv,如为 branch 类型,则此处的 “v” 指子节点)

// 内存中的 b+ tree 节点,对应磁盘上的一个 branch/leaf element page
type node struct {
    bucket     *Bucket // 该节点从属的 bucket
    isLeaf     bool    // 是否为 b+ tree leaf 节点,为 false 则为 branch 节点
    unbalanced bool    // 该节点是否删除过数据,是的话需要在落盘前进行 rebalance 操作
    spilled    bool    // 该节点是否执行过 spill 操作
    key        []byte  // 该节点下的首个 key (最小的 key)
    pgid       pgid    // 该节点对应 page  id
    parent     *node   // 父节点
    children   nodes   // 子节点列表,该节点为分支节点时使用
    inodes     inodes  // kv 数据列表,该节点为叶子节点时使用
}

 

inode 标识某个 node 中的一笔 kv 数据(广义的 kv,如为 branch 类型,则此处的 “v” 指子节点):

  • • flags: 标识 value 是否标识一个子 bucket

  • • key: 标识键 key

  • • pgid:如果 node 为 branch 类型,则 “v” 取 pgid 字段,标识子节点的 pgid

  • • value:如果 node 为 leaf 类型,则直接通过 value 取值

// b+ tree 叶子节点中的一个 kv 对
type inode struct {
    flags uint32 // 标识其是否是 bucket
    pgid  pgid   // 从属的 page id
    key   []byte // key
    value []byte // value
}

 

4.3 序列化

在读写事务提交时,需要将所有 node 序列化到 page 副本中(会被反序列化成 node 的节点往往都发生过变更),对应方法为 node.write:

func (*node) write(*page) {
    // ...
    // 设置对应的 page flag,标识是分支还是叶子节点
    if n.isLeaf {
        p.flags = leafPageFlag
    } else {
        p.flags = branchPageFlag
    }


    // 记录节点的 key 数量
    p.count = uint16(len(n.inodes))


    // 没有 key 数据,直接返回
    if p.count == 0 {
        return
    }


    // 下面针对每笔 kv (inode) 进行序列化
    // 记录 offset 偏移量
    off := unsafe.Sizeof(*p) + n.pageElementSize()*uintptr(len(n.inodes))
    for i, item := range n.inodes {
        // ...
        // key + value 的大小
        sz := len(item.key) + len(item.value)
        // 用于存储 key + value 内容的容器
        b := unsafeByteSlice(unsafe.Pointer(p), off, 0, sz)
        // 累加偏移量
        off += uintptr(sz)


        // 针对叶子节点的 key-value 数据,对 leafPageElement 进行赋值,标识数据元信息
        if n.isLeaf {
            elem := p.leafPageElement(uint16(i))
            elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
            elem.flags = item.flags
            elem.ksize = uint32(len(item.key))
            elem.vsize = uint32(len(item.value))
        // 针对分支节点的 key-child node 数据,对 branchPageElement 进行赋值,标识数据元信息
        } else {
            elem := p.branchPageElement(uint16(i))
            elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
            elem.ksize = uint32(len(item.key))
            elem.pgid = item.pgid
            // ...
        }


        // 深拷贝 key value 数据到对应的 offset 位置
        l := copy(b, item.key)
        copy(b[l:], item.value)
    }
}

 

4.4 反序列化

读写事务运行过程中,倘若涉及到对某个 b+ 树节点的调整,则需要将其反序列化为 node 实例,在内存中完成修改:

// 从 page 中读取节点内容,将其反序列化到 node 实例中
func (*node) read(*page) {
    // 传承 page 的 id
    n.pgid = p.id
    // 标识节点是否为叶子节点
    n.isLeaf = ((p.flags & leafPageFlag) != 0)
    // 反序列化对应数量的 inode
    n.inodes = make(inodes, int(p.count))


    // 针对每个 inode 进行反序列化
    for i := 0; i < int(p.count); i++ {
        inode := &n.inodes[i]
        if n.isLeaf {
            elem := p.leafPageElement(uint16(i))
            inode.flags = elem.flags
            inode.key = elem.key()
            inode.value = elem.value()
        } else {
            elem := p.branchPageElement(uint16(i))
            inode.pgid = elem.pgid
            inode.key = elem.key()
        }
    }


    // 冗余记录最小的 key
    if len(n.inodes) > 0 {
        n.key = n.inodes[0].key
    } else {
        n.key = nil
    }
}

 

4.5 cursor

 

游标 cursor 是 boltdb 中针对 b+ 树实现的一个改造点,它会通过一个栈结构记录某次操作在 b+ 树上的移动路径,以此来辅助完成一些复杂的回溯检索操作.

 

 

cursor 实现代码如下:

type Cursor struct {
    bucket *Bucket
    stack  []elemRef // 通过栈记录移动路径. 最后一个元素代表此时所在位置
}

 

elemRef 对应为移动路径涉足的一个 b+ 树节点,由于内存中的 b+ 树 node 是懒加载机制,因此倘若移动路径涉足到一些未反序列化的节点(没有数据变更),则可能会通过 page 字段来标识

  • • node: 倘若某个节点反序列化过了,则指向对应 node

  • • page: 倘若某个节点未反序列化,则指向对应 page

  • • index:检索目标在节点中的 key 的 index

// 由于 b+ 树采用懒加载机制,因此树中的节点可能是 page,也可能是 node
type elemRef struct {
    page  *page
    node  *node
    index int
}

 

cursor 一定是依附于 bucket 存在的,其构造方法如下:

// 返回对应的游标卡尺
func (*Bucket) Cursor() *Cursor {
    // ...
    // 返回游标实例
    return &Cursor{
        bucket: b,
        // 游标对应的栈
        stack: make([]elemRef, 0),
    }
}

 

5 bucket

5.1 设定

在 boltdb 中引入了桶 bucket 的设定,作用是实现业务数据的隔离, 可以简单把 bucket 类比于数据库中的表,只不过 bucket 的形式会更加灵活一些,还能支持嵌套式的拓扑关系,形如上图,school 和 school-class 是两个合法的 bucket,且彼此为父子关系.

在逻辑意义上,每个 bucket 会有一棵独立的 b+ 树,用于存放当前 bucket 范围内的 kv 数据.

 

5.2 类定义

Bucket 类定义代码如下:

  • • *bucket:Bucket header 部分. 这部分是需要被持久化的,其余字段只存在于内存副本中

  • • tx: 当前 Bucket 实例所从属的事务. 因为这是内存中的 Bucket 实例,因此一定是在开启了某个事务的前提下才会被创建出来

  • • buckets: 当前 Bucket 下缓存的子 Bucket map. 注意这里非全量,也是采取懒加载机制,被用到过的子 Bucket 才会缓存在这里

  • • page: 如果当前 Bucket 是一个内联 Bucket,则用该字段表示其虚拟 page(5.3小节介绍何谓内联 Bucket)

  • • rootNode: Bucket 下的 b+ 树根节点

  • • nodes:缓存的 b+ 树节点. 同样非全量,涉及变更的节点才会被反序列化成 node,才会缓存于此 map

// 内存中的 Bucket 实例,一定是基于某个 tx 创建的
type Bucket struct {
    *bucket                     // bucket header,记录元信息
    tx       *Tx                // 创建当前桶实例的事务
    buckets  map[string]*Bucket // 子桶
    page     *page              // 如果为 inline bucket,则为对应 page 的引用
    rootNode *node              // 当前桶下 b+ 树的根节点
    nodes    map[pgid]*node     // 当前桶下已反序列化后缓存的 node 节点


    // ...
}


// bucket header. 需要序列化部分的元信息
type bucket struct {
    root     pgid   // bucket 中根节点对应的 page id
    sequence uint64 // bucket 的序列号,全局单调递增
}

 

5.3 内联

正如上文所述,每个 bucket 都有一颗独立的 b+ 树,而每个 b+ 树至少有一个节点,即 boltdb 至少要为每个 bucket 分配一个 page.

然而在实际使用场景中,很多 bucket 存储的数据量可能远小于一个 page,为避免造成空间浪费,boltdb 针对数据量小于 pageSize/4 且只需要单个 page 的 bucket 采取 inline bucket 策略——借鉴虚拟内存实现思路,在紧挨着 Bucket header 的位置取出一块小于 page 但仍按照 page 格式组织的空间,组成一个逻辑意义的“page”进行数据存储.

 

判断一个 Bucket 是否需要采取 inline 模式的方法如下:

// 判断 bucket 是否为 inline 类型
func (*Bucket) inlineable() bool {
    var n = b.rootNode


    // 只有某个 bucket 仅包含一个叶子类型的根节点时,才有可能是 inline bucket
    if n == nil || !n.isLeaf {
        return false
    }


    // 统计一下 bucket 的总大小
    var size = pageHeaderSize
    for _, inode := range n.inodes {
        // 累加每个 inode 的大小
        size += leafPageElementSize + len(inode.key) + len(inode.value)
        // 如果存在 inode 不是 leaf element,则当前 node 不是 inline bucket
        if inode.flags&bucketLeafFlag != 0 {
            return false
            // 判断总大小是否超过 inline bucket 限制(pageSize/4)
        } else if size > b.maxInlineBucketSize() {
            return false
        }
    }


    return true
}


func (*Bucket) maxInlineBucketSize() uintptr {
    return uintptr(b.tx.db.pageSize / 4)
}

 

 

5.4 序列化

每个 Bucket 在刚被创建出来时,都是一个 inline bucket,此时需要对其进行序列化,将其数据组装成 kv 对的形式写入到其父 Bucket 的 b+ 树中. inline bucket 序列化方法是 bucket.write

func (*Bucket) CreateBucket(key []byte) (*Bucket, error) {
    // ...


    // 创建 bucket 实例(此时为 inline bucket)
    var bucket = Bucket{
        bucket:      &bucket{},
        rootNode:    &node{isLeaf: true},
        FillPercent: DefaultFillPercent,
    }
    
    // 序列化 bucket
    var value = bucket.write()


    // 组装成 kv 对形式写入到父 bucket b+ 树中
    key = cloneBytes(key)
    c.node().put(key, key, value, 0, bucketLeafFlag)


    // ...
}

 

此外,在读写事务提交时,也会涉及对 Bucket 的序列化操作,此时针对常规 bucket 会直接对 Bucket header 部分进行深拷贝,针对 inline bucket 则同样会调用 bucket.write 方法作序列化:

// spill writes all the nodes for this bucket to dirty pages.
func (*Bucket) spill() error {
    // 处理每个子 bucket
    for name, child := range b.buckets {
        // 记录子 bucket 序列化数据
        var value []byte
        // inline bucket 序列化
        if child.inlineable() {
            // ...
            value = child.write()
        // 普通 bucket 序列化
        } else {
            // ...
            // 直接进行 bucket 深拷贝
            value = make([]byte, unsafe.Sizeof(bucket{}))
            var bucket = (*bucket)(unsafe.Pointer(&value[0]))
            *bucket = *child.bucket
        }


        // ...
    }
    // ...
    return nil
}

 

inline bucket 的序列化方法如下:

func (*Bucket) write() []byte {
    // 获取数据大小
    var n = b.rootNode
    var value = make([]byte, bucketHeaderSize+n.size())


    // 深拷贝出 bucket header 部分内容
    var bucket = (*bucket)(unsafe.Pointer(&value[0]))
    *bucket = *b.bucket


    // 将数据部分写入到紧挨着 bucket header 的后继部分
    var p = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
    n.write(p)


    // 返回序列化结果
    return value
}

 

5.5 反序列化

当用户需要获取某个 Bucket 时,需要对其进行反序列化:

func (*Bucket) Bucket(name []byte) *Bucket {
    // ... 如果子 bucket 已反序列化过,进行复用


    // 反序列化对应的子 bucket
    var child = b.openBucket(v)
    
    // ... 缓存反序列化后的子 bucket
    return child
}

 

反序列化核心方法为 Bucket.openBucket,其中主语的 Bucket 是拟获取 Bucket 的 parent

  • • 构造一个空白 Bucket 实例

  • • 针对读写事务,基于 copy-on-write 机制,深拷贝一份 bucket header;针对只读事务,则直接传递 header 引用

  • • 针对 inline bucket,需要读取虚拟 page 内的数据

// 反序列化子 bucket
func (*Bucket) openBucket(value []byte) *Bucket {
    创建一个空白的 bucket 实例
    var child = newBucket(b.tx)


    // ...
    // 针对读写事务,需要对 bucket 副本拷贝 copy-on-write
    if b.tx.writable && !unaligned {
        child.bucket = &bucket{}
        *child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
    } else { // 针对只读事务,直接复用 bucket
        child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
    }


    // 针对内联 bucket,读取对应的 page
    if child.root == 0 {
        child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
    }


    return &child
}

 

func newBucket(tx *Tx) Bucket {
    var b = Bucket{tx: tx, FillPercent: DefaultFillPercent}
    // 针对读写事务反序列化的 bucket,需要初始化对应的缓存 map
    if tx.writable {
        b.buckets = make(map[string]*Bucket)
        b.nodes = make(map[pgid]*node)
    }
    return b
}

至此正文结束. 祝贺走到这里各位朋友,我们又一起打赢了一场硬仗!

6 展望

本文是 etcd 存储引擎系列的第二篇,带着大家一起深入了解了 boltdb 的存储设计原理. 在此回顾整个系列的研究进程并对后续内容进行展望:

  • • etcd存储引擎之主干框架(已完成):偏宏观视角下介绍 boltdb 的定位、架构、特性,通过几个核心流程浅探 boltdb 实现源码

  • • etcd存储引擎之存储设计(本篇):介绍 boltdb 存储模型、机制的设计实现,包含磁盘、内存两部分

  • • etcd存储引擎之b+树实现(待填坑):介绍 b+ 树理论模型及 boltdb 实现案例,包括模型定义及 crud 流程梳理

  • • etcd存储引擎之事务实现(待填坑):介绍 boltdb 事务的执行模式及实现原理



小徐先生的编程世界
在学钢琴,主业码农
 最新文章