基于etcd/RawNode的Multi-Raft设计与实现

文摘   2024-07-16 09:36   中国香港  


01 导语



在 CubeFS 技术揭秘系列 的历史文章中,对 CubeFS 基于 etcd/raft 实现纠删码存储的元数据管理有较为详细地介绍。然而,单 raft 组能承载的元数据量和吞吐始终有上限,无法满足其他需要水平扩展场景的需求。而 Multi-Raft 的广泛应用能够为 EB 级别集群的元数据管理提供解决方案。本文将结合 CubeFS 现有 Multi-Raft/tiglabs 设计和实现上的优势与不足,介绍基于 etcd/RawNode 实现的高性能 Multi-Raft 方案。



02 背景



目前,CubeFS 基于 tiglabs/raft 实现元数据分片管理模块 MetaNode 和数据分片 DataNode,tiglabs/raft 通过优化心跳机制和高内聚性设计,实现通信效率和易用性的提升。在日常迭代与运营过程中,CubeFS 面临一些性能和维护方面的挑战。如在性能方面,每个 Raft Group 至少需要对应 2 个协程,当单节点上的分区数量较多时(例如达到万级别以上),协程调度的开销较大,影响整体性能;此外,CubeFS 在修改 etcd 核心的 Raft 状态机流程时,后续特性更新需要对现有代码进行改造和适配,代码维护的复杂度较高。基于这些挑战,结合 tiglabs/raft 以及业界其他实现方案(如 CockroachDB,braft 等)的优缺点。本文将基于 etcd/RawNode,综合考虑心跳管理、协程池设计以及持久化优化等方面,提出一个高性能的 Multi-Raft 方案。



03 整体设计



图 1. 整体设计图


StateMachine: 应用状态机,应用层提交或处理 Raft 消息;

Storage: 持久化模块,如 WAL 和应用数据的持久化存储;

Propose Queue:消息提交队列;

Worker Pool: 处理 Raft 消息的协程池;

Group:单个 raft group,管理部分数据分片;

Transport: 负责数据同步、消息传输等;

Connetion pool:建立节点连接池,限制单节点连接数;

Loop: 状态机流转核心处理流程


关键问题


  • Raft Group 之间的频繁心跳带来的系统或网络开销

  • 针对 Raft Goup WAL 日志持久化带来存储压力,提升 io 能力

  • 如何调度处理 Raft Group 消息,提升消息轮转效率


设计目标


  • 基于 etcd/raft 实现 Multi-Raft 方案,复用 etcd/raft RawNode/raft 等状态机流程

  • 作为独立包,方便接入使用

  • 心跳消息聚合,节点粒度实现心跳

  • 采用协程池处理 Raft Group 消息,减少协程调度开销



04 核心模块



Raft Group


Raft Group 是基于 etcd/RawNode 进行封装的独立的 Raft 实例,对接 WAL 日志与数据持久化能力,提供 Propose、ReadIndex、成员管理、WAL 日志裁剪以及 Group 状态查询等功能。


// 状态机接口type StateMachine interface {    Apply(cxt context.Context, pd []ProposalData, index uint64) (rets []interface{}, err error)    LeaderChange(peerID uint64) error    ApplyMemberChange(cc *Member, index uint64) error    Snapshot() Snapshot    ApplySnapshot(s Snapshot) error}// 单个 raft 组接口type Group interface {  // 消息提交  Propose(ctx context.Context, msg *ProposalData) (ProposalResponse, error)  // leader 切换  LeaderTransfer(ctx context.Context, peerID uint64) error  // 基于 readIndex 方式读  ReadIndex(ctx context.Context) error  // 发起竞选  Campaign(ctx context.Context) error  // 日志切割  Truncate(ctx context.Context, index uint64) error  // 成员变更  MemberChange(ctx context.Context, mc *Member) error  // 当前 raft 分组状态  Stat() (*Stat, error)  Close() error}


WAL


Write-Ahead Log(WAL)日志是实现高效、可靠的 Multi-Raft 架构的关键,同时在保障数据一致性和系统恢复方面起着重要作用。如图 2. 所示,本文主要从持久化存储分享一些思路或者策略:


图 2. WAL 持久化


1、整体上,WAL Log 采用 Key-Value 形式存储,多个 Raft Group 共享同一个 Storage 实例,避免因 Raft Group 过多导致随机 IO 问题;


2、在日志条目组织方面,每个 Raft Group 的 WAL Log 按照 GroupID 作为前缀拼接 Log Index 作为 Key 写入到 Storage 中,迭代时通过 Group ID 前缀以及 Log Index 拉取后续的 Wal Log,同时基于该前缀规则可实现快速的 Group 日志删除清理功能;


3、在日志存取方面,WAL Log 基于上层 Storage 接口实现,Storage 支持 Batch 操作接口、Get 接口以及 Iter 接口,支持批量写入操作,减少写 IO 次数,提升性能和吞吐。


// 持久化接口type Storage interface {    Get(key []byte) (ValGetter, error)    Iter(prefix []byte) Iterator    NewBatch() Batch    Write(b Batch) error}


快照


快照机制依旧是关键的一环,用于管理和优化日志的大小,同时加快数据恢复过程。实现快照的主要挑战在于如何高效地生成和应用快照,并确保所有副本的一致性。由于快照整体上和单个 raft 组没有明显的区别,本文主要聚焦于快照的发送和接收:


快照发送


1、快照通过 Raft Group 创建,每个 Raft Group 维护就近的快照版本,每次接收和处理快照消息时,都取当前最新的 applied index 和 term 构建快照 meta 元数据;


2、快照发送通过异步进行,通过 RPC Stream 方式发送至对端,数据包含头部和快照数据内容两部分,对端先接收快照头部后再继续接收快照数据内容;


3、对端全部接收和应用成功后,返回成功响应。同时通过 RawNode.Step 将快照消息体转发到 Raft 状态机进行推进;


4、快照发送成功后回调 Raft Group 删除接口清理对应的快照版本。


快照接收


1、快照接收先解析头部并校验 Group ID 等信息是否一致,不一致时返回错误;


2、构造 incoming Snapshot 结构,通过 RPC Stream 接收 Snapshot 主体数据,每个主体数据为一次 Write Batch,对应上层状态机的 Batch;


3、回调上层状态机的 Apply Snapshot 接口,接收和应用 Snaphsot。上层状态机需要清理旧数据并写入新数据;


4、状态机处理完成后,将 Header 中的 Snapshot 消息推进到 Raft 状态机中,通知 Worker 执行消息处理;


5、Worker 消息处理接收到 Snapshot 消息,将 Snapshot 中的 Meta 元信息(term/commit 等)保存至 Storage 持久化存储 HardState 中,接着更新当前 storage 的 apply index 为 Snapshot Meta 中的 commit index。


注意Storage 不会执行 Raft 日志的清理动作,该动作留可以统一让上层 Truncate 日志时自动清理,由于 HardState 中已保存了最新的 Index 记录,因此旧日志不会影响 Raft 状态机的运行。


Transport


Transport 模块负责节点之间的通信,它保证了日志条目、快照以及其他控制信息能够正确、及时地在不同的 Raft 组节点之间高效传输。这里主要从连接控制方面介绍如何实现高性能数据传输:


图 3. transport 示意图


1、在协议方面,消息传输采用 grpc 协议,保证高效地传输;


2、每个节点维护两种连接类型的 grpc 连接,分别为 Normal 和 System,普通的业务提交的 RaftMessage 消息和 RaftSnapshot 消息会走 Normal 类型的连接通道,而合并的心跳消息等会走 System 类型的连接通道,避免互相阻塞,引发 Raft 切主等问题;


3、普通的 Raft Message 消息收发,复用同一个 grpc stream 流,并为这个 stream 流创建收和发两个协程。而 Raft Snapshot 的收发每次会创建一个新的 grpc stream 流,在完成 snapshot 收发以后会关闭掉该 stream,Snapshot 接收端会创建一个协程独立处理该 snapshot stream,在接收和应用完成后协程主动退出;


4、Transport 维护每个节点的连接池,同一个节点地址和同一种连接类型只会保留 1 个连接。



05 关键流程



下面将从状态机流转过程、协程池调度处理进行介绍,让大家能够清晰地了解整个 multi-raft 的设计及实现原理。


提交流程

提交是 Multi-Raft 体系中的关键流程之一。为了在提交流程中保证状态机流转的高效性与安全性,同时避免复杂的并发问题,设计上有以下几种理念:


1、同一时刻单个 Group 内只有一个 Worker 在处理消息:这可以避免同一个 Group 在并发处理消息时的状态不一致问题;


2、Worker 内无锁操作:由于同一个 Group 始终由单个 Worker 处理,因此 Worker 在处理其 group 内的消息时,可以免去对共享状态的额外同步操作,减少了锁竞争,提高了性能;


3、Worker 抢占式处理:抢占式处理指的是同个 Group 内的新任务可以立即接管处理权,而不需要等待当前任务完全结束,这使得任务处理更加灵活和高效。


具体流程:


1、每个分片管理自己的Raft Group,数据提交通过每个分片管理的Raft Group执行;


2、将消息投递至propose消息队列,并向workerChs 投递一个状态信号groupState,并阻塞等待;


3、worker从workerChs 取出信号,根据信号携带的groupID处理相应事务;


4、处理完成后通知应用层;


5、应用层收到通知,取消阻塞,请求结束,返回成功。


// 提交示例func Propose(ctx context.Context, pdata *ProposalData) (resp ProposalResponse, err error) {  ...  n := newNotify(timeoutCtx) // 超时机制  g.addNotify(pdata.notifyID, n) // 等待通知        // 提交处理  err = g.handler.HandlePropose(ctx, g.id, proposalRequest{    data: raw,  })  if err != nil {    return  }    // 收到信号,结束等待  ret, err := n.Wait(ctx)  ...}// worker 处理示例func HandlePropose(ctx context.Context, id uint64, req proposalRequest) error {  ...  queue := v.(proposalQueue)    // 将消息投递至队列中  if err := queue.Push(ctx, req); err != nil {    return err  }    // 投递信号,通知 worker 进行处理  h.signalToWorker(id, stateProcessReady)  return nil}


ReadIndex


ReadIndex 流程和提交流程相比,省掉了同步 log 的开销,可以较大幅提升读的吞吐,一定程度上降低读时延。其大致流程为:


1、Raft Group 在收到读请求时,记录下当前的 commit Index,称之为 ReadIndex


2、主节点向所有从节点发起一次心跳(确保当前主节点是否有效,避免网络分区时少数派 leader 仍处理请求)


3、等待状态机至少应用到 ReadIndex(即 apply index >= read index )


4、执行读请求,将状态机中的结果返回给客户端


func ReadIndex(ctx context.Context) error {  // 超时机制  timeoutCtx, cancel := context.WithTimeout(context.Background(), g.cfg.readIndexTimeout)  defer cancel()  n := newNotify(timeoutCtx)  // 开启通知  g.addNotify(notifyID, n)  ...  // 进入 raft 状态机流转  rn.ReadIndex(notifyIDToBytes(notifyID))  // 通知协程池调度处理  ...    g.handler.HandleSignalToWorker(ctx, g.id)  // 等待处理返回  n.Wait(ctx)  ...}


心跳机制


心跳处理是 Multi-Raft 中的核心问题之一,包括心跳发送与响应流程。当节点上维护的 Raft Group 增加至一定数量,Group 之间频繁的心跳消息会对系统造成负担。类似于 tiglabs/raft 设计与实现,心跳消息的发送与响应交由当前节点处理,Group 不独立生成心跳消息,这样可以有效减轻网络传输压力,提升网络传输效率。具体流程如下:


发送:


1、心跳消息发送通过manager 中的计时器定期触发,投递到propose消息队列;


2、 worker取出心跳消息进行处理,跳转执行处理状态机流转的流程;


3、状态机流转判断消息类型,如果是心跳类型消息,交由manager根据目标节点进行合并,消息体中携带groupID信息,统一定期发送;


响应:


4、当节点收到心跳消息,类似发送流程,经过状态机处理完成后,统一交由Manager进行响应。


func HandleMaybeCoalesceHeartbeat(ctx context.Context, groupID uint64, msg *raftpb.Message) bool {    ...      var hbMap map[uint64][]RaftHeartbeat    // 根据不同的消息类型,取出消息表,进行合并  switch msg.Type {  case raftpb.MsgHeartbeat:    h.coalescedMu.Lock()    hbMap = h.coalescedMu.heartbeats    ....    }  // 生成消息  hb := RaftHeartbeat{    GroupID: groupID,    To:      msg.To,    From:    msg.From,    Term:    msg.Term,    Commit:  msg.Commit,  }    ...        // 根据节点合并消息  hbMap[msg.To] = append(hbMap[msg.To], hb)  h.coalescedMu.Unlock()  return true}


上面大致介绍了消息提交、心跳等核心流程,并简要演示了 ReadIndex 的实现。这些流程集中于应用提交的视角,下面将从 worker 的视角,对协程调度、状态机流转展开介绍。


信号通知


信号通知机制用于保证同个 group 同一时刻有且仅有一个 worker 在处理消息,是协程池高效运行的主要保证。设计上,信号量中每个位代表一类消息,聚合同个 group 不同类型的消息到同一个信号。


图 4. 信号量设计


如图 4. 所示,bit1 代表当前 group 有正在处理的协程,避免了同个 group 同时被多个协程处理的风险;其次,若有新的提交或待处理的消息,可在当前信号量上进行修改,协程处理完后,会检查当前信号是否有更新,可选择继续处理;bit2、3、4 分别代表 group 中不同的消息类型。单个信号可以承载一种或多种类型消息,提升信号通知效率。


// 信号通知示例func signalToWorker(groupID uint64, state uint8) {    // 判断是否需要入队    if !h.enqueueGroupState(groupID, state) {    return  }    ...  for {    select {        // 投递至信号队列    case h.workerChs[int(count)%len(h.workerChs)] <- groupState{state: state, id: groupID}:      return    ...}


状态机流转


状态机流转(State Machine Transitions)主要负责跟随由 Raft 协议提交的命令进行状态更新。每个 Raft Group 都有一个关联的状态机,用于执行经过一致性协议确认后的操作。考虑到 Raft Group 数量过多带来的协程开销同时保证高性能的情况下,采用协程池负责处理节点上所有 Group 的状态流转。协程池调度管理其中主要基于信号通知机制,实现对每个 Group 消息的高效处理。如图 1 所示,状态机流转流程表示为 Loop 循环,下面围绕 Loop 过程介绍状态机的具体处理流程:


1、worker 定期从队列中获取信号量,根据信号携带的 groupID 从 Raft Group 消息队列中获取有待处理的 Raft Group 消息,根据 state 类型判断处理;


select {case in := <-ch:  span, ctx := trace.StartSpanFromContextWithTraceID(context.Background(), "", "worker-"+strconv.Itoa(wid)+"/"+strconv.FormatUint(in.id, 10))  v, ok := h.groups.Load(in.id)  if !ok {    span.Warnf("group[%d] has been removed or not created yet", in.id)    // remove the group state as group has been removed    h.removeGroupStateForce(in.id)    continue  }  group := (*internalGroupProcessor)(v.(*group))  ...


2、如果信号类型是 ProcessRaft(来自其他节点的 Raft 消息),则直接转发到 Raft 状态机 RaftGroup.RawNode->Step;


// process group request msgif in.state&stateProcessRaftRequestMsg != 0 {    if h.processRaftRequestMsg(ctx, group) {      in.state |= stateProcessReady     }}


3、如果信号类型是 ProcessReady,则执行 RaftGroup.RawNode->Ready 获取 Ready 消息体


// process group readyif in.state&stateProcessReady != 0 {    h.processReady(ctx, group)}


ProcessReady 类型是相对较为复杂的一个流程,需要处理通过 raft 一致性校验后的数据流转,主要分为如下几种:


a.首先处理上层消息提交,从 Group Propose 队列中获取所有待提交消息,同时清空 Propose 队列,执行 RaftGroup.RawNode->Step 将待提交消息转发到 Raft 状态机;


b.然后处理 SoftState 消息主节点变更消息,通过 RaftGroup.StateMachine-> LeaderChange 通知上层状态机变更;


c.接着判断是否有待发送消息 Messages,有则通过 RaftGroup->sendMessages 发送出去,sendMessages 遍历各个 msgs,根据消息目标节点,投递到 Transport 模块目标节点的消息队列中,等待发送出去;


d.其次处理 HardState 和 Entries(raft log)消息,将 HardState 与 entries 打包为一次 Write Batch 调用 Raft Group.Storage->Set 进行持久化,保证原子性;


e.接着处理 CommitedEntries,将 CommitedEntries 通过 RaftGroup.StateMachine->Apply 通知上层应用提交消息;


f.处理 ReadIndex 读请求,通知上层应用读操作


4、如果消息类型是 stateProcessTick,用于触发本地心跳消息或者选举

// process group tickif in.state&stateProcessTick != 0 {    h.processTick(ctx, group)    in.state |= stateProcessReady}


读到这里,Multi-Raft 的整体设计基本介绍完成。下面简要介绍关于 Multi-Raft 的接入使用,整体上接入使用与单个 etcd/raft 实例并无特别差异。



06 接入指南



Multi-Raft 作为独立的包,使用时类似于 etcd/raft,需实现 Storage 接口与 StateMachine 接口,基于 Manage 创建 Raft Group。整个接入过程较为清晰简单,其中关键步骤如下:


1、创建 storage 示例,实现 Storage 接口


kvStore, err := kvstore.NewKVStore(ctx, storagePath, kvstore.RocksdbLsmKVType, &kvstore.Option{    CreateIfMissing: true,    ColumnFamily:    []kvstore.CF{testRaftCF, testStateMachineCF},})storage := &testStorage{kvStore: kvStore, cf: testRaftCF} // testStorage 实现 Storage 接口


2、初始化 raftManager


cfg := &raft.Config{    NodeID:                        member.NodeID,    ...    TransportConfig: raft.TransportConfig{      Addr:                    member.Host,      MaxTimeoutMs:            30000,      ConnectTimeoutMs:        30000,      KeepaliveTimeoutS:       60,      ServerKeepaliveTimeoutS: 10,      MaxInflightMsgSize:      1024,    },    Logger:   log.DefaultLogger, // 自定义 log 日志打印    Storage:  storage, // 实现 Storage 接口的存储实例    Resolver: &addressResolver{nodes: nodesMap}, // 地址解析器}manager, err := raft.NewManager(cfg)


3、创建应用状态机,通过 manager 创建的 Raft Group


sm := newStateMachine(storage)groupConfig := &raft.GroupConfig{    ID:      1,    Applied: 0,    Members: allNodes,    SM:      sm,}group, err := manager.CreateRaftGroup(ctx, groupConfig) // 创建 Raft 组// wait for all node receive leader change and read index donesm.WaitLeaderChange()


4、应用层的提交通过 StateMachine 完成,manager 管理所有的 Raft Group。



07 总结



本文主要介绍 CubeFS 自主设计研发的 Multi-Raft 库,从整体架构、核心模块、实现细节以及接入使用方面作简要地介绍。



08 附录



基于现有实现,对 Multi-Raft 做了读写相关的性能测试,供各位读者参考。读基于 ReadIndex 接口,写基于 Propose 接口。


环境:3 节点,单节点 16C32GB

Raft Group 规模

Tick Interval (s)

读 QPS

写 QPS

500

1

141530

85247

100000

1

86789

49539

500

2

144569

87049

100000

2

121977

75462

500

3

145969

87148

100000

3

140357

86124


如上结果所示,在 raft group 数量比较多的情况下,心跳 ticker 的间隔会对节点整体产生较大的影响:


在心跳间隔为 1 秒时,10W 的 raft group 的吞吐会比 500 个 raft group 的吞吐下降 38%左右,平均时延增加 57%。


在心跳间隔为 2 秒时,吞吐差距缩小,相差仅为 15%。


在心跳间隔为 3 秒时,10W 的 raft group 吞吐几乎与 500 个 raft group 的吞吐一致。


作者介绍


Zongchao Hu,CubeFS Contributor 之一,负责纠删码元数据管理和存储引擎的研发。




CubeFS 简介




CubeFS 于2019年开源并在 SIGMOD 发表工业界论文,目前是云原生计算基金会 (CNCF) 托管的孵化阶段开源项目。作为新一代云原生分布式存储平台,兼容 S3、POSIX、HDFS 等协议,支持多副本和纠删码引擎,提供多租户,多 AZ 部署、跨区域复制等特性;适用于大数据、AI、容器平台、数据库及中间件存算分离,数据共享、数据保护等广泛场景。



►►►

往期推荐


文章转载自CubeFS点击这里阅读原文了解更多


联系Linux Foundation APAC




Linux基金会是非营利性组织,是技术生态系统的重要组成部分。

Linux基金会通过提供财务和智力资源、基础设施、服务、活动以及培训来支持创建永续开源生态系统。在共享技术的创建中,Linux基金会及其项目通过共同努力形成了非凡成功的投资。请关注LFAPAC(Linux Foundation APAC)微信公众号。

LFAPAC
Linux基金会通过提供财务和智力资源、基础设施、服务、活动以及培训来支持创建永续开源生态系统。在共享技术的创建中,Linux基金会及其项目通过共同努力形成了非凡成功的投资。
 最新文章