0 前言
工程实践场景中,我们通常用数据库完成数据的持久存储,而数据存储侧的性能调优也是一个永恒经典的话题. 在一些请求量大、读多写少的场景中,一种性能优化方式是考虑在数据库之上添加一层缓存组件,这样一方面能减轻数据库的访问压力,一方面也能提升查询操作的性能.
然而由于缓存(如 redis)和数据库(如 mysql)是两个独立的存储组件,在操作过程中无法在跨组件的基础上保证“事务” 的语义,因此不可避免地会面临缓存数据与数据库数据不一致的问题.
我们把上述问题称为 “缓存一致性”问题,本篇将紧密围绕该问题,针对设计缓存与数据库的读、写流程进行串联梳理,总结出能够兼顾数据一致性与操作性能的执行方法论.
理论先行,实践紧随. 本篇所探讨的内容,将统一通过 go 语言在我的开源项目 consistent_cache 中予以实践验证——开源项目传送门:https://github.com/xiaoxuxiansheng/consistent_cache
1 理论分析
1.1 缓存一致性问题
缓存(cache) 相比于数据库(db) 而言更加轻便快捷,但与之相对的也存在成本高、容量小、稳定性弱的问题,因此 cache 中的数据通常是依附于 db 中的持久化数据而存在,一笔数据的更新操作和存储终态最终还是要以 db 为准.
从这个角度出发来看,在写操作密集的场景下,使用 cache 的收益并不明显;然而在读多写少的场景中,我们通过将一些相对稳定不变的数据从 db 冗余到 cache 中,由 cache 同一收口数据查询能力,能够很大程度提升查询性能并降低 db 的访问压力.
随之而来的首要问题就是 cache 中的数据如何与 db 保持一致. 展开来说就是,cache 和 db 本身是两个独立的存储组件,跨组件间的数据一致性问题属于分布式事务的范畴,本身有一套解决的方案,但受限于其高昂的实现成本,因此不适用于绝大多数场景,这部分内容可以参考我之前发表的文章——万字长文漫谈分布式事务实现原理.
在本篇中,我们更多将注意力集中在,如何针对数据的读写流程进行秩序地组织串联,以此来满足 cache 和 db 间数据的最终一致性,并尽可能追求即时一致性的语义.
1.2 读写流程串讲
当前比较主流的缓存一致性实现思路中,针对读、写流程的职责进行如下拆解:
流程 | 缓存 cache | 数据库 db |
写流程 | 执行删除操作 | 执行写入操作 |
读流程 | 执行查询、写入操作 | 执行查询操作 |
• 在写流程中:只负责写 db,不负责更新 cache;与之相对的,需要在写 db 前,先负责将 cache 中的脏数据删除,由后续到来的读操作负责将更新后的迁移到缓存中
• 在读流程中:尝试读 cache;如果 cache 中数据 miss,则读取 db,并将读到的数据更新到 cache 中,以便后续的读操作复用
此处我们针对上述流程的设计原则通过 qa 形式进行简要剖析:
question A:为什么写流程不在写完 db 后直接更新 cache 数据?
• 原因I:cache 相比于 db 空间小、成本高,因此希望尽可能将访问频繁的热数据加载到 cache 中进行复用. 而 cache 的目标是面向读密集场景,数据的热度由读操作的频率来决定,因此将写 cache 的执行权交由读流程负责,而非写流程;
• 原因II:在明确已经由读流程负责写 cache 的前提下,写流程就不再执行重复动作,以此实现流程简化,也能规避部分因并发场景而导致的 cache 数据一致性问题
question B:为什么写流程需要删除 cache 数据?
• 在读流程中,当 cache 中存在数据,会立即读取并返回结果. 因此倘若写操作不删除 cache 中的脏数据,那么后续到来的读操作都会因此读取到错误的结果.
• 当写操作删除 cache 删除后,读流程访问 cache 时发现数据 miss,就会读取 db,并将此时正确的数据重新写入 cache 中
question C:为什么写流程需要先删除 cache 再写 db?
• 逆向思考:倘若先写 db 后删 cache,由于两个操作无法保证原子性,一旦前者成功后者失败,就会导致 cache 中永久存在错误的数据;
• 反之,先删 cache 后写 db,哪怕前者成功后者失败,无非是额外增加一次将数据由 db 冗余到 cache 中的成本而已
question D:上述流程是严谨的吗?是否还存在哪些环节可能存在 cache 数据不一致问题?
这个问题留给各位读者,请大家短暂驻步于此, 脑中思考该问题的答案,并带着问题再进入到后续小节的阅读内容中.
1.3 缓存双删策略
question D 的答案是否定的.
我们需要明白,在实际场景中,一系列读、写流程是并发执行的,两个流程下各执行步骤的相对次序可能因为机器、网络等不确定因素发生变化. 此处我们就给出一个具体 badcase 加以分析说明:
• 背景:
数据库中有一笔 kv 数据,key:a; value:b
{
"key": "a",
"value": "b"
}
此时,一笔读操作和一笔写操作并发启动,
读操作读取 key:a 对应的数据:
get a
写操作执行指令,期望将 key:a 对应的 value 更新为 c
• 过程
按照 1.2 小节中介绍的流程,下面我们进行读写流程的步骤串联:
(1)moment1:写流程删除 cache 中的 key:a 数据
(2)moment2:读流程读 cache,发现 key:a 数据 miss,准备进一步读 db
(3)moment3:读流程读 db,此时数据还是老版本,value 为 b
(4)moment4:写流程写 db,将 key:a 对应 value 更新为 c
(5)moment5:读流程把读到的 value:b 作为 key:a 的映射数据写入 cache
至此,读、写流程均结束了,然而现状是,db 中 key:a 数据已经更新为 c,然而 cache 中 key:a 对应的还是脏数据 b.
产生上述问题的本质原因就是,在并发场景下,步骤(1)写流程删除 cache 数据后,并无法阻止在(1)-(4)期间内,读流程再次读 db 并将脏数据迁移到 cache 中.
此处,我们首先想到的应对方式是缓存双删策略——即在写流程写 db 前后,分别执行一次删除 cache 操作.
以上述案例加以说明,就是写流程在 moment4 之后,额外增加一个 moment6,再一次将 cache 中 key:a 对应的数据删除.
那么,到此为止,是否还存在 cache 数据不一致的可能性呢?
请大家短暂驻足思考,并带着你的答案进入下一小节.
1.4 缓存延时双删策略
1.3 小节中,缓存双删中存在的问题其实很简单,就在于我们无法保证,写流程中,第二次删除 cache 的动作一定能执行在读流程写 cache 的操作之后,也就是 moment5 和 moment6 两个时刻的相对次序是不稳定的:
因此,我们进一步引入缓存延时双删策略. 这里的“延时”就体现在,写流程写完 db 后,会等待指定时长,保证此期间可能持有脏数据的读流程都完成写 cache 操作后,再执行第二次的删 cache 操作,以此来实现缓存数据的“最终一致性”语义.
缓存延时双删策略已经是可用于工程化落地的实现方案,其核心的读写流程串联如下图:
1.5 写缓存禁用机制
然而,缓存延时双删机制同样有其短板所在,核心分为两点:
• cache 数据弱一致: 写 db 到延时执行二次删 cache 操作期间,cache 中都可能存在脏数据,因此无法保证 cache 数据的即时一致性,只能保证最终一致性语义
• 二次删 cache 操作不稳定: 延时二次删 cache 操作存在执行失败的可能性,一旦失败,cache 数据的“最终一致性语义”都将无法保证
在分布式场景,我们往往需要在流程性能和数据一致性之间进行权衡取舍. 为了进一步保证 cache 数据的强一致性语义,我们可以尝试引入“锁机制”.
最简单粗暴的实现方式,就是通过一把 key 维度的分布式读写锁,实现读流程和写流程的隔离. 这样两个流程下的执行步骤也就不会混淆在一起,上述问题自然也就得到了根除.
然而,该做法的代价却是对读操作性能的大幅度牺牲,针对某笔数据,一旦产生写操作,那么在此期间所有读操作都要陷入阻塞等待的状态,这并不是我们所乐于接受的.
在此基础上,我们进一步对实现思路进行升级. 在 1.3-1.4 小节的思路演进过程中,我们发现导致 cache 数据不一致的罪魁祸首,本质上是读流程把脏数据写入 cache 的操作,而非宏观意义上的整个读流程.
因此,我们要做的不是完全隔离读、写流程,也不需要使用分布式锁这么重的工具,而是退而求其次,针对一笔数据的维度启用一个 “开关” 机制,能够用于控制读流程是否启用写 cache 机制即可:
• 每当有写流程到达时,先将该笔数据的“开关”关闭,然后正常执行后续流程,执行完成后再重新将“开关”打开
• 在“开关”关闭期间,所有到达的读流程正常执行步骤,唯独不会在读 db 后执行写 cache 操作
通过上述的 “写缓存禁用”机制,就保证了数据的强一致性. 写流程执行期间,该笔数据对应的读流程不会阻塞,只是相当于 cache 机制被暂时屏蔽,读流程需要统一从 db 中获取最精确的数据而已.
此处值得一提的是,在写流程完成写 db 操作后,通过需要延迟一段时间再重新开启该笔数据下的 “写缓存机制”,其本质思路和缓存延时双删策略中“延时”的用意是一致的,就是避免在并发场景下,读取到 db 脏数据的读流程写 cache 操作恰好发生在写流程“写缓存机制”启用之后.
1.6 其他缓存相关问题
除此之外,在 cache 与 db 交互流程中,还存在几个经典问题,也将在本篇中展开探讨,并在技术实战环节中对具体的应对策略进行展示:
• 缓存雪崩
倘若导致大量 cache key 在同一时刻过期,那么这一瞬间纷涌而至的大量读请求都会因为 cache 数据 miss 而集体涌入到 db 中,导致 db 压力陡增.
缓存雪崩问题的常用解决思路是切断问题的直接导火索,对 cache key 的过期时间进行打散,比如可以在预设过期时间的基础上,加上随机扰动值,因此来避免大面积 cache key 同时失效的情形.
• 缓存穿透
倘若读操作频繁请求 db 中不存在的数据,那么该数据自然也无法写入 cache,最终所有请求都会直击 db,导致 db 压力较大.
针对缓存穿透问题的解法之一,是存储层之上额外封装一层布隆过滤器,用于提前过滤大部分不存在的 key,具体技术原理可参见我之前发表的文章——布隆过滤器技术原理及应用实战.
而在本篇中,我们将采用另一种解法:倘若在读流程中发现数据在 db 中不存在,则同样会写 cache,但会针对该笔数据加以特殊标记,让后续读操作通过读 cache 就能获得到数据不存在的信息.
2 技术实战
介绍理论基础部分,下面展示一致性缓存服务项目 consistent_cach 的源码实现,进入技术实战环节.
2.1 架构
整个操作流程涉及到对缓存模块 cache 和数据库模块 db 的操作,因此在实现上拆分出三个核心模块:
• 一致性缓存服务 service:整体上串联读、写流程,包含其中针对 cache 和 db 的每个交互节点. (保证缓存数据一致性的核心逻辑,项目中重点实现部分)
• 缓存模块 cache:对缓存组件的抽象(定义成一个抽象 interface 由使用方自行实现,项目中提供了一个 redis 实现版本)
• 数据库模块 db:对数据库组件的抽象(定义成一个抽象 interface 由使用方自行实现,项目中提供了一个 mysql 实现版本)
一致性缓存服务 service 的实现代码代码位于 ./service.go,核心成员属性包括其持有的缓存模块 cache 和数据库模块 db 的实例引用:
type Service struct {
opts *Options // 策略参数
cache Cache // 缓存模块
db DB // 数据库模块
}
// 构造一致性缓存服务. 缓存和数据库均由使用方提供具体的实现版本
func NewService(cache Cache, db DB, opts ...Option) *Service {
s := Service{
cache: cache,
db: db,
opts: &Options{},
}
for _, opt := range opts {
opt(s.opts)
}
repair(s.opts)
return &s
}
针对缓存模块抽象接口——Cache 的定义代码位于 ./interface.go,其中包含如下核心方法:
• Get: 在 cache 中查询数据. 如果数据在 cache 中不存在,需要返回指定错误:ErrorCacheMiss
• Del: 从 cache 中删除数据
• Disable: 针对某条数据禁用写缓存机制,为了防止意外会给一个兜底过期时间(写流程删 cache 前执行)
• Enable: 针对某条数据,延时启用写缓存机制(写流程写 db 后执行)
• PutWhenEnable:只在写缓存机制启用的前提下,执行写 cache 操作(读流程在读 db 后执行)
var (
// 数据在缓存中不存在
ErrorCacheMiss = errors.New("cache miss")
)
// 缓存模块的抽象接口定义
type Cache interface {
// 启用某个 key 对应读流程写缓存机制(默认情况下为启用状态)
Enable(ctx context.Context, key string, delayMilis int64) error
// 禁用某个 key 对应读流程写缓存机制
Disable(ctx context.Context, key string, expireSeconds int64) error
// 读取 key 对应缓存
Get(ctx context.Context, key string) (string, error)
// 删除 key 对应缓存
Del(ctx context.Context, key string) error
// 校验某个 key 对应读流程写缓存机制是否启用,倘若启用则写入缓存(默认情况下为启用状态)
PutWhenEnable(ctx context.Context, key, value string, expireSeconds int64) (bool, error)
}
针对数据库模块定义了抽象的 interface——DB:
• Put: 将一条数据 obj 写入(或更新)到 db
• Get: 从 db 中读取一条数据,此处 obj 作为接收数据的指针(倘若 db 中不存在数据,需要返回指定错误 ErrorDBMiss)
// 数据库中不存在数据
var ErrorDBMiss = errors.New("db miss")
// 数据库模块的抽象接口定义
type DB interface {
// 数据写入数据库
Put(ctx context.Context, obj Object) error
// 从数据库读取数据
Get(ctx context.Context, obj Object) error
}
针对一条数据记录,使用抽象 interface——Object 进行抽象,其需要实现几个方法:
• KeyColumn: db 中唯一键的字段名,也会作为 cache 中的 key
• Key: 唯一键的值
• Write: 将数据记录 object 内容序列化成字符串
• Read: 读取字符串,将内容反序列化到数据记录 object 中
// 每次读写操作时,操作的一笔数据记录
type Object interface {
// 获取 key 对应的字段名
KeyColumn() string
// 获取 key 对应的值
Key() string
// 将 object 序列化成字符串
Write() (string, error)
// 读取字符串内容,反序列化到 object 实例中
Read(body string) error
}
2.2 缓存
在 consistent_cache 项目中,基于 redis 实现了一个缓存模块:
• disable 禁用写缓存机制: 通过 redis setEx 指令,在 redis 中设置与 key 一一对应的标识键 disableKey 实现
• enable 延时启用写缓存机制: 通过 redis expire 指令,给 disableKey 设置一个较短过期时间的方式来实现延时启用效果
• get、del:直接通过 redis get、del 操作实现
• putWhenEnable: 通过 lua 脚本实现,保证只在 disableKey 不存在时,才会写入 cache
redis 缓存模块实现类代码位于 ./redis/cache.go:
// redis 实现版本的缓存模块
type Cache struct {
// 抽象的客户端模块. 实现版本为 redis 客户端
client Client
}
// 构造器函数
func NewRedisCache(config *Config) *Cache {
return &Cache{client: NewRClient(config)}
}
其中各核心方法的源码内容展示如下:
// 启用某个 key 对应读流程写缓存机制(默认情况下为启用状态)
func (c *Cache) Enable(ctx context.Context, key string, delayMilis int64) error {
// redis 中删除 key 对应的 disable key. 只要 disable key 标识不存在,则读流程写缓存机制视为启用状态
// 给 disable key 设置一个相对较短的过期时间
return c.client.PExpire(ctx, key, delayMilis)
}
// 禁用某个 key 的读流程写缓存机制
func (c *Cache) Disable(ctx context.Context, key string, expireSeconds int64) error {
// redis 中设置 key 对应的 disable key. 只要 disable key 标识存在,则读流程写缓存机制视为禁用状态
return c.client.SetEx(ctx, c.disableKey(key), "1", expireSeconds)
}
// 读取 key 对应缓存内容
func (c *Cache) Get(ctx context.Context, key string) (string, error) {
// 从 redis 中读取 kv 对
reply, err := c.client.Get(ctx, key)
if err != nil && !errors.Is(err, redis.ErrNil) {
return "", err
}
// 倘若缓存中不存在数据,返回指定错误 ErrorCacheMiss
if errors.Is(err, redis.ErrNil) {
return "", consistent_cache.ErrorCacheMiss
}
return reply, nil
}
// 删除 key 对应缓存
func (c *Cache) Del(ctx context.Context, key string) error {
// 从 reids 中删除 kv 对
return c.client.Del(ctx, key)
}
其中,在putWhenEnable 方法中,因为需要对多个指令进行原子化执行,因此涉及到对 lua 脚本的使用:
// 校验某个 key 对应读流程写缓存机制是否启用,倘若启用则写入缓存(默认情况下为启用状态)
func (c *Cache) PutWhenEnable(ctx context.Context, key, value string, expireSeconds int64) (bool, error) {
// 运行 redis lua 脚本,保证只有在 disable key 不存在时,才会执行 key 的写入
reply, err := c.client.Eval(ctx, LuaCheckEnableAndWriteCache, 2, []interface{}{
c.disableKey(key),
key,
value,
expireSeconds,
})
if err != nil {
return false, err
}
return cast.ToInt(reply) == 1, nil
}
对应 lua 脚本代码如下,位于 ./redis.lua.go:
const (
// 通过 lua 脚本确保在 disable key 不存在时,才执行 key value 对写入
LuaCheckEnableAndWriteCache = `
local disable_key = KEYS[1];
local disable_flag = redis.call("get",disable_key);
if disable_flag then
return 0;
end
local key = KEYS[2];
local value = ARGV[1];
redis.call("set",key,value);
local cache_expire_seconds = tonumber(ARGV[2]);
redis.call("expire",key,cache_expire_seconds);
return 1;
`
)
值得一提的是,由于在 putWhenEnable 对应 lua 脚本中,涉及到对数据 key 以及与其一一一应的 disableKey 的操作,因此需要通过 hash tag 保证这两个 key 在 redis cluster 模式下也能被分发到同一个节点,这样 lua 脚本的执行才是有效的.
// 基于 key 映射得到 v key 表达式
func (c *Cache) disableKey(key string) string {
// 通过 {hash_tag},保证在 redis 集群模式下,key 和 disable key 也会被分发到相同节点
return fmt.Sprintf("Enable_Lock_Key_{%s}", key)
}
hash tag 的使用机制可以参见:https://redis.io/docs/latest/commands/cluster-keyslot/
2.3 数据库
在 consistent_cache 中,针对数据库模块 db 提供了一个 mysql 实现版本,连接 mysql 的客户端使用 gorm 版本. 这部分代码位于 ./mysql/*:
// 判断操作模型是否声明了表名
type tabler interface {
TableName() string
}
// 数据库模块的抽象接口定义
type DB struct {
db *gorm.DB
}
在数据写流程中:
• 执行 create 操作
• 倘若发生唯一键冲突错误,则改为执行 update 操作
更多细节通过源码注释的方式给出:
// 数据写入数据库
func (d *DB) Put(ctx context.Context, obj consistent_cache.Object) error {
db := d.db
// 倘若 obj 显式声明了表名,则进行应用
tabler, ok := obj.(tabler)
if ok {
db = db.Table(tabler.TableName())
}
// 此处通过两个非原子性动作实现 upsert 效果:
// 1 尝试创建记录
// 2 倘若发生唯一键冲突,则改为执行更新操作
err := db.WithContext(ctx).Create(obj).Error
if err == nil {
return nil
}
// 判断是否为唯一键冲突,若是的话,则改为更新操作
if IsDuplicateEntryErr(err) {
return db.WithContext(ctx).Debug().Where(fmt.Sprintf("`%s` = ?", obj.KeyColumn()), obj.Key()).Updates(obj).Error
}
// 其他错误直接返回
return err
}
针对 mysql 唯一键冲突的判断方法:
import "github.com/go-sql-driver/mysql"
func IsDuplicateEntryErr(err error) bool {
var mysqlErr *mysql.MySQLError
if errors.As(err, &mysqlErr) && mysqlErr.Number == DuplicateEntryErrCode {
return true
}
return false
}
在读流程中:
• 通过数据唯一键作为检索条件进行 select 操作
• 倘若数据不存在,需要返回指定错误 ErrorDBMiss
// 从数据库读取数据
func (d *DB) Get(ctx context.Context, obj consistent_cache.Object) error {
db := d.db
// 倘若 obj 显式声明了表名,则进行应用
tabler, ok := obj.(tabler)
if ok {
db = db.Table(tabler.TableName())
}
// select 语句读取通过唯一键检索数据记录
err := db.WithContext(ctx).Where(fmt.Sprintf("`%s` = ?", obj.KeyColumn()), obj.Key()).First(obj).Error
// 倘若 db 中不存在数据,返回指定错误 ErrorDBMiss
if errors.Is(err, gorm.ErrRecordNotFound) {
return consistent_cache.ErrorDBMiss
}
return err
}
更多有关 gorm 的使用示例和底层原理可以参见我之前发表的文章:
2.4 一致性缓存服务
在一致性缓存服务 service 提供的写数据方法 Put 中,包含如下核心步骤:
• 通过缓存模块 cache 的 disable 操作,禁用写缓存机制
• 在 cache 中删除对应的数据 key
• 将数据写入到 db 中
• 调用 cache 的 enable 方法,实现延时启用写缓存机制的效果
// 写操作
func (s *Service) Put(ctx context.Context, obj Object) error {
// 1 针对 key 维度禁用读流程写缓存机制
if err := s.cache.Disable(ctx, obj.Key(), s.opts.disableExpireSeconds); err != nil {
return err
}
defer func() {
go func() {
tctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := s.cache.Enable(tctx, obj.Key(), s.opts.enableDelayMilis); err != nil {
s.opts.logger.Errorf("enable fail, key: %s, err: %v", obj.Key(), err)
}
}()
}()
// 2 删除 key 维度对应缓存
if err := s.cache.Del(ctx, obj.Key()); err != nil {
return err
}
// 3 数据写入 db
return s.db.Put(ctx, obj)
}
在一致性缓存服务 service 提供的读数据方法 Get 中,包含如下核心步骤:
• 尝试从 cache 中读到空数据标识(针对缓存穿透问题启用的 NullData),则直接返回数据不存在的错误 ErrorDataNotExist
• 倘若 cache 缺失数据(ErrorCacheMiss),则从 db 中读数据
• 从 db 读到数据后,调用 cache 模块的 putWhenEnable 方法,保证只在写缓存机制启用的情况下,将数据写入 cache
• 倘若 db 中也没有数据(ErrorDBMiss),同样调用 cache 模块的 putWhenEnable 方法,保证只在写缓存机制启用的情况下,在 cache 中写入 NullData 标识
// 为响应缓存穿透问题,启用在缓存中写入空数据的标识
const NullData = "Err_Syntax_Null_Data"
// 数据在 cache 和 db 中均不存在
var ErrorDataNotExist = errors.New("data not exist")
// 2 读操作
func (s *Service) Get(ctx context.Context, obj Object) (useCache bool, err error) {
// 1 读取缓存
v, err := s.cache.Get(ctx, obj.Key())
// 2 非缓存 miss 类错误,直接抛出错误
if err != nil && !errors.Is(err, ErrorCacheMiss) {
return false, err
}
// 3 读取到缓存结果
if err == nil {
// 3.1 读取到的数据为 EmptyData. 是为了防止缓存穿透而设置的空值
if v == NullData {
return true, ErrorDataNotExist
}
// 3.2 正常读取到数据
return true, obj.Read(v)
}
// 4 缓存 miss,读 db
if err = s.db.Get(ctx, obj); err != nil && !errors.Is(err, ErrorDBMiss) {
return false, err
}
// 5 db 中也没有数据,则尝试往 cache 中写入 NullData
if errors.Is(err, ErrorDBMiss) {
if ok, err := s.cache.PutWhenEnable(ctx, obj.Key(), NullData, s.opts.CacheExpireSeconds()); err != nil {
s.opts.logger.Errorf("put null data into cache fail, key: %s, err: %v", obj.Key(), err)
} else {
s.opts.logger.Infof("put null data into cache resp, key: %s, ok: %t", obj.Key(), ok)
}
return false, ErrorDataNotExist
}
// 6 成功获取到数据了,则需要将其写入缓存
v, err = obj.Write()
if err != nil {
return false, err
}
if ok, err := s.cache.PutWhenEnable(ctx, obj.Key(), v, s.opts.CacheExpireSeconds()); err != nil {
s.opts.logger.Errorf("put data into cache fail, key: %s, data: %v, err: %v", obj.Key(), v, err)
} else {
s.opts.logger.Infof("put data into cache resp, key: %s, v: %v, ok: %t", obj.Key(), v, ok)
}
// 7 返回读取到的结果
return false, nil
}
其中为了应对缓存雪崩问题,在读流程写缓存步骤中,针对过期时间可以添加一个随机扰动值:
func (o *Options) CacheExpireSeconds() int64 {
if !o.cacheExpireRandomMode {
return o.cacheExpireSeconds
}
// 过期时间在 1~2倍之间取随机值
return o.cacheExpireSeconds + o.rander.Int63n(o.cacheExpireSeconds+1)
}
3 使用示例
3.1 主流程
最后是关于整个一致性缓存服务的使用示例,这部分代码位于 ./example/example_test.go 中.
其中核心步骤包括:
• 输入 redis 地址、密码,启用 redis 缓存模块
• 输入 mysql dsn,启用 mysql 数据库模块
• 创建一致性缓存服务实例
• 执行写操作
• 执行读操作
const (
redisAddress = "请输入 redis 地址"
redisPassword = "请输入 redis 密码"
mysqlDSN = "请输入 mysql dsn"
)
func newService() *consistent_cache.Service {
// 缓存模块
cache := redis.NewRedisCache(&redis.Config{
Address: redisAddress,
Password: redisPassword,
})
// 数据库模块
db := mysql.NewDB(mysqlDSN)
return consistent_cache.NewService(cache, db,
consistent_cache.WithCacheExpireSeconds(120),
consistent_cache.WithDisableExpireSeconds(1),
)
}
func Test_consistent_Cache(t *testing.T) {
service := newService()
ctx := context.Background()
exp := Example{
Key_: "test",
Data: "test",
}
// 写操作
if err := service.Put(ctx, &exp); err != nil {
t.Error(err)
return
}
// 读操作
expReceiver := Example{
Key_: "test",
}
if _, err := service.Get(ctx, &expReceiver); err != nil {
t.Error(err)
return
}
// 读取到的数据结果 以及是否使用到缓存
t.Logf("read data: %s, ", expReceiver.Data)
}
3.2 object 实现
在使用示例中,同样给出了针对数据记录 Object interface 的实现类 Example,代码位于 ./example/example_po.go:
type Example struct {
ID uint `json:"id" gorm:"primarykey"`
Key_ string `json:"key" gorm:"column:key"`
Data string `json:"data" gorm:"column:data"`
}
// 获取对应的表名
func (e *Example) TableName() string {
return "example"
}
// 获取 key 对应的字段名
func (e *Example) KeyColumn() string {
return "key"
}
// 获取 key 对应的值
func (e *Example) Key() string {
return e.Key_
}
func (e *Example) DataColumn() []string {
return []string{"data"}
}
// 将 object 序列化成字符串
func (e *Example) Write() (string, error) {
body, err := json.Marshal(e)
if err != nil {
return "", err
}
return string(body), nil
}
// 读取字符串内容,反序列化到 object 实例中
func (e *Example) Read(body string) error {
return json.Unmarshal([]byte(body), e)
}
4 总结
本篇和大家一起针对缓存 cache 与数据库 db 之间的数据一致性问题展开了理论探讨,推演了从缓存延时双删到写缓存禁用机制的演进过程.
基于上述理论,我通过 golang 开发了 lib 库,集成了缓存一致性读写流程中的核心步骤,并在本文中针对其中的技术细节进行了介绍,并在最后给出对应的使用示例.