0 前言
本期和大家分享【一致性哈希算法】系列专题的下篇——从零到一落地实现一致性哈希算法.
日前我已经基于 golang 手写实现了一个一致性哈希算法模块,该项目已于 github 开源: http://github.com/xiaoxuxiansheng/consistent_hash
本篇文章文章会基于源码级的讲解,大家一起捋清一致性哈希算法中的诸多技术细节,同时会涉及到对跳表以及 redis zset 的使用讲解.
理论先行,实践紧随. 由于内容有着很强的关联性,强烈建议大家先完成上篇——一致性哈希算法原理解析的阅读学习.
本期实战篇分享内容的目录大纲如下:
1 一致性哈希算法原理
我们简单回顾一下一致性哈希算法的原理.
• 一致性哈希算法是一种适用于有状态服务集群的负载均衡策略
• 一致性哈希算法数据结构由一个首尾衔接的哈希环组成:
• 节点入环时,通过取哈希并对环长度取模,确定节点所在的位置
• 数据入环时,通过取哈希并对环长度取模,然后找到顺时针往下的第一个节点,作为训中的目标节点
• 一致性哈希算法最大的优势是,在集群节点数量发生变更时,只需要承担局部小范围的数据迁移成本
• 另外,一致性哈希算法中通过虚拟节点路由的方式,能够提高节点负载均衡能力,并能很好地支持带权分治的诉求
2 一致性哈希代码实现
从第 2 章开始,我们一起从零到一手撸出一个一致性哈希算法模块.
2.1 架构设计
在架构上,可以拆分为三个部分:
• 核心算法模块:聚焦了一致性哈希算法的核心逻辑,如节点、数据的增删改查处理逻辑. 这部分核心逻辑由 sdk 实现,以供复用.
• 哈希环:主要负责元数据的存储,内聚了包括虚拟节点存储、虚拟节点-真实节点映射、真实节点-状态数据映射等内容. 哈希环是一个抽象的 interface,可以由使用方自行实现后完成注入. 在项目中,我给出了基于单机跳表以及 redis zset 两种哈希环的实现版本,以供大家选择使用.
• 哈希编码器:实现将节点、数据映射到哈希环上指定位置的能力. 是一个抽象的 interface. 可以由使用方实现后完成注入. 在项目中,我给出了基于 murmur3 哈希算法实现的版本,以供大家选择使用.
本章我们主要聚焦探讨算法模块的内容,有关于哈希环的两种实现版本,我们放在本文第 3、4 章再作展开.
2.2 核心类
2.2.1 一致性哈希服务
下面是关于一致性哈希服务的类定义,该类会作为使用一致性哈希模块的统一入口:
// 一致性哈希服务
type ConsistentHash struct {
// 哈希环,是核心存储模块. 包括虚拟节点到真实节点的映射关系、真实节点对应的虚拟节点个数、以及哈希环上各个节点的位置
hashRing HashRing
// 数据迁移器. 当节点数量发生变更时,会使用迁移器完成状态数据的迁移
migrator Migrator
// 哈希散列函数,用于将原始输入内容映射到哈希环上的某个位置
encryptor Encryptor
// 用户自定义配置项
opts ConsistentHashOptions
}
func NewConsistentHash(hashRing HashRing, encryptor Encryptor, migrator Migrator, opts ...ConsistentHashOption) *ConsistentHash {
ch := ConsistentHash{
hashRing: hashRing,
migrator: migrator,
encryptor: encryptor,
}
for _, opt := range opts {
opt(&ch.opts)
}
repair(&ch.opts)
return &ch
}
在一致性哈希服务类 ConsistentHash 中包含了几个核心成员属性:
• hashRing:哈希环. 内聚了元数据的存储能力. 是一个抽象 interface,由使用方自行实现后完成注入. 项目中也提供了跳表版和 redis 版实现可供选择使用.
• migrator:迁移器,在哈希环中节点数量发生变更时,用于回调完成数据迁移的闭包函数. 由使用方自行定义执行逻辑,完成注入.
• encryptor:哈希编码器. 需要支持将节点、数据映射到哈希环指定位置的寻址能力. 是一个抽象 interface,由使用方自行实现完成注入,项目中提供了默认的 murmur3 版本.
2.2.2 哈希环
哈希环 HashRing 是一个抽象 interface,其中定义了诸多方法,使用方需要遵循各方法的语义进行实现:
type HashRing interface {
// 锁住整个哈希环,在分布式场景下需要使用分布式锁. expireSeconds 为锁的过期时间
Lock(ctx context.Context, expireSeconds int) error
// 解锁哈希环
Unlock(ctx context.Context) error
// 将一个节点添加到哈希环中, 其中 virtualScore 为虚拟节点在哈希环中的位置,nodeID 为真实节点的 index
Add(ctx context.Context, virtualScore int32, nodeID string) error
// 在哈希环中找到 virtualScore 顺时针往下的第一个虚拟节点位置
Ceiling(ctx context.Context, virtualScore int32) (int32, error)
// 在哈希环中好到 virtualScore 逆时针往上的第一个虚拟节点位置
Floor(ctx context.Context, virtualScore int32) (int32, error)
// 在哈希环 virtualScore 位置移除一个真实节点
Rem(ctx context.Context, virtualScore int32, nodeID string) error
// 查询哈希环中全量的真实节点,返回的结果为 map,其中 key 为真实节点 index,val 为真实节点对应的虚拟节点个数
Nodes(ctx context.Context) (map[string]int, error)
// 设置一个真实节点对应的虚拟节点个数,同时该操作背后的含义是将一个真实节点添加到一致性哈希模块中
AddNodeToReplica(ctx context.Context, nodeID string, replicas int) error
// 删除一个真实节点对应的虚拟节点个数,同时该操作背后的含义是将一个真实节点从一致性哈希模块中删除
DeleteNodeToReplica(ctx context.Context, nodeID string) error
// 查询哈希环 virtualScore 位置上对应的真实节点列表
Node(ctx context.Context, virtualScore int32) ([]string, error)
// 查询某个真实节点中存储的状态数据的 key 集合
DataKeys(ctx context.Context, nodeID string) (map[string]struct{}, error)
// 将一系列状态数据的 key 添加与某个真实节点建立映射关系
AddNodeToDataKeys(ctx context.Context, nodeID string, dataKeys map[string]struct{}) error
// 将一系列状态数据的 key 删除与某个真实节点的映射关系
DeleteNodeToDataKeys(ctx context.Context, nodeID string, dataKeys map[string]struct{}) error
}
哈希环涉及的方法较多,根据各大类职能拆分如下:
• 并发安全:
• Lock:哈希环加锁
• Unlock:哈希环解锁
• 节点存储管理:
• Add:真实节点入环
• Rem:真实节点出环
• Ceiling:寻找顺时针下一个虚拟节点
• Floor:寻找逆时针上一个虚拟节点
• 真实节点与虚拟节点映射:
• Nodes:全量的真实节点以及虚拟节点个数映射
• AddNodeToReplica:存储真实节点对应的虚拟节点个数
• DeleteNodeToReplica:删除真实节点对应的虚拟节点个数
• Node:查询虚拟节点对应的真实节点列表
• 真实节点与状态数据映射:
• DataKeys:查询一个真实节点下的全量状态数据 key
• AddNodeToDataKeys:添加某个真实节点与状态数据 key 的映射
• DeleteNodeToDataKey:删除真实节点与状态数据 key 的映射
此处值得一提的是,由于哈希冲突以及取模运算的存在,导致可能存在多个真实节点共用一个虚拟节点的情况. 此时,虚拟节点的节点列表 nodeIDs 的长度值会大于 1.
此时,我会遵循节点进入 nodeIDs 列表的先后顺序,保证在每次存储和查询数据时,都以列表中首个节点作为操作的目标.
2.2.3 哈希散列器
哈希编码器用于将原始输入映射到哈希环上的某个位置:
type Encryptor interface {
Encrypt(origin string) int32
}
这个模块被定义成一个抽象 interface,可以由使用方自行实现.
在项目中我也提供了一个 基于 murmur3 哈希算法实现的版本:
type MurmurHasher struct {
}
func NewMurmurHasher() *MurmurHasher {
return &MurmurHasher{}
}
func (m *MurmurHasher) Encrypt(origin string) int32 {
hasher := murmur3.New32()
_, _ = hasher.Write([]byte(origin))
return int32(hasher.Sum32() % math.MaxInt32)
}
• 首先,基于 murmur3 哈希算法将原始输入映射为一个 uint32 整数
• 接下来这个 uint32 整数对 math.MaxInt32 取模,获得其在哈希环上的位置
从上述流程可以看出,在这个 murmur3 版本实现中,哈希环的长度为 math.MaxInt32(2^31 - 1),哈希环上的虚拟节点数值范围为 0~2^32 - 2
2.2.4 数据迁移器
在一致性哈希算法原理篇中,我会提过,随着真实节点的出入环操作,状态数据和虚拟节点间映射关系会随之改变,因此需要执行局部数据迁移操作.
为解决这一诉求,此处预留了一个执行数据迁移的回调入口,保证在节点发生变更时,使用方能够按照自定义的逻辑完成状态数据的迁移. (这个操作可能会比较重)
// 用户需要注册好闭包函数进来,核心是执行数据迁移操作的
type Migrator func(ctx context.Context, dataKeys map[string]struct{}, from, to string) error
2.3 添加节点
下面理一下,在一致性哈希算法模块中,添加一个新节点的处理流程:
2.3.1 主流程
添加节点的核心步骤包括:
• 基于哈希环维度加锁,避免并发问题引起数据不一致
• 需要校验节点是否存在,避免重复添加
• 通过用户传入的权重值,确定对应的虚拟节点个数——replicas
• 执行哈希编码,在 replicas 个虚拟节点中添加和节点的映射关系
• 每完成一个虚拟节点的处理,将执行数据迁移的任务进行聚合,最后统一调用迁移函数
其它更详细的内容通过代码注释的方式给出:
// 在一致性哈希模块中添加新的节点
func (c *ConsistentHash) AddNode(ctx context.Context, nodeID string, weight int) error {
// 1 加全局分布式锁
if err := c.hashRing.Lock(ctx, c.opts.lockExpireSeconds); err != nil {
return err
}
// 2 defer 保证返回前执行解锁操作
defer func() {
_ = c.hashRing.Unlock(ctx)
}()
// 3 如果节点已经存在了,直接返回重复添加节点的错误
nodes, err := c.hashRing.Nodes(ctx)
if err != nil {
return err
}
for node := range nodes {
if node == nodeID {
return errors.New("repeat node")
}
}
// 4 根据用户传入的节点权重值 weight 以及配置项中配置好放大系数 replicas,计算出这个真实节点对应的虚拟节点个数
replicas := c.getValidWeight(weight) * c.opts.replicas
// 5 将计算得到的 replicas 个数与 nodeID 的映射关系放到 hash ring 中,同时也能标识出当前 nodeID 已经存在
if err = c.hashRing.AddNodeToReplica(ctx, nodeID, replicas); err != nil {
return err
}
// 6 按照虚拟节点的个数,分别将虚拟节点添加到哈希环中
var migrateTasks []func()
for i := 0; i < replicas; i++ {
// 6 使用 encryptor,推算出对应的 k 个虚拟节点的数值
nodeKey := c.getRawNodeKey(nodeID, i)
virtualScore := c.encryptor.Encrypt(nodeKey)
// 7 将一个虚拟节点添加到 hash ring 当中
if err := c.hashRing.Add(ctx, virtualScore, nodeKey); err != nil {
return err
}
// 8 调用 migrateIn 方法,获取需要执行的数据迁移任务信息
// from: 数据迁移起点的节点 id
// to: 数据迁移终点的节点 id
// data: 需要迁移的状态数据的 key
from, to, datas, err := c.migrateIn(ctx, virtualScore, nodeID)
if err != nil {
return err
}
// 倘若待迁移的数据长度为 0,则直接跳过
if len(datas) == 0 {
continue
}
// 数据迁移任务不是立即执行,只是追加到 list 中,最后会在 batchExecuteMigrator 方法中统一执行
migrateTasks = append(migrateTasks, func() {
_ = c.migrator(ctx, datas, from, to)
})
}
// 9 批量执行数据迁移任务
c.batchExecuteMigrator(migrateTasks)
return nil
}
2.3.2 数据迁移
每次新增虚拟节点入环时,需要从后继节点中找到从属于新节点的状态数据完成迁移操作.
以下图为例,倘若在哈希环中插入了 G,则需要从顺时针往下的 C 中,找到属于 (B,G] 范围的数据,将其迁移到新插入的 G 中.
下面是获取本次迁移任务明细的流程,在返回的响应结果中,from 为迁移的起点(真实节点);to 为迁移的终点(真实节点);datas 为需要执行迁移的状态数据的 key 集合.
migrateIn 方法的细节通过代码注释给出:
// 在 AddNode 添加节点流程中,获取需要执行数据迁移的任务明细
// from 数据迁移起点的节点 id
// to 数据迁移终点的节点 id
// datas 需要迁移的状态数据的 key
func (c *ConsistentHash) migrateIn(ctx context.Context, virtualScore int32, nodeID string) (from, to string, datas map[string]struct{}, _err error) {
// 1 倘若使用方没有注入迁移函数,则直接返回
if c.migrator == nil {
return
}
// 2 根据虚拟节点数值 virtualScore 查询哈希环,查看其映射的真实节点列表
// 2.2.2 小节中解释了,由于哈希冲突以及取模运算,导致可能存在一个虚拟节点数值对应多个真实节点的情况
nodes, err := c.hashRing.Node(ctx, virtualScore)
if err != nil {
_err = err
return
}
// 3 倘若 virtualScore 对应节点数量大于 1,则说明当前节点不是列表中 index = 0 的首个真实节点,则说明不需要执行数据迁移操作. 因为状态数据只会被分配到列表的首个节点中.
if len(nodes) > 1 {
return
}
// 4 执行 floor 操作,获取当前虚拟节点数值 virtualScore 逆时针往上的第一个虚拟节点数值 lastScore
lastScore, err := c.hashRing.Floor(ctx, c.decrScore(virtualScore))
if err != nil {
_err = err
return
}
// 5 倘若哈希环上不存在其他的虚拟节点,则无需执行数据迁移操作
if lastScore == -1 || lastScore == virtualScore {
return
}
// 6 执行 ceiling 操作,获取当前虚拟节点数值 virtualScore 顺时针往下的第一个虚拟节点数值 nextScore
nextScore, err := c.hashRing.Ceiling(ctx, c.incrScore(virtualScore))
if err != nil {
_err = err
return
}
// 7 倘若哈希环上不存在其他虚拟节点,则无需执行数据迁移操作
if nextScore == -1 || nextScore == virtualScore {
return
}
// 8 patternOne: last-0-cur-next 具体解释在代码下方给出
patternOne := lastScore > virtualScore
// 9 patternTwo: last-cur-0-next 具体解释在代码下方给出
patternTwo := nextScore < virtualScore
if patternOne {
lastScore -= math.MaxInt32
}
if patternTwo {
virtualScore -= math.MaxInt32
lastScore -= math.MaxInt32
}
// 10 获取到 nextScore 对应的真实节点列表
nextNodes, err := c.hashRing.Node(ctx, nextScore)
if err != nil {
_err = err
return
}
// 11 倘若 nextScore 对应的真实节点列表为空,直接返回
if len(nextNodes) == 0 {
return
}
// 12 获取到 nextScore 首个真实节点对应的状态数据的 key 列表
dataKeys, err := c.hashRing.DataKeys(ctx, c.getNodeID(nextNodes[0]))
if err != nil {
_err = err
return
}
datas = make(map[string]struct{})
// 13 遍历状态数据 key 列表,将其中满足迁移条件的部分添加到 datas 当中
for dataKey := range dataKeys {
// 14 依次将每个状态数据的 key 映射到哈希环上的某个位置
dataVirtualScore := c.encryptor.Encrypt(dataKey)
// 15 对应于 patternOne,需要将 (last,max] 范围内的数据统一减去哈希环的长度,具体原因在代码下方解释
if patternOne && dataVirtualScore > (lastScore+math.MaxInt32) {
dataVirtualScore -= math.MaxInt32
}
// 16 对应于 patternTwo,将数据统一减去哈希环的长度,具体原因在代码下方解释
if patternTwo {
dataVirtualScore -= math.MaxInt32
}
// 17 倘若数据不属于 (lastScore,virtuaslScore] 的范围,则无需迁移
if dataVirtualScore <= lastScore || dataVirtualScore > virtualScore {
continue
}
// 18 将需要迁移的数据添加到 datas 中
datas[dataKey] = struct{}{}
}
// 19 从 nextScore 对应的首个真实节点中删除这部分需要迁移的数据的 key
if err = c.hashRing.DeleteNodeToDataKeys(ctx, c.getNodeID(nextNodes[0]), datas); err != nil {
return "", "", nil, err
}
// 20 将这部分需要迁移的数据的 key 添加到新增节点 nodeID 中
if err = c.hashRing.AddNodeToDataKeys(ctx, nodeID, datas); err != nil {
return "", "", nil, err
}
// 21 返回结果
// 迁移起点 from: nextScore 对应的首个真实节点
// 迁移终点 to:新增节点 nodeID
// 迁移数据:datas
return c.getNodeID(nextNodes[0]), nodeID, datas, nil
}
// 虚拟节点数值加 1 (倘若来到环的终点,则将其置为 0)
func (c *ConsistentHash) incrScore(score int32) int32 {
if score == math.MaxInt32-1 {
return 0
}
return score + 1
}
// 虚拟节点数值减 1(倘若穿过环的起点,则将其置为 math.MaxInt32 - 1)
func (c *ConsistentHash) decrScore(score int32) int32 {
if score == 0 {
return math.MaxInt32 - 1
}
return score - 1
}
在代码注释 8、9 项中提到的 patternOne 和 patternTwo,指的是新插入节点 cur、逆时针上一个节点 prev 和顺时针下一个节点 next 之间存在一些特殊的位置关系. 具体展示如下:
• patternOne:
在 patternOne 中,存在的位置关系为 prev-0-cur-next. 即在 prev 和 cur 之间经过了哈希环的起点.
此时我们需要迁移的数据由 (prev, math.Int32) + [0, cur] 两部分共同组成.
• patternTwo:
在 patternTwo 中,存在的位置关系为 prev-cur-0-next,即在 cur 和 next 之间经过了哈希环的起点.
此时我们需要迁移的数据处于 (prev,cur] 的范围.
• 批量迁移:
在完成所有虚拟节点的插入操作后,会调用 batchExecuteMigrator 方法并发执行之前集成好的数据迁移操作.
func (c *ConsistentHash) batchExecuteMigrator(migrateTasks []func()) {
// 执行所有的数据迁移任务
var wg sync.WaitGroup
for _, migrateTask := range migrateTasks {
// shadow
migrateTask := migrateTask
wg.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
// log ...
}
wg.Done()
}()
migrateTask()
}()
}
wg.Wait()
}
2.4 删除节点
接下来梳理一下删除节点的流程.
2.4.1 主流程
当需要从一致性哈希模块删除一个节点时,涉及的核心步骤如下:
• 对哈希环加锁,避免产生并发问题
• 校验哈希环中是否存在目标节点
• 查询节点对应的虚拟节点个数
• 依次通过哈希编码器,检索哈希环上的各个虚拟节点
• 遍历每个虚拟节点,将待删除节点从虚拟节点的列表中删除
• 执行数据迁移操作
更完整的技术细节通过代码注释的方式给出:
// 从一致性哈希模块中删除节点
func (c *ConsistentHash) RemoveNode(ctx context.Context, nodeID string) error {
// 1 加全局分布式锁
if err := c.hashRing.Lock(ctx, c.opts.lockExpireSeconds); err != nil {
return err
}
// 2 通过 defer 保证方法退出前进行解锁
defer func() {
_ = c.hashRing.Unlock(ctx)
}()
// 3 查询哈希环中存在的所有真实节点
nodes, err := c.hashRing.Nodes(ctx)
if err != nil {
return err
}
// 4 校验待删除的节点在哈希环中是否存在
var (
nodeExist bool
replicas int
)
for node, _replicas := range nodes {
if node == nodeID {
nodeExist = true
replicas = _replicas
break
}
}
// 5 如果待删除的节点不存在,直接抛错返回
if !nodeExist {
return errors.New("invalid node id")
}
// 6 从哈希环中删除节点与虚拟节点个数的映射信息,这个操作背后的含义就是从哈希环中删去这个真实节点
if err = c.hashRing.DeleteNodeToReplica(ctx, nodeID); err != nil {
return err
}
var migrateTasks []func()
// 7 根据真实节点对应的虚拟节点个数,开始执行对应虚拟节点的删除操作
for i := 0; i < replicas; i++ {
// 8 使用 encryptor,推算出对应的 k 个虚拟节点数值
virtualScore := c.encryptor.Encrypt(fmt.Sprintf("%s_%d", nodeID, i))
// 9 首先调用 migrateOut 方法,获取迁移任务的明细信息
from, to, datas, err := c.migrateOut(ctx, virtualScore, nodeID)
if err != nil {
return err
}
// 10 从哈希环对应虚拟节点数值 virtualScore 的位置删去这个真实节点 nodeID
nodeKey := c.getRawNodeKey(nodeID, i)
if err = c.hashRing.Rem(ctx, virtualScore, nodeKey); err != nil {
return err
}
// 11 倘若待迁移的数据长度为 0,则直接跳过
if len(datas) == 0 {
continue
}
// 12 数据迁移任务不是立即执行,而是放在方法返回前统一批量执行
migrateTasks = append(migrateTasks, func() {
_ = c.migrator(ctx, datas, from, to)
})
}
// 13 批量执行数据迁移任务
c.batchExecuteMigrator(migrateTasks)
return nil
}
2.4.2 数据迁移
在删除节点时,也需要执行数据迁移任务,以下图为例:
当需要删除 G 时,需要把 G 中承载的从属于 (B,G] 范围的数据托付给顺时针往下的 C.
migrateOut 用于检索到一笔迁移任务的起点、终点以及待迁移数据的 key 集合. 核心步骤包括:
• 首先找到哈希环顺指针往下的第一个满足托付条件的真实节点
• 遍历被删除节点的数据,将满足迁移条件的数据 key 摘取出来
完整内容参见代码及注释:
// 获取在删除节点流程中,需要执行数据迁移任务的明细
// from:数据迁移起点的节点 id
// to:数据迁移终点的节点 id
// datas: 需要迁移的数据 key
func (c *ConsistentHash) migrateOut(ctx context.Context, virtualScore int32, nodeID string) (from, to string, datas map[string]struct{}, err error) {
// 1 使用方没有注入迁移函数,则直接返回
if c.migrator == nil {
return
}
// 2 defer 保证方法返回前执行:
// 2.1 从迁移起点的节点中将状态数据 key 删除
// 2.2 在迁移终点的节点中添加状态数据 key
defer func() {
if err != nil {
return
}
if to == "" || len(datas) == 0 {
return
}
if err = c.hashRing.DeleteNodeToDataKeys(ctx, nodeID, datas); err != nil {
return
}
err = c.hashRing.AddNodeToDataKeys(ctx, to, datas)
}()
from = nodeID
// 3 查看虚拟节点数值对应的真实节点列表
nodes, _err := c.hashRing.Node(ctx, virtualScore)
if _err != nil {
err = _err
return
}
// 4 如果真实节点不存在,直接返回(异常情况)
if len(nodes) == 0 {
return
}
// 5 如果待删除节点不是列表中的首个节点,直接返回. 因为非首节点不会存放对应于该 virtualsCore 的状态数据
if c.getNodeID(nodes[0]) != nodeID {
return
}
// 6 查看待删除节点中存放的状态数据 key 列表
var allDatas map[string]struct{}
if allDatas, err = c.hashRing.DataKeys(ctx, nodeID); err != nil {
return
}
// 7 倘若不存在状态数据,则无需迁移,直接返回
if len(allDatas) == 0 {
return
}
// 8 查询哈希环中虚拟节点数值 virtualScore 逆时针往前的第一个虚拟节点数值 lastScore
lastScore, _err := c.hashRing.Floor(ctx, c.decrScore(virtualScore))
if _err != nil {
err = _err
return
}
// 9 倘若哈希环中不存在其他的虚拟节点,则需要校验当前真实节点是否是环中的唯一真实节点,是的话直接报错,否则会需要将全量数据委托给列表中的下一个真实节点
var onlyScore bool
if lastScore == -1 || lastScore == virtualScore {
if len(nodes) == 1 {
err = errors.New("no other no")
return
}
onlyScore = true
}
// 10 判断是否是 lastScore-0-virtualScore-nextScore 的组成形式
pattern := lastScore > virtualScore
if pattern {
lastScore -= math.MaxInt32
}
datas = make(map[string]struct{})
// 11 遍历待删除节点中的状态数据 key,将需要进行迁移的数据添加到 datas 中
for data := range allDatas {
// 12 如果整个哈希环只有一个 virtualScore,且对应的真实节点数量为多个,则直接将全量数据委托给列表中的下一个真实节点
if onlyScore {
datas[data] = struct{}{}
continue
}
// 13 将位置位于 (lastScore, virtualScore] 的数据添加到 datas,需要进行迁移
dataScore := c.encryptor.Encrypt(data)
if pattern && dataScore > lastScore+math.MaxInt32 {
dataScore -= math.MaxInt32
}
if dataScore <= lastScore || dataScore > virtualScore {
continue
}
datas[data] = struct{}{}
}
// 14 如果当前 virtualScore 下存在多个节点,则直接委托给下一个节点
if len(nodes) > 1 {
to = c.getNodeID(nodes[1])
return
}
// 15 寻找后继节点
if to, err = c.getValidNextNode(ctx, virtualScore, nodeID, nil); err != nil {
err = _err
return
}
// 16 倘若未找到满足条件的后继节点,抛出错误
if to == "" {
err = errors.New("no other node")
}
return
}
在 migrateOut 方法中,getValidNextNode 方法用于满足迁移条件的后继节点,这里一方面需要考虑位置关系,另一方面要考虑后继节点不能和待删除节点是同一个真实节点.
func (c *ConsistentHash) getValidNextNode(ctx context.Context, score int32, nodeID string, ranged map[int32]struct{}) (string, error) {
// 1 首先通过 ceiling 方法,在哈希环上寻找顺时针往下的第一个虚拟节点数值 nextScore
nextScore, err := c.hashRing.Ceiling(ctx, c.incrScore(score))
if err != nil {
return "", err
}
// 2 nextScore 不存在,直接返回
if nextScore == -1 {
return "", nil
}
// 3 倘若已经检索了一整轮还未找到目标,直接返回
if _, ok := ranged[nextScore]; ok {
return "", nil
}
// 4 获取 nextScore 对应的真实节点列表
nextNodes, err := c.hashRing.Node(ctx, nextScore)
if err != nil {
return "", err
}
// 5 倘若不存在真实节点,抛错返回(这是不符合预期的. 一个 virtualScore 在真实节点列表为空时,会直接从环中移除)
if len(nextNodes) == 0 {
return "", errors.New("next node empty")
}
// 6 nextScore 对应的首个真实节点是否非当前待删除节点,如果满足条件直接返回,作为托付的后继节点
if nextNode := c.getNodeID(nextNodes[0]); nextNode != nodeID {
return nextNode, nil
}
// 7 倘若 nextScore 对应的真实节点列表长度大于 1,且首个真实节点对应为待删除节点,则取第二个节点作为后继节点
if len(nextNodes) > 1 {
return c.getNodeID(nextNodes[1]), nil
}
if ranged == nil {
ranged = make(map[int32]struct{})
}
ranged[score] = struct{}{}
// 8 倘若当前找到的 nextScore 对应的真实节点为待删除节点本身,则递归向下检索
return c.getValidNextNode(ctx, nextScore, nodeID, ranged)
}
2.5 查询数据对应节点
在执行一笔状态数据的读、写请求时,需要通过一致性哈希模块,检索到数据所对应的真实节点. 该流程核心步骤如下:
• 哈希环加锁,保证并发安全
• 通过哈希编码器,找到数据在哈希环上的位置
• 找到顺时针往下的第一个虚拟节点
• 找到虚拟节点对应的首个真实节点(可能存在一对多)
• 建立真实节点与状态数据之间的映射关系
func (c *ConsistentHash) GetNode(ctx context.Context, dataKey string) (string, error) {
// 1 加全局分布式锁
if err := c.hashRing.Lock(ctx, c.opts.lockExpireSeconds); err != nil {
return "", err
}
// 2 defer 保证方法退出前解锁
defer func() {
_ = c.hashRing.Unlock(ctx)
}()
// 3 输入一个数据的 key,根据 encryptor 计算出其从属于哈希环的位置 dataScore
dataScore := c.encryptor.Encrypt(dataKey)
// 4 执行 ceiling 检索流程,沿着顺时针方向,找到当前 dataKey 对应 dataScore 的下一个虚拟节点数值 ceilingScore
ceilingScore, err := c.hashRing.Ceiling(ctx, dataScore)
if err != nil {
return "", err
}
// 5 倘若未找到目标,则说明没有可用的目标节点
if ceilingScore == -1 {
return "", errors.New("no node available")
}
// 6 查询 ceilingScore 对应的真实节点列表
nodes, err := c.hashRing.Node(ctx, ceilingScore)
if err != nil {
return "", err
}
// 7 倘若真实节点列表为空直接返回错误(异常情况)
if len(nodes) == 0 {
return "", errors.New("no node available with empty score")
}
// 8 为 dataKey 选中真实节点后,需要将 dataKey 添加到真实节点的状态数据 key 列表中
if err = c.hashRing.AddNodeToDataKeys(ctx, c.getNodeID(nodes[0]), map[string]struct{}{
dataKey: {},
}); err != nil {
return "", err
}
// 9 返回选中的目标节点
return nodes[0], nil
}
至此,一致性哈希算法模块的内容我们介绍完毕,下面我们来探讨两种具体的哈希环实现方式.
3 单机版哈希环
第 3 章主要探讨如何基于单机跳表实现哈希环.
3.1 数据结构
首先,由于虚拟节点路由的存在,在哈希环中可能存在较大的数量的虚拟节点数据,且虚拟节点根据在哈希环上所处的位置,存在明确的顺序关系.
这里我选择基于有序表的方式来实现哈希环,以虚拟节点的数值(在哈希环上的位置)作为排序的键,这样每次在新增、删除、检索虚拟节点时,都能基于有序表的特性,做到基于 O(logN) 对数级别的时间复杂度完成任务.
在有序表的数据结构选型上,我个人比较推崇使用跳表 skiplist,因此本章我会基于跳表实现一款单机版的哈希环.
3.2 单机锁
基于单机跳表实现的哈希环也需要在支持加锁功能,以保证一些核心流程不产生并发问题.
同时,为了保持哈希环 interface 协议的一致性,此处的单机锁还需要支持一个过期后自动释放的功能. 这里我基于阻塞型和非阻塞型,分别实现了两个版本单机锁.
3.2.1 核心类
首先是基于异步编程实现的阻塞型版本的单机锁,其中包含如下图所示的几个核心成员属性:
// 带有过期释放功能的单机锁
type LockEntity struct {
// 内置一把 sync.Mutex,加解锁操作实际上是对它进行操作
lock sync.Mutex
// 另一把锁,加解锁流程会是一系列符合操作,为了保证操作的连贯性,需要通过 doubleLock 进行加锁
doubleLock sync.Mutex
// 守护协程声明周期控制器,用于停止过期释放锁的守护协程
// 为了保证单机锁过期时被自动释放,会异步启动一个守护协程负责在过期时进行解锁
cancel context.CancelFunc
// 标识锁的所有者身份
owner atomic.Value
}
3.2.2 加锁
为了保证单机锁具有过期后自动释放的能力,在加锁时需要额外开启一个异步守护协程执行解锁操作:
加锁方法核心步骤如下:
• 加 doubleLock,保证后续操作流程的原子性
• 获取 lock,调用 mutex.Lock 执行真正的加锁操作
• 加锁成功后,把 owner 字段设置为当前 goroutine 的身份标识 token,用于解锁时作身份校验
• 倘若设了过期时间,需要开启一个 goroutine,在指定时长后进行解锁操作
• 针对这个异步启动的 goroutine,通过 context 控制其生命周期,保证在手动释放锁后,会停止这个异步 goroutine,避免重复解锁以及协程泄漏问题
// 锁住哈希环,支持配置过期时间. 达到过期时间后,会自动释放锁
func (s *SkiplistHashRing) Lock(ctx context.Context, expireSeconds int) error {
// 1 首先加 doubleLock,保证加锁流程一系列复合步骤的连贯性
s.doubleLock.Lock()
defer s.doubleLock.Unlock()
// 2 执行真正的加锁操作
s.lock.Lock()
// 3 加锁成功后,将当前 goroutine 的身份标识作为 token
token := os.GetCurrentProcessAndGogroutineIDStr()
// 4 将当前 goroutine 的 token 进行存储,表示其对锁的所有券
s.owner.Store(token)
// 5 倘若传入的过期时间非正值,则直接返回,不启动解锁的异步 goroutine
if expireSeconds <= 0 {
return nil
}
// 6 开启异步 goroutine,在指定时长后进行解锁. 并通过 context cancel 函数控制 goroutine 的生命周期
cctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
go func() {
// 7 倘若锁已经被主动释放,则该 goroutine 会直接退出
select {
case <-cctx.Done():
return
// 8 达到指定时长后,会进行解锁操作
case <-time.After(time.Duration(expireSeconds) * time.Second):
s.unlock(ctx, token)
}
}()
return nil
}
下面基于 goroutine 获取身份标识 token 的方法:此处会通过进程的 pid 结合协程的 goroutine_id 拼接生成一个全局唯一的二维字符串.
// 获取由进程id+协程id组成的二位标识字符串
func GetCurrentProcessAndGogroutineIDStr() string {
pid := GetCurrentProcessID()
goroutineID := GetCurrentGoroutineID()
return fmt.Sprintf("%d_%s", pid, goroutineID)
}
// 获取当前的协程id
func GetCurrentGoroutineID() string {
buf := make([]byte, 128)
buf = buf[:runtime.Stack(buf, false)]
stackInfo := string(buf)
return strings.TrimSpace(strings.Split(strings.Split(stackInfo, "[running]")[0], "goroutine")[1])
}
// 获取当前的进程id
func GetCurrentProcessID() int {
return os.Getpid()
}
3.2.3 解锁
解锁流程的核心步骤如下:
• 获取当前 goroutine 的身份标识 token
• 加 doubleLock,保证后续流程的原子性
• 校验锁的 owner 字段是否于 token 一致
• 将 owner 字段置为空,代表锁未占用
• 倘若存在异步 goroutine,调用 cancel 将其释放
• 调用 mutex.Unlock 执行真正的解锁操作
// 解锁释放哈希环
func (s *SkiplistHashRing) Unlock(ctx context.Context) error {
// 获取当前 goroutine token
token := os.GetCurrentProcessAndGogroutineIDStr()
return s.unlock(ctx, token)
}
// 私有的解锁方法, token 由上层传入
func (s *SkiplistHashRing) unlock(ctx context.Context, token string) error {
// 1 首先加 doubleLock,保证解锁流程一系列复合步骤的连贯性
s.doubleLock.Lock()
defer s.doubleLock.Unlock()
// 2 解锁时发现锁不属于自己,直接终止解锁流程
owner, _ := s.owner.Load().(string)
if owner != token {
return errors.New("not your lock")
}
// 3 需要解锁了,则将 owner 清空,代表锁不被任何人占用
s.owner.Store("")
// 4 倘若 cancel 函数非空,则执行. 用于将过期释放锁的 goroutine 停止
if s.cancel != nil {
s.cancel()
}
// 5 执行真正的解锁操作
s.lock.Unlock()
return nil
}
3.2 小节中结合对 goroutine 和 context 的使用,通过异步编程的方式向大家展示了一种带有过期功能的阻塞型单机锁.
下面在 3.3 小节中向大家展示一种简化版的实现方式,这部分会基于同步串行的流程进行实现.
3.3 单机锁(简化版)
3.3.1 核心类
简化版单机锁的核心成员属性如下图所示:
type LockEntityV2 struct {
// 标识锁是否被占用,需要结合 expireAt 字段来看
locked bool
// 用于保证流程原子性的辅助锁
mutex sync.Mutex
// 锁的过期释放时间
expireAt time.Time
// 锁的持有者身份标识
owner string
}
func NewLockEntityV2() *LockEntityV2 {
return &LockEntityV2{}
}
3.3.2 加锁
加锁流程的核心步骤为:
• 加 mutex,保证后续流程的原子性
• 基于 locked 标志校验锁是否已被持有
• 基于 expireAt 标志校验锁是否已过期释放
• 加锁时做以下设置:
• 设置 locked 标识为 true
• 设置 expireAt 为过期释放的时间点
• 设置 owner 为当前 goroutine 的身份标识
func (l *LockEntityV2) Lock(ctx context.Context, expireSeconds int) error {
l.mutex.Lock()
defer l.mutex.Unlock()
now := time.Now()
if !l.locked || l.expireAt.Before(now) {
l.locked = true
l.expireAt = now.Add(time.Duration(expireSeconds) * time.Second)
l.owner = utils.GetProcessAndGoroutineIDStr()
return nil
}
return errors.New("accquire by others")
}
3.3.3 解锁
解锁流程的核心步骤为:
• 加 mutex,保证后续流程的原子性
• 校验 locked 标识以及 expireAt 标识,判断锁是否已被释放
• 校验 owner 标识,判断锁是否属于自己
• 解锁时将 locked 标识设置为 false
func (l *LockEntityV2) Unlock(ctx context.Context) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.locked || l.expireAt.Before(time.Now()) {
return errors.New("not locked")
}
if l.owner != utils.GetProcessAndGoroutineIDStr() {
return errors.New("not your lock")
}
l.locked = false
return nil
}
3.4 跳表
聊完单机锁部分,下面进入跳表哈希环的核心数据结构和方法链路部分.
本文探讨的终点是一致性哈希算法,本章更多地是在探讨如何基于跳表实现哈希环结构,至于跳表本身的结构特性,不属于本文探讨的范畴,大家如果对这部分内容感兴趣的话,可以阅读我之前发表的文章——基于 golang 从零到一实现跳表.
3.4.1 核心类
在跳表版本哈希环的类定义中,包含如下核心成员属性:
• LockEntity:带有过期释放功能的单机锁,锁维度的是整个哈希环
• root:跳表的根节点,类型为 virtualNode
• nodeToReplicas:记录了每个节点对应的虚拟节点个数
• nodeToDataKey:记录了每个节点存储的状态数据的 key
// 基于本地跳表实现一个哈希环
type SkiplistHashRing struct {
LockEntity
root *virtualNode
// 每个真实节点对应的虚拟节点个数
nodeToReplicas map[string]int
// 每个真实节点存储的状态数据 key
nodeToDataKey map[string]map[string]struct{}
}
func NewSkiplistHashRing() *SkiplistHashRing {
return &SkiplistHashRing{
root: &virtualNode{},
nodeToReplicas: make(map[string]int),
nodeToDataKey: make(map[string]map[string]struct{}),
}
}
virtualNode 是哈希环中的虚拟节点:
• score:虚拟节点在哈希环上的位置
• nodeIDs:虚拟节点对应的真实节点列表
• nexts:后继虚拟节点列表(对应于跳表的多层链表结构)
// 跳表中的每个节点,对应于一个虚拟节点数值.
type virtualNode struct {
// 虚拟节点数值,标识节点在哈希环上的位置
score int32
// 该虚拟节点对应的真实节点列表
nodeIDs []string
// 该虚拟节点的后继节点,由于使用的是跳表,因此是多层指针结构
nexts []*virtualNode
}
3.4.2 Add
Add 方法对应的是添加一个节点到哈希环某个虚拟节点位置的流程,其核心步骤为:
• 判断对应位置下虚拟节点是否已存在,倘若已存在,直接追加到 nodeIDs 列表中即可
• 倘若虚拟节点不存在. 则创建一个新的虚拟节点,roll 出对应的层高,然后将其插入到跳表中
// 真实节点入环. 将一个真实节点添加到 score 对应的虚拟节点中:
func (s *SkiplistHashRing) Add(ctx context.Context, score int32, nodeID string) error {
// 1 倘若对应于 score 的虚拟节点已经存在了,则直接将真实节点 nodeID 追加到虚拟节点对应的真实节点列表中即可
targetNode, ok := s.get(score)
if ok {
for _, _nodeID := range targetNode.nodeIDs {
if _nodeID == nodeID {
return nil
}
}
targetNode.nodeIDs = append(targetNode.nodeIDs, nodeID)
return nil
}
// 2 score 对应的虚拟节点还不存在. 则需要创建出一个新的虚拟节点.
// 3 通过随机数,得出新增的虚拟节点在跳表中的高度
rLevel := s.roll()
// 4 如有必要,对跳表最大高度进行扩容适配
if len(s.root.nexts) < rLevel+1 {
difs := make([]*virtualNode, rLevel+1-len(s.root.nexts))
s.root.nexts = append(s.root.nexts, difs...)
}
// 5 创建出新的虚拟节点
newNode := virtualNode{
score: score,
nexts: make([]*virtualNode, rLevel+1),
// 6 将真实节点挂载在虚拟节点的真实节点列表中
nodeIDs: []string{nodeID},
}
// 7 新节点添加进入跳表中
move := s.root
for level := rLevel; level >= 0; level-- {
for move.nexts[level] != nil && move.nexts[level].score < score {
move = move.nexts[level]
}
newNode.nexts[level] = move.nexts[level]
move.nexts[level] = &newNode
}
return nil
}
func (s *SkiplistHashRing) roll() int {
rander := rand.New(rand.NewSource(time.Now().UnixNano()))
var level int
for rander.Intn(2) == 1 {
level++
}
return level
}
3.4.3 Rem
Rem 方法对应为将一个真实节点从指定虚拟节点中删除的流程:
• 根据 score 获取到对应的虚拟节点
• 倘若虚拟节点的真实节点列表长度大于 1,则只需要将待删除节点从列表中剔除即可
• 倘若待删除节点是列表中唯一的节点,则直接将整个虚拟节点从跳表中删除
// 从哈希环 score 对应的虚拟节点中删除真实节点 nodeID
func (s *SkiplistHashRing) Rem(ctx context.Context, score int32, nodeID string) error {
// 1 倘若 score 对应的虚拟节点不存在,直接返回错误
targetNode, ok := s.get(score)
if !ok {
return fmt.Errorf("score: %d not exist", score)
}
// 2 检索真实节点 nodeID 在虚拟节点真实节点列表 nodeIDs 中的 index
index := -1
for i := 0; i < len(targetNode.nodeIDs); i++ {
if targetNode.nodeIDs[i] == nodeID {
index = i
break
}
}
// 3 倘若待删除的真实节点在虚拟节点中不存在,直接返回错误
if index == -1 {
return fmt.Errorf("node: %s not exist in score: %d", nodeID, score)
}
// 4 倘若虚拟节点中不只待删除节点 nodeID 这一个节点,则将其从列表中摘除即可
if len(targetNode.nodeIDs) > 1 {
targetNode.nodeIDs = append(targetNode.nodeIDs[:index], targetNode.nodeIDs[index+1:]...)
return nil
}
// 5 倘若虚拟节点中对应只有待删除节点 nodeID 这一个节点,则需要将整个虚拟节点从哈希环中摘除
move := s.root
for level := len(s.root.nexts) - 1; level >= 0; level-- {
for move.nexts[level] != nil && move.nexts[level].score < score {
move = move.nexts[level]
}
if move.nexts[level] == nil || move.nexts[level].score > score {
continue
}
move.nexts[level] = move.nexts[level].nexts[level]
}
// 6 如有必要,对跳表最大高度进行缩容
for level := 0; level < len(s.root.nexts); level++ {
if s.root.nexts[level] != nil {
continue
}
s.root.nexts = s.root.nexts[:level]
break
}
return nil
}
3.4.4 Ceiling
Ceiling 方法对应的是根据 score 在哈希环上找到顺时针往下第一个虚拟节点数值的流程:
• 首先执行跳表 ceiling 操作,找到跳表中 >= score 的第一个虚拟节点数值
• 倘若前一步未找到目标,则执行跳表 first 操作,获取到跳表中最小的虚拟节点数值
// 在哈希环中,找到 score 顺时针往下的第一个虚拟节点数值
func (s *SkiplistHashRing) Ceiling(ctx context.Context, score int32) (int32, error) {
// 1 执行跳表的 ceiling 方法,查看大于等于 score 的目标虚拟节点是否存在,倘若存在,直接返回
target, ok := s.ceiling(score)
if ok {
return target, nil
}
// 2 倘若 ceiling 方法没找到合适的目标,则调用 first 方法找到跳表中最小的虚拟节点进行返回 (因为哈希环是呈首尾衔接的环状)
first, _ := s.first()
return first, nil
}
下面是跳表的 ceiling 方法:
// 在跳表中找到数值 >= score 且最接近于 score 的虚拟节点数值
func (s *SkiplistHashRing) ceiling(score int32) (int32, bool) {
if len(s.root.nexts) == 0 {
return -1, false
}
move := s.root
for level := len(s.root.nexts) - 1; level >= 0; level-- {
for move.nexts[level] != nil && move.nexts[level].score < score {
move = move.nexts[level]
}
}
if move.nexts[0] == nil {
return -1, false
}
return move.nexts[0].score, true
}
下面是跳表的 first 方法:
// 找到跳表中最小的虚拟节点数值
func (s *SkiplistHashRing) first() (int32, bool) {
if len(s.root.nexts) == 0 {
return -1, false
}
return s.root.nexts[0].score, true
}
3.4.5 Floor
Floor 方法对应的是从哈希环中找到 score 逆时针往上第一个虚拟节点数值的流程:
• 首先执行跳表 floor 操作,找到跳表中 <= score 的第一个虚拟节点数值
• 倘若前一步未找到目标,执行跳表 last 操作,找到跳表中最大的虚拟节点数值
// 从哈希环中找到 score 逆时针向上的第一个虚拟节点数值
func (s *SkiplistHashRing) Floor(ctx context.Context, score int32) (int32, error) {
// 1 先尝试从跳表中找到小于等于 score 且最接近于 score 的第一个虚拟节点数值
target, ok := s.floor(score)
if ok {
return target, nil
}
// 2 倘若 floor 未找到合适目标. 则尝试获取跳表中最大的一个虚拟节点数值.(因为哈希环是首尾衔接的)
last, _ := s.last()
return last, nil
}
下面是跳表的 floor 方法:
// 从跳表中查询小于等于 score 且最接近于 score 的虚拟节点数值
func (s *SkiplistHashRing) floor(score int32) (int32, bool) {
if len(s.root.nexts) == 0 {
return -1, false
}
move := s.root
for level := len(s.root.nexts) - 1; level >= 0; level-- {
for move.nexts[level] != nil && move.nexts[level].score < score {
move = move.nexts[level]
}
}
if move.nexts[0] != nil && move.nexts[0].score == score {
return score, true
}
if move == s.root {
return -1, false
}
return move.score, true
}
下面是跳表的 last 方法:
// 获取跳表中最大的虚拟节点数值
func (s *SkiplistHashRing) last() (int32, bool) {
// 层数从高到低
move := s.root
for level := len(s.root.nexts) - 1; level >= 0; level-- {
for move.nexts[level] != nil {
move = move.nexts[level]
}
}
if move == s.root {
return -1, false
}
return move.score, true
}
3.4.6 Node
Node 方法对应的是,通过虚拟节点数值 score 获取到其对应真实节点列表的方法:
• 通过 score 在跳表中检索到虚拟节点
• 返回虚拟节点的真实节点列表 nodeIDs
// 根据 score 获取到对应的真实节点列表
func (s *SkiplistHashRing) Node(ctx context.Context, score int32) ([]string, error) {
// 1 根据 score 从跳表中检索到目标虚拟节点
targetNode, ok := s.get(score)
if !ok {
return nil, fmt.Errorf("score: %d not exist", score)
}
// 2 返回虚拟节点中挂载的真实节点列表
return targetNode.nodeIDs, nil
}
4 redis版哈希环
聊完单机表,下面我们聊聊更贴近于生产环境的分布式版本.
4.1 数据结构
在工具选型上,我选择使用 redis 中的有序集合 zset/sorted set 实现哈希环,其底层数据结构本质上也是通过跳表实现(在数据量小时,会退化为链表)
zset 的官方文档链接:https://redis.io/docs/data-types/sorted-sets/
4.2 核心类
redis 版哈希环的核心成员属性如下图所示:
// 基于 redis 实现的哈希环
type RedisHashRing struct {
// 哈希环维度的唯一键
key string
// 连接 redis 的客户端
redisClient *Client
}
func NewRedisHashRing(key string, redisClient *Client) *RedisHashRing {
return &RedisHashRing{
key: key,
redisClient: redisClient,
}
}
4.3 redis 客户端
4.3.1 客户端定义
为了更好地支持哈希环的一系列增删改查操作,此处专门基于 redigo 定制实现了一版 redis 客户端,在其中封装好了与 zset 有关的各种方法:
// Client Redis 客户端.
type Client struct {
opts *ClientOptions
pool *redis.Pool
}
func NewClient(network, address, password string, opts ...ClientOption) *Client {
c := Client{
opts: &ClientOptions{
network: network,
address: address,
password: password,
},
}
for _, opt := range opts {
opt(c.opts)
}
repairClient(c.opts)
pool := c.getRedisPool()
return &Client{
pool: pool,
}
}
4.3.2 ZAdd
zadd 方法用于将 value 添加进入 zset 中. 其中 table 为 zset 的唯一键,score 为数据的分值,决定了其所处的位置.
zadd 指令的官方文档链接: https://redis.io/commands/zadd/
// ZAdd 执行Redis ZAdd 命令.
func (c *Client) ZAdd(ctx context.Context, table string, score int64, value string) error {
conn, err := c.pool.GetContext(ctx)
if err != nil {
return err
}
defer conn.Close()
_, err = conn.Do("ZADD", table, score, value)
return err
}
4.3.3 ZRangeByScore
zRangeByScore 方法用于从 zset 检索出对应于 score 范围的一系列数据. 底层是基于 redis 中的 zRange 指令实现,通过带上 "ByScore" 标识,标识检索操作依据 score 执行.
ZRange 指令的官方文档链接: https://redis.io/commands/zrange/
// ZRangeByScore 执行 redis zrangebyscore 命令
func (c *Client) ZRangeByScore(ctx context.Context, table string, score1, score2 int64) ([]*ScoreEntity, error) {
conn, err := c.pool.GetContext(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
raws, err := redis.Values(conn.Do("ZRANGE", table, score1, score2, "BYSCORE", "WITHSCORES"))
if err != nil {
return nil, err
}
if len(raws)&1 != 0 {
return nil, fmt.Errorf("invalid entity len: %d", len(raws))
}
scoreEntities := make([]*ScoreEntity, 0, len(raws)>>1)
for i := 0; i < len(raws)>>1; i++ {
scoreEntities = append(scoreEntities, &ScoreEntity{
Score: gocast.ToInt64(raws[i<<1|1]),
Val: gocast.ToString(raws[i<<1]),
})
}
return scoreEntities, nil
}
4.3.4 Ceiling
ceiling 方法用于从 zset 中找到 >= score 的第一个分值. 该方法底层基于也是基于 zRange 指令实现的,通过将检索的右边界设置为 +inf ,将范围设定为 [score,+∞) ,同时通过将 limit 设置为 1,代表只返回第一笔数据.
// 返回大于等于 score 的第一个目标
func (c *Client) Ceiling(ctx context.Context, table string, score int64) (*ScoreEntity, error) {
conn, err := c.pool.GetContext(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
raws, err := redis.Values(conn.Do("ZRANGE", table, score, "+inf", "BYSCORE", "LIMIT", 0, 1, "WITHSCORES"))
if err != nil {
return nil, err
}
if len(raws) != 2 {
return nil, fmt.Errorf("invalid len of entity: %d, err: %w", len(raws), ErrScoreNotExist)
}
return &ScoreEntity{
Score: gocast.ToInt64(raws[1]),
Val: gocast.ToString(raws[0]),
}, nil
}
4.3.5 Floor
floor 方法用于从 zset 中找到 <= score 的第一个分值. 底层同样基于 zRange 指令实现,通过将范围右边界设置为 -inf ,并通过 "REV" 标识实现取反操作,将检索范围设定为 (-∞,score],同时通过将 limit 设置为 1,代表只返回第一笔数据.
// 返回小于等于 score 的第一个目标
func (c *Client) Floor(ctx context.Context, table string, score int64) (*ScoreEntity, error) {
conn, err := c.pool.GetContext(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
raws, err := redis.Values(conn.Do("ZRANGE", table, score, "-inf", "REV", "BYSCORE", "LIMIT", 0, 1, "WITHSCORES"))
if err != nil {
return nil, err
}
if len(raws) != 2 {
return nil, fmt.Errorf("invalid len of entity: %d, err: %w", len(raws), ErrScoreNotExist)
}
return &ScoreEntity{
Score: gocast.ToInt64(raws[1]),
Val: gocast.ToString(raws[0]),
}, nil
}
4.3.6 FirstOrLast
firstOrLast 方法用于返回 zset 中最小或者最大的 score 分值. 底层同样基于 zrange 指令实现,通过合理设置 score 范围边界以及 limit 参数实现.
func (c *Client) FirstOrLast(ctx context.Context, table string, first bool) (*ScoreEntity, error) {
conn, err := c.pool.GetContext(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
var raws []interface{}
if first {
raws, err = redis.Values(conn.Do("ZRANGE", table, "-inf", "+inf", "BYSCORE", "LIMIT", 0, 1, "WITHSCORES"))
} else {
raws, err = redis.Values(conn.Do("ZRANGE", table, "+inf", "-inf", "REV", "BYSCORE", "LIMIT", 0, 1, "WITHSCORES"))
}
if err != nil {
return nil, err
}
if len(raws) != 2 {
return nil, fmt.Errorf("invalid len of entity: %d, err: %w", len(raws), ErrScoreNotExist)
}
return &ScoreEntity{
Score: gocast.ToInt64(raws[1]),
Val: gocast.ToString(raws[0]),
}, nil
}
4.3.7 ZRem
zRem 方法用于从跳表中移除对应于 score 的数据.
ZRem指令的官方文档链接: https://redis.io/commands/zrem/
func (c *Client) ZRem(ctx context.Context, table string, score int64) error {
conn, err := c.pool.GetContext(ctx)
if err != nil {
return err
}
defer conn.Close()
_, err = conn.Do("ZREMRANGEBYSCORE", table, score, score)
return err
}
4.4 加/解分布式锁
redis 版哈希环在加/解锁流程需要操作分布式锁. 这里使用的是基于 redis 实现的分布式锁,用到的是我此前开源的一款 redis 分布式锁客户端 sdk——redis_lock.
该项目对应的 github 开源地址为: http://github.com/xiaoxuxiansheng/redis_lock
// 锁住哈希环,支持配置过期时间. 达到过期时间后,会自动释放锁
func (r *RedisHashRing) Lock(ctx context.Context, expireSeconds int) error {
// 基于 redis 分布式锁实现
lock := redis_lock.NewRedisLock(r.getLockKey(), r.redisClient, redis_lock.WithExpireSeconds(int64(expireSeconds)))
return lock.Lock(ctx)
}
// 解锁哈希环.
func (r *RedisHashRing) Unlock(ctx context.Context) error {
lock := redis_lock.NewRedisLock(r.getLockKey(), r.redisClient)
return lock.Unlock(ctx)
}
4.4 Add
下面是将节点添加到 zset score 对应位置的处理流程:
// 真实节点入环. 将一个真实节点 nodeID 添加到 score 对应的虚拟节点中
func (r *RedisHashRing) Add(ctx context.Context, score int32, nodeID string) error {
// 1 首先基于 score,从哈希环中取出对应的虚拟节点
scoreEntities, err := r.redisClient.ZRangeByScore(ctx, r.getTableKey(), int64(score), int64(score))
if err != nil {
return fmt.Errorf("redis ring add failed, err: %w", err)
}
// 2 倘若存在多个节点,返回错误(异常情况)
if len(scoreEntities) > 1 {
return fmt.Errorf("invalid score entity len: %d", len(scoreEntities))
}
// 3 先查出来 score 对应的 val,将新增节点 nodeID 追加进去,再添加到 zset 中
var nodeIDs []string
if len(scoreEntities) == 1 {
if err = json.Unmarshal([]byte(scoreEntities[0].Val), &nodeIDs); err != nil {
return err
}
for _, _nodeID := range nodeIDs {
if _nodeID == nodeID {
return nil
}
}
if err = r.redisClient.ZRem(ctx, r.getTableKey(), scoreEntities[0].Score); err != nil {
return fmt.Errorf("redis ring zrem failed, err: %w", err)
}
}
nodeIDs = append(nodeIDs, nodeID)
newNodeIDs, _ := json.Marshal(nodeIDs)
// 4 将新的结果添加到虚拟节点 score 虚拟节点中
if err = r.redisClient.ZAdd(ctx, r.getTableKey(), int64(score), string(newNodeIDs)); err != nil {
return fmt.Errorf("redis ring zadd failed, err: %w", err)
}
return nil
}
4.5 Rem
下面是将节点从 zset score 对应位置中移除的方法流程:
// 从哈希环对应于 score 的虚拟节点删去真实节点 nodeID
func (r *RedisHashRing) Rem(ctx context.Context, score int32, nodeID string) error {
// 1 首先通过 score 检索获取到对应的虚拟节点
scoreEntities, err := r.redisClient.ZRangeByScore(ctx, r.getTableKey(), int64(score), int64(score))
if err != nil {
return fmt.Errorf("redis ring rem zrange by score failed, err: %w", err)
}
// 2 倘若虚拟节点个数不为 1,返回错误(异常情况)
if len(scoreEntities) != 1 {
return fmt.Errorf("redis ring rem failed, invalid score entity len: %d", len(scoreEntities))
}
var nodeIDs []string
if err = json.Unmarshal([]byte(scoreEntities[0].Val), &nodeIDs); err != nil {
return err
}
// 3 获取到待删除真实节点 nodeID 在虚拟节点的真实节点列表中的 index
index := -1
for i := 0; i < len(nodeIDs); i++ {
if nodeIDs[i] == nodeID {
index = i
break
}
}
if index == -1 {
return nil
}
// 4 首先删除对应的 score
if err = r.redisClient.ZRem(ctx, r.getTableKey(), scoreEntities[0].Score); err != nil {
return fmt.Errorf("redis ring rem zrem failed, err: %w", err)
}
nodeIDs = append(nodeIDs[:index], nodeIDs[index+1:]...)
if len(nodeIDs) == 0 {
return nil
}
// 5 在 score 对应的虚拟节点中添加删除 nodeID 后的真实节点列表
newNodeIDStr, _ := json.Marshal(nodeIDs)
if err = r.redisClient.ZAdd(ctx, r.getTableKey(), scoreEntities[0].Score, string(newNodeIDStr)); err != nil {
return fmt.Errorf("redis ring rem zadd failed, err: %w", err)
}
return nil
}
4.6 Ceiling
下面是从 redis 哈希环中找到 score 顺时针往下第一个分值的方法流程:
// 从哈希环中获取到 score 顺时针往下的第一个虚拟节点数值
func (r *RedisHashRing) Ceiling(ctx context.Context, score int32) (int32, error) {
// 1 首先执行 ceilng 检索到 zset 中大于等于 score 且最接近于 score 的节点
scoreEntity, err := r.redisClient.Ceiling(ctx, r.getTableKey(), int64(score))
if err != nil && !errors.Is(err, ErrScoreNotExist) {
return 0, fmt.Errorf("redis ring ceiling failed, err: %w", err)
}
// 2 倘若找到目标直接返回
if scoreEntity != nil {
return int32(scoreEntity.Score), nil
}
// 3 倘若 ceiling 流程未找到目标节点,则通过 first 方法获取到 zset 中 score 最小的节点进行返回
if scoreEntity, err = r.redisClient.FirstOrLast(ctx, r.getTableKey(), true); err != nil && !errors.Is(err, ErrScoreNotExist) {
return 0, fmt.Errorf("redis ring first failed, err: %w", err)
}
if scoreEntity != nil {
return int32(scoreEntity.Score), nil
}
return -1, nil
}
4.7 Floor
下面是从 redis 哈希环中找到 score 逆时针往上第一个分值的方法流程:
// 从哈希环中获取到 score 逆时针往上的第一个虚拟节点数值
func (r *RedisHashRing) Floor(ctx context.Context, score int32) (int32, error) {
// 1 从 zset 中获取到小于等于 score 且最接近于 score 的节点
scoreEntity, err := r.redisClient.Floor(ctx, r.getTableKey(), int64(score))
if err != nil && !errors.Is(err, ErrScoreNotExist) {
return 0, fmt.Errorf("redis ring floor failed, err: %w", err)
}
if scoreEntity != nil {
return int32(scoreEntity.Score), nil
}
// 2 倘若 floor 流程没找到节点,则通过 last 获取 zset 上 score 值最大的节点
if scoreEntity, err = r.redisClient.FirstOrLast(ctx, r.getTableKey(), false); err != nil && !errors.Is(err, ErrScoreNotExist) {
return 0, fmt.Errorf("redis ring last failed, err: %w", err)
}
if scoreEntity != nil {
return int32(scoreEntity.Score), nil
}
return -1, nil
}
4.8 Node
最后是从 redis 哈希环中获取 score 对应的节点列表的流程:
func (r *RedisHashRing) Node(ctx context.Context, score int32) ([]string, error) {
scoreEntities, err := r.redisClient.ZRangeByScore(ctx, r.getTableKey(), int64(score), int64(score))
if err != nil {
return nil, fmt.Errorf("redis ring node zrange by score failed, err: %w", err)
}
if len(scoreEntities) != 1 {
return nil, fmt.Errorf("redis ring node failed, invalid len of score entities: %d", len(scoreEntities))
}
var nodeIDs []string
if err = json.Unmarshal([]byte(scoreEntities[0].Val), &nodeIDs); err != nil {
return nil, err
}
return nodeIDs, nil
}
5 总结
至此,本次分享全文结束. 最后,我们回头做个总结. 在本期分享中:
• 基于 golang 从零到一实现了一致性哈希算法模块,并于 github 开源——consistent_hash. 开源地址:http://github.com/xiaoxuxiansheng/consistent_hash
• 一致性哈希算法模块中,实现了节点、数据的读写流程,并将哈希环和哈希编码器设定为抽象 interface,可由用户自行实现完成注入
• 在项目中基于跳表和 redis zset 分别实现了单机版和分布式版的哈希环,可供用户选择使用
• 项目中基于 murmur3 哈希算法实现了哈希编码器,可供用户选择使用