CubeFS 在 3.0.0 版本支持了纠删码(ErasureCode)子系统以应对更大规模的存储需求,降低存储成本。然而磁盘故障、节点宕机等问题依然是不能完全避免的,需要后台任务来保证集群故障后快速恢复以及数据完整性、数据均匀分布等,且由于 EC 存储和多副本存储模式底层实现上的不同,实现细节亦有不同。本文结合图文和源码,阐述 CubeFS 的纠删码存储子系统——Blobstore 的后台任务调度设计和实现,重点分析后台任务的互斥机制、任务队列以及不同后台任务具体执行过程中的共通性。
01
名词解释
为方便理解源码和文章内容,下面先对 Blobstore 的一些相关名词进行解释:
Volume:卷是用户数据的逻辑存储单元,通过 vid 唯一标识;
Blob:进行一次 EC 计算的数据大小,用户数据先按照 blob 大小进行拆分,然后进行计算,如图在 4+4 的纠删码模式下,产生 4 个数据块和 4 个校验块,写入到 chunk 中,这 8 份数据落在 8 个不同节点的磁盘上, 通过 bid 唯一标识;
Chunk:组成 volume 的基本单元,对应实际的物理存储空间,每个 chunk 只归属于一个卷,由 blobnode 节点管理;
Shard:组成 chunk 的小数据块,对应上述 blob 计算得到的 8 个小数据块,写入 chunk,这些 shard 组成对应用户数据的一条 EC 条带。
02
后台任务
纠删码子系统的后台任务可以大致分为两类:
Chunk 迁移相关任务:主要实现 chunk 在磁盘之间的数据迁移,包括:坏盘修复、chunk 均衡、节点/磁盘下线等;
Chunk 数据维护任务:主要用于保证 chunk 中数据的完整性,包括:数据巡检、数据修补、异步删除。
两种任务都以“基准 bid 列表“(BenchmarkBids,包含了 一个 chunk 上应该记录的所有有效数据的 bid)作为参照,通过 EC 计算进行数据恢复,后面将详细描述“基准 bid 列表”的获取和数据恢复的过程。整个后台任务(Task)调度的实现主要涉及四个模块:
Scheduler:负责任务从生成到清理整个生命周期的调度和状态流转;
Worker:负责任务的具体执行,运行在数据实际存储的机器上,周期性地向 Scheduler 获取任务和上报任务执行情况;
Kafka:用于 Scheduler 和 Worker 之间数据修补和删除任务消息的传递,通过将任务消息投递在不同的 kafka 主题,实现任务消费的优先级管理。
ClusterMgr(CM):集群的元数据管理中心,管理卷、chunk 以及磁盘之间的映射关系,记录任务的元数据。
互斥机制
Scheduler 启动后全局维持一个 VolTaskLocker,确保同一个卷只有一个 chunk 迁移类型的后台任务在执行,防止并发冲突,导致数据错乱。同一个卷的 chunk 数据维护任务可以和迁移类型任务可以并发,比如 chunk 迁移时正好有异步删除任务执行,这会使得源 chunk 上 shard 被删除但目的 chunk 上仍然存在,如果正好有修补任务执行,目的 chunk 上会有 shard 缺失。所以修补和删除任务执行完成后都需要二次校验机制来保证数据不被漏删或缺失。
任务队列
任务锁定机制
Scheduler 管理任务的整个生命周期并向 Worker 分配任务,任务保存在内存队列中,以两种状态进行区分:可分配任务和不可分配任务,两种状态可以相互转换,一个 Task 被 Scheduler 分配给 Worker 执行后,处于锁定状态,并且锁定时间可根据任务的执行情况进行调整。如图所示,红色 Task 表示不可分配任务,绿色 Task 表示可分配任务,任务锁定机制给队列中的每个任务设置一个 deadline,每次需要从队列中取出任务时,将 deadline 与当前时间戳进行比较,没有超过 deadline 的任务不会被取出,需要重试某个任务时,将该任务的 deadline 缩短,使得该任务能快速分配;需要延长任务执行时间时,对其进行续租。
缩短锁定时间场景:任务执行失败,需要分配给其他 Worker 重试;
延长锁定时间场景:Scheduler 周期性收到 Worker 上报的任务消息,若任务执行正常,则延长一段执行时间进行续租。
任务租约机制
Worker 后台启动一个协程定时向 Scheduler 发送请求续租所有执行中的任务,Scheduler 先判断该任务是否在队列中,如果存在,对其 deadline 字段进行累加,以保证该任务不会被分配。相反的,如果一个任务执行时间已抵达 deadline 又未续租,则会被重新分配。
任务状态机
当 Worker 向 Scheduler 获取一个任务时,Scheduler 首先遍历 Queue 的 doing 队列,找到可分配的 Task,如果 doing 队列中的 Task 都已分配,则将 todo 队列中的队首 Task 取出,插入 doing 队列队首,把它的 deadline 设置为当前时间加上任务锁定时间,将其转换为不可分配任务。
TaskQueue 和 WorkerTaskQueue 是 Scheduler 基于 Queue 结构的进一步封装,向上提供功能更丰富的接口:
TaskQueue:用于 Scheduler 内部任务状态的流转,比如管理准备执行的任务和已执行完成的任务,当这类任务执行失败(校验、更新 CM 元数据信息)时需要重试,所以除 Task 出入队列之外,对外还提供重试接口——TaskRetry;
WorkerTaskQueue:负责与 Worker 交互,当任务执行发生异常时,需要向 Worker 提供相应的接口进行调整:
Acquire(获取):从队列中分配一个任务给 Worker;
Cancel(取消):取消某个任务,将其重置为可分配状态;
Reclaim(回收):重新申请目的资源,将其重置为可分配状态;
Renewal(续租):续租任务,保持任务锁定状态。
Reclaim 通常用于迁移过程中目的资源不可写的情况,scheduler 收到 reclaim 请求后先重新申请目的资源,更新任务的元数据,将任务的 timeout 设置为 0,立即转化为可分配状态。
如图所示为任务状态机的流转示意图,包含了任务在流转过程种的 4 种状态:
Inited:初始化的任务状态,生成全局唯一的 TaskID,经过 prepare 阶段分配目标迁移资源,转化为 Prepared 状态;
Prepared:该状态的 Task 记录在 WorkerTaskQueue 中,任务执行失败时,可以取消、回收以重置任务状态,或者续租来延长任务执行时间;
Completed:任务执行完成,向 CM 更新任务元数据;
Finished:CM 上的卷和磁盘 chunk 映射关系已更新,删除 CM 上的任务元数据。
任务执行的幂等性
后台任务通常涉及大量数据读写,任务重复执行不仅可能导致数据不一致,也会占用系统读写资源,影响用户体验,所以确保任务执行的幂等性非常重要。
在 Blobstore 中,Scheduler 的任务队列维护一个以 TaskID 作为 Key 的哈希表,任务入队时,需要先校验该 Task 是否已存在,否则返回错误。同时需要保证 Task 在队列之间流转时,有出必有入,任务执行队列取消、回收、续租只修改 Task 的状态和元数据,没有生成新任务入队。迁移类型任务进入准备队列前,scheduler 会向 CM 获取对应卷最新的映射关系,如果任务的源 chunk 已不在映射中,说明迁移以完成,无需执行该任务。
Chunk 迁移完成后,Task 从执行队列加入到已完成队列,假设 scheduler 向 CM 更新卷映射失败(超时),此时将 Task 重新入队已完成任务队列,只需要执行更新映射的操作(幂等),无需进入执行队列重新执行迁移操作。
数据修补和删除任务本身具有幂等性,scheduler 直接向 worker 发生请求来执行任务,无需为 worker 分配资源和监听任务状态,所以 scheduler 只维护内存任务池来存储这些任务。
接下来通过坏盘修复任务来进一步理解 chunk 迁移类任务调度设计和实现:
坏盘修复
一个坏盘修复任务的执行单位是 chunk,通过 vuid 唯一标识,通常来说一个磁盘上会有多个 chunk。EC 计算得到的数据块和校验块以 shard 的形式存储在 chunk 上,通过 bid 来标识,每个 chunk 维护一个 bid 列表。一个坏盘修复任务由多个 chunk 迁移组成,其中关键信息包括:任务 ID、类型、状态以及坏盘的磁盘 ID 和 vuid(待迁移的 chunk),其他 chunk 迁移类型的任务类似。
如图所示为坏盘修复任务的整体架构图,Scheduler 负责任务调度,Worker 执行任务,CM 保存任务相关的元数据。Scheduler 通过不同的 Mgr 来管理不同类型的任务,比如 DiskRepairMgr 负责坏盘修复任务的整个生命周期,需要 Run() 方法启动多个个异步协程在任务队列中流转任务:
如下图所示,DiskRepairMgr 维护三个任务队列,在 Scheduler 和 Worker 之间流转任务,其中 prepareQueue 和 finishQueue 负责 Task 在 Scheduler 内部的流转,而 workQueue 是 WorkerTaskQueue 结构,主要负责与 Worker 的交互,一个 Task 的 生命周期大致如下:
从 CM 获取坏盘,生成任务,向 CM 注册任务,加入到 prepareQueue;
等待执行任务队列空闲,从 prepareQueue 获取一个 Task,进入 prepare 阶段(完善任务信息,比如 chunk 迁移目标等),向 CM 更新任务元数据,从 prepareQueue 队列移除,加入到 workQueue;
Worker 从 workQueue 获取 Task,此时 Task 被锁定;
workQueue 从 CM 获取已完成任务的消息,添加该 Task 加入到 finishQueue;
从 finishQueue 取出任务,向 CM 更新修复后的映射关系,删除该任务;
该坏盘的所有 chunk 修复完成后,向 CM 更新该坏盘状态为“已修复”,并清理任务;
清理垃圾任务;
Task 的元数据信息持久化记录在 CM,当 Scheduler 重新启动时,从 CM 拉取任务列表,并根据任务的状态添加到对应的队列,保证任务可以继续执行。
坏盘修复任务涉及多个 chunk 的迁移,需要根据 CM 记录的对应磁盘的任务列表 和 chunk(vuid)列表的一致性判断整个坏盘修复任务完成,分为两种情况:
仍从 CM 获取到该磁盘的修复任务,但该盘上已经没有 chunk 存在,则是因为网络超时导致了重复添加任务,这种视为垃圾任务需要清理;
从 CM 获取到该盘仍有未修复的 chunk,但是没有任务在执行修复,则需要对该盘进行修复重试。
Scheduler 负责任务调度,Worker 则负责任务的实际执行,如图为 WorkerService 的结构图:
WorkerService 负责与 Scheduler、blobnode(数据节点)之间的通信(worker 不与 CM 通信,由 Scheduler 向 CM 更新任务的元数据),通过 TaskRunnerMgr 管理所有后台任务的执行,每个具体的任务对应一个 TaskRunner,WorkerSerivice 启动后,周期性向 Scheduler 请求新任务,同时续租未完成任务、通知已完成任务。
Scheduler 生成的每一个任务,对应 worker 中的一个 taskRunner,ITaskWorker 定义了任务执行所需要的接口方法,包括用于获取前文提到的“基准 bid 列表”的方法,ITaskWorker 将“基准 bid 列表”拆分成多个 Tasklet 子任务执行来实现并发。
type TaskRunner struct {
taskID string
w ITaskWorker
schedulerCli scheduler.IMigrator
...
}
type ITaskWorker interface {
// split tasklets accord by volume benchmark bids
GenTasklets(ctx context.Context) ([]Tasklet, *WorkError)
// define tasklet execution operator ,eg:disk repair & migrate
ExecTasklet(ctx context.Context, t Tasklet) *WorkError
// check whether the task is executed successfully when volume task finish
Check(ctx context.Context) *WorkError
OperateArgs() scheduler.OperateTaskArgs
TaskType() (taskType proto.TaskType)
GetBenchmarkBids() []*ShardInfoSimple
}
“基准 bid 列表”是所有后台任务的参照,无论是 chunk 迁移还是数据巡检、修复,决定了要将哪些数据写入到目标 chunk,直接关系到集群数据的完整性,其获取的基本原则是:只多不少。有效数据一定在列表中,列表中没有的一定是垃圾数据,列表中可以有垃圾数据(上层部分写入成功,后续可以通过删除任务清理)。
坏盘迁移与均衡和磁盘下线任务同样是 chunk 迁移任务,但由于坏盘的 chunk 不可读,“基准 bid 列表”的获取也略有不同:当修复任务类型为坏盘修复时,该坏盘上 chunk 对应的 vuid 不参与计算“基准 bid 列表”(如图所示,vuid4 位于坏盘上,使得 bid3 虽然被标记删除但迁移时仍然需要恢复,而磁盘正常的场景下不需要迁移),然后校验用于修复的 vuid 个数是否满足 EC 计算的最小 shard 个数,将这些 vuid 对应的 bid 列表聚合,把有删除和标记删除以及缺失不可恢复的 bid 剔除,得到“基准 bid 列表”。在执行具体修复或者迁移时,将整个“基准 bid 列表”的数据写入新的 chunk,如何将“基准 bid 列表”的数据写入新 chunk,下面通过“数据修补”部分一起解析。
综上,以坏盘修复任务举例,描述了 CubeFS 的纠删码子系统的 chunk 迁移类型后台任务的调度设计和实现,chunk 均衡任务以及磁盘下线与其类似,大同小异,不再赘述。
数据巡检、修补
WorkerService 一方面从 Scheduler 拉取 chunk 迁移相关的任务,同时拉取巡检任务:根据“基准 bid 列表”比对 chunk 上的 bid 列表,巡检完成返回的结果是一个 MissedShard 数组,最后用这个巡检结果作为参数,向 Scheduler 发送巡检完成的请求。巡检任务通过 InspectTaskMgr 管理。
type MissedShard struct {
// 标识缺失数据的 chunk
Vuid Vuid `json:"vuid"`
// 标识缺失的 shard
Bid BlobID `json:"bid"`
}
消息回到 Scheduler,并不会直接将该消息作为一个修补任务加入任务列表,而是向 proxy 发送数据修补的请求,proxy 收到的修补任务请求分为两种,一种由数据巡检任务产生,一种由上传数据失败时发起,这两种任务对应 kafka 中两个不同的主题(topic),且设置不同的 topic 优先级,比如可以使用户自主发现的缺失修补任务优先执行。
同样,Scheduler 只负责任务的调度,具体的修补过程是由 worker 执行的,数据修补和坏盘修复任务本质都是对 chunk 上的数据进行恢复,不同在于坏盘修复任务基于 chunk 粒度,而修补则基于 bid 粒度。所以两种任务具体执行,都是依赖 ShardRecover 实现的,ShardRecover 维护以下信息:
replicas:待修复 shard 所在条带的所有 chunk 的 vuid;
chunksShardsBuf:完整的条带数据缓存;
codeMode:条带的编码模式;
shardGetter:访问具体某个 chunk 的信息,比如获取某个 shard、获取该 chunk 的 bid 列表;
repairBidsReadOnly:待修复 shard 对应的 bid 列表;
前置条件准备完成后,通过 EC 计算将完整数据记录到 ShardRecover 的缓存上,用于后续写入目标 chunk。
func (r *ShardRecover) RecoverShards(ctx context.Context, repairIdxs []uint8, direct bool) error {
...
// 通过 EC 计算将缺失的数据恢复到 repairIdx 对应的 chunk 上,记录在 chunksShardsBuf
}
func (r *ShardRecover) GetShard(idx uint8, bid proto.BlobID) ([]byte, error) {
// 从缓存中读取需要写入目标 shard 的数据
return r.chunksShardsBuf[idx].FetchShard(bid)
}
坏盘修复任务数据的恢复会被 ITaskWorker 拆成多个 Tasklet 子任务来执行,根据 Tasklet 构建 shardRecover 在缓存中恢复所有数据,最后将数据写入到目标 chunk。
03
总结
本文结合代码描述了 CubeFS 的纠删码存储子系统后台任务如何在各个系统组件之间调度,以“坏盘修复”任务为例,介绍任务在 Scheduler 内部如何生成,流转、分发、清理,其次从数据修复、迁移的角度围绕”基准 bid 列表“展开介绍不同类型任务的详细执行流程及共通性。
04
参考
作者介绍
Jian Xie,在 CubeFS 项目参与 Blobstore 的设计与研发。
CubeFS 简介
CubeFS 于2019年开源并在 SIGMOD 发表工业界论文,目前是云原生计算基金会 (CNCF) 托管的孵化阶段开源项目。作为新一代云原生分布式存储平台,兼容 S3、POSIX、HDFS 等协议,支持多副本和纠删码引擎,提供多租户,多 AZ 部署、跨区域复制等特性;适用于大数据、AI、容器平台、数据库及中间件存算分离,数据共享、数据保护等广泛场景。
►►►
往期推荐
文章转载自CubeFS。点击这里阅读原文了解更多。
CNCF概况(幻灯片)
扫描二维码联系我们!
CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux Foundation,是非营利性组织。
CNCF(云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。请关注CNCF微信公众号。