0 前言
本着学习和实践的目的,从本期开始,我将和大家一起走进一个新的专题—— 【基于go实现redis】 .
该专题围绕着我的一个开源项目——goredis展开. 由于个人水平有限,如有不到位之处,欢迎批评指正:https://github.com/xiaoxuxiansheng/goredis
本系列计划分为四篇内容:
• 基于go实现redis之主干框架(本篇): 在宏观视角下纵览 goredis 整体架构,梳理各模块间的关联性
• 基于go实现redis之指令分发(待填坑): 聚焦介绍 goredis 服务端如何启动和运行,并在接收客户端请求后实现指令协议的解析和分发
• 基于go实现redis之存储引擎(待填坑): 聚焦介绍数据存储层中单协程无锁化执行框架,各类基本数据类型的底层实现细节,以及过期数据的惰性和定期回收机制
• 基于go实现redis之数据持久化(待填坑): 介绍goredis关于aof持久化机制的实现以及有关于aof重写策略的执行细节
(此外,这里需要特别提及一下,在学习过程中,很大程度上需借助了 hdt3213 系列博客和项目的帮助,在此致敬一下:https://www.cnblogs.com/Finley/category/1598973.html)
1 实现目标
redis(https://redis.io/)是一款高性能的key-value内存存储组件,广泛应用于数据缓存、分布式协调、消息队列等使用场景中.
我此前也做过一些有关redis的技术分享,主要围绕着分布式锁和消息队列的主题展开:
有关redis的技术细节博大精妙,受限于个人水平,本系列中我只能说通过go语言仿制出一个“乞丐版”redis,尝试窥探其中的冰山一角. 项目中涉及实现到的功能点包括:
• tcp服务端搭建
• 基于go自带netpoller实现io多路复用
• 还原redis数据解析协议
• 常规数据类型与操作指令支持
• string——get/mget/set/mset
• list——lpush/lpop/rpush/rpop/lrange
• set——sadd/sismember/srem
• hashmap——hset/hget/hdel
• sortedset——zadd/zrem/zrangebyscore
• 数据持久化机制
• appendonlyfile落盘与重写
除此之外,为了效仿redis. 项目在与底层数据模型交互时,会基于单协程模型实现. 数据存储模型则是由一个无锁化的map结构实现.
2 架构总览
如上图所示,goredis的运行架构在纵向上可以拆分为三个模块:
• 服务运行层: 支持goredis作为tcp服务端,接收和处理来自客户端的连接与请求
• 应用程序application
• 服务端server
• 指令分发层: 在服务端接收到请求后,遵循Redis Serialization Protocol转换为操作指令,并分发到存储引擎层
• 指令分发器handler
• 协议解析器parser
• 存储引擎层: 与存储介质交互,完成数据存储、过期回收以及持久化处理
• 数据持久化persister
• 存储介质kvstore
• 数据库执行层executor
• 数据库触发层trigger
在宏观上,goredis接收并处理一笔请求的途径链路为:
server->handler(parser)->dbtrigger->dbexecutor->kvstore(persister)
goredis项目遵循依赖注入的实现风格,基于dig(https://github.com/uber-go/dig)实现各模块和组件的注入与管理,具体使用方式可以参见我之前的文章——低配 Spring—Golang IOC 框架 dig 原理解析
在文件 app/factory.go 中作了统一收口,可以基于全局视角一览各部分参与的模块组件:
var container = dig.New()
func init() {
/**
其它
**/
// 配置加载 conf
_ = container.Provide(SetUpConfig)
_ = container.Provide(PersistThinker)
// 日志打印 logger
_ = container.Provide(log.GetDefaultLogger)
/**
存储引擎层
**/
// 数据持久化
_ = container.Provide(persist.NewPersister)
// 存储介质
_ = container.Provide(datastore.NewKVStore)
// 执行器
_ = container.Provide(database.NewDBExecutor)
// 触发器
_ = container.Provide(database.NewDBTrigger)
/**
指令分发层
**/
// 协议解析器
_ = container.Provide(protocol.NewParser)
// 指令分发器
_ = container.Provide(handler.NewHandler)
/**
服务端运行层
**/
_ = container.Provide(server.NewServer)
}
3 服务运行层
服务运行层支撑了整个goredis应用的运行,包含两个核心子模块:
• Application: 对应为goredis应用程序的抽象
• Server: 对应为goredis接收和响应客户端请求的运行服务端
3.1 应用程序
goredis应用程序启动的入口代码位于main.go,核心步骤包括:
• 构造server与application实例
• 一键运行application
import "github.com/xiaoxuxiansheng/goredis/app"
func main() {
// 1 构造 server 实例
server, err := app.ConstructServer()
if err != nil {
panic(err)
}
// 2 构造 application 实例
app := app.NewApplication(server, app.SetUpConfig())
defer app.Stop()
// 3 运行 application
if err := app.Run(); err != nil {
panic(err)
}
}
有关 application类定义代码位于:app/app.go,包含两个核心成员属性:
• server: 持有的服务端实例
• conf: 聚合了使用方关于goredis的定制化配置
type Application struct {
server *server.Server
conf *Config
}
func (a *Application) Run() error {
return a.server.Serve(a.conf.Address())
}
func (a *Application) Stop() {
a.server.Stop()
}
3.2 服务端
服务端类定义代码位于 server/server.go,其中持有了指令分发模块handler的引用:
type Server struct {
// ...
handler Handler
// ...
}
server运行时,会启动handler实例,并且启动tcp服务并监听tcp端口,将每个到来的tcp连接移交给handler层进行处理:
func (s *Server) Serve(address string) error {
// 1 启动 handler
s.handler.Start()
// ...
// 2 运行 tcp 服务
s.listenAndServe(listener, closec)
// ...
}
func (s *Server) listenAndServe(listener net.Listener, closec chan struct{}) {
// ...
// io 多路复用模型,goroutine for per conn
for {
// 接收到来的 tcp 连接
conn, err := listener.Accept()
// ...
// 为每个到来的 conn 分配一个 goroutine 处理
pool.Submit(func() {
// ...
s.handler.Handle(ctx, conn)
})
}
// ...
}
func (s *Server) Stop() {
s.stopOnce.Do(func() {
close(s.stopc)
})
}
其中,pool.Submit方法类似于go func一键启动协程,只不过本项目中采用的是协程池模式:
import (
"runtime/debug"
"strings"
"github.com/panjf2000/ants"
)
var pool *ants.Pool
func init() {
_pool, err := ants.NewPool(50000, ants.WithPanicHandler(func(i interface{}) {
// ...
}))
if err != nil {
panic(err)
}
pool = _pool
}
func Submit(task func()) {
pool.Submit(task)
}
协程池使用到的是ants(https://github.com/panjf2000/ants),之前我也发表了一篇对应的技术文章:Golang 协程池 Ants 实现原理
4 指令分发层
指令分发层的核心作用是,接收到请求后,遵循Redis协议将其解析为操作指令,然后将指令分发到存储引擎层进行执行.
4.1 指令分发器
指令分发器的接口定义代码位于 server/server.go,包含两个核心方法:
• Start:启动handler
• Handle:持续处理到来的一笔连接
// 逻辑处理器
type Handler interface {
Start() error // 启动 handler
// 处理到来的每一笔 tcp 连接
Handle(ctx context.Context, conn net.Conn)
// 关闭处理器
Close()
}
关于Handler的实现类位于handler/handler.go,其中包含三个核心成员属性:
• db: 对下游存储引擎层的抽象
• parser: 遵循redis协议,将到来的请求参数解析成对应的redis指令
• persister: handler初启动时,通过persister读取此前的持久化内容,将其分发到存储引擎层还原出对应的数据
type Handler struct {
// ...
db DB
parser Parser
persister Persister
// ...
}
启动Handler的Start方法:
func (h *Handler) Start() error {
// 加载持久化文件,还原内容
reloader, err := h.persister.Reloader()
// ...
// 读取持久化文件内容,还原内存数据库
h.handle(SetLoadingPattern(context.Background()), newFakeReaderWriter(reloader))
return nil
}
处理到来连接的Handle方法——通过parser将请求参数转化为指令,然后分发给db:
// 处理到来的 tcp 连接
func (h *Handler) Handle(ctx context.Context, conn net.Conn) {
// ...
h.handle(ctx, conn)
}
func (h *Handler) handle(ctx context.Context, conn io.ReadWriter) {
// 借助 protocol parser 将到来的指令转而通过 stream channel 输出
stream := h.parser.ParseStream(conn)
for {
select {
// ...
// 读取 stream channel 中到来的指令
case droplet := <-stream:
if err := h.handleDroplet(ctx, conn, droplet); err != nil {
// ...
}
}
}
}
// 处理每一笔指令
func (h *Handler) handleDroplet(ctx context.Context, conn io.ReadWriter, droplet *Droplet) error {
// ...
// 请求指令类型转换
multiReply, ok := droplet.Reply.(MultiReply)
// ...
// 通过 db 引擎处理请求指令
if reply := h.db.Do(ctx, multiReply.Args()); reply != nil {
// 返回响应结果
_, _ = conn.Write(reply.ToBytes())
// ...
}
// ...
}
func (h *Handler) Close() {
// ...
h.db.Close()
h.persister.Close()
}
4.2 协议解析器
协议解析器的接口定义代码位于handler/struct.go,通过ParseStream方法将连接转换成channel的形式,把每一笔请求参数转为redis操作指令,并通过channel传递给handler进行处理:
// 协议解析器
type Parser interface {
ParseStream(reader io.Reader) <-chan *Droplet
}
Parser实现类代码位于protocol/parser.go:
type Parser struct {
lineParsers map[byte]lineParser
// ...
}
// 将连接转为 stream channel 形式,并通过异步协程执行 parse 方法持续往 channel 中传送到来的指令
func (p *Parser) ParseStream(reader io.Reader) <-chan *handler.Droplet {
ch := make(chan *handler.Droplet)
pool.Submit(
func() {
p.parse(reader, ch)
})
return ch
}
func (p *Parser) parse(rawReader io.Reader, ch chan<- *handler.Droplet) {
reader := bufio.NewReader(rawReader)
for {
// 逐行读取数据
firstLine, err := reader.ReadBytes('\n')
// 解析请求参数,分发给对应的指令解析函数
firstLine = bytes.TrimSuffix(firstLine, []byte{'\r', '\n'})
lineParseFunc, ok := p.lineParsers[firstLine[0]]
// ...
// 将解析后的指令传送到 stream channel
ch <- lineParseFunc(firstLine, reader)
}
}
5 存储引擎层
存储引擎层模块封装了有关数据存储交互的流程,其中分为四个核心子模块:
• 触发层trigger: 负责接收来自handler的操作指令,将其通过channel移交给executor
• 执行层executor: 采用单协程模式运行,负责完成存储数据的crud操作
• 存储介质datastore: 存储数据所在之处,包含各类数据类型的实现模型
• 持久化模块persister: 支持内存数据的持久化和重加载功能
5.1 数据库触发层
有关整个数据库存储引擎的接口定义位于 handler/struct.go,通过Do方法完成指令的执行,并接收其响应结果:
type DB interface {
Do(ctx context.Context, cmdLine [][]byte) Reply
Close()
}
对DB的实现类是数据库触发层DBTrigger,代码位于 database/trigger.go,其中依赖了executor实例:
// 数据库触发器
type DBTrigger struct {
// ...
executor Executor
}
DBTrigger接收到指令,会对其进行包装,然后通过channel(executor.Entrance)将其发送给全局单例的executor中:
// 处理指令
func (d *DBTrigger) Do(ctx context.Context, cmdLine [][]byte) handler.Reply {
// ...
// 组装指令
cmd := Command{
ctx: ctx,
cmd: cmdType,
args: cmdLine[1:],
receiver: make(CmdReceiver),
}
// 将指令投递到 executor
d.executor.Entrance() <- &cmd
// 接收来自 executor 返回的 reply
return <-cmd.Receiver()
}
5.2 数据库执行层
有关数据执行器接口的定义代码位于 database/struct.go,其会通过Entrance入口完成指令的接收:
type Executor interface {
// 执行器入口,用于接收指令
Entrance() chan<- *Command
// 校验指令是否合法
ValidCommand(cmd CmdType) bool
Close()
}
DBExecutor是数据库执行器的实现类,代码位于 database/executor.go,核心成员属性包括:
• ch:通过该channel接收到来自上游trigger发送的指令
• cmdHandlers:基于指令类型映射到对应的执行方法
• dataStore:真正存储数据的地方
• gcTicker:用于驱动定期回收过期数据的定时器
type DBExecutor struct {
// ...
// 接收来自 trigger 指令的 channel
ch chan *Command
// 指令分发
cmdHandlers map[CmdType]CmdHandler
// 数据存储介质
dataStore DataStore
// 回收过期数据的定时器
gcTicker *time.Ticker
}
func (e *DBExecutor) Entrance() chan<- *Command {
return e.ch
}
DBExecutor会以全局单例的形式存在,并在其初始化时启动run方法,持续接收来自trigger和gcTicker的指令,对dataStore中的数据进行操作. 正因为单协程运行的executor是dataStore的唯一操作入口,因此存储数据时可以放心地采用无锁化模型
// 数据库执行层单协程运行模式
func (e *DBExecutor) run() {
for {
// ...
// 定期批量回收一次过期数据
case <-e.gcTicker.C:
e.dataStore.GC()
// 接收并处理到来的指令
case cmd := <-e.ch:
// 获取指令对应的处理函数
cmdFunc, ok := e.cmdHandlers[cmd.cmd]
// ...
// 基于懒加载机制,对 key 进行回收
e.dataStore.ExpirePreprocess(string(cmd.args[0]))
// 执行指令处理函数,并将执行结果通过 receiver 发送给 trigger
cmd.receiver <- cmdFunc(cmd)
}
}
}
5.3 数据存储介质
存储介质DataStore的接口定义代码位于 database/struct.go,其中包含了各类数据操作指令犯法:
type DataStore interface {
// ...
// 过期相关指令
GC()
Expire(*Command) handler.Reply
ExpireAt(*Command) handler.Reply
// string 数据类型
Get(*Command) handler.Reply
Set(*Command) handler.Reply
// ...
// list 数据类型
LPush(*Command) handler.Reply
// ...
// set 数据类型
SAdd(*Command) handler.Reply
// ...
// hashmap 数据类型
HSet(*Command) handler.Reply
// ...
// sortedset 数据类型
ZAdd(*Command) handler.Reply
// ...
}
dataStore的实现类为KVStore,对应代码位于 database/kv_store.go:
• data:实际存储数据的map结构,因为不存在并发,所以无需加锁保护
• expiredAt:记录了每个key对应的过期时间
type KVStore struct {
// 实际存储数据的 map
data map[string]interface{}
// 记录 key 过期时间的 map
expiredAt map[string]time.Time
// 执行 key 过期回收任务的时间轮
expireTimeWheel SortedSet
// 数据持久化模块
persister handler.Persister
}
5.4 数据持久化模块
数据持久化模块的接口定义代码位于 handler/persist.go,核心方法包括:
• Reloader:获取用于重加载数据的reloader
• PersistCmd:完成一笔指令的持久化
type Persister interface {
// 加载持久化数据的 loader
Reloader() (io.ReadCloser, error)
// 对指令进行持久化
PersistCmd(ctx context.Context, cmd [][]byte)
Close()
}
goredis中,针对持久化模块仅仅实现了aof模式,对应代码位于 persist/aof.go,相关内容将在本系列第4篇中展开.
type aofPersister struct {
// ...
// 接收持久化指令的通道
buffer chan [][]byte
// aof 文件
aofFile *os.File
// aof 文件名
aofFileName string
// aof 持久化策略
appendFsync appendSyncStrategy
// aof 重写策略
autoAofRewriteAfterCmd int64
aofCounter atomic.Int64
// ...
}
6 展望
至此,本篇正文结束. 本期向大家介绍的是goredis系列的开篇之作,在此对未来拟展开部分内容作个展望:
• 基于go实现redis之主干框架(已完成): 在宏观视角下纵览 goredis 整体架构,梳理各模块间的关联性
• 基于go实现redis之指令分发(待填坑): 聚焦介绍 goredis 服务端如何启动和运行,并在接收客户端请求后实现指令协议的解析和分发
• 基于go实现redis之存储引擎(待填坑): 聚焦介绍数据存储层中单协程无锁化执行框架,各类基本数据类型的底层实现细节,以及过期数据的惰性和定期回收机制
• 基于go实现redis之数据持久化(待填坑): 介绍goredis关于aof持久化机制的实现以及有关于aof重写策略的执行细节