0 前言
年前和大家一起完成了有关 lsm tree 话题的探讨,并在系列完结时立下一个了 flag——下个系列,剑指 b+ 树专题~
于是乎,择日不如撞日,今天咱就肝上一把,开启新专题,推动填坑之旅!
本系列我们将以 etcd 存储引擎 boltdb 作为 b+树 工程实践案例进行学习,该项目开源地址为:https://github.com/etcd-io/bbolt,go 语言纯度接近 100%. (本系列涉及走读的 boltdb 源码版本为 v1.3.8)
此外,在这里补充插入一个致敬环节. 在关于本专题内容的学习过程中,我借助了叉鸽 boltdb 系列博客以及滴滴出行魏猛老师的分享,大大降低了学习的阻力,特此致敬一下.
顺带附上叉鸽系列博客传送门,讲得不错,需要者自取:
https://blog.mrcroxx.com/posts/code-reading/boltdb-made-simple/0-introduction/
下面是本专题的分享节奏,计划分为四篇:
• etcd存储引擎之主干框架(本篇):偏宏观视角下介绍 boltdb 的定位、架构、特性,通过几个核心流程浅探 boltdb 实现源码
• etcd存储引擎之存储设计(待填坑):介绍 boltdb 存储模型、机制的设计实现,包含磁盘、内存两部分
• etcd存储引擎之b+树实现(待填坑):介绍 b+ 树理论模型及 boltdb 实现案例,包括模型定义及 crud 流程梳理
• etcd存储引擎之事务实现(待填坑):介绍 boltdb 事务的执行模式及实现原理
1 核心概念
1.1 bolt 之于 etcd
etcd 是一个具有强一致性的分布式协调服务,基于 golang 实现,底层基于 raft 算法保证数据的强一致和高可用,对应开源地址: https://github.com/etcd-io/etcd
常看我文章的同学应该会有印象,此前针对 etcd 其他方向,我也陆续做过一些技术分享,这里简单附上话题对应传送门,感兴趣的同学可以拓展学习:
言归正传,本系列我们聚焦 etcd 存储层引擎 boltdb 的实现原理. boltdb 在 etcd 整体架构中所属的层次定位示意如上图,它是由 go 语言实现的单机 kv 数据磁盘存储系统:
• 单机运行: 无需考虑分布式共识相关内容(简单)
• 磁盘存储: kv 数据存储于磁盘(可靠)
• 本地读写: 读写时直接与本地文件交互,没有客户端与服务端的通信环节(简单、粗暴、高效)
1.2 存储设计
下面我们拓展聊聊 boltdb 存储技术实现. 本文作为系列开篇,整体内容偏宏观,本小节内容讲解力度偏小、点到即止,更多细节内容在未来的存储设计篇和b+树实现篇中进一步展开.
1.2.1 读写
boltdb 存储依赖于磁盘,针对于存储数据的交互,分为读、写流程:
• 在读流程上:基于 mmap(memory-mapping) 技术实现,隐藏与磁盘交互的细节,使用方能够像访问内存中的字节数组一样读取磁盘文件中的内容.
• 在写流程上:基于 pwrite + fdatasync,实现数据落盘, 兼顾效率与稳定
1.2.2 page
基于局部性原理,操作系统下,内存与磁盘间数据的交换以页 page 为单位. 与之类似,boltdb 也是通过 page 为单位,完成数据库文件内容的组织.
在 boltdb 中,page 可以分为以下四类:
• meta page:存储 boltdb 的元数据,例如版本号、校验和等,还包括全局递增的事务 id 记录(属于全局维度的内容)
• freelist page:存储空闲 page 信息,例如哪些 page 空闲可用,哪些 page 将被事务释放(属于全局维度的内容. 个人觉得其定位可类比于 go 语言中的 heap,采取以空间换时间的策略,缓存并管理空闲 page 以供复用,减少与操作系统的交互频率)
• branch element page:存储索引的节点, 对应为 b+树中的分支节点(较细的粒度,与具体数据挂钩)
• leaf element page:存储数据的节点, 对应为 b+树中的叶子节点(较细的粒度,与具体数据挂钩)
每个 db 在初始化时,会先完成 4 个 page 的初始化和持久化:
• meta page * 2: 数据库的元数据自然需要初始化完成;之所以数量是两个,与 boltdb 为兼顾效率与稳定性采用的 copy-on-write 机制有关(留待下篇,按下不表)
• freelist: 全局维度的空闲 page 管理模块,自然需要初始化完成
• leaf element page: 作为一棵空白 b+ 树的根节点,同时也是叶子节点
1.2.3 b+树
boltdb 中,基于 b+ 树实现数据的存储. 这也是我将其作为学习案例的初衷.
有关于 b+ 树的详细设定,我将在 b+ 树实现篇中详细展开,这里仅一笔带过:
b+ 树是 b 树的升级版本,本质上是一颗扁平化的 n 叉树,叶子节点存储真实数据,非叶子节点仅存储索引信息,其拓扑结构形如下图:
b+ 树本身是偏理论性的定义,在落地实践时可能会出现一定的差异化改造. 以 boltdb 的实现为例,其中存在但不仅限的两个较大改造点包括:
• 引入游标工具: 底层叶子节点未通过链表串联,范围检索会借助一个压栈记录了移动路径的游标指针来完成
• 降低调整频率: 为兼顾操作效率与b+树的平衡性,boltdb 仅在数据溢写落盘前,才一次性完成 b+树的平衡性调整
1.2.4 bucket
在 boltdb 中引入了桶 bucket 的设定. bucket 的作用是实现业务数据的隔离, 可以简单把 bucket 类比于数据库中的表,只不过 bucket 的形式会更加灵活一些,还能支持嵌套式的拓扑关系,形如上图,school 和 school-class 是两个合法的 bucket,且彼此为父子关系.
从每个 db 会有个默认的 root bucket,以此为起点可以衍生出一个 bucket 多叉树,本身也是通过 b+ 树的模型实现.
在逻辑意义上,每个 bucket 会有一棵独立的 b+ 树,用于存放当前 bucket 范围内的 kv 数据.
1.3 事务执行
最后是关于事务 transaction 的部分.
boltdb 中的事务分为只读事务 read-only tx 和读写事务 read-write tx 两类:
• 读写事务: 顾名思义,事务中可以存在非幂等的写操作. 同一时刻只能有一个读写事务执行,但可以和多个只读事务并行执行
• 只读事务:事务内只存在查询操作. 多个只读事务可以并行执行,也可以和读写事务并行执行
有关 boltdb 事务的更多设定、acid 性质的保证机制及事务的实现细节等内容,我们放在事务实现篇中详细讲解.
2 使用示例
本章通过一个单测示例向大家展示 boltdb 的基本用法.
2.1 启动
启动 boltdb 时,需要指定数据库文件的路径,并将文件权限设置为可读写:
import (
"testing"
"go.etcd.io/bbolt"
)
func Test_boltDB(t *testing.T) {
// 1 启动数据库
db, err := bbolt.Open("./test_b.db", 0600, nil)
if err != nil {
t.Error(err)
return
}
defer db.Close()
// ...
}
2.2 建bucket(表)
• 通过 db.Update 方法,启动隐式读写事务(方法结束时,boltdb 会自动帮忙 commit 事务)
• 在其中通过 tx.CreateBucketIfNotExists 方法完成一个名为 "test" 的 bucket 的创建:
import (
"testing"
"go.etcd.io/bbolt"
)
func Test_boltDB(t *testing.T) {
// 1 启动数据库
// ...
// 2 建表
if err = db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte("test"))
return err
}); err != nil {
t.Error(err)
return
}
// ...
}
2.3 增改
• 通过 tx.Bucket 方法获取到 bucket
• 调用 bucket.Put 方法,完成 kv 数据的写入
import (
"testing"
"go.etcd.io/bbolt"
)
func Test_boltDB(t *testing.T) {
// 1 启动数据库
// ...
// 2 建表
// ...
// 3 增、改
if err = db.Update(func(tx *bbolt.Tx) error {
table := tx.Bucket([]byte("test"))
if err := table.Put([]byte("a"), []byte("b")); err != nil {
return err
}
return table.Put([]byte("c"), []byte("d"))
}); err != nil {
t.Error(err)
return
}
// ...
}
2.4 删
通过 bucket.Delete 方法,完成 key 的删除:
import (
"testing"
"go.etcd.io/bbolt"
)
func Test_boltDB(t *testing.T) {
// 1 启动数据库
// ...
// 2 建表
// ...
// 3 增、改
// ...
// 4 删
if err = db.Update(func(tx *bbolt.Tx) error {
table := tx.Bucket([]byte("test"))
return table.Delete([]byte("a"))
}); err != nil {
t.Error(err)
return
}
// ...
}
2.5 查
• 通过 db.View 方法,启动隐式只读事务(方法结束时,boltdb 会自动帮忙 commit 事务)
• 通过 bucket.Get 方法,完成 key value 数据的查询
import (
"testing"
"go.etcd.io/bbolt"
)
func Test_boltDB(t *testing.T) {
// 1 启动数据库
// ...
// 2 建表
// ...
// 3 增、改
// ...
// 4 删
// ...
// 5 查
if err = db.View(func(tx *bbolt.Tx) error {
table := tx.Bucket([]byte("test"))
v1 := table.Get([]byte("c"))
t.Logf("v of key c: %s", v1)
v2 := table.Get([]byte("a"))
t.Logf("v of key a: %s", v2)
return nil
}); err != nil {
t.Error(err)
return
}
}
3 主流程走读
在本章中,我们将以第 2 章示例代码为入口,进行几个核心操作流程的源码流程走读,但涉足的源码深度相对较浅:
3.1 db定义
首先介绍 boltdb 中的一个核心类——DB. 对应为一个数据库实例的代码抽象,其中包含的核心成员属性,已通过图解和源码注释的方式给出:
// boltdb 抽象的数据库
type DB struct {
// ...
// 数据库文件名称
path string
// 打开文件方法
openFile func(string, int, os.FileMode) (*os.File, error)
// 数据库文件,所有数据存储于此
file *os.File
// 基于 mmap 技术映射的数据库文件内容
data *[maxMapSize]byte
// ...
// 两个轮换使用的 meta page
meta0 *meta
meta1 *meta
// 数据库单个 page 的大小,单位 byte
pageSize int
// 数据库是否已启动
opened bool
// 全局唯一的读写事务
rwtx *Tx
// 一系列只读事务
txs []*Tx
// freelist,管理空闲的 page
freelist *freelist
freelistLoad sync.Once
// 提高 page 字节数组复用率的对象池
pagePool sync.Pool
// ...
// 互斥锁,保证读写事务全局唯一
rwlock sync.Mutex
// 保护 meta page 的互斥锁
metalock sync.Mutex
// 保护 mmap 的读写锁
mmaplock sync.RWMutex
// 数据落盘持久化时使用的操作方法,对应为 pwrite 操作
ops struct {
writeAt func(b []byte, off int64) (n int, err error)
}
// 是否已只读模式启动数据库
readOnly bool
}
3.2 启动
3.2.1 主流程
通过 Open 方法可以启动 db,核心流程包括:
• 构造 db 实例,并读取各项 option 完成配置
• 通过传入的 path,打开对应的数据库文件(如果文件之前不存在,则会进行全新文件的创建)
• 倘若在创建新的数据库文件,则需要完成 2 个 meta page、1 个 freelist page 和 1 个 leaf element page 的初始化
• 构造 pagePool 对象池,后续可复用 page 的字节数组
• 执行 mmap 操作,完成数据库文件和内存空间的映射
• 返回构造好的 db 实例
func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
// 构造 db 实例
db := &DB{
opened: true,
}
// 启用默认配置
if options == nil {
options = DefaultOptions
}
// ...
// 默认不启用只读模式
if options.ReadOnly {
flag = os.O_RDONLY
db.readOnly = true
} else {
// always load free pages in write mode
db.PreLoadFreelist = true
}
// 打开数据库文件的操作方法
db.openFile = options.OpenFile
if db.openFile == nil {
db.openFile = os.OpenFile
}
// 打开数据库文件
var err error
if db.file, err = db.openFile(path, flag|os.O_CREATE, mode); err != nil {
_ = db.close()
return nil, err
}
// 数据库文件名称赋值
db.path = db.file.Name()
// ...
// 数据落盘操作
db.ops.writeAt = db.file.WriteAt
// 数据 page 大小
if db.pageSize = options.PageSize; db.pageSize == 0 {
// 默认等于操作系统 page 大小
db.pageSize = defaultPageSize
}
// 倘若从零到一创建一个新的 db 文件,则需要进行初始化
if info, err := db.file.Stat(); err != nil {
_ = db.close()
return nil, err
} else if info.Size() == 0 {
// 初始化 db
if err := db.init(); err != nil {
// ...
_ = db.close()
return nil, err
}
}
// ...
// 对象池,用于复用 page 的字节数组
db.pagePool = sync.Pool{
New: func() interface{} {
return make([]byte, db.pageSize)
},
}
// 基于 mmap 建立数据库文件和内存空间的映射
if err := db.mmap(options.InitialMmapSize); err != nil {
_ = db.close()
return nil, err
}
// 预加载 freelist
if db.PreLoadFreelist {
db.loadFreelist()
}
// ...
return db, nil
}
3.2.2 初始化
下面是启用一个全新数据库文件时,需要执行的初始化方法:
// 初始化一个全新的数据库文件
func (db *DB) init() error {
// 初始化数据库的 4 个 page:meta page * 2 + freelist page + leaf page
buf := make([]byte, db.pageSize*4)
// 初始化 mata page
for i := 0; i < 2; i++ {
p := db.pageInBuffer(buf, pgid(i))
p.id = pgid(i)
p.flags = metaPageFlag
// Initialize the meta page.
m := p.meta()
m.magic = magic
m.version = version
m.pageSize = uint32(db.pageSize)
m.freelist = 2
m.root = bucket{root: 3}
m.pgid = 4
m.txid = txid(i)
m.checksum = m.sum64()
}
// 初始化 freelist page
p := db.pageInBuffer(buf, pgid(2))
p.id = pgid(2)
p.flags = freelistPageFlag
p.count = 0
// 初始化空的 leaf page
p = db.pageInBuffer(buf, pgid(3))
p.id = pgid(3)
p.flags = leafPageFlag
p.count = 0
// 将初始化的 4 个 page 落盘,基于 pwrite + fdatasync 操作
if _, err := db.ops.writeAt(buf, 0); err != nil {
return err
}
if err := fdatasync(db); err != nil {
return err
}
db.filesz = len(buf)
return nil
}
3.2.3 mmap
下面是通过 mmap 实现数据文件与内存映射的源码,核心步骤包括:
• 加锁保证 mmap 操作并发安全
• 设置合适的 mmap 空间大小
• 倘若之前已经执行过 mmap,则需要善后处理
• 执行新一轮 mmap 操作
func (db *DB) mmap(minsz int) (err error) {
// 互斥锁,保护 mmap 并发安全
db.mmaplock.Lock()
defer db.mmaplock.Unlock()
info, err := db.file.Stat()
// ...
// 调整合适的 mmap 容量
fileSize := int(info.Size())
var size = fileSize
if size < minsz {
size = minsz
}
size, err = db.mmapSize(size)
if err != nil {
return err
}
// ...
// 倘若此前已经有读写事务在运行,此时因为要执行 mmap 操作,则需要对 bucket 内容进行重塑
if db.rwtx != nil {
db.rwtx.root.dereference()
}
// 解除之前建立的 mmap 映射
if err = db.munmap(); err != nil {
return err
}
// 建立新的 mmap 映射
if err = mmap(db, size); err != nil {
return err
}
// ...
return nil
}
mmap 底层通过系统调用实现,不同的操作系统会有不同的实现细节. 以我当前所用的 mac 为例,对应的 unix 系统版本实现源码如下:
// mmap memory maps a DB's data file.
func mmap(db *DB, sz int) error {
// Map the data file to memory.
b, err := unix.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|db.MmapFlags)
if err != nil {
return err
}
// Advise the kernel that the mmap is accessed randomly.
err = unix.Madvise(b, syscall.MADV_RANDOM)
if err != nil && err != syscall.ENOSYS {
// Ignore not implemented error in kernel because it still works.
return fmt.Errorf("madvise: %s", err)
}
// Save the original byte slice and convert to a byte array pointer.
db.dataref = b
db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
db.datasz = sz
return nil
}
3.3 建bucket(表)
一个 bucket 本质上是从属于其父 bucket b+ 树中的一笔特殊的 kv 对数据. 因此创建 bucket 的过程会和写入 kv 数据的流程相类似:
• 借助游标,找到 bucket key 所应当从属的父 bucket b+ 树的位置
• 创建子 bucket实例,并取得序列化后的结果
• 将 bucket 名称作为 key,bucket 序列化结果作为 value,以一组 kv 对的形式插入到父 bucket b+ 树中
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
// ...
// 获取游标
c := b.Cursor()
// 借助游标找到桶名 key 对应的位置
k, _, flags := c.seek(key)
// 桶已存在
if bytes.Equal(key, k) {
if (flags & bucketLeafFlag) != 0 {
return nil, ErrBucketExists
}
return nil, ErrIncompatibleValue
}
// 创建新的桶实例
var bucket = Bucket{
bucket: &bucket{},
rootNode: &node{isLeaf: true},
FillPercent: DefaultFillPercent,
}
// 取得桶的序列化结果
var value = bucket.write()
// 将这个新桶对应的 kv 对数据写入到 b+ 树中
key = cloneBytes(key)
c.node().put(key, key, value, 0, bucketLeafFlag)
// ...
// 返回创建好的新桶
return b.Bucket(key), nil
}
3.4 查bucket(表)
通过名称检索 bucket 的流程,一定程度上和数据的查询流程相类似:
• 查看父 bucket 的缓存 map,如果子 bucket 已反序列化过,则直接复用
• 通过游标 cursor 检索父 bucket 的 b+ 树,找到对应子 bucket 的 kv 对数据
• 根据 kv 数据反序列化生成子 bucket 实例
• 将子 bucket 添加到父 bucket 的缓存 map 中
• 返回检索得到的子 bucket
func (b *Bucket) Bucket(name []byte) *Bucket {
// 如果 map 中已经缓存了对应的桶,直接返回
if b.buckets != nil {
if child := b.buckets[string(name)]; child != nil {
return child
}
}
// 借助游标在 b+ 树中检索 kv 对
c := b.Cursor()
k, v, flags := c.seek(name)
// ...
// 找到桶后,对其反序列化
var child = b.openBucket(v)
// 缓存到 map 中
if b.buckets != nil {
b.buckets[string(name)] = child
}
// 返回桶
return child
}
3.5 数据crud
数据的 crud 过程同样是借助在 b+ 树上游走的游标 cursor 加以完成,下面分别示意增改、删、查操作的源码主流程:
• 增改
func (b *Bucket) Put(key []byte, value []byte) error {
// 前置校验
// ...
// 借助游标检索到 k v 对所在的位置
c := b.Cursor()
k, _, flags := c.seek(key)
// ...
// 在对应位置中插入 kv 对内容
key = cloneBytes(key)
c.node().put(key, key, value, 0, 0)
return nil
}
• 删
// 在表中删除 key
func (b *Bucket) Delete(key []byte) error {
// ...
// 借助游标移动到 key 对应位置
c := b.Cursor()
k, _, flags := c.seek(key)
// 倘若 key 不存在
if !bytes.Equal(key, k) {
return nil
}
// ...
// 在 b+ 树节点中删除对应的 key
c.node().del(key)
return nil
}
• 查
func (b *Bucket) Get(key []byte) []byte {
// 借助游标检索到 kv 对所在位置
k, v, flags := b.Cursor().seek(key)
// ...
// key 不存在,则返回空
if !bytes.Equal(key, k) {
return nil
}
// 返回对应的 value
return v
}
3.6 数据落盘
在 boltdb 提交读写事务时,会一次性将更新的脏数据溢写落盘:
• 通过 rebalance 和 spill 操作,保证 b+ 树的平衡性满足要求
• 执行 pwrite+fdatasync 操作,完成脏数据的 page 的一些落盘
• 通过 pagePool 回收用于指向这部分 page 对应的字节数组
• 由于更新了事务进度,meta page 也需要溢写落盘
• 关闭读写事务
更多细节参见下方的源码注释:
func (tx *Tx) Commit() error {
// ...
// 数据溢写磁盘前,需要调整一轮 b+ 树,保证其平衡性
// rebalance 是为了避免因为 delete 操作,导致某些节点 kv 对数量太少,不满足 b+ 树平衡性要求
tx.root.rebalance()
// ...
// spill 是为了避免因为 put 操作,导致某些节点 kv 对数量太多,不满足 b+ 树平衡性要求
if err := tx.root.spill(); err != nil {
tx.rollback()
return err
}
// 事务更新到的脏数据溢写落盘
if err := tx.write(); err != nil {
tx.rollback()
return err
}
// ...
// meta page 溢写落盘
if err := tx.writeMeta(); err != nil {
tx.rollback()
return err
}
// ...
// 关闭事务
tx.close()
// ...
return nil
}
// 事务脏页溢写落盘
func (tx *Tx) write() error {
// 事务缓存的脏页
pages := make(pages, 0, len(tx.pages))
for _, p := range tx.pages {
pages = append(pages, p)
}
// 清空缓存
tx.pages = make(map[pgid]*page)
// 对脏页进行排序
sort.Sort(pages)
// 按照顺序,将脏页溢写落盘
for _, p := range pages {
// page 总大小,包含 overflow 不分
rem := (uint64(p.overflow) + 1) * uint64(tx.db.pageSize)
// page 的 offset,可以根据 page id 推算得到
offset := int64(p.id) * int64(tx.db.pageSize)
var written uintptr
// Write out page in "max allocation" sized chunks.
for {
sz := rem
if sz > maxAllocSize-1 {
sz = maxAllocSize - 1
}
buf := unsafeByteSlice(unsafe.Pointer(p), written, 0, int(sz))
// 将 page 溢写到文件对应 offset 的位置
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}
rem -= sz
// 一次性写完了
if rem == 0 {
break
}
// 如果没有一次性写完,下一轮接着写
offset += int64(sz)
written += uintptr(sz)
}
}
// fdatasync 操作,确保数据溢写落盘完成
if !tx.db.NoSync || IgnoreNoSync {
if err := fdatasync(tx.db); err != nil {
return err
}
}
// 释放这部分已落盘 page,倘若其不存在 overflow,说明是标准规格的字节数组,则清空内容,然后添加到对象池中进行复用
for _, p := range pages {
// Ignore page sizes over 1 page.
// These are allocated using make() instead of the page pool.
if int(p.overflow) != 0 {
continue
}
buf := unsafeByteSlice(unsafe.Pointer(p), 0, 0, tx.db.pageSize)
// See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
for i := range buf {
buf[i] = 0
}
tx.db.pagePool.Put(buf) //nolint:staticcheck
}
return nil
}
至此,本篇结束.
4 展望
本文作为 etcd 存储引擎系列的开篇,带着大家一起以一个相关宏观的视角总览了 boltdb 的架构设计与核心概念. 本文内容相对停滞于浅层,针对几个核心方向的挖深力度有所不足,主要通过后续几个篇章持续发力展开,在此做个展望:
• etcd存储引擎之主干框架(本篇):偏宏观视角下介绍 boltdb 的定位、架构、特性,通过几个核心流程浅探 boltdb 实现源码
• etcd存储引擎之存储设计(待填坑):介绍 boltdb 存储模型、机制的设计实现,包含磁盘、内存两部分
• etcd存储引擎之b+树实现(待填坑):介绍 b+ 树理论模型及 boltdb 实现案例,包括模型定义及 crud 流程梳理
• etcd存储引擎之事务实现(待填坑):介绍 boltdb 事务的执行模式及实现原理