基于go实现redis之主干框架

文摘   科技   2024-04-21 14:47   北京  

0 前言

本着学习和实践的目的,从本期开始,我将和大家一起走进一个新的专题—— 【基于go实现redis】 .

该专题围绕着我的一个开源项目——goredis展开. 由于个人水平有限,如有不到位之处,欢迎批评指正:https://github.com/xiaoxuxiansheng/goredis

本系列计划分为四篇内容:

  • • 基于go实现redis之主干框架(本篇): 在宏观视角下纵览 goredis 整体架构,梳理各模块间的关联性

  • • 基于go实现redis之指令分发(待填坑): 聚焦介绍 goredis 服务端如何启动和运行,并在接收客户端请求后实现指令协议的解析和分发

  • • 基于go实现redis之存储引擎(待填坑): 聚焦介绍数据存储层中单协程无锁化执行框架,各类基本数据类型的底层实现细节,以及过期数据的惰性和定期回收机制

  • • 基于go实现redis之数据持久化(待填坑): 介绍goredis关于aof持久化机制的实现以及有关于aof重写策略的执行细节

(此外,这里需要特别提及一下,在学习过程中,很大程度上需借助了 hdt3213 系列博客和项目的帮助,在此致敬一下:https://www.cnblogs.com/Finley/category/1598973.html)

 

1 实现目标

redis(https://redis.io/)是一款高性能的key-value内存存储组件,广泛应用于数据缓存、分布式协调、消息队列等使用场景中.

我此前也做过一些有关redis的技术分享,主要围绕着分布式锁和消息队列的主题展开:

 

有关redis的技术细节博大精妙,受限于个人水平,本系列中我只能说通过go语言仿制出一个“乞丐版”redis,尝试窥探其中的冰山一角. 项目中涉及实现到的功能点包括:

  • • tcp服务端搭建

  •     • 基于go自带netpoller实现io多路复用

  •     • 还原redis数据解析协议

  • • 常规数据类型与操作指令支持

  •     • string——get/mget/set/mset

  •     • list——lpush/lpop/rpush/rpop/lrange

  •     • set——sadd/sismember/srem

  •     • hashmap——hset/hget/hdel

  •     • sortedset——zadd/zrem/zrangebyscore

  • • 数据持久化机制

  •     • appendonlyfile落盘与重写

 

除此之外,为了效仿redis. 项目在与底层数据模型交互时,会基于单协程模型实现. 数据存储模型则是由一个无锁化的map结构实现.

 

2 架构总览

如上图所示,goredis的运行架构在纵向上可以拆分为三个模块:

  • • 服务运行层: 支持goredis作为tcp服务端,接收和处理来自客户端的连接与请求

  •     • 应用程序application

  •     • 服务端server

  • • 指令分发层: 在服务端接收到请求后,遵循Redis Serialization Protocol转换为操作指令,并分发到存储引擎层

  •     • 指令分发器handler

  •     • 协议解析器parser

  • • 存储引擎层: 与存储介质交互,完成数据存储、过期回收以及持久化处理

  •     • 数据持久化persister

  •     • 存储介质kvstore

  •     • 数据库执行层executor

  •     • 数据库触发层trigger

在宏观上,goredis接收并处理一笔请求的途径链路为:

server->handler(parser)->dbtrigger->dbexecutor->kvstore(persister)

 

goredis项目遵循依赖注入的实现风格,基于dig(https://github.com/uber-go/dig)实现各模块和组件的注入与管理,具体使用方式可以参见我之前的文章——低配 Spring—Golang IOC 框架 dig 原理解析

 

 

在文件 app/factory.go 中作了统一收口,可以基于全局视角一览各部分参与的模块组件:

var container = dig.New()


func init() {
    /**
       其它
    **/
    // 配置加载 conf
    _ = container.Provide(SetUpConfig)
    _ = container.Provide(PersistThinker)
    // 日志打印 logger
    _ = container.Provide(log.GetDefaultLogger)


    /**
       存储引擎层
    **/
    // 数据持久化
    _ = container.Provide(persist.NewPersister)
    // 存储介质
    _ = container.Provide(datastore.NewKVStore)
    // 执行器
    _ = container.Provide(database.NewDBExecutor)
    // 触发器
    _ = container.Provide(database.NewDBTrigger)


    /**
       指令分发层
    **/
    // 协议解析器
    _ = container.Provide(protocol.NewParser)
    // 指令分发器
    _ = container.Provide(handler.NewHandler)


    /**
       服务端运行层
    **/
    _ = container.Provide(server.NewServer)
}

 

3 服务运行层

 

服务运行层支撑了整个goredis应用的运行,包含两个核心子模块:

  • • Application: 对应为goredis应用程序的抽象

  • • Server: 对应为goredis接收和响应客户端请求的运行服务端

3.1 应用程序

goredis应用程序启动的入口代码位于main.go,核心步骤包括:

  • • 构造server与application实例

  • • 一键运行application

import "github.com/xiaoxuxiansheng/goredis/app"


func main() {
    // 1 构造 server 实例
    server, err := app.ConstructServer()
    if err != nil {
        panic(err)
    }


    // 2 构造 application 实例
    app := app.NewApplication(server, app.SetUpConfig())
    defer app.Stop()


    // 3 运行 application
    if err := app.Run(); err != nil {
        panic(err)
    }
}

 

有关 application类定义代码位于:app/app.go,包含两个核心成员属性:

  • • server: 持有的服务端实例

  • • conf: 聚合了使用方关于goredis的定制化配置

type Application struct {
    server *server.Server
    conf   *Config
}

 

func (*Application) Run() error {
    return a.server.Serve(a.conf.Address())
}


func (*Application) Stop() {
    a.server.Stop()
}

 

3.2 服务端

服务端类定义代码位于 server/server.go,其中持有了指令分发模块handler的引用:

type Server struct {
    // ...
    handler  Handler
    // ...
}

 

server运行时,会启动handler实例,并且启动tcp服务并监听tcp端口,将每个到来的tcp连接移交给handler层进行处理:

func (*Server) Serve(address string) error {
    // 1 启动 handler
    s.handler.Start()
    // ...


    // 2 运行 tcp 服务
    s.listenAndServe(listener, closec)
    // ...
}


func (*Server) listenAndServe(listener net.Listener, closec chan struct{}) {
    // ...
    // io 多路复用模型,goroutine for per conn
    for {
        // 接收到来的 tcp 连接
        conn, err := listener.Accept()
        // ...
        // 为每个到来的 conn 分配一个 goroutine 处理
        pool.Submit(func() {
            // ...
            s.handler.Handle(ctx, conn)
        })
    }


    // ...
}


func (*Server) Stop() {
    s.stopOnce.Do(func() {
        close(s.stopc)
    })
}

其中,pool.Submit方法类似于go func一键启动协程,只不过本项目中采用的是协程池模式:

import (
    "runtime/debug"
    "strings"


    "github.com/panjf2000/ants"
)


var pool *ants.Pool


func init() {
    _pool, err := ants.NewPool(50000, ants.WithPanicHandler(func(interface{}) {
        // ...
    }))
    if err != nil {
        panic(err)
    }
    pool = _pool
}


func Submit(task func()) {
    pool.Submit(task)
}

协程池使用到的是ants(https://github.com/panjf2000/ants),之前我也发表了一篇对应的技术文章:Golang 协程池 Ants 实现原理

 

4 指令分发层

指令分发层的核心作用是,接收到请求后,遵循Redis协议将其解析为操作指令,然后将指令分发到存储引擎层进行执行.

4.1 指令分发器

指令分发器的接口定义代码位于 server/server.go,包含两个核心方法:

  • • Start:启动handler

  • • Handle:持续处理到来的一笔连接

// 逻辑处理器
type Handler interface {
    Start() error // 启动 handler
    // 处理到来的每一笔 tcp 连接
    Handle(ctx context.Context, conn net.Conn)
    // 关闭处理器
    Close()
}

 

关于Handler的实现类位于handler/handler.go,其中包含三个核心成员属性:

  • • db: 对下游存储引擎层的抽象

  • • parser: 遵循redis协议,将到来的请求参数解析成对应的redis指令

  • • persister: handler初启动时,通过persister读取此前的持久化内容,将其分发到存储引擎层还原出对应的数据

type Handler struct {
    // ...
    db        DB
    parser    Parser
    persister Persister
    // ...
}

 

启动Handler的Start方法:

func (*Handler) Start() error {
    // 加载持久化文件,还原内容
    reloader, err := h.persister.Reloader()
    // ...
    // 读取持久化文件内容,还原内存数据库
    h.handle(SetLoadingPattern(context.Background()), newFakeReaderWriter(reloader))
    return nil
}

 

处理到来连接的Handle方法——通过parser将请求参数转化为指令,然后分发给db:

// 处理到来的 tcp 连接
func (*Handler) Handle(ctx context.Context, conn net.Conn) {
    // ...
    h.handle(ctx, conn)
}


func (*Handler) handle(ctx context.Context, conn io.ReadWriter) {
    // 借助 protocol parser 将到来的指令转而通过 stream channel 输出
    stream := h.parser.ParseStream(conn)
    for {
        select {
        // ...
        // 读取 stream channel 中到来的指令
        case droplet := <-stream:
            if err := h.handleDroplet(ctx, conn, droplet); err != nil {
                // ...
            }
        }
    }
}


// 处理每一笔指令
func (*Handler) handleDroplet(ctx context.Context, conn io.ReadWriter, droplet *Droplet) error {
    // ...
    // 请求指令类型转换
    multiReply, ok := droplet.Reply.(MultiReply)
    // ...
    // 通过 db 引擎处理请求指令
    if reply := h.db.Do(ctx, multiReply.Args()); reply != nil {
        // 返回响应结果
        _, _ = conn.Write(reply.ToBytes())
        // ...
    }


    // ...
}

 

func (*Handler) Close() {  
    // ...
    h.db.Close()
    h.persister.Close()
}

 

4.2 协议解析器

协议解析器的接口定义代码位于handler/struct.go,通过ParseStream方法将连接转换成channel的形式,把每一笔请求参数转为redis操作指令,并通过channel传递给handler进行处理:

// 协议解析器
type Parser interface {
    ParseStream(reader io.Reader) <-chan *Droplet
}

 

Parser实现类代码位于protocol/parser.go:

type Parser struct {
    lineParsers map[byte]lineParser
    // ...
}

 

// 将连接转为 stream channel 形式,并通过异步协程执行 parse 方法持续往 channel 中传送到来的指令
func (*Parser) ParseStream(reader io.Reader) <-chan *handler.Droplet {
    ch := make(chan *handler.Droplet)
    pool.Submit(
        func() {
            p.parse(reader, ch)
        })
    return ch
}

 

func (*Parser) parse(rawReader io.Reader, ch chan<- *handler.Droplet) {
    reader := bufio.NewReader(rawReader)
    for {
        // 逐行读取数据
        firstLine, err := reader.ReadBytes('\n')
        
        // 解析请求参数,分发给对应的指令解析函数
        firstLine = bytes.TrimSuffix(firstLine, []byte{'\r', '\n'})
        lineParseFunc, ok := p.lineParsers[firstLine[0]]
        // ...
        // 将解析后的指令传送到 stream channel
        ch <- lineParseFunc(firstLine, reader)
    }
}

 

5 存储引擎层

 

存储引擎层模块封装了有关数据存储交互的流程,其中分为四个核心子模块:

  • • 触发层trigger: 负责接收来自handler的操作指令,将其通过channel移交给executor

  • • 执行层executor: 采用单协程模式运行,负责完成存储数据的crud操作

  • • 存储介质datastore: 存储数据所在之处,包含各类数据类型的实现模型

  • • 持久化模块persister: 支持内存数据的持久化和重加载功能

 

5.1 数据库触发层

有关整个数据库存储引擎的接口定义位于 handler/struct.go,通过Do方法完成指令的执行,并接收其响应结果:

type DB interface {
    Do(ctx context.Context, cmdLine [][]byte) Reply
    Close()
}

 

对DB的实现类是数据库触发层DBTrigger,代码位于 database/trigger.go,其中依赖了executor实例:

// 数据库触发器
type DBTrigger struct {
    // ...
    executor Executor
}

 

DBTrigger接收到指令,会对其进行包装,然后通过channel(executor.Entrance)将其发送给全局单例的executor中:

// 处理指令
func (*DBTrigger) Do(ctx context.Context, cmdLine [][]byte) handler.Reply {
    // ...


    // 组装指令
    cmd := Command{
        ctx:      ctx,
        cmd:      cmdType,
        args:     cmdLine[1:],
        receiver: make(CmdReceiver),
    }


    // 将指令投递到 executor
    d.executor.Entrance() <- &cmd


    // 接收来自 executor 返回的 reply
    return <-cmd.Receiver()
}

 

5.2 数据库执行层

有关数据执行器接口的定义代码位于 database/struct.go,其会通过Entrance入口完成指令的接收:

type Executor interface {
    // 执行器入口,用于接收指令
    Entrance() chan<- *Command
    // 校验指令是否合法
    ValidCommand(cmd CmdType) bool
    Close()
}

 

DBExecutor是数据库执行器的实现类,代码位于 database/executor.go,核心成员属性包括:

  • • ch:通过该channel接收到来自上游trigger发送的指令

  • • cmdHandlers:基于指令类型映射到对应的执行方法

  • • dataStore:真正存储数据的地方

  • • gcTicker:用于驱动定期回收过期数据的定时器

type DBExecutor struct {
    // ...
    // 接收来自 trigger 指令的 channel
    ch     chan *Command
    // 指令分发
    cmdHandlers map[CmdType]CmdHandler
    // 数据存储介质
    dataStore   DataStore
    // 回收过期数据的定时器
    gcTicker *time.Ticker
}

 

func (*DBExecutor) Entrance() chan<- *Command {
    return e.ch
}

DBExecutor会以全局单例的形式存在,并在其初始化时启动run方法,持续接收来自trigger和gcTicker的指令,对dataStore中的数据进行操作. 正因为单协程运行的executor是dataStore的唯一操作入口,因此存储数据时可以放心地采用无锁化模型

// 数据库执行层单协程运行模式
func (*DBExecutor) run() {
    for {
        // ...        
        // 定期批量回收一次过期数据
        case <-e.gcTicker.C:
            e.dataStore.GC()
        // 接收并处理到来的指令
        case cmd := <-e.ch:
            // 获取指令对应的处理函数
            cmdFunc, ok := e.cmdHandlers[cmd.cmd]
            // ...
            // 基于懒加载机制,对 key 进行回收
            e.dataStore.ExpirePreprocess(string(cmd.args[0]))
            // 执行指令处理函数,并将执行结果通过 receiver 发送给 trigger
            cmd.receiver <- cmdFunc(cmd)
        }
    }
}

 

5.3 数据存储介质

存储介质DataStore的接口定义代码位于 database/struct.go,其中包含了各类数据操作指令犯法:

type DataStore interface {
    // ...
    // 过期相关指令
    GC()
    Expire(*Command) handler.Reply
    ExpireAt(*Command) handler.Reply


    // string 数据类型
    Get(*Command) handler.Reply
    Set(*Command) handler.Reply
    // ...


    // list 数据类型
    LPush(*Command) handler.Reply
    // ...


    // set 数据类型
    SAdd(*Command) handler.Reply
    // ...


    // hashmap 数据类型
    HSet(*Command) handler.Reply
    // ...


    // sortedset 数据类型
    ZAdd(*Command) handler.Reply
    // ...
}

 

dataStore的实现类为KVStore,对应代码位于 database/kv_store.go:

  • • data:实际存储数据的map结构,因为不存在并发,所以无需加锁保护

  • • expiredAt:记录了每个key对应的过期时间

type KVStore struct {
    // 实际存储数据的 map
    data      map[string]interface{}
    // 记录 key 过期时间的 map
    expiredAt map[string]time.Time


    // 执行 key 过期回收任务的时间轮
    expireTimeWheel SortedSet


    // 数据持久化模块
    persister handler.Persister
}

 

5.4 数据持久化模块

数据持久化模块的接口定义代码位于 handler/persist.go,核心方法包括:

  • • Reloader:获取用于重加载数据的reloader

  • • PersistCmd:完成一笔指令的持久化

type Persister interface {
    // 加载持久化数据的 loader
    Reloader() (io.ReadCloser, error)
    // 对指令进行持久化
    PersistCmd(ctx context.Context, cmd [][]byte)
    Close()
}

 

goredis中,针对持久化模块仅仅实现了aof模式,对应代码位于 persist/aof.go,相关内容将在本系列第4篇中展开.

type aofPersister struct {
    // ...
    // 接收持久化指令的通道
    buffer                 chan [][]byte
    // aof 文件
    aofFile                *os.File
    // aof 文件名
    aofFileName            string
    // aof 持久化策略
    appendFsync            appendSyncStrategy
    // aof 重写策略
    autoAofRewriteAfterCmd int64
    aofCounter             atomic.Int64


    // ...
}

 

 

6 展望

至此,本篇正文结束. 本期向大家介绍的是goredis系列的开篇之作,在此对未来拟展开部分内容作个展望:

  • • 基于go实现redis之主干框架(已完成): 在宏观视角下纵览 goredis 整体架构,梳理各模块间的关联性

  • • 基于go实现redis之指令分发(待填坑): 聚焦介绍 goredis 服务端如何启动和运行,并在接收客户端请求后实现指令协议的解析和分发

  • • 基于go实现redis之存储引擎(待填坑): 聚焦介绍数据存储层中单协程无锁化执行框架,各类基本数据类型的底层实现细节,以及过期数据的惰性和定期回收机制

  • • 基于go实现redis之数据持久化(待填坑): 介绍goredis关于aof持久化机制的实现以及有关于aof重写策略的执行细节



小徐先生的编程世界
在学钢琴,主业码农
 最新文章