一致性缓存理论分析与技术实战

文摘   2024-05-11 14:23   北京  

0 前言

工程实践场景中,我们通常用数据库完成数据的持久存储,而数据存储侧的性能调优也是一个永恒经典的话题. 在一些请求量大、读多写少的场景中,一种性能优化方式是考虑在数据库之上添加一层缓存组件,这样一方面能减轻数据库的访问压力,一方面也能提升查询操作的性能.

然而由于缓存(如 redis)和数据库(如 mysql)是两个独立的存储组件,在操作过程中无法在跨组件的基础上保证“事务” 的语义,因此不可避免地会面临缓存数据与数据库数据不一致的问题.

我们把上述问题称为 “缓存一致性”问题,本篇将紧密围绕该问题,针对设计缓存与数据库的读、写流程进行串联梳理,总结出能够兼顾数据一致性与操作性能的执行方法论.

理论先行,实践紧随. 本篇所探讨的内容,将统一通过 go 语言在我的开源项目 consistent_cache 中予以实践验证——开源项目传送门:https://github.com/xiaoxuxiansheng/consistent_cache

 

1 理论分析

1.1 缓存一致性问题

缓存(cache) 相比于数据库(db) 而言更加轻便快捷,但与之相对的也存在成本高、容量小、稳定性弱的问题,因此 cache 中的数据通常是依附于 db 中的持久化数据而存在,一笔数据的更新操作和存储终态最终还是要以 db 为准.

从这个角度出发来看,在写操作密集的场景下,使用 cache 的收益并不明显;然而在读多写少的场景中,我们通过将一些相对稳定不变的数据从 db 冗余到 cache 中,由 cache 同一收口数据查询能力,能够很大程度提升查询性能并降低 db 的访问压力.

 

随之而来的首要问题就是 cache 中的数据如何与 db 保持一致. 展开来说就是,cache 和 db 本身是两个独立的存储组件,跨组件间的数据一致性问题属于分布式事务的范畴,本身有一套解决的方案,但受限于其高昂的实现成本,因此不适用于绝大多数场景,这部分内容可以参考我之前发表的文章——万字长文漫谈分布式事务实现原理.

在本篇中,我们更多将注意力集中在,如何针对数据的读写流程进行秩序地组织串联,以此来满足 cache 和 db 间数据的最终一致性,并尽可能追求即时一致性的语义.

 

1.2 读写流程串讲

当前比较主流的缓存一致性实现思路中,针对读、写流程的职责进行如下拆解:

流程缓存 cache数据库 db
写流程执行删除操作执行写入操作
读流程执行查询、写入操作执行查询操作
  • • 在写流程中:只负责写 db,不负责更新 cache;与之相对的,需要在写 db 前,先负责将 cache 中的脏数据删除,由后续到来的读操作负责将更新后的迁移到缓存中

  • • 在读流程中:尝试读 cache;如果 cache 中数据 miss,则读取 db,并将读到的数据更新到 cache 中,以便后续的读操作复用

 

此处我们针对上述流程的设计原则通过 qa 形式进行简要剖析:

question A:为什么写流程不在写完 db 后直接更新 cache 数据?

  • • 原因I:cache 相比于 db 空间小、成本高,因此希望尽可能将访问频繁的热数据加载到 cache 中进行复用. 而 cache 的目标是面向读密集场景,数据的热度由读操作的频率来决定,因此将写 cache 的执行权交由读流程负责,而非写流程;

  • • 原因II:在明确已经由读流程负责写 cache 的前提下,写流程就不再执行重复动作,以此实现流程简化,也能规避部分因并发场景而导致的 cache 数据一致性问题

 

question B:为什么写流程需要删除 cache 数据?

  • • 在读流程中,当 cache 中存在数据,会立即读取并返回结果. 因此倘若写操作不删除 cache 中的脏数据,那么后续到来的读操作都会因此读取到错误的结果.

  • • 当写操作删除 cache 删除后,读流程访问 cache 时发现数据 miss,就会读取 db,并将此时正确的数据重新写入 cache 中

 

question C:为什么写流程需要先删除 cache 再写 db?

  • • 逆向思考:倘若先写 db 后删 cache,由于两个操作无法保证原子性,一旦前者成功后者失败,就会导致 cache 中永久存在错误的数据;

  • • 反之,先删 cache 后写 db,哪怕前者成功后者失败,无非是额外增加一次将数据由 db 冗余到 cache 中的成本而已

 

question D:上述流程是严谨的吗?是否还存在哪些环节可能存在 cache 数据不一致问题?

这个问题留给各位读者,请大家短暂驻步于此, 脑中思考该问题的答案,并带着问题再进入到后续小节的阅读内容中.

 

1.3 缓存双删策略

question D 的答案是否定的.

我们需要明白,在实际场景中,一系列读、写流程是并发执行的,两个流程下各执行步骤的相对次序可能因为机器、网络等不确定因素发生变化. 此处我们就给出一个具体 badcase 加以分析说明:

 

 

  • • 背景:

数据库中有一笔 kv 数据,key:a; value:b

{
  "key": "a",
  "value": "b"
}

此时,一笔读操作和一笔写操作并发启动,

读操作读取 key:a 对应的数据:

get a

写操作执行指令,期望将 key:a 对应的 value 更新为 c

 

  • • 过程

按照 1.2 小节中介绍的流程,下面我们进行读写流程的步骤串联:

(1)moment1:写流程删除 cache 中的 key:a 数据

(2)moment2:读流程读 cache,发现 key:a 数据 miss,准备进一步读 db

(3)moment3:读流程读 db,此时数据还是老版本,value 为 b

(4)moment4:写流程写 db,将 key:a 对应 value 更新为 c

(5)moment5:读流程把读到的 value:b 作为 key:a 的映射数据写入 cache

至此,读、写流程均结束了,然而现状是,db 中 key:a 数据已经更新为 c,然而 cache 中 key:a 对应的还是脏数据 b.

产生上述问题的本质原因就是,在并发场景下,步骤(1)写流程删除 cache 数据后,并无法阻止在(1)-(4)期间内,读流程再次读 db 并将脏数据迁移到 cache 中.

 

此处,我们首先想到的应对方式是缓存双删策略——即在写流程写 db 前后,分别执行一次删除 cache 操作.

以上述案例加以说明,就是写流程在 moment4 之后,额外增加一个 moment6,再一次将 cache 中 key:a 对应的数据删除.

 

 

那么,到此为止,是否还存在 cache 数据不一致的可能性呢?

请大家短暂驻足思考,并带着你的答案进入下一小节.

 

1.4 缓存延时双删策略

1.3 小节中,缓存双删中存在的问题其实很简单,就在于我们无法保证,写流程中,第二次删除 cache 的动作一定能执行在读流程写 cache 的操作之后,也就是 moment5 和 moment6 两个时刻的相对次序是不稳定的:

 

 

因此,我们进一步引入缓存延时双删策略. 这里的“延时”就体现在,写流程写完 db 后,会等待指定时长,保证此期间可能持有脏数据的读流程都完成写 cache 操作后,再执行第二次的删 cache 操作,以此来实现缓存数据的“最终一致性”语义.

缓存延时双删策略已经是可用于工程化落地的实现方案,其核心的读写流程串联如下图:

 

 

1.5 写缓存禁用机制

然而,缓存延时双删机制同样有其短板所在,核心分为两点:

  • • cache 数据弱一致: 写 db 到延时执行二次删 cache 操作期间,cache 中都可能存在脏数据,因此无法保证 cache 数据的即时一致性,只能保证最终一致性语义

  • • 二次删 cache 操作不稳定: 延时二次删 cache 操作存在执行失败的可能性,一旦失败,cache 数据的“最终一致性语义”都将无法保证

 

在分布式场景,我们往往需要在流程性能和数据一致性之间进行权衡取舍. 为了进一步保证 cache 数据的强一致性语义,我们可以尝试引入“锁机制”.

最简单粗暴的实现方式,就是通过一把 key 维度的分布式读写锁,实现读流程和写流程的隔离. 这样两个流程下的执行步骤也就不会混淆在一起,上述问题自然也就得到了根除.

然而,该做法的代价却是对读操作性能的大幅度牺牲,针对某笔数据,一旦产生写操作,那么在此期间所有读操作都要陷入阻塞等待的状态,这并不是我们所乐于接受的.

 

 

在此基础上,我们进一步对实现思路进行升级. 在 1.3-1.4 小节的思路演进过程中,我们发现导致 cache 数据不一致的罪魁祸首,本质上是读流程把脏数据写入 cache 的操作,而非宏观意义上的整个读流程.

因此,我们要做的不是完全隔离读、写流程,也不需要使用分布式锁这么重的工具,而是退而求其次,针对一笔数据的维度启用一个 “开关” 机制,能够用于控制读流程是否启用写 cache 机制即可:

  • • 每当有写流程到达时,先将该笔数据的“开关”关闭,然后正常执行后续流程,执行完成后再重新将“开关”打开

  • • 在“开关”关闭期间,所有到达的读流程正常执行步骤,唯独不会在读 db 后执行写 cache 操作

通过上述的 “写缓存禁用”机制,就保证了数据的强一致性. 写流程执行期间,该笔数据对应的读流程不会阻塞,只是相当于 cache 机制被暂时屏蔽,读流程需要统一从 db 中获取最精确的数据而已.

 

此处值得一提的是,在写流程完成写 db 操作后,通过需要延迟一段时间再重新开启该笔数据下的 “写缓存机制”,其本质思路和缓存延时双删策略中“延时”的用意是一致的,就是避免在并发场景下,读取到 db 脏数据的读流程写 cache 操作恰好发生在写流程“写缓存机制”启用之后.

 

1.6 其他缓存相关问题

除此之外,在 cache 与 db 交互流程中,还存在几个经典问题,也将在本篇中展开探讨,并在技术实战环节中对具体的应对策略进行展示:

  • • 缓存雪崩

倘若导致大量 cache key 在同一时刻过期,那么这一瞬间纷涌而至的大量读请求都会因为 cache 数据 miss 而集体涌入到 db 中,导致 db 压力陡增.

缓存雪崩问题的常用解决思路是切断问题的直接导火索,对 cache key 的过期时间进行打散,比如可以在预设过期时间的基础上,加上随机扰动值,因此来避免大面积 cache key 同时失效的情形.

 

  • • 缓存穿透

倘若读操作频繁请求 db 中不存在的数据,那么该数据自然也无法写入 cache,最终所有请求都会直击 db,导致 db 压力较大.

针对缓存穿透问题的解法之一,是存储层之上额外封装一层布隆过滤器,用于提前过滤大部分不存在的 key,具体技术原理可参见我之前发表的文章——布隆过滤器技术原理及应用实战.

而在本篇中,我们将采用另一种解法:倘若在读流程中发现数据在 db 中不存在,则同样会写 cache,但会针对该笔数据加以特殊标记,让后续读操作通过读 cache 就能获得到数据不存在的信息.

 

2 技术实战

介绍理论基础部分,下面展示一致性缓存服务项目 consistent_cach 的源码实现,进入技术实战环节.

2.1 架构

整个操作流程涉及到对缓存模块 cache 和数据库模块 db 的操作,因此在实现上拆分出三个核心模块:

  • • 一致性缓存服务 service:整体上串联读、写流程,包含其中针对 cache 和 db 的每个交互节点. (保证缓存数据一致性的核心逻辑,项目中重点实现部分)

  • • 缓存模块 cache:对缓存组件的抽象(定义成一个抽象 interface 由使用方自行实现,项目中提供了一个 redis 实现版本)

  • • 数据库模块 db:对数据库组件的抽象(定义成一个抽象 interface 由使用方自行实现,项目中提供了一个 mysql 实现版本)

 

 

一致性缓存服务 service 的实现代码代码位于 ./service.go,核心成员属性包括其持有的缓存模块 cache 和数据库模块 db 的实例引用:

type Service struct {
    opts  *Options // 策略参数
    cache Cache // 缓存模块
    db    DB    // 数据库模块
}


// 构造一致性缓存服务. 缓存和数据库均由使用方提供具体的实现版本
func NewService(cache Cache, db DB, opts ...Option) *Service {
    s := Service{
        cache: cache,
        db:    db,
        opts:  &Options{},
    }


    for _, opt := range opts {
        opt(s.opts)
    }


    repair(s.opts)
    return &s
}

 

针对缓存模块抽象接口——Cache 的定义代码位于 ./interface.go,其中包含如下核心方法:

  • • Get: 在 cache 中查询数据. 如果数据在 cache 中不存在,需要返回指定错误:ErrorCacheMiss

  • • Del: 从 cache 中删除数据

  • • Disable: 针对某条数据禁用写缓存机制,为了防止意外会给一个兜底过期时间(写流程删 cache 前执行

  • • Enable: 针对某条数据,延时启用写缓存机制(写流程写 db 后执行)

  • • PutWhenEnable:只在写缓存机制启用的前提下,执行写 cache 操作读流程在读 db 后执行

var (
    // 数据在缓存中不存在
    ErrorCacheMiss    = errors.New("cache miss")
)


// 缓存模块的抽象接口定义
type Cache interface {
    // 启用某个 key 对应读流程写缓存机制(默认情况下为启用状态)
    Enable(ctx context.Context, key string, delayMilis int64) error
    // 禁用某个 key 对应读流程写缓存机制
    Disable(ctx context.Context, key string, expireSeconds int64) error
    // 读取 key 对应缓存
    Get(ctx context.Context, key string) (string, error)
    // 删除 key 对应缓存
    Del(ctx context.Context, key string) error
    // 校验某个 key 对应读流程写缓存机制是否启用,倘若启用则写入缓存(默认情况下为启用状态)
    PutWhenEnable(ctx context.Context, key, value string, expireSeconds int64) (bool, error)
}

 

针对数据库模块定义了抽象的 interface——DB:

  • • Put: 将一条数据 obj 写入(或更新)到 db

  • • Get: 从 db 中读取一条数据,此处 obj 作为接收数据的指针(倘若 db 中不存在数据,需要返回指定错误 ErrorDBMiss)

// 数据库中不存在数据
var ErrorDBMiss       = errors.New("db miss")


// 数据库模块的抽象接口定义
type DB interface {
    // 数据写入数据库
    Put(ctx context.Context, obj Object) error
    // 从数据库读取数据
    Get(ctx context.Context, obj Object) error
}

 

针对一条数据记录,使用抽象 interface——Object 进行抽象,其需要实现几个方法:

  • • KeyColumn: db 中唯一键的字段名,也会作为 cache 中的 key

  • • Key: 唯一键的值

  • • Write: 将数据记录 object 内容序列化成字符串

  • • Read: 读取字符串,将内容反序列化到数据记录 object 中

// 每次读写操作时,操作的一笔数据记录
type Object interface {
    // 获取 key 对应的字段名
    KeyColumn() string
    // 获取 key 对应的值
    Key() string


    // 将 object 序列化成字符串
    Write() (string, error)
    // 读取字符串内容,反序列化到 object 实例中
    Read(body string) error
}

 

2.2 缓存

在 consistent_cache 项目中,基于 redis 实现了一个缓存模块:

  • • disable 禁用写缓存机制: 通过 redis setEx 指令,在 redis 中设置与 key 一一对应的标识键 disableKey 实现

  • • enable 延时启用写缓存机制: 通过 redis expire 指令,给 disableKey 设置一个较短过期时间的方式来实现延时启用效果

  • • get、del:直接通过 redis get、del 操作实现

  • • putWhenEnable: 通过 lua 脚本实现,保证只在 disableKey 不存在时,才会写入 cache

 

redis 缓存模块实现类代码位于 ./redis/cache.go:

// redis 实现版本的缓存模块
type Cache struct {
    // 抽象的客户端模块. 实现版本为 redis 客户端
    client Client
}


// 构造器函数
func NewRedisCache(config *Config) *Cache {
    return &Cache{client: NewRClient(config)}
}

 

 

其中各核心方法的源码内容展示如下:

// 启用某个 key 对应读流程写缓存机制(默认情况下为启用状态)
func (*Cache) Enable(ctx context.Context, key string, delayMilis int64) error {
    // redis 中删除 key 对应的 disable key. 只要 disable key 标识不存在,则读流程写缓存机制视为启用状态
    // 给 disable key 设置一个相对较短的过期时间
    return c.client.PExpire(ctx, key, delayMilis)
}


// 禁用某个 key 的读流程写缓存机制
func (*Cache) Disable(ctx context.Context, key string, expireSeconds int64) error {
    // redis 中设置 key 对应的 disable key. 只要 disable key 标识存在,则读流程写缓存机制视为禁用状态
    return c.client.SetEx(ctx, c.disableKey(key), "1", expireSeconds)
}


// 读取 key 对应缓存内容
func (*Cache) Get(ctx context.Context, key string) (string, error) {
    // 从 redis 中读取 kv 对
    reply, err := c.client.Get(ctx, key)
    if err != nil && !errors.Is(err, redis.ErrNil) {
        return "", err
    }
    // 倘若缓存中不存在数据,返回指定错误 ErrorCacheMiss
    if errors.Is(err, redis.ErrNil) {
        return "", consistent_cache.ErrorCacheMiss
    }
    return reply, nil
}


// 删除 key 对应缓存
func (*Cache) Del(ctx context.Context, key string) error {
    // 从 reids 中删除 kv 对
    return c.client.Del(ctx, key)
}

 

其中,在putWhenEnable 方法中,因为需要对多个指令进行原子化执行,因此涉及到对 lua 脚本的使用:

// 校验某个 key 对应读流程写缓存机制是否启用,倘若启用则写入缓存(默认情况下为启用状态)
func (*Cache) PutWhenEnable(ctx context.Context, key, value string, expireSeconds int64) (bool, error) {
    // 运行 redis lua 脚本,保证只有在 disable key 不存在时,才会执行 key 的写入
    reply, err := c.client.Eval(ctx, LuaCheckEnableAndWriteCache, 2, []interface{}{
        c.disableKey(key),
        key,
        value,
        expireSeconds,
    })
    if err != nil {
        return false, err
    }
    return cast.ToInt(reply) == 1, nil
}

 

对应 lua 脚本代码如下,位于 ./redis.lua.go:

const (
    // 通过 lua 脚本确保在 disable key 不存在时,才执行 key value 对写入
    LuaCheckEnableAndWriteCache = `
    local disable_key = KEYS[1];
    local disable_flag = redis.call("get",disable_key);
    if disable_flag then
        return 0;
    end
    local key = KEYS[2];
    local value = ARGV[1];
    redis.call("set",key,value);
    local cache_expire_seconds = tonumber(ARGV[2]);
    redis.call("expire",key,cache_expire_seconds);
    return 1;
`
)

 

值得一提的是,由于在 putWhenEnable 对应 lua 脚本中,涉及到对数据 key 以及与其一一一应的 disableKey 的操作,因此需要通过 hash tag 保证这两个 key 在 redis cluster 模式下也能被分发到同一个节点,这样 lua 脚本的执行才是有效的.

// 基于 key 映射得到 v key 表达式
func (*Cache) disableKey(key string) string {
    // 通过 {hash_tag},保证在 redis 集群模式下,key 和 disable key 也会被分发到相同节点
    return fmt.Sprintf("Enable_Lock_Key_{%s}", key)
}

hash tag 的使用机制可以参见:https://redis.io/docs/latest/commands/cluster-keyslot/

 

2.3 数据库

在 consistent_cache 中,针对数据库模块 db 提供了一个 mysql 实现版本,连接 mysql 的客户端使用 gorm 版本. 这部分代码位于 ./mysql/*:

// 判断操作模型是否声明了表名
type tabler interface {
    TableName() string
}


// 数据库模块的抽象接口定义
type DB struct {
    db *gorm.DB
}

 

在数据写流程中:

  • • 执行 create 操作

  • • 倘若发生唯一键冲突错误,则改为执行 update 操作

更多细节通过源码注释的方式给出:

// 数据写入数据库
func (*DB) Put(ctx context.Context, obj consistent_cache.Object) error {
    db := d.db
    // 倘若 obj 显式声明了表名,则进行应用
    tabler, ok := obj.(tabler)
    if ok {
        db = db.Table(tabler.TableName())
    }


    // 此处通过两个非原子性动作实现 upsert 效果:
    // 1 尝试创建记录
    // 2 倘若发生唯一键冲突,则改为执行更新操作
    err := db.WithContext(ctx).Create(obj).Error
    if err == nil {
        return nil
    }


    // 判断是否为唯一键冲突,若是的话,则改为更新操作
    if IsDuplicateEntryErr(err) {
        return db.WithContext(ctx).Debug().Where(fmt.Sprintf("`%s` = ?", obj.KeyColumn()), obj.Key()).Updates(obj).Error
    }
    // 其他错误直接返回
    return err
}

 

针对 mysql 唯一键冲突的判断方法:

import "github.com/go-sql-driver/mysql"


func IsDuplicateEntryErr(err error) bool {
    var mysqlErr *mysql.MySQLError
    if errors.As(err, &mysqlErr) && mysqlErr.Number == DuplicateEntryErrCode {
        return true
    }
    return false
}

 

在读流程中:

  • • 通过数据唯一键作为检索条件进行 select 操作

  • • 倘若数据不存在,需要返回指定错误 ErrorDBMiss

// 从数据库读取数据
func (*DB) Get(ctx context.Context, obj consistent_cache.Object) error {
    db := d.db
    // 倘若 obj 显式声明了表名,则进行应用
    tabler, ok := obj.(tabler)
    if ok {
        db = db.Table(tabler.TableName())
    }


    // select 语句读取通过唯一键检索数据记录
    err := db.WithContext(ctx).Where(fmt.Sprintf("`%s` = ?", obj.KeyColumn()), obj.Key()).First(obj).Error
    // 倘若 db 中不存在数据,返回指定错误 ErrorDBMiss
    if errors.Is(err, gorm.ErrRecordNotFound) {
        return consistent_cache.ErrorDBMiss
    }
    return err
}

 

更多有关 gorm 的使用示例和底层原理可以参见我之前发表的文章:

 

2.4 一致性缓存服务

 

在一致性缓存服务 service 提供的写数据方法 Put 中,包含如下核心步骤:

  • • 通过缓存模块 cache 的 disable 操作,禁用写缓存机制

  • • 在 cache 中删除对应的数据 key

  • • 将数据写入到 db 中

  • • 调用 cache 的 enable 方法,实现延时启用写缓存机制的效果

// 写操作
func (*Service) Put(ctx context.Context, obj Object) error {
    // 1 针对 key 维度禁用读流程写缓存机制
    if err := s.cache.Disable(ctx, obj.Key(), s.opts.disableExpireSeconds); err != nil {
        return err
    }


    defer func() {
        go func() {
            tctx, cancel := context.WithTimeout(context.Background(), time.Second)
            defer cancel()
            if err := s.cache.Enable(tctx, obj.Key(), s.opts.enableDelayMilis); err != nil {
                s.opts.logger.Errorf("enable fail, key: %s, err: %v", obj.Key(), err)
            }
        }()
    }()


    // 2 删除 key 维度对应缓存
    if err := s.cache.Del(ctx, obj.Key()); err != nil {
        return err
    }


    // 3 数据写入 db
    return s.db.Put(ctx, obj)
}

 

在一致性缓存服务 service 提供的读数据方法 Get 中,包含如下核心步骤:

  • • 尝试从 cache 中读到空数据标识(针对缓存穿透问题启用的 NullData),则直接返回数据不存在的错误 ErrorDataNotExist

  • • 倘若 cache 缺失数据(ErrorCacheMiss),则从 db 中读数据

  • • 从 db 读到数据后,调用 cache 模块的 putWhenEnable 方法,保证只在写缓存机制启用的情况下,将数据写入 cache

  • • 倘若 db 中也没有数据(ErrorDBMiss),同样调用 cache 模块的 putWhenEnable 方法,保证只在写缓存机制启用的情况下,在 cache 中写入 NullData 标识

// 为响应缓存穿透问题,启用在缓存中写入空数据的标识
const NullData = "Err_Syntax_Null_Data"


// 数据在 cache 和 db 中均不存在
var ErrorDataNotExist = errors.New("data not exist")


// 2 读操作
func (*Service) Get(ctx context.Context, obj Object) (useCache bool, err error) {
    // 1 读取缓存
    v, err := s.cache.Get(ctx, obj.Key())
    // 2 非缓存 miss 类错误,直接抛出错误
    if err != nil && !errors.Is(err, ErrorCacheMiss) {
        return false, err
    }


    // 3 读取到缓存结果
    if err == nil {
        // 3.1 读取到的数据为 EmptyData. 是为了防止缓存穿透而设置的空值
        if v == NullData {
            return true, ErrorDataNotExist
        }
        // 3.2 正常读取到数据
        return true, obj.Read(v)
    }


    // 4 缓存 miss,读 db
    if err = s.db.Get(ctx, obj); err != nil && !errors.Is(err, ErrorDBMiss) {
        return false, err
    }


    // 5 db 中也没有数据,则尝试往 cache 中写入 NullData
    if errors.Is(err, ErrorDBMiss) {
        if ok, err := s.cache.PutWhenEnable(ctx, obj.Key(), NullData, s.opts.CacheExpireSeconds()); err != nil {
            s.opts.logger.Errorf("put null data into cache fail, key: %s, err: %v", obj.Key(), err)
        } else {
            s.opts.logger.Infof("put null data into cache resp, key: %s, ok: %t", obj.Key(), ok)
        }


        return false, ErrorDataNotExist
    }


    // 6 成功获取到数据了,则需要将其写入缓存
    v, err = obj.Write()
    if err != nil {
        return false, err
    }
    if ok, err := s.cache.PutWhenEnable(ctx, obj.Key(), v, s.opts.CacheExpireSeconds()); err != nil {
        s.opts.logger.Errorf("put data into cache fail, key: %s, data: %v, err: %v", obj.Key(), v, err)
    } else {
        s.opts.logger.Infof("put data into cache resp, key: %s, v: %v, ok: %t", obj.Key(), v, ok)
    }


    // 7 返回读取到的结果
    return false, nil
}

 

其中为了应对缓存雪崩问题,在读流程写缓存步骤中,针对过期时间可以添加一个随机扰动值:

func (*Options) CacheExpireSeconds() int64 {
    if !o.cacheExpireRandomMode {
        return o.cacheExpireSeconds
    }


    // 过期时间在 1~2倍之间取随机值
    return o.cacheExpireSeconds + o.rander.Int63n(o.cacheExpireSeconds+1)
}

 

3 使用示例

3.1 主流程

最后是关于整个一致性缓存服务的使用示例,这部分代码位于 ./example/example_test.go 中.

其中核心步骤包括:

  • • 输入 redis 地址、密码,启用 redis 缓存模块

  • • 输入 mysql dsn,启用 mysql 数据库模块

  • • 创建一致性缓存服务实例

  • • 执行写操作

  • • 执行读操作

const (
    redisAddress  = "请输入 redis 地址"
    redisPassword = "请输入 redis 密码"


    mysqlDSN = "请输入 mysql dsn"
)


func newService() *consistent_cache.Service {
    // 缓存模块
    cache := redis.NewRedisCache(&redis.Config{
        Address:  redisAddress,
        Password: redisPassword,
    })
    // 数据库模块
    db := mysql.NewDB(mysqlDSN)
    return consistent_cache.NewService(cache, db,
        consistent_cache.WithCacheExpireSeconds(120),
        consistent_cache.WithDisableExpireSeconds(1),
    )
}


func Test_consistent_Cache(*testing.T) {
    service := newService()
    ctx := context.Background()
    exp := Example{
        Key_: "test",
        Data: "test",
    }
    // 写操作
    if err := service.Put(ctx, &exp); err != nil {
        t.Error(err)
        return
    }


    // 读操作
    expReceiver := Example{
        Key_: "test",
    }
    if _, err := service.Get(ctx, &expReceiver); err != nil {
        t.Error(err)
        return
    }


    // 读取到的数据结果 以及是否使用到缓存
    t.Logf("read data: %s, ", expReceiver.Data)
}

 

3.2 object 实现

在使用示例中,同样给出了针对数据记录 Object interface 的实现类 Example,代码位于 ./example/example_po.go:

type Example struct {
    ID   uint   `json:"id" gorm:"primarykey"`
    Key_ string `json:"key" gorm:"column:key"`
    Data string `json:"data" gorm:"column:data"`
}


// 获取对应的表名
func (*Example) TableName() string {
    return "example"
}


// 获取 key 对应的字段名
func (*Example) KeyColumn() string {
    return "key"
}


// 获取 key 对应的值
func (*Example) Key() string {
    return e.Key_
}


func (*Example) DataColumn() []string {
    return []string{"data"}
}


// 将 object 序列化成字符串
func (*Example) Write() (string, error) {
    body, err := json.Marshal(e)
    if err != nil {
        return "", err
    }
    return string(body), nil
}


// 读取字符串内容,反序列化到 object 实例中
func (*Example) Read(body string) error {
    return json.Unmarshal([]byte(body), e)
}

 

4 总结

本篇和大家一起针对缓存 cache 与数据库 db 之间的数据一致性问题展开了理论探讨,推演了从缓存延时双删到写缓存禁用机制的演进过程.

基于上述理论,我通过 golang 开发了 lib 库,集成了缓存一致性读写流程中的核心步骤,并在本文中针对其中的技术细节进行了介绍,并在最后给出对应的使用示例.


小徐先生的编程世界
在学钢琴,主业码农
 最新文章