etcd存储引擎之事务实现

文摘   科技   2024-03-14 23:22   北京  

0 前言

本期我们继续延续 etcd 存储引擎系列的话题. 在该系列中,我们以 boltdb 作为 b+树 工程化落地的学习案例,该项目开源地址为:https://github.com/etcd-io/bbolt,go 语言纯度接近 100%. (本系列涉及走读的 boltdb 源码版本为 v1.3.8

下面是本专题的分享节奏,本文是其中的最后一篇——事务实现篇:

 

1 事务核心概念

1.1 定义

在计算机术语中,事务指的是一个具有原子性的逻辑执行单元,它内部可能由一个或多个更细粒度的操作组成,但在技术层面保证事务中的所有动作在执行时就像是完成单个动作一样不可拆解,以此确保操作目标的完整性的一致性.

 

1.2 性质

谈到事务,我们知道其有着著名的 acid(atomicity-consistency-isolation-durability) 四大特性:

  • • atomicity 原子性:事务是最小的执行单元,不可拆解, 不存在中间态,要么全部成功,要么全部失败

  • • consistency 一致性:数据变更是否满足约束,查询结果是否正确、符合预期. 个人认为,一致性是事务追求的最终目标,由其他三项性质共同保证

  • • isolation 隔离性:事务之间相互隔离,不产生干扰. 事务的隔离等级由低到高可进一步划分为:

隔离等级描述存在问题
读未提交会读到其他事务未提交的脏数据脏读
读已提交读取其他事务已提交的最新数据,因此可能存在前后查询结果不一致的问题不可重复读
可重复读事务执行期间查询结果一致,但范围查询时数据条数可能变化幻读
串行化读写事务串行化执行,隔离程度好,但性能可能成为瓶颈并发度(性能)低
  • • durability 持久性:事务提交后,变更内容持久存储,稳定生效.

 

boltdb 能够很好地满足上述有关事务的各项核心性质,下面我们就很对 boltdb 中的具体实现,逐一论证上述各项性质都是如何得到保证.

 

2 事务机制实现

2.1 类定义

boltdb 中针对事务有个对应的类定义,核心属性包括:

  • • writable:标识事务是否为读写模式

  • • managed:标识是否为隐式事务. 即生命周期托管于 boltdb 的事务,不可人为操作显式提交或者回滚

  • • db:事务从属的 db 实例

  • • meta: 事务启动时,深拷贝生成的 meta page 副本

  • • root: 事务启动时,深拷贝生成的 bucket 副本

  • • pages:事务执行过程中,生成的脏数据 page

// boltdb 中的事务实例
type Tx struct {
    writable       bool // 是否为读写事务
    managed        bool // 是否为隐式事务,提交和回滚行为由系统控制,不可人为操控
    db             *DB // 从属的 db 实例
    meta           *meta // 事务启动时,拷贝的 meta page 副本
    root           Bucket // 事务启动时,拷贝的 bucket 副本
    pages          map[pgid]*page // 当前事务使用到的脏数据 page
    // ...
    commitHandlers []func() // 事务提交成功后,执行的回调函数
    // ...    
}

 

在 db 类定义中,也有与事务相关的部分:

  • • meta0、meta1:轮换使用的 meta page

  • • rwtx:全局至多只能同时存在一个读写事务

  • • txs:并行的一系列只读事务

  • • rwlock:保证读写事务单一串行的互斥锁

// 一个运行的 boltdb 实例
type DB struct {
    // ...
    meta0    *meta  // 轮换使用的 meta page,可以一个理解为一个版本的数据源头
    meta1    *meta  // 轮换使用的 meta page,可以一个理解为一个版本的数据源头
    // ...
    rwtx     *Tx // 全局唯一的读写事务
    txs      []*Tx // 一系列并行的只读事务
    // ...
    rwlock   sync.Mutex   // 保证读写事务串行的互斥锁
    // ...
}

 

2.2 原子性

首先针对事务原子性语义的保证,boltdb 采取的策略就是采用写时复制 copy-on-write 机制,使得所有产生中间态的数据都临时缓存于一份副本中,最终随着事务提交使得所有变更内容一次性生效. 针对这个过程的核心步骤作具体拆解:

  • • 针对数据的变更操作,统一通过读写事务包装

  • • 在执行过程中统一通过深拷贝产生副本,包括 meta 和 bucket 副本

  • • 修改数据时,将节点对应 page 反序列化成内存中的 node 副本

  • • 提交事务时,为所有涉及改动的 node 分配新的 page(脏数据 page)

  • • 修改 meta page 副本下的 b+树拓扑结构,使得脏数据 page 嵌入其中

  • • 将脏数据 page 持久化落盘

  • • 将 meta page 副本持久化落盘(此步骤是成败关键,因为 meta page 可以视为一个数据版本的起点,因此一个 meta 副本的生效与否,将决定了一整个数据版本的变更内容是生效还是作废,也是原子性结果的核心保证)

 

下面我们走进源码当中,串联一下各个环节与之相关的一些技术设计:

  • • 首先,在事务初始化时,针对最新版本(txid 较大)的 meta page以及对应的 bucket,深拷贝出一份副本

func (tx *Tx) init(db *DB) {
    // 基于 copy-on-write 机制,深拷贝出一份 meta page 副本
    tx.meta = &meta{}
    db.meta().copy(tx.meta)


    // 基于 copy-on-writer 机制,深拷贝出一份 bucket 副本. 并通过 meta page 副本持有其引用. 这会作为一个版本的数据源
    tx.root = newBucket(tx)
    tx.root.bucket = &bucket{}
    *tx.root.bucket = tx.meta.root


    // 针对读写事务,需要在 meta page 副本中对 tx id 递增
    if tx.writable {
        tx.pages = make(map[pgid]*page)
        tx.meta.txid += txid(1)
    }
}

 

  • • 事务执行过程中,每当需要修改数据时,通过 page 反序列出对应的内存 node 副本,再进行修改:

// 修改节点数据前,先获取节点反序列后的 node 副本
func (*Bucket) node(pgId pgid, parent *node) *node {
    // ...
    // 构造 node 副本实例
    n := &node{bucket: b, parent: parent}
    // ...
    // 根据 page id 从 mmap page buffer 中获取到对应的 page
    var p = b.page
    if p == nil {
        p = b.tx.page(pgId)
    }


    // 将 page 内容反序列 node 中
    n.read(p)
    // node 副本缓存到 bucket 中
    b.nodes[pgId] = n


    // ...
    return n
}

 

  • • 事务提交时,为所有 node 副本序列化分配新的 page (脏数据 page),并将其嵌入到 meta page 副本下的 b+ 树副本中,不直接影响原 page 中上一个版本的数据:

// 提交事务
func (tx *Tx) Commit() error {
    // rebalance ...
    
    // spill,过程中针对有修改的节点,pending 其 old page,为其分配新 page 副本
    if err := tx.root.spill(); err != nil {
        // ...
    }


    // ...
}


// spill 一个 bucket
func (*Bucket) spill() error {
    // ...
    if err := b.rootNode.spill(); err != nil {
        return err
    }
    // ...
}


// spill 一个 node
// 所有涉及变更节点部分,pending 老 page,分配新 page 副本(copy-on-write):
func (*node) spill() error {
    // ...
    // 修改被反序列化过的 node 副本(涉及变更)
    for _, node := range nodes {
        // 为其分配的新的 page
        p, err := tx.allocate((node.size() + tx.db.pageSize - 1) / tx.db.pageSize)
        // ...
        // node 序列化到 page
        node.pgid = p.id
        node.write(p)
        // ...


        // node 与新的 page 映射关系,以 kv 对形式写入到 parent 中
        if node.parent != nil {
            // ...
            node.parent.put(key, node.inodes[0].key, nil, node.pgid, 0)
            // ...
        }


        // ...
    }


    // ...
    return nil
}

 

  • • 接下来,将所有脏数据 page 通过 pwrite + fdatasync 操作持久化落盘. 但大家需要清楚,此时新版本数据还没生效,因为正如我一再强调的,一个数据版本的起源来自于 meta page 的 root 指针,所以最终实现一锤定音效果的是 meta page 副本落盘生效的环节,这个步骤同样通过 pwrite + fdatasync 操作实现.

// 提交事务
func (tx *Tx) Commit() error {
    // rebalance... 
    
    // spill...
    // 脏数据 page 副本溢写落盘
    if err := tx.write(); err != nil {
        // ...
    }


    // meta page 副本溢写落盘
    if err := tx.writeMeta(); err != nil {
        tx.rollback()
        return err
    }
    // 至此,副本成功覆写,走完 copy-on-write 完整循环
    return nil
}

如果 meta page 副本溢写落盘成功,则事务执行成功,否则,事务执行失败,进行回滚.

 

2.3 隔离性

boltdb 的事务隔离级别是介于可重复读和串行化之间的:

  • • 其能够完美满足【可重复读】 的语义

  • • 由于 boltdb 是 kv 数据库,没有范围统计的场景,因此不存在【幻读】 问题

  • • boltdb 支持多个只读事务和读写事务并行,但是保证全局同一时刻只能有一个读写事务运行

基于以上,boltdb 在读写事务的执行模式上是串行化的,但是能够和只读事务之间并行,保证更好的读并发性.

下面通过源码,论证一遍上述的步骤.

首先,在读写事务启动时通过一把锁实现互斥,保证全局唯一性

// 启动读写事务
func (db *DB) beginRWTx() (*Tx, error) {
    // ...
    db.rwlock.Lock() // 读写事务互斥锁
    // ...
    return t, nil
}

 

只读事务运行时则不取事务锁,直接追加到 db.txs 中:

// 启动只读事务
func (db *DB) beginTx() (*Tx, error) {
    // ...
    // 追加只读事务进入 list
    db.txs = append(db.txs, t)
    // ...
    return t, nil
}

 

在一个事务关闭时,如果是读写事务,需要解开事务锁;如果是只读事务,则直接从 list 中移除

// 关闭事务
func (tx *Tx) close() {
    // ...
    // 读写事务
    if tx.writable {
        // 解除读写事务锁
        tx.db.rwlock.Unlock()
        // ...
    } else {
        // 从 list 中移除只读事务
        tx.db.removeTx(tx)
    }
    // ...
}

 

boltdb 针对解决可重复读问题的核心机制是 copy-on-write 加 pending 机制,实现步骤可以拆解如下:

  • • 每个事务都有一个全局唯一的 txid,txid 随着读写事务的迭代单调递增,对于只读事务,则每次都同步当前最大的 txid

  • • 事务开启时,找到 txid 最大的 meta page深拷贝出一个副本,并始终作为读取数据的版本,这样在事务整个生命周期中,数据版本是保持一致的. 所以接下来的问题就在于,如何保持这个版本下的数据正常存在,不被提前修改或者删除,避免产生【不可重复读】问题

  • • 读写事务提交时,记录老数据的旧 page 不会立即 release,而是会被置为 pending 状态

  • • 在release pending page 的流程中,会通过比较 txid 的大小,确保只有在不存在有更小编号的只读事务运行时,才把 pending 状态的 page 完全 release

 

针对上述涉及到的 pending page 和 release page 流程,涉及到与 freelist 模块的交互,其中包含两个核心成员属性:

  • • ids:所有已经 release,可直接复用的 page

  • • pending:某个事务下的 pending page list,当所有 < 该 txid 的只读事务都结束时,该组 pending page 会被 release

将 page 置为 pending 和 release 状态的方法为 freelist.free 和 freelist.release:

// 某个读写事务将某个 page 追加到 pending page list
func (*freelist) free(txid txid, p *page) {
    // ...
    // 获取对应事务的 pending page list
    txp := f.pending[txid]
    // ...
    // 将 free 的 page 追加到 pending page list 中
    for id := p.id; id <= p.id+pgid(p.overflow); id++ {
        // ...
        txp.ids = append(txp.ids, id)
        // ...
    }
}

 

// 将所有 <= txid 的 pending page 都进行 release:
func (*freelist) release(txid txid) {
    // ...
    for tid, txp := range f.pending {
        if tid <= txid {
            // 将 pending page 追加到 release list
            m = append(m, txp.ids...)
            // 将某个 tx id 的 pending page list 整个移除掉
            delete(f.pending, tid)
        }
    }
    // ...
}

 

 

在事务初始化时,会基于当前 txid 最大的 meta page 深拷贝出一份副本:

// 初始化事务
func (tx *Tx) init(db *DB) {
    // ...
    // 为事务创建一份 meta page 副本
    tx.meta = &meta{}
    // 从当前 db 最新的 meta page 中进行深拷贝
    db.meta().copy(tx.meta)


    // ...
    // 针对读写事务,递增 meta page 副本中的事务 id
    if tx.writable {
        tx.pages = make(map[pgid]*page)
        tx.meta.txid += txid(1)
    }
}

在整个事务运行过程中,使用的都是这份 meta page 副本对应的一个数据版本,因此只要该版本的数据不被修改,那么可重复读的语义就能够保证.

事务执行过程中,修改数据是将节点反序列化为 node 副本,在内存中完成修改后,再通过事务提交使副本生效. 这个过程中,不会直接对原本的 page 进行覆盖,而是会创建新的脏数据 page 副本承接数据,且针对原本使用的旧 page,不会立刻释放,而是将其添加到当前事务下的 pending page list 当中:

free page 的调用时机发生在事务提交流程中,此时会进入到每个 node 的 spill 流程中,把所有涉及改动的 node 副本的旧 page 添加到当前事务的 pending page list 中

// spill 一个发生过反序列化过的子节点
func (*node) spill() error {
    // ...将 node spill 成多个符合规格的节点 -> nodes
    // 遍历每个 spill 过的 node
    for _, node := range nodes {
        // 将 node 副本对应的 old page 追加到当前事务的 pending page list 中
        if node.pgid > 0 {
            tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))
            node.pgid = 0
        }


        // 为 node 副本分配新的 page
        p, err := tx.allocate((node.size() + tx.db.pageSize - 1) / tx.db.pageSize)
        // ...
    }
    // ...
}

 

接下来我们关注的是,这部分 pending page 何时会得到 release 的机会.

每当有新读写事务启动时,会记录当前仍然在运行的最小 tx id,并把小于该值的事务对应的 pending page 全部 release 掉:

// 启动读写事务
func (db *DB) beginRWTx() (*Tx, error) {
    // ...
    // 释放所有已不再被读事务依赖的 pending page
    db.freePages()
    return t, nil
}

 

// 释放当前所有已经不再被读事务依赖的 pending page.
func (db *DB) freePages() {
    // 通过只读事务 list,获取当前仍在运行的最小只读事务 txid
    sort.Sort(txsById(db.txs))
    if len(db.txs) > 0 {
        minid = db.txs[0].meta.txid
    }
    
    // 把所有小于该值的 tx id 对应的 pending page 统统 release
    if minid > 0 {
        db.freelist.release(minid - 1)
    }
    // ...
}

 

2.4 持久性

最后是有关于事务持久性的保证,这部分通过 pwrite + fdatasync 操作,保证数据被稳定地持久化落盘:

这部分内容其实在本系列的第一、二篇中都做过介绍,本质就是通过 pwrite 操作将数据溢写到文件的指定 offset,再通过 fdatasync 操作确保该文件下的设备 io 操作完成之后,流程才继续向下.

该流程对应的源码梳理如下:

// 提交事务
func (tx *Tx) Commit() error {
    // ...
    // 基于 pwrite + fdatasync 操作实现脏数据 page 持久化落盘
    if err := tx.write(); err != nil {
        // ...
    }


    // ...
    // 基于 pwrite + fdatasync 操作实现 meta page 副本持久化落盘
    if err := tx.writeMeta(); err != nil {
        // ...
    }
    // ...
}

直到 meta page 副本成功落盘,这样一个读写事务就成功提交,一个新的数据版本也正式生效.

这个步骤尤为关键,如果 meta page 落盘发生错误,那么事务会视为失败,新生成的 meta page 会因为校验和检查失败被视为失效版本,此时会基于冗余的另一份 meta page 正常对外提供服务.

// 获取 db 中数据更新且有效的 meta page
func (db *DB) meta() *meta {
    // 优先获取 txid 更大的 meta page(保证数据实时性)
    metaA := db.meta0
    metaB := db.meta1
    if db.meta1.txid > db.meta0.txid {
        metaA = db.meta1
        metaB = db.meta0
    }


    // 保证会使用有效的 meta page(基于 checksum 校验)
    if err := metaA.validate(); err == nil {
        return metaA
    } else if err := metaB.validate(); err == nil {
        return metaB
    }


    // ...
}

 

3 主流程

接下来,我们从另一个视角切入,梳理事务运行的生命周期,分别针对事务启动、提交和回滚的流程进行源码走读.

3.1 隐式事务

首先,事务可以分为显式和隐式两类. 所谓显式事务,是使用方手动通过 begin 操作启动的事务,使用方需要通过 commit 或者 rollback 操作来将事务执行结果推向终态.

隐式事务则是将事务生命周期托管给 db,由 db 来决策何时应该 commit 或 rollback 事务,使用方无需关心.

首先,使用方可以通过 db.Update 方法启动一个隐式的读写事务,由 boltdb 帮助管理事务生命周期,完成事务的提交或者回滚. 该方法中涉及的核心步骤包括:

  • • 启动一个读写事务实例

  • • 将事务实例的 managed 标识置为 true,标识其为隐式事务

  • • 执行用户传入的闭包函数

  • • 根据执行结果,决定提交还是回滚事务

// Update 启动隐式读写事务
func (db *DB) Update(fn func(*Tx) error) error {
    // 启动一个新的读写事务实例
    t, err := db.Begin(true)
    // ...
    // 兜底保证进行回滚. 如果事务正常提交成功的话,必然会将 db 引用指针为 nil
    defer func() {
        if t.db != nil {
            t.rollback()
        }
    }()


    // 标记读写事务为隐式事务,不允许由使用方手动提交或者回滚
    t.managed = true


    // 执行使用方传入的闭包函数
    err = fn(t)
    t.managed = false
    // 中途发生错误,则进行回滚
    if err != nil {
        _ = t.Rollback()
        return err
    }
 
    // 闭包函数未发生错误,提交事务
    return t.Commit()
}

 

针对隐式只读事务,则可以通过 db.View 方法启动,大体步骤和 Update 方法类似,区别在于事务会以只读模式运行,并且不论成功与否,都会通过 rollback 操作来执行流程,这是因为事务具有的只读属性,因此即便回滚也不会对数据状况产生影响.

// View 启动隐式只读事务
func (db *DB) View(fn func(*Tx) error) error {
    // 启动只读事务
    t, err := db.Begin(false)


    // 兜底保证进行回滚. 如果事务正常结束的话,必然会将 db 引用指针为 nil 
    defer func() {
        if t.db != nil {
            t.rollback()
        }
    }()


    // 标记事务为隐式事务
    t.managed = true


    // 执行使用方传入的闭包函数
    err = fn(t)
    t.managed = false
    
    // 倘若闭包函数发生错误,进行事务回滚
    if err != nil {
        _ = t.Rollback()
        return err
    }


    // 因为是只读事务,不会对数据状况发生变化,因此事务可以通过回滚来结束
    return t.Rollback()
}

 

 

3.2 启动事务

下面是事务的启动流程,以 db.Begin 方法为入口,根据事务是否为读写事务,分为不同的实现:

// 启动事务. 进一步分为读写和只读两种模式
func (db *DB) Begin(writable bool) (*Tx, error) {
    if writable {
        return db.beginRWTx()
    }
    return db.beginTx()
}

针对读写事务的核心方法为 beginRWTx,核心步骤包括:

  • • 添加事务互斥锁

  • • meta page 互斥锁

  • • 构造读写事务实例

  • • 深拷贝构造 meta page 和 bucket 副本

  • • meta page 副本中事务 id 递增

  • • 当前读写事务设置为全局的 db.rwtx

  • • release 所有已不再被使用的 pending page

// 启动读写事务
func (db *DB) beginRWTx() (*Tx, error) {
    // ...
    // 事务互斥锁
    db.rwlock.Lock()


    // meta 模块互斥锁
    db.metalock.Lock()
    defer db.metalock.Unlock()


    // 构造事务实例
    t := &Tx{writable: true}
    // 初始化事务
    t.init(db)
    db.rwtx = t
    // 释放所有已不再被引用的 pending page
    db.freePages()
    return t, nil
}


// 初始化事务
func (tx *Tx) init(db *DB) {
    // ...
    // 深拷贝一份 meta page 副本
    tx.meta = &meta{}
    db.meta().copy(tx.meta)


    // 深拷贝一份 bucket 副本,并通过 meta page 副本指向其引用. 这会作为一个版本的数据源
    tx.root = newBucket(tx)
    tx.root.bucket = &bucket{}
    *tx.root.bucket = tx.meta.root


    // 如果是读写事务,需要在 meta page 副本中递增事务 id. 
    // 最终在事务提交, meta page 副本落盘时,这个递增的事务 id 才会在全局维度生效,保证这份 meta 有最高的优先级
    if tx.writable {
        tx.pages = make(map[pgid]*page)
        tx.meta.txid += txid(1)
    }
}

 

对于只读事务的启动流程,大部分流程和读写事务相同,但其中存在几处差异:

  • • 不需要添加事务互斥锁

  • • 构造事务实例为只读模式

  • • 初始化事务时,不对 meta page 副本的事务 id 递增

  • • 当前只读事务实例追加到 db.txs 列表

  • • 不需要执行 release page 流程

// 启动只读事务
func (db *DB) beginTx() (*Tx, error) {
    // ...
    // 加 meta page 互斥锁
    db.metalock.Lock()


    // 构造只读事务实例
    t := &Tx{}
    // 初始化事务
    t.init(db)


    // 只读事务添加到到 txs 中
    db.txs = append(db.txs, t)
    n := len(db.txs)


    // 解 meta page 互斥锁
    db.metalock.Unlock()


    // ...
    return t, nil
}

 

3.3 提交事务

最后整体再梳理一次提交事务的流程(通过前文得知,只有读写事务有机会执行 commit),核心步骤包括:

  • • 针对 b+ 树副本进行 rebalance

  • • 针对 b+ 树副本进行 spill,过程中会对所有涉及修改的 node 分配脏数据 page,以及把 old page 添加到当前事务的 pending page list

  • • 对变更后的 freelist 进行序列化,并更新当前的 meta page 副本的 freelist 引用

  • • 脏数据 page 溢写落盘

  • • meta page 副本溢写落盘(至此,事务执行成功)

  • • 关闭事务

  • • 执行使用方预设的事务提交回调函数

// 提交读写事务
func (tx *Tx) Commit() error {
    // ... 
    // 针对 b+ 树副本进行 rebalance
    tx.root.rebalance()
    // ...
    
    opgid := tx.meta.pgid
    // 针对 b+ 树进行 spill. 过程中,针对所有涉及修改的节点,会分配新的 page 副本
    // 嵌入到新版本的 b+ 树副本中,并通过 meta page 副本指向新版的 b+ 树
    if err := tx.root.spill(); err != nil {
        // ...
    }
    // ...
    tx.meta.root.root = tx.root.root


    // 更新 freelist 部分... 
    err := tx.commitFreelist()


    // 如果针对 mmap 进行了扩容,此处需要针对 file 进行 truncate 操作...


    // 脏数据 page 基于 pwrite + fdatasync 操作溢写落盘
    if err := tx.write(); err != nil {
        // ...
    }


    // meta page 副本基于 pwrite + fdatasync 操作溢写落盘
    if err := tx.writeMeta(); err != nil {
        // ...
    }
    
    // 至此事务提交成功
    
    // 关闭事务
    tx.close()


    // 执行事务回调函数
    for _, fn := range tx.commitHandlers {
        fn()
    }


    return nil
}

 

3.4 回滚事务

针对显式事务,使用方可以手动进行回滚操作 . 得益于 boltdb 的 copy-on-write 以及 pending 机制,使得事务的回滚操作异常简单,只需要把拟回滚事务对应的 pending page list 还原成正常 page 即可:

// 回滚事务
func (tx *Tx) Rollback() error {
    // ...
    tx.nonPhysicalRollback()
    return nil
}


func (tx *Tx) nonPhysicalRollback() {
    // ...
    // 针对读写事务,将 freelist 中置为 pending 状态的 page 进行恢复
    if tx.writable {
        tx.db.freelist.rollback(tx.meta.txid)
    }
    // 关闭事务
    tx.close()
}

 

// 从 freelist 中,将 tx id 对应的 pending page list 删除
func (*freelist) rollback(txid txid) {
    // ...
    delete(f.pending, txid)
    // ...
}

 

4 总结

祝贺!至此,我们完成了【etcd存储引擎】 系列的全部内容,这里我们一起对往期内容作个回顾:



小徐先生的编程世界
在学钢琴,主业码农
 最新文章