01 导语
在 CubeFS 技术揭秘系列 的历史文章中,对 CubeFS 基于 etcd/raft 实现纠删码存储的元数据管理有较为详细地介绍。然而,单 raft 组能承载的元数据量和吞吐始终有上限,无法满足其他需要水平扩展场景的需求。而 Multi-Raft 的广泛应用能够为 EB 级别集群的元数据管理提供解决方案。本文将结合 CubeFS 现有 Multi-Raft/tiglabs 设计和实现上的优势与不足,介绍基于 etcd/RawNode 实现的高性能 Multi-Raft 方案。
02 背景
目前,CubeFS 基于 tiglabs/raft 实现元数据分片管理模块 MetaNode 和数据分片 DataNode,tiglabs/raft 通过优化心跳机制和高内聚性设计,实现通信效率和易用性的提升。在日常迭代与运营过程中,CubeFS 面临一些性能和维护方面的挑战。如在性能方面,每个 Raft Group 至少需要对应 2 个协程,当单节点上的分区数量较多时(例如达到万级别以上),协程调度的开销较大,影响整体性能;此外,CubeFS 在修改 etcd 核心的 Raft 状态机流程时,后续特性更新需要对现有代码进行改造和适配,代码维护的复杂度较高。基于这些挑战,结合 tiglabs/raft 以及业界其他实现方案(如 CockroachDB,braft 等)的优缺点。本文将基于 etcd/RawNode,综合考虑心跳管理、协程池设计以及持久化优化等方面,提出一个高性能的 Multi-Raft 方案。
03 整体设计
图 1. 整体设计图
StateMachine: 应用状态机,应用层提交或处理 Raft 消息;
Storage: 持久化模块,如 WAL 和应用数据的持久化存储;
Propose Queue:消息提交队列;
Worker Pool: 处理 Raft 消息的协程池;
Group:单个 raft group,管理部分数据分片;
Transport: 负责数据同步、消息传输等;
Connetion pool:建立节点连接池,限制单节点连接数;
Loop: 状态机流转核心处理流程
关键问题
Raft Group 之间的频繁心跳带来的系统或网络开销
针对 Raft Goup WAL 日志持久化带来存储压力,提升 io 能力
如何调度处理 Raft Group 消息,提升消息轮转效率
设计目标
基于 etcd/raft 实现 Multi-Raft 方案,复用 etcd/raft RawNode/raft 等状态机流程
作为独立包,方便接入使用
心跳消息聚合,节点粒度实现心跳
采用协程池处理 Raft Group 消息,减少协程调度开销
04 核心模块
Raft Group
Raft Group 是基于 etcd/RawNode 进行封装的独立的 Raft 实例,对接 WAL 日志与数据持久化能力,提供 Propose、ReadIndex、成员管理、WAL 日志裁剪以及 Group 状态查询等功能。
// 状态机接口
type StateMachine interface {
Apply(cxt context.Context, pd []ProposalData, index uint64) (rets []interface{}, err error)
LeaderChange(peerID uint64) error
ApplyMemberChange(cc *Member, index uint64) error
Snapshot() Snapshot
ApplySnapshot(s Snapshot) error
}
// 单个 raft 组接口
type Group interface {
// 消息提交
Propose(ctx context.Context, msg *ProposalData) (ProposalResponse, error)
// leader 切换
LeaderTransfer(ctx context.Context, peerID uint64) error
// 基于 readIndex 方式读
ReadIndex(ctx context.Context) error
// 发起竞选
Campaign(ctx context.Context) error
// 日志切割
Truncate(ctx context.Context, index uint64) error
// 成员变更
MemberChange(ctx context.Context, mc *Member) error
// 当前 raft 分组状态
Stat() (*Stat, error)
Close() error
}
WAL
Write-Ahead Log(WAL)日志是实现高效、可靠的 Multi-Raft 架构的关键,同时在保障数据一致性和系统恢复方面起着重要作用。如图 2. 所示,本文主要从持久化存储分享一些思路或者策略:
图 2. WAL 持久化
1、整体上,WAL Log 采用 Key-Value 形式存储,多个 Raft Group 共享同一个 Storage 实例,避免因 Raft Group 过多导致随机 IO 问题;
2、在日志条目组织方面,每个 Raft Group 的 WAL Log 按照 GroupID 作为前缀拼接 Log Index 作为 Key 写入到 Storage 中,迭代时通过 Group ID 前缀以及 Log Index 拉取后续的 Wal Log,同时基于该前缀规则可实现快速的 Group 日志删除清理功能;
3、在日志存取方面,WAL Log 基于上层 Storage 接口实现,Storage 支持 Batch 操作接口、Get 接口以及 Iter 接口,支持批量写入操作,减少写 IO 次数,提升性能和吞吐。
// 持久化接口
type Storage interface {
Get(key []byte) (ValGetter, error)
Iter(prefix []byte) Iterator
NewBatch() Batch
Write(b Batch) error
}
快照
快照机制依旧是关键的一环,用于管理和优化日志的大小,同时加快数据恢复过程。实现快照的主要挑战在于如何高效地生成和应用快照,并确保所有副本的一致性。由于快照整体上和单个 raft 组没有明显的区别,本文主要聚焦于快照的发送和接收:
快照发送
1、快照通过 Raft Group 创建,每个 Raft Group 维护就近的快照版本,每次接收和处理快照消息时,都取当前最新的 applied index 和 term 构建快照 meta 元数据;
2、快照发送通过异步进行,通过 RPC Stream 方式发送至对端,数据包含头部和快照数据内容两部分,对端先接收快照头部后再继续接收快照数据内容;
3、对端全部接收和应用成功后,返回成功响应。同时通过 RawNode.Step 将快照消息体转发到 Raft 状态机进行推进;
4、快照发送成功后回调 Raft Group 删除接口清理对应的快照版本。
快照接收
1、快照接收先解析头部并校验 Group ID 等信息是否一致,不一致时返回错误;
2、构造 incoming Snapshot 结构,通过 RPC Stream 接收 Snapshot 主体数据,每个主体数据为一次 Write Batch,对应上层状态机的 Batch;
3、回调上层状态机的 Apply Snapshot 接口,接收和应用 Snaphsot。上层状态机需要清理旧数据并写入新数据;
4、状态机处理完成后,将 Header 中的 Snapshot 消息推进到 Raft 状态机中,通知 Worker 执行消息处理;
5、Worker 消息处理接收到 Snapshot 消息,将 Snapshot 中的 Meta 元信息(term/commit 等)保存至 Storage 持久化存储 HardState 中,接着更新当前 storage 的 apply index 为 Snapshot Meta 中的 commit index。
注意:Storage 不会执行 Raft 日志的清理动作,该动作留可以统一让上层 Truncate 日志时自动清理,由于 HardState 中已保存了最新的 Index 记录,因此旧日志不会影响 Raft 状态机的运行。
Transport
Transport 模块负责节点之间的通信,它保证了日志条目、快照以及其他控制信息能够正确、及时地在不同的 Raft 组节点之间高效传输。这里主要从连接控制方面介绍如何实现高性能数据传输:
图 3. transport 示意图
1、在协议方面,消息传输采用 grpc 协议,保证高效地传输;
2、每个节点维护两种连接类型的 grpc 连接,分别为 Normal 和 System,普通的业务提交的 RaftMessage 消息和 RaftSnapshot 消息会走 Normal 类型的连接通道,而合并的心跳消息等会走 System 类型的连接通道,避免互相阻塞,引发 Raft 切主等问题;
3、普通的 Raft Message 消息收发,复用同一个 grpc stream 流,并为这个 stream 流创建收和发两个协程。而 Raft Snapshot 的收发每次会创建一个新的 grpc stream 流,在完成 snapshot 收发以后会关闭掉该 stream,Snapshot 接收端会创建一个协程独立处理该 snapshot stream,在接收和应用完成后协程主动退出;
4、Transport 维护每个节点的连接池,同一个节点地址和同一种连接类型只会保留 1 个连接。
05 关键流程
下面将从状态机流转过程、协程池调度处理进行介绍,让大家能够清晰地了解整个 multi-raft 的设计及实现原理。
提交流程
提交是 Multi-Raft 体系中的关键流程之一。为了在提交流程中保证状态机流转的高效性与安全性,同时避免复杂的并发问题,设计上有以下几种理念:
1、同一时刻单个 Group 内只有一个 Worker 在处理消息:这可以避免同一个 Group 在并发处理消息时的状态不一致问题;
2、Worker 内无锁操作:由于同一个 Group 始终由单个 Worker 处理,因此 Worker 在处理其 group 内的消息时,可以免去对共享状态的额外同步操作,减少了锁竞争,提高了性能;
3、Worker 抢占式处理:抢占式处理指的是同个 Group 内的新任务可以立即接管处理权,而不需要等待当前任务完全结束,这使得任务处理更加灵活和高效。
具体流程:
1、每个分片管理自己的Raft Group,数据提交通过每个分片管理的Raft Group执行;
2、将消息投递至propose消息队列,并向workerChs 投递一个状态信号groupState,并阻塞等待;
3、worker从workerChs 取出信号,根据信号携带的groupID处理相应事务;
4、处理完成后通知应用层;
5、应用层收到通知,取消阻塞,请求结束,返回成功。
// 提交示例
func Propose(ctx context.Context, pdata *ProposalData) (resp ProposalResponse, err error) {
...
n := newNotify(timeoutCtx) // 超时机制
g.addNotify(pdata.notifyID, n) // 等待通知
// 提交处理
err = g.handler.HandlePropose(ctx, g.id, proposalRequest{
data: raw,
})
if err != nil {
return
}
// 收到信号,结束等待
ret, err := n.Wait(ctx)
...
}
// worker 处理示例
func HandlePropose(ctx context.Context, id uint64, req proposalRequest) error {
...
queue := v.(proposalQueue)
// 将消息投递至队列中
if err := queue.Push(ctx, req); err != nil {
return err
}
// 投递信号,通知 worker 进行处理
h.signalToWorker(id, stateProcessReady)
return nil
}
ReadIndex
ReadIndex 流程和提交流程相比,省掉了同步 log 的开销,可以较大幅提升读的吞吐,一定程度上降低读时延。其大致流程为:
1、Raft Group 在收到读请求时,记录下当前的 commit Index,称之为 ReadIndex
2、主节点向所有从节点发起一次心跳(确保当前主节点是否有效,避免网络分区时少数派 leader 仍处理请求)
3、等待状态机至少应用到 ReadIndex(即 apply index >= read index )
4、执行读请求,将状态机中的结果返回给客户端
func ReadIndex(ctx context.Context) error {
// 超时机制
timeoutCtx, cancel := context.WithTimeout(context.Background(), g.cfg.readIndexTimeout)
defer cancel()
n := newNotify(timeoutCtx)
// 开启通知
g.addNotify(notifyID, n)
...
// 进入 raft 状态机流转
rn.ReadIndex(notifyIDToBytes(notifyID))
// 通知协程池调度处理
...
g.handler.HandleSignalToWorker(ctx, g.id)
// 等待处理返回
n.Wait(ctx)
...
}
心跳机制
心跳处理是 Multi-Raft 中的核心问题之一,包括心跳发送与响应流程。当节点上维护的 Raft Group 增加至一定数量,Group 之间频繁的心跳消息会对系统造成负担。类似于 tiglabs/raft 设计与实现,心跳消息的发送与响应交由当前节点处理,Group 不独立生成心跳消息,这样可以有效减轻网络传输压力,提升网络传输效率。具体流程如下:
发送:
1、心跳消息发送通过manager 中的计时器定期触发,投递到propose消息队列;
2、 worker取出心跳消息进行处理,跳转执行处理状态机流转的流程;
3、状态机流转判断消息类型,如果是心跳类型消息,交由manager根据目标节点进行合并,消息体中携带groupID信息,统一定期发送;
响应:
4、当节点收到心跳消息,类似发送流程,经过状态机处理完成后,统一交由Manager进行响应。
func HandleMaybeCoalesceHeartbeat(ctx context.Context, groupID uint64, msg *raftpb.Message) bool {
...
var hbMap map[uint64][]RaftHeartbeat
// 根据不同的消息类型,取出消息表,进行合并
switch msg.Type {
case raftpb.MsgHeartbeat:
h.coalescedMu.Lock()
hbMap = h.coalescedMu.heartbeats
...
.
}
// 生成消息
hb := RaftHeartbeat{
GroupID: groupID,
To: msg.To,
From: msg.From,
Term: msg.Term,
Commit: msg.Commit,
}
...
// 根据节点合并消息
hbMap[msg.To] = append(hbMap[msg.To], hb)
h.coalescedMu.Unlock()
return true
}
上面大致介绍了消息提交、心跳等核心流程,并简要演示了 ReadIndex 的实现。这些流程集中于应用提交的视角,下面将从 worker 的视角,对协程调度、状态机流转展开介绍。
信号通知
信号通知机制用于保证同个 group 同一时刻有且仅有一个 worker 在处理消息,是协程池高效运行的主要保证。设计上,信号量中每个位代表一类消息,聚合同个 group 不同类型的消息到同一个信号。
图 4. 信号量设计
如图 4. 所示,bit1 代表当前 group 有正在处理的协程,避免了同个 group 同时被多个协程处理的风险;其次,若有新的提交或待处理的消息,可在当前信号量上进行修改,协程处理完后,会检查当前信号是否有更新,可选择继续处理;bit2、3、4 分别代表 group 中不同的消息类型。单个信号可以承载一种或多种类型消息,提升信号通知效率。
// 信号通知示例
func signalToWorker(groupID uint64, state uint8) {
// 判断是否需要入队
if !h.enqueueGroupState(groupID, state) {
return
}
...
for {
select {
// 投递至信号队列
case h.workerChs[int(count)%len(h.workerChs)] <- groupState{state: state, id: groupID}:
return
...
}
状态机流转
状态机流转(State Machine Transitions)主要负责跟随由 Raft 协议提交的命令进行状态更新。每个 Raft Group 都有一个关联的状态机,用于执行经过一致性协议确认后的操作。考虑到 Raft Group 数量过多带来的协程开销同时保证高性能的情况下,采用协程池负责处理节点上所有 Group 的状态流转。协程池调度管理其中主要基于信号通知机制,实现对每个 Group 消息的高效处理。如图 1 所示,状态机流转流程表示为 Loop 循环,下面围绕 Loop 过程介绍状态机的具体处理流程:
1、worker 定期从队列中获取信号量,根据信号携带的 groupID 从 Raft Group 消息队列中获取有待处理的 Raft Group 消息,根据 state 类型判断处理;
select {
case in := <-ch:
span, ctx := trace.StartSpanFromContextWithTraceID(context.Background(), "", "worker-"+strconv.Itoa(wid)+"/"+strconv.FormatUint(in.id, 10))
v, ok := h.groups.Load(in.id)
if !ok {
span.Warnf("group[%d] has been removed or not created yet", in.id)
// remove the group state as group has been removed
h.removeGroupStateForce(in.id)
continue
}
group := (*internalGroupProcessor)(v.(*group))
...
2、如果信号类型是 ProcessRaft(来自其他节点的 Raft 消息),则直接转发到 Raft 状态机 RaftGroup.RawNode->Step;
// process group request msg
if in.state&stateProcessRaftRequestMsg != 0 {
if h.processRaftRequestMsg(ctx, group) {
in.state |= stateProcessReady
}
}
3、如果信号类型是 ProcessReady,则执行 RaftGroup.RawNode->Ready 获取 Ready 消息体
// process group ready
if in.state&stateProcessReady != 0 {
h.processReady(ctx, group)
}
ProcessReady 类型是相对较为复杂的一个流程,需要处理通过 raft 一致性校验后的数据流转,主要分为如下几种:
a.首先处理上层消息提交,从 Group Propose 队列中获取所有待提交消息,同时清空 Propose 队列,执行 RaftGroup.RawNode->Step 将待提交消息转发到 Raft 状态机;
b.然后处理 SoftState 消息主节点变更消息,通过 RaftGroup.StateMachine-> LeaderChange 通知上层状态机变更;
c.接着判断是否有待发送消息 Messages,有则通过 RaftGroup->sendMessages 发送出去,sendMessages 遍历各个 msgs,根据消息目标节点,投递到 Transport 模块目标节点的消息队列中,等待发送出去;
d.其次处理 HardState 和 Entries(raft log)消息,将 HardState 与 entries 打包为一次 Write Batch 调用 Raft Group.Storage->Set 进行持久化,保证原子性;
e.接着处理 CommitedEntries,将 CommitedEntries 通过 RaftGroup.StateMachine->Apply 通知上层应用提交消息;
f.处理 ReadIndex 读请求,通知上层应用读操作
4、如果消息类型是 stateProcessTick,用于触发本地心跳消息或者选举
// process group tick
if in.state&stateProcessTick != 0 {
h.processTick(ctx, group)
in.state |= stateProcessReady
}
读到这里,Multi-Raft 的整体设计基本介绍完成。下面简要介绍关于 Multi-Raft 的接入使用,整体上接入使用与单个 etcd/raft 实例并无特别差异。
06 接入指南
Multi-Raft 作为独立的包,使用时类似于 etcd/raft,需实现 Storage 接口与 StateMachine 接口,基于 Manage 创建 Raft Group。整个接入过程较为清晰简单,其中关键步骤如下:
1、创建 storage 示例,实现 Storage 接口
kvStore, err := kvstore.NewKVStore(ctx, storagePath, kvstore.RocksdbLsmKVType, &kvstore.Option{
CreateIfMissing: true,
ColumnFamily: []kvstore.CF{testRaftCF, testStateMachineCF},
})
storage := &testStorage{kvStore: kvStore, cf: testRaftCF} // testStorage 实现 Storage 接口
2、初始化 raftManager
cfg := &raft.Config{
NodeID: member.NodeID,
...
TransportConfig: raft.TransportConfig{
Addr: member.Host,
MaxTimeoutMs: 30000,
ConnectTimeoutMs: 30000,
KeepaliveTimeoutS: 60,
ServerKeepaliveTimeoutS: 10,
MaxInflightMsgSize: 1024,
},
Logger: log.DefaultLogger, // 自定义 log 日志打印
Storage: storage, // 实现 Storage 接口的存储实例
Resolver: &addressResolver{nodes: nodesMap}, // 地址解析器
}
manager, err := raft.NewManager(cfg)
3、创建应用状态机,通过 manager 创建的 Raft Group
sm := newStateMachine(storage)
groupConfig := &raft.GroupConfig{
ID: 1,
Applied: 0,
Members: allNodes,
SM: sm,
}
group, err := manager.CreateRaftGroup(ctx, groupConfig) // 创建 Raft 组
// wait for all node receive leader change and read index done
sm.WaitLeaderChange()
4、应用层的提交通过 StateMachine 完成,manager 管理所有的 Raft Group。
07 总结
本文主要介绍 CubeFS 自主设计研发的 Multi-Raft 库,从整体架构、核心模块、实现细节以及接入使用方面作简要地介绍。
08 附录
基于现有实现,对 Multi-Raft 做了读写相关的性能测试,供各位读者参考。读基于 ReadIndex 接口,写基于 Propose 接口。
环境:3 节点,单节点 16C32GB
Raft Group 规模 | Tick Interval (s) | 读 QPS | 写 QPS |
500 | 1 | 141530 | 85247 |
100000 | 1 | 86789 | 49539 |
500 | 2 | 144569 | 87049 |
100000 | 2 | 121977 | 75462 |
500 | 3 | 145969 | 87148 |
100000 | 3 | 140357 | 86124 |
如上结果所示,在 raft group 数量比较多的情况下,心跳 ticker 的间隔会对节点整体产生较大的影响:
在心跳间隔为 1 秒时,10W 的 raft group 的吞吐会比 500 个 raft group 的吞吐下降 38%左右,平均时延增加 57%。
在心跳间隔为 2 秒时,吞吐差距缩小,相差仅为 15%。
在心跳间隔为 3 秒时,10W 的 raft group 吞吐几乎与 500 个 raft group 的吞吐一致。
作者介绍
Zongchao Hu,CubeFS Contributor 之一,负责纠删码元数据管理和存储引擎的研发。
CubeFS 简介
CubeFS 于2019年开源并在 SIGMOD 发表工业界论文,目前是云原生计算基金会 (CNCF) 托管的孵化阶段开源项目。作为新一代云原生分布式存储平台,兼容 S3、POSIX、HDFS 等协议,支持多副本和纠删码引擎,提供多租户,多 AZ 部署、跨区域复制等特性;适用于大数据、AI、容器平台、数据库及中间件存算分离,数据共享、数据保护等广泛场景。
►►►
往期推荐
文章转载自CubeFS。点击这里阅读原文了解更多。
联系Linux Foundation APAC
Linux基金会是非营利性组织,是技术生态系统的重要组成部分。
Linux基金会通过提供财务和智力资源、基础设施、服务、活动以及培训来支持创建永续开源生态系统。在共享技术的创建中,Linux基金会及其项目通过共同努力形成了非凡成功的投资。请关注LFAPAC(Linux Foundation APAC)微信公众号。