基于go实现redis之指令分发

文摘   科技   2024-04-27 11:53   北京  

0 前言

欢迎回来,由我们一起继续推进技术分享专题—— 【基于go实现redis】 .

此前我已于 github 开源项目——goredis,由于我个人水平有限,如有实现不到位之处,欢迎批评指正:https://github.com/xiaoxuxiansheng/goredis

 

本系列正是围绕着开源项目goredis展开,共分为四篇内容,本本为其中的第二篇——指令分发篇

  • • 基于go实现redis之主干框架(已完成): 在宏观视角下纵览 goredis 整体架构,梳理各模块间的关联性

  • • 基于go实现redis之指令分发(本篇): 聚焦介绍 goredis 服务端如何启动和运行,并在接收客户端请求后实现指令协议的解析和分发

  • • 基于go实现redis之存储引擎(待填坑): 聚焦介绍数据存储层中单协程无锁化执行框架,各类基本数据类型的底层实现细节,以及过期数据的惰性和定期回收机制

  • • 基于go实现redis之数据持久化(待填坑): 介绍goredis关于aof持久化机制的实现以及有关于aof重写策略的执行细节

 

1 架构梳理

正如前言中所介绍,本篇我们聚焦探讨两个问题:

  • • goredis 服务端如何运行: 即如何快速搭建一个 tcp server,能够正常接收处理来自客户端的 tcp 连接和请求

  • • goredis 请求内容解析: 即遵循怎样的协议,针对 tcp 请求内容进行解析转换,并给出遵循协议的响应格式

 

为了解决上述问题,本篇涉及到的模块包括:

  • • 应用程序 application: 对应为全局 goredis 应用程序的抽象载体(第2章介绍)

  • • 服务端 server: 对应为 tcp server 的具体实现模块(第2章介绍)

  • • 指令分发器 handler: 负责为到来的 tcp 连接服务,将请求内容转为操作指令,并分发给存储引擎层(第3章介绍)

  • • 协议解析器 parser: 遵循 Redis 文本解析协议,将请求内容解析为 redis 操作指令(第3章介绍)

 

2 服务运行

本章中,我们将展开介绍,如何支撑 goredis 应用程序的启动,并作为一个 tcp server 面向客户端提供服务.

goredis 启动与运行流程如上图所示,分为几个核心步骤:

  • • 配置加载: 感知并启用用户设定的运行策略

  • • server 启动: server 启动后持续监听 tcp 端口,接收到来的 tcp 连接

  • • handler 启动:handler 用于服务到来的 tcp 连接,将请求内容转为 redis 指令,并分发至存储引擎侧处理

 

2.1 启动入口

goredis 程序启动入口位于 main.go,完成对 conf 的加载,server 和 application 实例的构造,然后通过 application 一键启动程序:

func main() {
    server, err := app.ConstructServer()
    // ...


    app := app.NewApplication(server, app.SetUpConfig())
    defer app.Stop()


    if err := app.Run(); err != nil {
        // ...
    }
}

 

2.2 配置加载

redis 运行策略的配置文件位于 ./redis.conf . 目前支持配置的策略包括:

  • • bind: 服务端支持接收的客户端源 ip

  • • port: 服务端启动时监听的端口

  • • appendonly: (true/false)是否启用 aof 持久化机制

  • • appendfilename: 持久化时使用的 aof 文件路径+名称

  • • appendfsync:aof 持久化策略. (1)always: 针对每个指令进行 fsync 落盘; (2)everysec: 指令先集中溢写到设备 buffer, 每秒集中 fsync 落盘一次; (3)no: 指令溢写到设备 buffer 后即结束流程,由设备自行决定何时落盘 (有关于 fsync 的概念,可参见我之前的文章——etcd存储引擎之存储设计一文中的 3.2 小节)

  • • auto-aof-rewrite-after-cmds: 每持久化多少条指令后进行一次 aof 文件重写

# 接收的源 ip 地址. 设置为 0.0.0.0 代表接受任意源 ip.
bind 0.0.0.0
# 程序绑定的端口号
port 6379


# 是否启用 aof 持久化策略
appendonly yes
# aof 文件名
appendfilename appendonly.aof
# aof 级别,分为 always | everysec | no
appendfsync everysec
# 每执行多少次 aof 指令后,进行一次指令重写
auto-aof-rewrite-after-cmds 1000

 

goredis 启动时,首先会通过 SetUpConfig 方法对配置文件进行加载,生成 Config 实例并应用到服务各个模块中.

此处代码位于 app/config.go:

// 与 redis.conf 一一对应的各项配置参数
type Config struct {
    Bind                    string `cfg:"bind"`                        // 接收的源 ip 地址
    Port                    int    `cfg:"port"`                        // 程序绑定的端口号
    AppendOnly_             bool   `cfg:"appendonly"`                  // 是否启用 aof 持久化
    AppendFileName_         string `cfg:"appendfilename"`              // aof 文件路径+名称
    AppendFsync_            string `cfg:"appendfsync"`                 // aof 持久化策略
    AutoAofRewriteAfterCmd_ int    `cfg:"auto-aof-rewrite-after-cmds"` // 每执行多少次 aof 操作后,进行一次重写
}


// 加载生成配置项实例
func SetUpConfig() *Config {
    // 懒加载机制,保证全局只加载一次配置
    confOnce.Do(func() {
        defer func() {
            // 倘若配置加载失败,则使用默认的兜底策略
            if globalConf == nil {
                globalConf = defaultConf()
            }
        }()
        // 打开根目录下的 redis.conf 文件
        file, err := os.Open("./redis.conf")
        // ...
        // 读取文件内容,反序列化为 conf 实例
        globalConf = setUpConfig(file)
    })


    return globalConf
}


// 默认的兜底策略
func defaultConf() *Config {
    return &Config{
        Bind:        "0.0.0.0", // 默认面向所有源 ip 地址开放
        Port:        6379, // 默认端口号为 6379
        AppendOnly_: false, // 默认不启用 aof
    }
}

 

2.3 服务运行

支撑 goredis 运行 tcp server 的核心代码位于 server/server.go 文件. 在实现过程中,体现到的核心技术细节主要包含三点:

  • • io 多路复用技术: server 运行过程中,在监听 tcp 端口和处理 tcp 连接时,通过对 go 语言 io netpoller 的使用,隐藏了这部分技术细节,以 linux 系统为例,底层会使用到 io 多路复用中的 epoll 指令(例如 listener 阻塞等待 tcp 连接到达的额accept 操作以及在 tcp 连接到达后执行的读写操作,都涉及对 io 多路复用技术的使用. 这部分技术细节内容,隐藏在 go 原生 io 标准库中,感兴趣的同学可以查看我之前发表的技术文章——解析 Golang 网络 IO 模型之 Epoll

  • • 一比一异步协程服务连接: 对于到来的每个 tcp 连接,会为其一比一启动 goroutine,并将其分配给 handler 层进行处理

  • • 优雅关闭策略: 在处理 tcp 连接时,会通过 context 和 waitGroup 工具控制和守护其生命周期,实现优雅关闭策略

 

 

有关服务端 server 的类定义如下,其中包含如下核心成员属性:

  • • 单例工具 xxOnce: 通过 runOnce、stopOnce 两个单例工具,规避单 server 实例下启动和停止动作的重复执行

  • • 指令分发器实例 handler: server accept 到 tcp 连接后,会分配给 handler 进行处理

  • • 关闭控制器 stopc: 通过 stopc 感知到服务退出信号,进行资源回收

type Server struct {
    runOnce  sync.Once
    stopOnce sync.Once
    handler  Handler
    // ...
    stopc    chan struct{}
}

 

服务端启动的入口方法为 server.Serve():

  • • 退出信号感知: 启动一个异步协程探测是否有程序信号,以便完成退出前的资源回收、优雅关闭

  • • 端口监听器初始化: 根据配置的 ip port,创建 tcp listener 实例

  • • 服务端主运行流程: 基于 for 循环 + listener accept 模型,持续探测并接收到来的 tcp 连接. 为每个到来的 tcp 连接一比一分配 goroutine,并将其托付给指令分发器 handler 进行处理

func (s *Server) Serve(address string) error {
    if err := s.handler.Start(); err != nil {
        return err
    }
    var _err error
    s.runOnce.Do(func() {
        // 监听进程信号,完成程序退出前的资源回收
        exitWords := []os.Signal{syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT}


        sigc := make(chan os.Signal, 1)
        signal.Notify(sigc, exitWords...)
        closec := make(chan struct{}, 4)
        pool.Submit(func() {
            for {
                select {
                // 进程退出信号
                case signal := <-sigc:
                    switch signal {
                    case exitWords[0], exitWords[1], exitWords[2], exitWords[3]:
                        closec <- struct{}{}
                        return
                    default:
                    }
                // 显式关闭信号
                case <-s.stopc:
                    closec <- struct{}{}
                    return
                }
            }
        })
        // 为指定端口申请监听器
        listener, err := net.Listen("tcp", address)
        if err != nil {
            _err = err
            return
        }
        // 启动经典的 tcp server 运行框架
        s.listenAndServe(listener, closec)
    })


    return _err
}

 

func (s *Server) listenAndServe(listener net.Listener, closec chan struct{}) {
    errc := make(chan error, 1)
    defer close(errc)


    // 接收到退出信号,则回收资源,终止流程
    ctx, cancel := context.WithCancel(context.Background())
    pool.Submit(
        func() {
            // 服务单退出处理
            select {
            case <-closec:
                // ...
            case err := <-errc:
                // ...
            }
            cancel()
            // 关闭 handler
            s.handler.Close()
            // 关闭 listener
            if err := listener.Close(); err != nil {
                s.logger.Errorf("[server]server close listener err: %s", err.Error())
            }
        })


    // ...
    // 服务端运行框架  for 循环 + listener accept 模型
    var wg sync.WaitGroup
    // io 多路复用模型,goroutine for per conn
    for {
        conn, err := listener.Accept()
        if err != nil {
            // 超时类错误,忽略
            if ne, ok := err.(net.Error); ok && ne.Timeout() {
                time.Sleep(5 * time.Millisecond)
                continue
            }


            // 意外错误,则停止运行
            errc <- err
            break
        }


        // 为每个到来的 conn 分配一个 goroutine 处理
        wg.Add(1)
        pool.Submit(func() {
            defer wg.Done()
            s.handler.Handle(ctx, conn)
        })
    }


    // 通过 waitGroup 保证优雅退出
    wg.Wait()
}

 

3 指令分发

3.1 指令分发器

每当有 tcp 连接到达后,server 层会将其统一分配到下层,由 handler 为其提供服务.

入口方法为 handler.Handle(),为了更好地支持优雅关闭策略,handler 会缓存活跃的连接,以便在退出前对连接进行关闭:

func (h *Handler) Handle(ctx context.Context, conn net.Conn) {
    h.mu.Lock()
    // 判断 db 是否已经关闭
    if h.closed.Load() {
        h.mu.Unlock()
        return
    }


    // 当前 conn 缓存起来
    h.conns[conn] = struct{}{}
    h.mu.Unlock()


    // 核心逻辑所在
    h.handle(ctx, conn)
}
// 关闭 handler
func (h *Handler) Close() {
    h.Once.Do(func() {
        // ...
        h.closed.Store(true)
        h.mu.RLock()
        defer h.mu.RUnlock()
                // 依次关闭连接
        for conn := range h.conns {
            if err := conn.Close(); err != nil {
                // ...
            }
        }
        h.conns = nil
                // 关闭存储引擎
        h.db.Close()
                // 关闭持久化模块
        h.persister.Close()
    })
}

 

 

handler 层会紧密依赖到对协议解析器 parser 的使用:(1)将到来的 tcp 连接转换成 go 语言中普遍受欢迎的通道 channel 形式;(2)然后借由 parser 的能力将到来的请求内容一一转换成符合 Redis 文本协议的指令内容;(3)进一步将其分发给下层的存储引擎 db 进行处理.

 

在此处的实现上,我将用于传递指令的通道类比为一股源源不断的水流——stream,将到来的每一个指令类比为水流中的一个水滴——droplet,并采用 go 语言中经典的 for + select 模式组织 handler 流程的运行框架.

func (h *Handler) handle(ctx context.Context, conn io.ReadWriter) {
    // 将连接转换成通道 channel 实例,抽象为一股水流
    stream := h.parser.ParseStream(conn)
    for {
        select {
        // ...
        // 承载水流中的每一个水滴
        case droplet := <-stream:
            if err := h.handleDroplet(ctx, conn, droplet); err != nil {
                // ...
            }
        }
    }
}

 

// 处理到来的水滴(请求指令)
func (h *Handler) handleDroplet(ctx context.Context, conn io.ReadWriter, droplet *Droplet) error {
    // 请求是否终止. 如包含了 EOF 类错误
    if droplet.Terminated() {
        return droplet.Err
    }
     
    // ...
    // 请求指令必须为 multiBulkReply 类型,稍后展开介绍
    multiReply, ok := droplet.Reply.(MultiReply)
    // ...


    // 将请求指令发往存储引擎 db
    if reply := h.db.Do(ctx, multiReply.Args()); reply != nil {
       // 将 db 给予的响应通过 tcp 连接返回给客户端
        _, _ = conn.Write(reply.ToBytes())
        return nil
    }


    // ...
}

 

3.2 RESP

在正式介绍协议解析模块之前,我们有必要先对 Redis 文本解析协议的理论知识进行补充.

Resp 全称 Redis Serialization Protocol,是 Redis 中用于解析 tcp 文本内容采用的协议:

  • • 内容分类: Resp 中将文本内容分为简单字符串、错误信息、整数、定长字符串以及数组五类

  • • 换行分隔符: Resp 以行为解析单元,文本内容一律以换行符 \r\n 进行分隔

  • • 二进制安全: Resp 中定长字符串和数组类型的文本内容能够保证二进制安全,即使正文内容混淆了类似 \r\n 这样的特殊字符,也不会发生混淆歧义

 

接下来我们针对上文提到的五类内容注意展开介绍:

  • • 简单字符串(Simple String): 以 "+" 开头,随后紧跟文本内容直到换行符出现.

简单字符串非二进制安全,主要用于服务端完成指令处理后给予客户端的一些简单响应中,其示意如下:

+OK\r\n

 

  • • 错误(Error): 以 “-” 开头,随后紧跟错误信息直到换行符出现.

错误非二进制安全,主要用于服务端给予客户端的报错信息响应中,其示意如下:

-Err syntax error\r\n

 

  • • 整数(Integer): 以 ":" 开头,随后紧跟一个64位有符号整数

整数非二进制安全,主要用于服务端给予客户端的一些统计类响应中,其示意如下:

:1\r\n

 

  • • 定长字符串(Bulk String): 由两行内容组成,(1)第一行以 "$" 开头,随后标识出字符串长度;(2)第二行展示对应长度字符串.

定长字符串由于存在长度表示,因此是二进制安全的,即便 string 中包含了 \r\n 也能被正常解析. 例如服务端执行 get 指令获取到的 value 值就以定长字符串的协议给予客户端响应.

定长字符串的示意内容如下,表示一个长度为 3 的字符串——"abc"

$3\r\n
abc\r\n

 

  • • 数组(Array,也称 Multi Bulk String): 由 2n + 1 行内容组成(n为数组长度) :(1)第一行以 "*" 开头,随后标识出数组长度 n;(2)接下来每 2 行形成一组,遵循定长字符串 Bulk String 的表达协议

数组同样是二进制安全的表达形式. 例如客户端发放服务端的指令,或是服务端基于客户端的复杂数据结构响应,均采用数组的表达形式.

数组的示意内容如下,表示一个包含 3 个元素的数组,对应形成一笔由客户端发往服务端的 set 指令——【set a b】

*2\r\n
$3\r\n
set\r\n
$1\r\n
a\r\n
$1\r\n
b\r\n

 

3.3 协议解析器

协议解析器 parser 模块遵循 Resp,针对各类文本内容,定义了响应的解析方法.

对应代码位于 protocol/parser.go 文件中:

// 协议解析器 parser
type Parser struct {
    // 以本文首个字符为标志,映射到不同文本类型的解析方法
    lineParsers map[byte]lineParser
    // ...
}


// 协议解析器构造器函数
func NewParser(logger log.Logger) handler.Parser {
    p := Parser{
        // ...
    }
    // 以本文首个字符为标志,映射到不同文本类型的解析方法
    p.lineParsers = map[byte]lineParser{
        // 解析简单字符串
        '+': p.parseSimpleString,
        // 解析错误
        '-': p.parseError,
        // 解析整数
        ':': p.parseInt,
        // 解析定长字符串
        '$': p.parseBulk,
        // 解析数组
        '*': p.parseMultiBulk,
    }
    return &p
}

 

指令分发器 handler 接收到 tcp 连接时,会调用 parser.ParseStream 方法:

(1)其中 parser 会构造出一个 channel 实例返回给 handler

(2)并异步启动一个 goroutine 持续接收来自 tcp 连接中的请求,并遵循 RESP 将其解析成 redis 指令,然后通过 channel 发往 handler:

// 由 parser 提供能力,将 tcp 连接中的请求内容解析成指令并通过 channel 发往 handler
func (p *Parser) ParseStream(reader io.Reader) <-chan *handler.Droplet {
    // 构造 channel 实例
    ch := make(chan *handler.Droplet)
    pool.Submit(
        func() {
            // 异步启动 goroutine,负责完成 tcp 请求内容解析,并通过 channel 传输
            p.parse(reader, ch)
        })
    return ch
}

 

// 负责完成 tcp 请求内容解析,并通过 channel 传输
func (p *Parser) parse(rawReader io.Reader, ch chan<- *handler.Droplet) {
    // reader 封装
    reader := bufio.NewReader(rawReader)
    for {
        // 以换行符为分割,读取首行内容
        firstLine, err := reader.ReadBytes('\n')
        // ...
    
        length := len(firstLine)
        if length <= 2 || firstLine[length-1] != '\n' || firstLine[length-2] != '\r' {
            continue
        }
        
        firstLine = bytes.TrimSuffix(firstLine, []byte{'\r', '\n'})
        // 以本文首个字符为标志,映射到不同文本类型的解析方法
        lineParseFunc, ok := p.lineParsers[firstLine[0]]
        if !ok {
            p.logger.Errorf("[parser] invalid line handler: %s", firstLine[0])
            continue
        }
        // 解析成指令内容后,通过 channel 发送往 handler
        ch <- lineParseFunc(firstLine, reader)
    }
}

 

4 使用示例

介绍完 goredis 的服务运行框架以及文本解析协议后,接下来于本章中给出 goredis 的使用示例.

 

(1)首先,通过 goredis 下的启动脚本一键启动服务:

./start.sh

 

(2)接下来,访问到服务对应的 tcp 端口,建立 tcp 连接:

telnet localhost 6379

 

(3)成功建立 tcp 连接后,遵循 Resp 下的 Multi Bulk String 协议,执行一笔 set 指令——【set a b】

*3         
$3
set
$1
a
$1
b

 

(4)设置成功后,获得响应:

:1

 

(5)接下来遵循 Resp 下的 Multi Bulk String 协议,执行一笔 get 指令——【get a】

*2
$3
get
$1
a
$1

 

(6)最终成功获得响应:

$1
b

 

5 展望

至此为指令分发篇的全部内容,在此对本系列内容做个小结和展望:

  • • 基于go实现redis之主干框架(已完成): 在宏观视角下纵览 goredis 整体架构,梳理各模块间的关联性

  • • 基于go实现redis之指令分发(已完成): 聚焦介绍 goredis 服务端如何启动和运行,并在接收客户端请求后实现指令协议的解析和分发

  • • 基于go实现redis之存储引擎(待填坑): 聚焦介绍数据存储层中单协程无锁化执行框架,各类基本数据类型的底层实现细节,以及过期数据的惰性和定期回收机制

  • • 基于go实现redis之数据持久化(待填坑): 介绍 goredis 关于 aof 持久化机制的实现以及有关于aof重写策略的执行细节



小徐先生的编程世界
在学钢琴,主业码农
 最新文章