基于go实现redis之存储引擎

文摘   科技   2024-04-30 12:49   河北  

0 前言

欢迎回来,由我们一起继续推进技术分享专题—— 【基于go实现redis之存储引擎】 .

此前我已于 github 开源项目——goredis,由于我个人水平有限,如有实现不到位之处,欢迎批评指正:https://github.com/xiaoxuxiansheng/goredis

 

本系列正是围绕着开源项目goredis展开,共分为四篇内容,本文为其中的第三篇——存储引擎篇:

 

1 架构梳理

在往期内容中,我们介绍了有关 goredis 的整体架构,探讨了 goredis 的 server 端如何运行,指令分发层如何遵循 RESP 协议将原始请求内容解析成可识别的操作指令,并将其发往存储引擎层进行执行.

接下来我们就要正式打开存储引擎模块的黑盒子一探究竟.

 

1.1 DB

在上层 handler 的视角中,整个存储引擎模块是一个整体  被抽象成一个名为 DB 的接口,通过方法暴露出一个用于执行指令并返回结果的 Do 方法. 接口定义代码位于 handler/struct.go 文件中:

type DB interface {
    // 执行指令
    Do(ctx context.Context, cmdLine [][]byte) Reply
    // 关闭存储引擎
    Close()
}


type Reply interface {
    ToBytes() []byte
}

 

事实上从存储引擎内部视角中,可以将其自上而下拆分为三个模块:

  • • 数据触发层 Trigger: 属于一个代理层,封装指令并将其投递给 executor. 该模块采用并发模型

  • • 数据执行层 Executor: 接收来自 trigger 的指令,操作 dataStore 完成执行. 该模块采用单例串行的执行模式.

  • • 数据存储介质 DataStore: 底层存储数据的所在之处,收敛有关各种数据类型的实现模型和操作细节

 

1.2 Trigger

数据触发层 trigger 是狭义上针对 DB 接口的实现类.

从功能上看,trigger 负责封装指令并通过 channel 投递给底层的 executor,看起来只是扮演了一个简单的代理层角色,但是从职能架构上看,一方面实现了职能边界的拆分,另一方面实现了由并发模式到串行模式的过渡转换.

这部分代码位于 database/trigger.go:

type DBTrigger struct {
    // 保证关闭操作单次执行的单例工具.
    once     sync.Once
    // 依赖的数据库执行层 executor
    executor Executor
}

 

func (*DBTrigger) Do(ctx context.Context, cmdLine [][]byte) handler.Reply {
    // ...


    // 投递给到 executor
    d.executor.Entrance() <- &cmd


    // 监听 chan,直到接收到返回的 reply
    return <-cmd.Receiver()
}

 

1.3 Executor

executor 是全局单例运行的,通过串行执行的 run 方法进行数据操作流程的收口,保证底层数据存储介质一定只会在同一个协程中进行读写,不存在任何并发行为,因此底层存储模型可以实现无锁化.

有关 executor 的实现类代码位于 database/executor.go,其中包含如何核心成员属性:

  • • ctx、cancel:生命周期控制

  • • ch:从上游 trigger 接收指令的入口 channel

  • • cmdHandlers:各种指令类型对应的处理函数

  • • dataStore:更底层的存储介质

  • • gcTicker:定时驱动执行过期数据回收的定时器

// 具体的数据库执行层实现类
type DBExecutor struct {
    // 生命周期控制
    ctx    context.Context
    cancel context.CancelFunc
    
    // 用户接收 cmd 的入口
    ch     chan *Command


    // 根据 cmd 类型映射到具体的处理方法
    cmdHandlers map[CmdType]CmdHandler
    
    // 数据存储介质
    dataStore   DataStore


    // 驱动定期执行过期数据回收的定时器
    gcTicker *time.Ticker
}

 

// executor 持续运行
func (*DBExecutor) run() {
    for {
        select {
        // ...
        // 每隔一段时间批量回收过期的 key
        case <-e.gcTicker.C:
            e.dataStore.GC()
        // 接收来自 trigger 发送的 cmd,并执行具体的操作
        case cmd := <-e.ch:
            cmdFunc, ok := e.cmdHandlers[cmd.cmd]
            // ...
            // 惰性回收机制实现过期 key 删除
            e.dataStore.ExpirePreprocess(string(cmd.args[0])) 
            // 将执行结果通过 receiver chan 发送给 trigger
            cmd.receiver <- cmdFunc(cmd)
        }
    }
}

 

1.4 DataStore

针对数据存储介质的实现类是 kvStore,其中包含两个 map 和一个有序 set:

  • • data map: 记录了 redis 中的全量 key-value 数据

  • • expiredAt map: 记录了各个 key 对应的过期时间

  • • expireTimeWheel 有序 set: 基于过期时间实现 key 的有序存储,便于后续通过时间范围批量回收过期数据的操作

这部分代码位于 datastore/kv_store.go:

type KVStore struct {
    // 存储 keyvalue 数据的 map
    data      map[string]interface{}
    // 记录 key 过期时间的 map
    expiredAt map[string]time.Time
    // 各 key 根据过期时间排序的有序集合
    expireTimeWheel SortedSet
    // 持久化模块
    persister handler.Persister
}

 

2 数据类型

在 goredis 中,暂时只支持了 redis 中最简单的五种数据类型,对应的方法和用途如下:

 

2.1 string

对于其中最简单的字符串 string 类型,需要支持将文本内容以字节数组形式输出的核心方法:

// 字符串数据类型接口定义
type String interface {
    // 将字符串内容转为字节数组表达形式
    Bytes() []byte 
    database.CmdAdapter
}

 

// 字符串数据类型实现类
type stringEntity struct {
    // key value
    key, str string
}


// 将字符串内容转为字节数组表达形式 
func (*stringEntity) Bytes() []byte {
    return []byte(s.str)
}

 

2.2 list

针对列表 list 类型,需要支持一系列数据的追加和弹出操作,并支持根据指定索引获取特定范围内的数据:

// 列表数据类型接口定义
type List interface {
    // 从左侧推入数据
    LPush(value []byte)
    // 从左侧弹出数据
    LPop(cnt int64) [][]byte
    // 从右侧推入数据
    RPush(value []byte)
    // 从右侧弹出数据
    RPop(cnt int64) [][]byte
    // 列表中的数据个数
    Len() int64
    // 在指定索引范围内遍历列表
    Range(start, stop int64) [][]byte
    database.CmdAdapter
}

 

// 列表数据类型具体实现
type listEntity struct {
    key  string
    data [][]byte
}


// 从左侧推入数据
func (*listEntity) LPush(value []byte) {
    l.data = append([][]byte{value}, l.data...)
}


// 从左侧弹出数据
func (*listEntity) LPop(cnt int64) [][]byte {
    if int64(len(l.data)) < cnt {
        return nil
    }


    poped := l.data[:cnt]
    l.data = l.data[cnt:]
    return poped
}


// 从右侧推入数据
func (*listEntity) RPush(value []byte) {
    l.data = append(l.data, value)
}


// 从右侧弹出数据
func (*listEntity) RPop(cnt int64) [][]byte {
    if int64(len(l.data)) < cnt {
        return nil
    }


    poped := l.data[int64(len(l.data))-cnt:]
    l.data = l.data[:int64(len(l.data))-cnt]
    return poped
}


// 列表中的数据个数
func (*listEntity) Len() int64 {
    return int64(len(l.data))
}


// 在指定索引范围内遍历列表
func (*listEntity) Range(start, stop int64) [][]byte {
    if stop == -1 {
        stop = int64(len(l.data) - 1)
    }


    if start < 0 || start >= int64(len(l.data)) {
        return nil
    }


    if stop < 0 || stop >= int64(len(l.data)) || stop < start {
        return nil
    }


    return l.data[start : stop+1]
}

 

2.3 set

接下来是集合 set 类型,底层通过一个特殊定义的 map 来实现数据去重的功能:

// 集合数据类型的接口定义
type Set interface {
    // 向集合中添加元素
    Add(value string) int64
    // 判断集合中是否存在指定元素
    Exist(value string) int64
    // 从集合中移除元素
    Rem(value string) int64
    database.CmdAdapter
}

 

// 集合数据类型的实现类
type setEntity struct {
    // 整个集合维度对应的键
    key       string
    // 集合数据载体
    container map[string]struct{}
}


// 向集合中添加元素
func (*setEntity) Add(value string) int64 {
    if _, ok := s.container[value]; ok {
        return 0
    }
    s.container[value] = struct{}{}
    return 1
}


// 判断集合中是否存在指定元素
func (*setEntity) Exist(value string) int64 {
    if _, ok := s.container[value]; ok {
        return 1
    }
    return 0
}


// 从集合中移除元素
func (*setEntity) Rem(value string) int64 {
    if _, ok := s.container[value]; ok {
        delete(s.container, value)
        return 1
    }
    return 0
}

 

2.4 hashmap

字典 hashmap 类型的底层就是对 golang map 数据结构的封装:

// map 数据类型的接口定义
type HashMap interface {
    // 将 kv 对写入 map 
    Put(key string, value []byte)
    // 从 map 中获取 k 对应的 v
    Get(key string) []byte
    // 从 map 中删除 k
    Del(key string) int64
    database.CmdAdapter
}

 

// map 数据类型的实现类
type hashMapEntity struct {
    // 整个 map 维度对应的 key
    key  string
    data map[string][]byte
}


// 将 kv 对写入 map 
func (*hashMapEntity) Put(key string, value []byte) {
    h.data[key] = value
}


// 从 map 中获取 k 对应的 v
func (*hashMapEntity) Get(key string) []byte {
    return h.data[key]
}


// 从 map 中删除 k
func (*hashMapEntity) Del(key string) int64 {
    if _, ok := h.data[key]; !ok {
        return 0
    }
    delete(h.data, key)
    return 1
}

 

2.5 zset

zset(SortedSet)为有序集合,支持将存入的元素按照分数高低进行排序.

goredis 中通过跳表 skiplist 来实现 zset,此处在文字描述上重点展示其实现集合去重功能的一些特殊步骤,有关跳表结构本身的实现原理则不作赘述,感兴趣的同学可以参见我之前发表的文章——基于golang从零到一实现跳表.

首先是关于有序集合 zset 的接口定义,代码位于 struct/sorted_set.go:

// 有序集合数据类型的接口定义
type SortedSet interface {
    // 往有序集合中添加元素
    Add(score int64, member string)
    // 从有序集合中移除元素
    Rem(member string) int64
    // 获取指定分数范围内的元素
    Range(score1, score2 int64) []string
    database.CmdAdapter
}

 

在跳表 skiplist 类定义中,包含如下核心成员属性:

  • • key: 整个 zset 维度对应的 key

  • • memberToScore: 记录了每个元素对应的分数. 通过该 map 也能实现 member 的去重

  • • scoreToNode: 记录分数与跳表节点的映射关系

  • • head: 跳表头节点

  • • rander: 随机数生成器,用于在插入新节点时决定其高度

// 采用跳表作为有序集合数据类型的实现类
type skiplist struct {
    // 整个有序集合维度对应的键
    key           string
    // 由分数映射到跳表中的节点
    scoreToNode   map[int64]*skipnode
    // 由元素映射到其对应的分数
    memberToScore map[string]int64
    // 跳表的头节点
    head          *skipnode
    // 随机数生成器
    rander        *rand.Rand
}

 

其中,每个分数对应一个跳表节点,里面会通过一个 members 集合,记录对应于该分数下的所有元素:

// 跳表中节点的实现类
type skipnode struct {
    // 节点对应的分数
    score   int64
    // 节点中包含的 member 集合
    members map[string]struct{}
    // 后继节点列表
    nexts   []*skipnode
}

 

由于 zset 本质上还是一个集合,所以在插入一个新元素时,首先需要判定其之前是否已经存在于有序集合中,如果是的话,需要对老数据进行删除,然后再遵循跳表的操作流程完成数据插入.

// 往跳表中添加元素. 
func (*skiplist) Add(score int64, member string) {
    // member 此前已存在
    oldScore, ok := s.memberToScore[member]
    if ok {
        // 如果分数相同,则无需处理,直接返回
        if oldScore == score {
            return
        } 
        // 分数不同,则需要先移除 member 旧数据,再执行添加流程
        s.rem(oldScore, member)
    }


    // 建立 member 映射到 score 的关系
    s.memberToScore[member] = score
    // 判断 score 对应的跳表节点是否存在
    node, ok := s.scoreToNode[score]
    if ok {
        // 节点已存在,则在节点对应的 member 集合中添加 member 即可
        node.members[member] = struct{}{}
        return
    }


    // 构造出 score 对应的跳表节点
    // roll 出新节点高度
    height := s.roll()
    for int64(len(s.head.nexts)) < height+1 {
        s.head.nexts = append(s.head.nexts, nil)
    }


    // 构造节点实例
    inserted := newSkipnode(score, height+1)
    inserted.members[member] = struct{}{}
    s.scoreToNode[score] = inserted


    // 将新节点插入跳表
    move := s.head
    for i := height; i >= 0; i-- {
        for move.nexts[i] != nil && move.nexts[i].score < score {
            move = move.nexts[i]
            continue
        }


        inserted.nexts[i] = move.nexts[i]
        move.nexts[i] = inserted
    }
}

 

roll 方法中通过随机数生成器 rander,随机生成新插入节点的高度:

// 随机抛出新节点的高度
func (*skiplist) roll() int64 {
    var level int64
    for s.rander.Intn(2) > 0 {
        level++
    }
    return level
}

 

 

在删除元素时,遵循如下步骤:

  • • 首先根据元素找到对应的分数

  • • 通过分数找到对应的跳表节点

  • • 从跳表节点的元素集合中删去该元素

  • • 倘若该元素为跳表节点中的最后一个元素,则遵循跳表操作流程完成该节点的删除

// 从跳表中移除元素
func (*skiplist) Rem(member string) int64 {
    // 判断 member 之前是否存在
    score, ok := s.memberToScore[member]
    if !ok {
        return 0
    }
    // member 存在,进入专门的移除流程
    s.rem(score, member)
    return 1
}

 

// 从跳表中移除指定 score 下特定的某个 member
func (*skiplist) rem(score int64, member string) {
    // 删除 member 与 score 的映射关系
    delete(s.memberToScore, member)
    // 获取 member 所从属的跳表节点
    skipnode := s.scoreToNode[score]


     // 从跳表节点中删除该 member
    delete(skipnode.members, member)
    if len(skipnode.members) > 0 {
        return
    }


    // 删除完 member 后,该跳表节点中数据为空,则需要回收该节点
    delete(s.scoreToNode, score)
    move := s.head
    for i := len(s.head.nexts) - 1; i >= 0; i-- {
        for move.nexts[i] != nil && move.nexts[i].score < score {
            move = move.nexts[i]
        }


        if move.nexts[i] == nil || move.nexts[i].score > score {
            continue
        }


        remed := move.nexts[i]
        move.nexts[i] = move.nexts[i].nexts[i]
        remed.nexts[i] = nil
    }
}

 

最后是通过指定分数区间获取范围内元素的方法:

// 根据指定分数范围 [score1,score2],返回对应的元素列表 
func (*skiplist) Range(score1, score2 int64) []string {
    // 右边界为 -1,视为上不封顶
    if score2 == -1 {
        score2 = math.MaxInt64
    }


    if score1 > score2 {
        return []string{}
    }


    // 遍历跳表,找到首个 >= score1 的节点
    move := s.head
    for i := len(s.head.nexts) - 1; i >= 0; i-- {
        for move.nexts[i] != nil && move.nexts[i].score < score1 {
            move = move.nexts[i]
        }
    }


    // 来到了 level0 层,move.nexts[i] 如果存在,就是首个 >= score1 的元素
    if len(move.nexts) == 0 || move.nexts[0] == nil {
        return []string{}
    }


    res := []string{}
    for move.nexts[0] != nil && move.nexts[0].score >= score1 && move.nexts[0].score <= score2 {
        for member := range move.nexts[0].members {
            res = append(res, member)
        }
        move = move.nexts[0]
    }
    return res
}

 

3 指令执行

接下来,以 get 和 set 指令执行为例进行流程串联.

3.1 Trigger

首先,数据触发层 trigger 接收到来自 handler 分发的指令后,会对指令内容合法性进行校验,然后对指令进行封装,并通过 executor 暴露出的 channel 将其投递到执行层

func (*DBTrigger) Do(ctx context.Context, cmdLine [][]byte) handler.Reply {
    // 1 指令校验. 
    // 1.1 长度合法性校验
    if len(cmdLine) < 2 {
        return handler.NewErrReply(fmt.Sprintf("invalid cmd line: %v", cmdLine))
    }
    // 1.2 指令类型校验
    cmdType := CmdType(cmdLine[0])
    if !d.executor.ValidCommand(cmdType) {
        return handler.NewErrReply(fmt.Sprintf("unknown cmd '%s'", cmdLine[0]))
    }


    // 2 指令封装
    cmd := Command{
        ctx:      ctx,
        cmd:      cmdType,
        args:     cmdLine[1:],
        receiver: make(CmdReceiver),
    }


    // 3 指令投递
    d.executor.Entrance() <- &cmd


    // 4 监听 chan,直到接收到返回的 reply
    return <-cmd.Receiver()
}

 

3.2 Executor

数据执行层 executor 属于全局单例执行模式,通过单一协程运行 run 方法,持续接收来自 trigger 层投递的指令,根据指令类型映射到数据存储介质模块中对应的执行方法:

func (*DBExecutor) run() {
    for {
        select {
        // ...


        // 2 接收处理 cmd
        case cmd := <-e.ch:
            cmdFunc, ok := e.cmdHandlers[cmd.cmd]
            // ...
            cmd.receiver <- cmdFunc(cmd)
        }
    }
}

 

3.3 KVStore

数据存储介质中封装了一系列具体的数据操作方法,以 get 和 set 方法为例进行展示,代码位于 datastore/kv_store.go 文件中.

get 方法相对比较简单,首先根据 key 从 data map 中获取 value,然后将其断言成 string 类型并返回:

func (*KVStore) Get(cmd *database.Command) handler.Reply {
    args := cmd.Args()
    key := string(args[0])
    v, err := k.getAsString(key)
    // ...
    return handler.NewBulkReply(v.Bytes())
}


// 从 data map 中读取数据
func (*KVStore) getAsString(key string) (String, error) {
    v, ok := k.data[key]
    // ...


    str, ok := v.(String)
    // ...


    return str, nil
}

 

set 的本质流程是往 data map 中写入一笔 kv 数据,但还需要兼顾考虑 ex 和 nx 的附属指令:

func (*KVStore) Set(cmd *database.Command) handler.Reply {
    args := cmd.Args()
    key := string(args[0])
    value := string(args[1])


    // 支持 NX EX
    var (
        insertStrategy bool
        ttlStrategy    bool
        ttlSeconds     int64
        ttlIndex       = -1
    )


    for i := 2; i < len(args); i++ {
        flag := strings.ToLower(string(args[i]))
        switch flag {
        // 处理带 only insert 模式的 nx 指令
        case "nx":
            insertStrategy = true
        // 处理带过期时间的 ex 指令
        case "ex":
            // 重复的 ex 指令
            if ttlStrategy {
                return handler.NewSyntaxErrReply()
            }
            if i == len(args)-1 {
                return handler.NewSyntaxErrReply()
            }
            ttl, err := strconv.ParseInt(string(args[i+1]), 10, 64)
            // ...
            ttlStrategy = true
            ttlSeconds = ttl
            ttlIndex = i
            i++
        // ...
        }
    }


    // 将 args 剔除 ex 部分,进行持久化
    if ttlIndex != -1 {
        args = append(args[:ttlIndex], args[ttlIndex+2:]...)
    }


    // 将 kv 数据写入到 data map
    affected := k.put(key, value, insertStrategy)
    // 过期时间设置
    if affected > 0 && ttlStrategy {
        expireAt := lib.TimeNow().Add(time.Duration(ttlSeconds) * time.Second)
        _cmd := [][]byte{[]byte(database.CmdTypeExpireAt), []byte(key), []byte(lib.TimeSecondFormat(expireAt))}
        _ = k.expireAt(cmd.Ctx(), _cmd, key, expireAt) // 其中会完成 ex 信息的持久化
    }


    // 过期时间处理
    if affected > 0 {
        // 指令持久化
        k.persister.PersistCmd(cmd.Ctx(), append([][]byte{[]byte(database.CmdTypeSet)}, args...))
        return handler.NewIntReply(affected)
    }


    return handler.NewNillReply()
}

 

// 将 kv 数据写入 data map
func (*KVStore) put(key, value string, insertStrategy bool) int64 {
    if _, ok := k.data[key]; ok && insertStrategy {
        return 0
    }


    k.data[key] = NewString(key, value)
    return 1
}

 

4 过期回收

接下来,我们针对 goredis 中数据过期时间设置以及过期数据回收的流程进行串联.

4.1 数据结构

在 kvStore 的类定义中,通过两个成员属性记录了有关过期时间的信息:

  • • expiredAt: 记录数据 key 对应的过期时间

  • • expireTimeWheel: 一个特殊的有序集合,其中根据过期时间对 key 进行排序

type KVStore struct {
    // ...
    expiredAt map[string]time.Time
    expireTimeWheel SortedSet
    // ...
}

基于以上设计,我们对数据过期流程进行如下设计:

  • • 设置数据过期时间流程: 则分别记录 key 对应过期时间,并且将 key 添加到基于时间排序的有序集合中

  • • 回收过期数据流程: (1)在执行某个 key 的操作指令时,先通过过期时间 map 获取其过期信息,倘若数据已经过期,则对其进行删除(惰性删除机制);(2)通过定时任务驱动,定期对有序集合中的已过期的数据进行回收.

 

4.2 设置ttl

在 goredis 的实现中,涉及到设置数据过期时间的指令包括 set(带 ex)、expire、expireAt:

 

以 expire 指令为例,首先推算出数据的过期时间点,然后将 key 对应的过期时间添加到过期时间 map,并将其添加到过期时间有序集合中:

func (*KVStore) Expire(cmd *database.Command) handler.Reply {
    args := cmd.Args()
    key := string(args[0])
    ttl, err := strconv.ParseInt(string(args[1]), 10, 64)
    // ... 


    expireAt := lib.TimeNow().Add(time.Duration(ttl) * time.Second)
    _cmd := [][]byte{[]byte(database.CmdTypeExpireAt), []byte(key), []byte(lib.TimeSecondFormat(expireAt))}
    return k.expireAt(cmd.Ctx(), _cmd, key, expireAt)
}

 

func (*KVStore) expireAt(ctx context.Context, cmd [][]byte, key string, expireAt time.Time) handler.Reply {
    k.expire(key, expireAt)
    k.persister.PersistCmd(ctx, cmd) // 持久化
    return handler.NewOKReply()
}

 

func (*KVStore) expire(key string, expiredAt time.Time) {
    if _, ok := k.data[key]; !ok {
        return
    }
    k.expiredAt[key] = expiredAt
    k.expireTimeWheel.Add(expiredAt.Unix(), key)
}

 

4.3 过期回收

 

数据执行层 executor 运行过程中,通过 for + select 运行模式同时监听 gcTicker 和 cmdChan.

在接收到来自 gcTicker 信号时,会执行 GC 方法,通过过期时间有序集合获取到所有已过期的数据 key,然后一次性完成过期数据的回收:

func (*DBExecutor) run() {
    for {
        select {
        // ...
        // 每隔 1 分钟批量一次过期的 key
        case <-e.gcTicker.C:
            e.dataStore.GC()
        // ...
        }
    }
}

 

func (*KVStore) GC() {
    // 找出当前所有已过期的 key,批量回收
    nowUnix := lib.TimeNow().Unix()
    for _, expiredKey := range k.expireTimeWheel.Range(0, nowUnix) {
        k.expireProcess(expiredKey)
    }
}

 

func (*KVStore) expireProcess(key string) {
    delete(k.expiredAt, key)
    delete(k.data, key)
    k.expireTimeWheel.Rem(key)
}

 

在接收到来自 cmdChan 的操作指令时,则会针对 key 进行过期时间校验,倘若数据已过期,则在此时推动完成数据的回收:

func (*DBExecutor) run() {
    for {
        select {
        // ...
        case cmd := <-e.ch:
            // ...
            // 惰性机制实现过期 key 删除
            e.dataStore.ExpirePreprocess(string(cmd.args[0])) 
            // ...
        }
    }
}

 

func (*KVStore) ExpirePreprocess(key string) {
    expiredAt, ok := k.expiredAt[key]
    if !ok {
        return
    }


    if expiredAt.After(lib.TimeNow()) {
        return
    }


    k.expireProcess(key)
}

 

5 展望

至此为存储引擎篇的全部内容,在此对本系列内容做个小结和展望:



小徐先生的编程世界
在学钢琴,主业码农
 最新文章