0 前言
欢迎回来,由我们一起继续推进技术分享专题—— 【基于go实现redis之存储引擎】 .
此前我已于 github 开源项目——goredis,由于我个人水平有限,如有实现不到位之处,欢迎批评指正:https://github.com/xiaoxuxiansheng/goredis
本系列正是围绕着开源项目goredis展开,共分为四篇内容,本文为其中的第三篇——存储引擎篇:
• 基于go实现redis之指令分发(已完成): 聚焦介绍 goredis 服务端如何启动和运行,并在接收客户端请求后实现指令协议的解析和分发
• 基于go实现redis之存储引擎(本篇): 聚焦介绍数据存储层中单协程无锁化执行框架,各类基本数据类型的底层实现细节,以及过期数据的惰性和定期回收机制
• 基于go实现redis之数据持久化(待填坑): 介绍goredis关于aof持久化机制的实现以及有关于aof重写策略的执行细节
1 架构梳理
在往期内容中,我们介绍了有关 goredis 的整体架构,探讨了 goredis 的 server 端如何运行,指令分发层如何遵循 RESP 协议将原始请求内容解析成可识别的操作指令,并将其发往存储引擎层进行执行.
接下来我们就要正式打开存储引擎模块的黑盒子一探究竟.
1.1 DB
在上层 handler 的视角中,整个存储引擎模块是一个整体 , 被抽象成一个名为 DB 的接口,通过方法暴露出一个用于执行指令并返回结果的 Do 方法. 接口定义代码位于 handler/struct.go 文件中:
type DB interface {
// 执行指令
Do(ctx context.Context, cmdLine [][]byte) Reply
// 关闭存储引擎
Close()
}
type Reply interface {
ToBytes() []byte
}
事实上从存储引擎内部视角中,可以将其自上而下拆分为三个模块:
• 数据触发层 Trigger: 属于一个代理层,封装指令并将其投递给 executor. 该模块采用并发模型
• 数据执行层 Executor: 接收来自 trigger 的指令,操作 dataStore 完成执行. 该模块采用单例串行的执行模式.
• 数据存储介质 DataStore: 底层存储数据的所在之处,收敛有关各种数据类型的实现模型和操作细节
1.2 Trigger
数据触发层 trigger 是狭义上针对 DB 接口的实现类.
从功能上看,trigger 负责封装指令并通过 channel 投递给底层的 executor,看起来只是扮演了一个简单的代理层角色,但是从职能架构上看,一方面实现了职能边界的拆分,另一方面实现了由并发模式到串行模式的过渡转换.
这部分代码位于 database/trigger.go:
type DBTrigger struct {
// 保证关闭操作单次执行的单例工具.
once sync.Once
// 依赖的数据库执行层 executor
executor Executor
}
func (d *DBTrigger) Do(ctx context.Context, cmdLine [][]byte) handler.Reply {
// ...
// 投递给到 executor
d.executor.Entrance() <- &cmd
// 监听 chan,直到接收到返回的 reply
return <-cmd.Receiver()
}
1.3 Executor
executor 是全局单例运行的,通过串行执行的 run 方法进行数据操作流程的收口,保证底层数据存储介质一定只会在同一个协程中进行读写,不存在任何并发行为,因此底层存储模型可以实现无锁化.
有关 executor 的实现类代码位于 database/executor.go,其中包含如何核心成员属性:
• ctx、cancel:生命周期控制
• ch:从上游 trigger 接收指令的入口 channel
• cmdHandlers:各种指令类型对应的处理函数
• dataStore:更底层的存储介质
• gcTicker:定时驱动执行过期数据回收的定时器
// 具体的数据库执行层实现类
type DBExecutor struct {
// 生命周期控制
ctx context.Context
cancel context.CancelFunc
// 用户接收 cmd 的入口
ch chan *Command
// 根据 cmd 类型映射到具体的处理方法
cmdHandlers map[CmdType]CmdHandler
// 数据存储介质
dataStore DataStore
// 驱动定期执行过期数据回收的定时器
gcTicker *time.Ticker
}
// executor 持续运行
func (e *DBExecutor) run() {
for {
select {
// ...
// 每隔一段时间批量回收过期的 key
case <-e.gcTicker.C:
e.dataStore.GC()
// 接收来自 trigger 发送的 cmd,并执行具体的操作
case cmd := <-e.ch:
cmdFunc, ok := e.cmdHandlers[cmd.cmd]
// ...
// 惰性回收机制实现过期 key 删除
e.dataStore.ExpirePreprocess(string(cmd.args[0]))
// 将执行结果通过 receiver chan 发送给 trigger
cmd.receiver <- cmdFunc(cmd)
}
}
}
1.4 DataStore
针对数据存储介质的实现类是 kvStore,其中包含两个 map 和一个有序 set:
• data map: 记录了 redis 中的全量 key-value 数据
• expiredAt map: 记录了各个 key 对应的过期时间
• expireTimeWheel 有序 set: 基于过期时间实现 key 的有序存储,便于后续通过时间范围批量回收过期数据的操作
这部分代码位于 datastore/kv_store.go:
type KVStore struct {
// 存储 keyvalue 数据的 map
data map[string]interface{}
// 记录 key 过期时间的 map
expiredAt map[string]time.Time
// 各 key 根据过期时间排序的有序集合
expireTimeWheel SortedSet
// 持久化模块
persister handler.Persister
}
2 数据类型
在 goredis 中,暂时只支持了 redis 中最简单的五种数据类型,对应的方法和用途如下:
2.1 string
对于其中最简单的字符串 string 类型,需要支持将文本内容以字节数组形式输出的核心方法:
// 字符串数据类型接口定义
type String interface {
// 将字符串内容转为字节数组表达形式
Bytes() []byte
database.CmdAdapter
}
// 字符串数据类型实现类
type stringEntity struct {
// key value
key, str string
}
// 将字符串内容转为字节数组表达形式
func (s *stringEntity) Bytes() []byte {
return []byte(s.str)
}
2.2 list
针对列表 list 类型,需要支持一系列数据的追加和弹出操作,并支持根据指定索引获取特定范围内的数据:
// 列表数据类型接口定义
type List interface {
// 从左侧推入数据
LPush(value []byte)
// 从左侧弹出数据
LPop(cnt int64) [][]byte
// 从右侧推入数据
RPush(value []byte)
// 从右侧弹出数据
RPop(cnt int64) [][]byte
// 列表中的数据个数
Len() int64
// 在指定索引范围内遍历列表
Range(start, stop int64) [][]byte
database.CmdAdapter
}
// 列表数据类型具体实现
type listEntity struct {
key string
data [][]byte
}
// 从左侧推入数据
func (l *listEntity) LPush(value []byte) {
l.data = append([][]byte{value}, l.data...)
}
// 从左侧弹出数据
func (l *listEntity) LPop(cnt int64) [][]byte {
if int64(len(l.data)) < cnt {
return nil
}
poped := l.data[:cnt]
l.data = l.data[cnt:]
return poped
}
// 从右侧推入数据
func (l *listEntity) RPush(value []byte) {
l.data = append(l.data, value)
}
// 从右侧弹出数据
func (l *listEntity) RPop(cnt int64) [][]byte {
if int64(len(l.data)) < cnt {
return nil
}
poped := l.data[int64(len(l.data))-cnt:]
l.data = l.data[:int64(len(l.data))-cnt]
return poped
}
// 列表中的数据个数
func (l *listEntity) Len() int64 {
return int64(len(l.data))
}
// 在指定索引范围内遍历列表
func (l *listEntity) Range(start, stop int64) [][]byte {
if stop == -1 {
stop = int64(len(l.data) - 1)
}
if start < 0 || start >= int64(len(l.data)) {
return nil
}
if stop < 0 || stop >= int64(len(l.data)) || stop < start {
return nil
}
return l.data[start : stop+1]
}
2.3 set
接下来是集合 set 类型,底层通过一个特殊定义的 map 来实现数据去重的功能:
// 集合数据类型的接口定义
type Set interface {
// 向集合中添加元素
Add(value string) int64
// 判断集合中是否存在指定元素
Exist(value string) int64
// 从集合中移除元素
Rem(value string) int64
database.CmdAdapter
}
// 集合数据类型的实现类
type setEntity struct {
// 整个集合维度对应的键
key string
// 集合数据载体
container map[string]struct{}
}
// 向集合中添加元素
func (s *setEntity) Add(value string) int64 {
if _, ok := s.container[value]; ok {
return 0
}
s.container[value] = struct{}{}
return 1
}
// 判断集合中是否存在指定元素
func (s *setEntity) Exist(value string) int64 {
if _, ok := s.container[value]; ok {
return 1
}
return 0
}
// 从集合中移除元素
func (s *setEntity) Rem(value string) int64 {
if _, ok := s.container[value]; ok {
delete(s.container, value)
return 1
}
return 0
}
2.4 hashmap
字典 hashmap 类型的底层就是对 golang map 数据结构的封装:
// map 数据类型的接口定义
type HashMap interface {
// 将 kv 对写入 map
Put(key string, value []byte)
// 从 map 中获取 k 对应的 v
Get(key string) []byte
// 从 map 中删除 k
Del(key string) int64
database.CmdAdapter
}
// map 数据类型的实现类
type hashMapEntity struct {
// 整个 map 维度对应的 key
key string
data map[string][]byte
}
// 将 kv 对写入 map
func (h *hashMapEntity) Put(key string, value []byte) {
h.data[key] = value
}
// 从 map 中获取 k 对应的 v
func (h *hashMapEntity) Get(key string) []byte {
return h.data[key]
}
// 从 map 中删除 k
func (h *hashMapEntity) Del(key string) int64 {
if _, ok := h.data[key]; !ok {
return 0
}
delete(h.data, key)
return 1
}
2.5 zset
zset(SortedSet)为有序集合,支持将存入的元素按照分数高低进行排序.
goredis 中通过跳表 skiplist 来实现 zset,此处在文字描述上重点展示其实现集合去重功能的一些特殊步骤,有关跳表结构本身的实现原理则不作赘述,感兴趣的同学可以参见我之前发表的文章——基于golang从零到一实现跳表.
首先是关于有序集合 zset 的接口定义,代码位于 struct/sorted_set.go:
// 有序集合数据类型的接口定义
type SortedSet interface {
// 往有序集合中添加元素
Add(score int64, member string)
// 从有序集合中移除元素
Rem(member string) int64
// 获取指定分数范围内的元素
Range(score1, score2 int64) []string
database.CmdAdapter
}
在跳表 skiplist 类定义中,包含如下核心成员属性:
• key: 整个 zset 维度对应的 key
• memberToScore: 记录了每个元素对应的分数. 通过该 map 也能实现 member 的去重
• scoreToNode: 记录分数与跳表节点的映射关系
• head: 跳表头节点
• rander: 随机数生成器,用于在插入新节点时决定其高度
// 采用跳表作为有序集合数据类型的实现类
type skiplist struct {
// 整个有序集合维度对应的键
key string
// 由分数映射到跳表中的节点
scoreToNode map[int64]*skipnode
// 由元素映射到其对应的分数
memberToScore map[string]int64
// 跳表的头节点
head *skipnode
// 随机数生成器
rander *rand.Rand
}
其中,每个分数对应一个跳表节点,里面会通过一个 members 集合,记录对应于该分数下的所有元素:
// 跳表中节点的实现类
type skipnode struct {
// 节点对应的分数
score int64
// 节点中包含的 member 集合
members map[string]struct{}
// 后继节点列表
nexts []*skipnode
}
由于 zset 本质上还是一个集合,所以在插入一个新元素时,首先需要判定其之前是否已经存在于有序集合中,如果是的话,需要对老数据进行删除,然后再遵循跳表的操作流程完成数据插入.
// 往跳表中添加元素.
func (s *skiplist) Add(score int64, member string) {
// member 此前已存在
oldScore, ok := s.memberToScore[member]
if ok {
// 如果分数相同,则无需处理,直接返回
if oldScore == score {
return
}
// 分数不同,则需要先移除 member 旧数据,再执行添加流程
s.rem(oldScore, member)
}
// 建立 member 映射到 score 的关系
s.memberToScore[member] = score
// 判断 score 对应的跳表节点是否存在
node, ok := s.scoreToNode[score]
if ok {
// 节点已存在,则在节点对应的 member 集合中添加 member 即可
node.members[member] = struct{}{}
return
}
// 构造出 score 对应的跳表节点
// roll 出新节点高度
height := s.roll()
for int64(len(s.head.nexts)) < height+1 {
s.head.nexts = append(s.head.nexts, nil)
}
// 构造节点实例
inserted := newSkipnode(score, height+1)
inserted.members[member] = struct{}{}
s.scoreToNode[score] = inserted
// 将新节点插入跳表
move := s.head
for i := height; i >= 0; i-- {
for move.nexts[i] != nil && move.nexts[i].score < score {
move = move.nexts[i]
continue
}
inserted.nexts[i] = move.nexts[i]
move.nexts[i] = inserted
}
}
roll 方法中通过随机数生成器 rander,随机生成新插入节点的高度:
// 随机抛出新节点的高度
func (s *skiplist) roll() int64 {
var level int64
for s.rander.Intn(2) > 0 {
level++
}
return level
}
在删除元素时,遵循如下步骤:
• 首先根据元素找到对应的分数
• 通过分数找到对应的跳表节点
• 从跳表节点的元素集合中删去该元素
• 倘若该元素为跳表节点中的最后一个元素,则遵循跳表操作流程完成该节点的删除
// 从跳表中移除元素
func (s *skiplist) Rem(member string) int64 {
// 判断 member 之前是否存在
score, ok := s.memberToScore[member]
if !ok {
return 0
}
// member 存在,进入专门的移除流程
s.rem(score, member)
return 1
}
// 从跳表中移除指定 score 下特定的某个 member
func (s *skiplist) rem(score int64, member string) {
// 删除 member 与 score 的映射关系
delete(s.memberToScore, member)
// 获取 member 所从属的跳表节点
skipnode := s.scoreToNode[score]
// 从跳表节点中删除该 member
delete(skipnode.members, member)
if len(skipnode.members) > 0 {
return
}
// 删除完 member 后,该跳表节点中数据为空,则需要回收该节点
delete(s.scoreToNode, score)
move := s.head
for i := len(s.head.nexts) - 1; i >= 0; i-- {
for move.nexts[i] != nil && move.nexts[i].score < score {
move = move.nexts[i]
}
if move.nexts[i] == nil || move.nexts[i].score > score {
continue
}
remed := move.nexts[i]
move.nexts[i] = move.nexts[i].nexts[i]
remed.nexts[i] = nil
}
}
最后是通过指定分数区间获取范围内元素的方法:
// 根据指定分数范围 [score1,score2],返回对应的元素列表
func (s *skiplist) Range(score1, score2 int64) []string {
// 右边界为 -1,视为上不封顶
if score2 == -1 {
score2 = math.MaxInt64
}
if score1 > score2 {
return []string{}
}
// 遍历跳表,找到首个 >= score1 的节点
move := s.head
for i := len(s.head.nexts) - 1; i >= 0; i-- {
for move.nexts[i] != nil && move.nexts[i].score < score1 {
move = move.nexts[i]
}
}
// 来到了 level0 层,move.nexts[i] 如果存在,就是首个 >= score1 的元素
if len(move.nexts) == 0 || move.nexts[0] == nil {
return []string{}
}
res := []string{}
for move.nexts[0] != nil && move.nexts[0].score >= score1 && move.nexts[0].score <= score2 {
for member := range move.nexts[0].members {
res = append(res, member)
}
move = move.nexts[0]
}
return res
}
3 指令执行
接下来,以 get 和 set 指令执行为例进行流程串联.
3.1 Trigger
首先,数据触发层 trigger 接收到来自 handler 分发的指令后,会对指令内容合法性进行校验,然后对指令进行封装,并通过 executor 暴露出的 channel 将其投递到执行层:
func (d *DBTrigger) Do(ctx context.Context, cmdLine [][]byte) handler.Reply {
// 1 指令校验.
// 1.1 长度合法性校验
if len(cmdLine) < 2 {
return handler.NewErrReply(fmt.Sprintf("invalid cmd line: %v", cmdLine))
}
// 1.2 指令类型校验
cmdType := CmdType(cmdLine[0])
if !d.executor.ValidCommand(cmdType) {
return handler.NewErrReply(fmt.Sprintf("unknown cmd '%s'", cmdLine[0]))
}
// 2 指令封装
cmd := Command{
ctx: ctx,
cmd: cmdType,
args: cmdLine[1:],
receiver: make(CmdReceiver),
}
// 3 指令投递
d.executor.Entrance() <- &cmd
// 4 监听 chan,直到接收到返回的 reply
return <-cmd.Receiver()
}
3.2 Executor
数据执行层 executor 属于全局单例执行模式,通过单一协程运行 run 方法,持续接收来自 trigger 层投递的指令,根据指令类型映射到数据存储介质模块中对应的执行方法:
func (e *DBExecutor) run() {
for {
select {
// ...
// 2 接收处理 cmd
case cmd := <-e.ch:
cmdFunc, ok := e.cmdHandlers[cmd.cmd]
// ...
cmd.receiver <- cmdFunc(cmd)
}
}
}
3.3 KVStore
数据存储介质中封装了一系列具体的数据操作方法,以 get 和 set 方法为例进行展示,代码位于 datastore/kv_store.go 文件中.
get 方法相对比较简单,首先根据 key 从 data map 中获取 value,然后将其断言成 string 类型并返回:
func (k *KVStore) Get(cmd *database.Command) handler.Reply {
args := cmd.Args()
key := string(args[0])
v, err := k.getAsString(key)
// ...
return handler.NewBulkReply(v.Bytes())
}
// 从 data map 中读取数据
func (k *KVStore) getAsString(key string) (String, error) {
v, ok := k.data[key]
// ...
str, ok := v.(String)
// ...
return str, nil
}
set 的本质流程是往 data map 中写入一笔 kv 数据,但还需要兼顾考虑 ex 和 nx 的附属指令:
func (k *KVStore) Set(cmd *database.Command) handler.Reply {
args := cmd.Args()
key := string(args[0])
value := string(args[1])
// 支持 NX EX
var (
insertStrategy bool
ttlStrategy bool
ttlSeconds int64
ttlIndex = -1
)
for i := 2; i < len(args); i++ {
flag := strings.ToLower(string(args[i]))
switch flag {
// 处理带 only insert 模式的 nx 指令
case "nx":
insertStrategy = true
// 处理带过期时间的 ex 指令
case "ex":
// 重复的 ex 指令
if ttlStrategy {
return handler.NewSyntaxErrReply()
}
if i == len(args)-1 {
return handler.NewSyntaxErrReply()
}
ttl, err := strconv.ParseInt(string(args[i+1]), 10, 64)
// ...
ttlStrategy = true
ttlSeconds = ttl
ttlIndex = i
i++
// ...
}
}
// 将 args 剔除 ex 部分,进行持久化
if ttlIndex != -1 {
args = append(args[:ttlIndex], args[ttlIndex+2:]...)
}
// 将 kv 数据写入到 data map
affected := k.put(key, value, insertStrategy)
// 过期时间设置
if affected > 0 && ttlStrategy {
expireAt := lib.TimeNow().Add(time.Duration(ttlSeconds) * time.Second)
_cmd := [][]byte{[]byte(database.CmdTypeExpireAt), []byte(key), []byte(lib.TimeSecondFormat(expireAt))}
_ = k.expireAt(cmd.Ctx(), _cmd, key, expireAt) // 其中会完成 ex 信息的持久化
}
// 过期时间处理
if affected > 0 {
// 指令持久化
k.persister.PersistCmd(cmd.Ctx(), append([][]byte{[]byte(database.CmdTypeSet)}, args...))
return handler.NewIntReply(affected)
}
return handler.NewNillReply()
}
// 将 kv 数据写入 data map
func (k *KVStore) put(key, value string, insertStrategy bool) int64 {
if _, ok := k.data[key]; ok && insertStrategy {
return 0
}
k.data[key] = NewString(key, value)
return 1
}
4 过期回收
接下来,我们针对 goredis 中数据过期时间设置以及过期数据回收的流程进行串联.
4.1 数据结构
在 kvStore 的类定义中,通过两个成员属性记录了有关过期时间的信息:
• expiredAt: 记录数据 key 对应的过期时间
• expireTimeWheel: 一个特殊的有序集合,其中根据过期时间对 key 进行排序
type KVStore struct {
// ...
expiredAt map[string]time.Time
expireTimeWheel SortedSet
// ...
}
基于以上设计,我们对数据过期流程进行如下设计:
• 设置数据过期时间流程: 则分别记录 key 对应过期时间,并且将 key 添加到基于时间排序的有序集合中
• 回收过期数据流程: (1)在执行某个 key 的操作指令时,先通过过期时间 map 获取其过期信息,倘若数据已经过期,则对其进行删除(惰性删除机制);(2)通过定时任务驱动,定期对有序集合中的已过期的数据进行回收.
4.2 设置ttl
在 goredis 的实现中,涉及到设置数据过期时间的指令包括 set(带 ex)、expire、expireAt:
以 expire 指令为例,首先推算出数据的过期时间点,然后将 key 对应的过期时间添加到过期时间 map,并将其添加到过期时间有序集合中:
func (k *KVStore) Expire(cmd *database.Command) handler.Reply {
args := cmd.Args()
key := string(args[0])
ttl, err := strconv.ParseInt(string(args[1]), 10, 64)
// ...
expireAt := lib.TimeNow().Add(time.Duration(ttl) * time.Second)
_cmd := [][]byte{[]byte(database.CmdTypeExpireAt), []byte(key), []byte(lib.TimeSecondFormat(expireAt))}
return k.expireAt(cmd.Ctx(), _cmd, key, expireAt)
}
func (k *KVStore) expireAt(ctx context.Context, cmd [][]byte, key string, expireAt time.Time) handler.Reply {
k.expire(key, expireAt)
k.persister.PersistCmd(ctx, cmd) // 持久化
return handler.NewOKReply()
}
func (k *KVStore) expire(key string, expiredAt time.Time) {
if _, ok := k.data[key]; !ok {
return
}
k.expiredAt[key] = expiredAt
k.expireTimeWheel.Add(expiredAt.Unix(), key)
}
4.3 过期回收
数据执行层 executor 运行过程中,通过 for + select 运行模式同时监听 gcTicker 和 cmdChan.
在接收到来自 gcTicker 信号时,会执行 GC 方法,通过过期时间有序集合获取到所有已过期的数据 key,然后一次性完成过期数据的回收:
func (e *DBExecutor) run() {
for {
select {
// ...
// 每隔 1 分钟批量一次过期的 key
case <-e.gcTicker.C:
e.dataStore.GC()
// ...
}
}
}
func (k *KVStore) GC() {
// 找出当前所有已过期的 key,批量回收
nowUnix := lib.TimeNow().Unix()
for _, expiredKey := range k.expireTimeWheel.Range(0, nowUnix) {
k.expireProcess(expiredKey)
}
}
func (k *KVStore) expireProcess(key string) {
delete(k.expiredAt, key)
delete(k.data, key)
k.expireTimeWheel.Rem(key)
}
在接收到来自 cmdChan 的操作指令时,则会针对 key 进行过期时间校验,倘若数据已过期,则在此时推动完成数据的回收:
func (e *DBExecutor) run() {
for {
select {
// ...
case cmd := <-e.ch:
// ...
// 惰性机制实现过期 key 删除
e.dataStore.ExpirePreprocess(string(cmd.args[0]))
// ...
}
}
}
func (k *KVStore) ExpirePreprocess(key string) {
expiredAt, ok := k.expiredAt[key]
if !ok {
return
}
if expiredAt.After(lib.TimeNow()) {
return
}
k.expireProcess(key)
}
5 展望
至此为存储引擎篇的全部内容,在此对本系列内容做个小结和展望:
• 基于go实现redis之指令分发(已完成): 聚焦介绍 goredis 服务端如何启动和运行,并在接收客户端请求后实现指令协议的解析和分发
• 基于go实现redis之存储引擎(已完成): 聚焦介绍数据存储层中单协程无锁化执行框架,各类基本数据类型的底层实现细节,以及过期数据的惰性和定期回收机制
• 基于go实现redis之数据持久化(待填坑): 介绍 goredis 关于 aof 持久化机制的实现以及有关于aof重写策略的执行细节