CubeFS 在 3.0.0 版本推出了纠删码特性[1],在保障数据耐久性的前提下提供更低的数据冗余度,也是大数据降冷、数据库备份等业务场景的首选。为了让社区用户更加深入的理解纠删码设计细节,我们将从源码入手逐步讲解各个模块。
本文结合图文和源码,阐述 CubeFS 的纠删码存储子系统——Blobstore 的IO流程设计和实现,从顶层到底层介绍一下基本的读、写、删除流程在access、proxy、clusterMgr、scheduler、blobnode几个模块之间的交互和流转。
访问blobstore
模块简介
Access:请求接入网关,提供数据读、写、删等基本接口;
BlobNode:单机存储引擎,管理整机的磁盘数据,负责数据的持久化存储, 执行卷修补、迁移和回收任务;
ClusterManager:元数据管理模块, 负责集群资源(如磁盘、节点、存储空间单元)的管理;
Proxy:ClusterManager 与异步消息代理模块,提供数据写入空间的分配、删除与修补消息转发等;
Scheduler:异步任务调度中心,负责磁盘修复、磁盘下线、数据均衡、数据巡检、数据修补以及数据删除等任务的生成和调度。
服务注册与发现
在搭建一个 blobstore 集群的时候,会利用 consul 服务注册与发现,注册多个集群,并将可以访问这些集群的一篮子 access 注册至该 consul 地址。上层应用在访问 blobstore 子系统时,会解析 consul 地址,通过一定策略(随机、轮询等)拿到相应的 access。
用户只会直接访问 Access,通过 Access 为入口来访问整个 blobstore 子系统。Access 内部会通过一些策略(随机、容量、地域就近等)选择某个集群,或者由上层记录了 ClusterID 指定访问某个集群。
名词解释
为方便理解源码和文章内容,下面先对 Blobstore 的一些相关名词进行解释:
Volume:卷是用户数据的逻辑存储单元,通过 vid 唯一标识;
Blob:进行一次 EC 计算的数据大小,用户数据先按照 Slice大小进行拆分,然后进行计算,如图在 4+4 的纠删码模式下,产生 4 个数据块和 4 个校验块,写入到 chunk 中,这 8 份数据落在 8 个不同节点的磁盘上, 通过 bid 唯一标识;
Chunk:组成 volume 的基本单元,对应实际的物理存储空间,每个 chunk 只归属于一个卷,由 blobnode 节点管理;
Shard:组成 SliceUnit的小数据块,对应上述 单个Slice EC计算得到的 8 个数据块和校验块,每个Slice Unit追加写入到物理Chunk中,这些 shard 组成对应用户数据的一条 EC 条带。
基本流程
下面我们将以 access 模块作为入口,介绍一下基本的 IO 流程。
上层用户访问 blobstore 子系统,一般会通过 access 模块访问,它对外暴露两种形式的 client:
access 作为进程提供的 http API 接口,access 封装的 SDK 集成到上层使用者,以下统称为 access,需要区分这两种形式时简称 api 和 sdk。sdk 和 api 都提供了基本的读、写、删等操作。
读流程
上层通过一定策略(负载均衡等)挑选一个 access,发起读请求
access 模块接受来自上层的读请求,检查有效性、合法性
access 模块拿到卷信息(卷信息、磁盘信息,都有本地缓存,本地缓存失效,读 Proxy 缓存,再读 ClusterMgr)。然后生成更具体的 Blob 对象信息
如果是小对象(对应 1 个 blob,并且读的 blobsize 比较小),会优化为直读,根据 diskID 找到对应 blobnode 的 host,依次只读取该 blob 下所有 shard 数据块,不读校验块
否则普通对象,遍历 blob 数组,依次读取 blob。根据 volume 的 codemode 获取 N,根据 idc 排序 vuid,并行读取 N+1 个块。如果有 N 份数据正常,则可以正常拼接为期望的对象,也就是这 N+1 里最多允许有 1 个失败,如果有 2 个及以上失败则继续读剩余的 vuid 对应数据修复读
blobnode 模块接受 access 模块的 Get 请求,检查有效性
blobnode 拿到 disk,根据 vuid 得到 chunkStorage,按[from, to)范围读取 shard
读取 rocksdb 上的元数据
获取 storage,限制磁盘带宽速度,读取 shard 数据,向上返回
写流程
access 模块接受来自上层的写请求,检查有效性、合法性
access 确定 codemode 和 cluster,申请 blobs 的一段范围 bid
读取 readBody 流到一段 buffer 里,对读到的数据做 EC 编码得到校验块
遍历 blobs 数组,对每个 blob 请求拿到卷信息(本地 cache/Proxy/ClusterMgr),拿到 blobnode 对应的 host
遍历 volume 的 units,组装 putShard 参数,依次调用 blobnode 接口把 shards 写入。大于 quorum 则算成功
blobnode 模块接受到 access 模块的 Put 请求,检查有效性
blobnode 拿到 disk,根据 vuid 得到 chunkStorage,判断是否有足够空间且允许写入
取得 storage,先写数据,再写 rocksdb 元数据。写入时会限制磁盘带宽速度。向上返回
删除流程
access 模块接受来自上层的删除请求,检查有效性、合法性
根据 location 封装删除消息,投递给 proxy 模块,proxy 投递给 kafka 对应的删除消息主题
scheduler 模块后台任务异步消费 kafka 对应主题的删除消息,批量删除
scheduler 根据 locations 里携带的 host,找对应 blobnode 发起标删和删除请求,做两阶段删除,对应的所有 blobnode 全部成功后才算本消息删除成功
如果删除失败,则 scheduler 会将失败消息重新投递只 kafka 的 failed 主题中等待下一次消费
blobnode 接收到 scheduler 模块的标删/删除请求,检查有效性
读元数据,检查 flag 已经时标删状态,删除 shard 对应的 rocksdb 元数据。
数据部分通过调用系统 punch hole 的接口,做到非常高效的释放
详细流程介绍
下面结合图文和代码,具体介绍一下几个流程,以及一些特殊场景的优化和处理。
代码做了一定程度的精简,仅作说明
读、写、删操作,都会用到一个比较重要的结构体 Location,location 定义:
type Location struct {
ClusterID proto.ClusterID // 集群 ID
CodeMode codemode.CodeMode // 编码模式
Size uint64 // 对象大小
BlobSize uint32 // 分块大小
Crc uint32 // crc 签名
Blobs []SliceInfo // 分块位置
}
type SliceInfo struct {
Vid proto.Vid // 卷 ID
MinBid proto.BlobID // 开始的 BID
Count uint32 // 连续多少个 Blob
}
读流程
用户数据按照一定大小被切分成多个连续的 blob,打散分布在集群的多个物理节点。在并发场景下,读请求能充分利用全部集群资源数据。
来自于 api 和 sdk 的请求 Access,最终都会调用 access 模块的 StreamHandler 来处理。
另外,该实例会启动后台协程 discard 异常的卷,后台更新 cluster 的容量等信息。
为了提供更高的可用性和容灾能力,blobstore 子系统会将用户数据切分后编码计算并持久化存储于多个 AZ 中。但是多 AZ 情况下网络带宽消耗比较大,access 访问时候会根据机房 IDC 做排序,优先读取本 AZ 的数据,减少跨 A 带宽 Z 流量。
user Get
|-> api/sdk
|->Get (ctx, writer, location, readSize, offset)
|-> readOneBlob(ctx, getTime, serviceController, blob, sortedVuids, shards)
|-> readOneShard(ctx, serviceController, blob, vuid, stopChan)
|-> blobnodeClient.RangeGetShard(ctx, host, &args)
代码调用流程
Get:
接收请求参数,包括读取位置、读取大小和偏移量
生成需要读取的 blob 列表
获取服务控制器
尝试直接读取数据分片,如果失败则进行 EC 重构读取
将读取的数据写入客户端
readOneBlob:
计算需要读取的最小分片数 minShardsRead
使用 shardPipe 生成器函数并行读取分片
将正确偏移的字节写入 writer
收集读取的分片数据,检查是否可以重构数据块
如果数据块可以重构,停止读取并返回成功;否则返回错误
小文件优化
如果 blobs 里只有 1 个对象,并且是小对象,会针对性的做小对象优化直读。
小文件做在线 EC 会有较严重的 IO 放大。比如 1K 大小的文件,做 EC(3+3)切分后的 shard 存储在 3+3 个位置,读取至少需要 3 次网络请求,访问 3 个 blobnode 才能构建出完整数据。在纠删码中小文件的扇入扇出问题尤为突出,极大影响小文件的读写效率;对此我们规定 EC 的每个数据块有最小数据大小,且可根据情况定义数据块大小值(比如 4K)。当实际数据不足 data*4K 时用 0 补齐,编码后写入到各 blobnode。
以上图为例讨论该优化项的优势:
文件系统一般都以 4K 为一个 Block 大小
读取时可直接读第一个 D 数据块;也可从任一个 P 走恢复读流程;减少 IO 路径且不消耗跨 AZ 带宽(假设多个 blobnode 在多个 AZ,即多机房/地域)
数据损坏时,可从任一个校验块恢复数据
数据冗余度提升至 (P+1),提高了可靠性
这样做也带来了缺点:同时也浪费了部分空间。
但是在云计算场景中,小文件的性能要求比存储成本更高,用空间换时间的思想很好地解决了该问题,使存储抽象层更加统一完善。
if int(blob.BlobSize) <= blob.ShardSize || blob.ReadSize < blob.BlobSize/4 {
h.getDataShardOnly(ctx, getTime, w, serviceController, blob)
}
并发优化
例如 access 需要读一个 blob(1,2,3)有 3 个 shard 块,会同时向这 3 个 shard 块所在的 blobndoe 发起读请求,充分利用全部集群资源。
修复读
access 会并发找多个 blobnode,同时读取 N+1 个块。如果有 N 份数据正常,则可以正常拼接为期望的对象。当凑不够 N 份数据,需要继续读取直到 N 份数据成功,可 EC 修复。
EIO 坏盘修复
特殊的,如果读或者写的过程中,遇到了 EIO 错误,会把当前磁盘标记为坏盘,并且上报给 ClusterMgr 模块,然后 ClusterMgr 会通知到 Scheduler 模块,生成修盘任务发给其它 blobnode(worker)。做到坏盘的自动发现,自动修复。
磁盘报告 EIO 之后,状态设置为 broken,对于非 EIO 的坏盘场景,也可以通过接口主动设置坏盘;volume 感知到有坏盘的,会降低卷的健康度。续租卷的时候会失败,分配的时候不会优先分配出去
clusterMgr 收到标记坏盘请求后,scheduler 模块会周期拉取感知到坏盘,并针对该盘下的所有 chunk 生成修盘任务,每个 chunk 为一个任务触发数据重建,进入 reparing 修复状态
scheduler 对 blobnode 提供 AcquireTask 接口,blobnode 会周期获取修盘任务,依次修复每个 chunk,并上报该 chunk 完成修复,并把该 chunk 做 release 释放
scheduler 感知到该坏盘下的所有 chunk 修复完毕,会找 clusterMgr 修改磁盘状态为 repaired
整个进度状态 scheduler 可感知,修复整个盘后,blobnode 则会彻底摘除句柄
换上新盘之后,调用在线注册的接口,注册之后自动发现
blobnode 遇到 EIO 上报坏盘。
func (s *Service) handleDiskIOError(ctx context.Context, diskID proto.DiskID, diskErr error) {
ds, exist := s.Disks[diskID]
ds.SetStatus(proto.DiskStatusBroken)
err := s.ClusterMgrClient.SetDisk(ctx, diskID, proto.DiskStatusBroken)
go s.waitRepairAndClose(ctx, ds)
}
scheduler 后台异步处理坏盘任务的生成,以及任务流转。
func (mgr *DiskRepairMgr) Run() {
go mgr.collectTaskLoop()
go mgr.prepareTaskLoop()
go mgr.finishTaskLoop()
go mgr.checkRepairedAndClearLoop()
go mgr.checkAndClearJunkTasksLoop()
}
写流程
用户数据按照一定大小被切分成多个连续的 blob,打散分布在集群的多个物理节点。在并发场景下,写请求能充分利用全部集群资源数据。
user Put
|-> api/sdk
|-> Put(ctx, reader, args.Size, hasherMap)
|-> writeToBlobnodesWithHystrix(ctx, blobident, shards, func() {...})
|->writeToBlobnodes(ctx, blob, shards, callback)
|->blobnodeClient.PutShard(ctxChild, host, args)
在写流程里,上层通过 api 和 sdk,最终都会走到 access 里 StreamHandler 的 Put 方法。将对参数 Body 里读到的数据做 EC 编码得到校验块,把数据块和校验块一起整个 shards 下发给 blobnode。
Quorum 写加速
并发同时向多个 blobnode 节点写入 shard 分片数据,等待多个节点返回结果。设定一个 quorum 值,满足 quorum 成功个数后则向上返回成功,不必等待所有节点都成功。能一定程度上减少网络波动、长尾效应。
LRC 优化
如上图,编码为 LRC(12,9,3) 或者 RS(12,12)。
若使用 RS 码:以 D1~D12 数据块计算 P1~P12 个校验块。
若使用 LRC 码:以 D1~D4 数据块计算本地校验块 L1,以 D5~D8 计算 L2,D9~D12 计算 L3;以 D1~D12 计算全局校验块 P1~P9。
在一个 EC 组中,损坏一个数据块的几率要远大于损坏多个数据块的几率。如果能够及时发现坏块,那么多数情况下只需要恢复一块就够了。这时启用 LRC 就可以在本 AZ 内完成数据恢复,避免跨 AZ 的带宽和 IO 消耗。
比如在并发写入时候,写入 blobnode chunk1 的 D1 写失败或者其它原因导致损坏,写入其它 blobnode chunk 的 D2~D4 成功,L1 成功;则就可以借助 AZ-1 中的其它几个块来修复 D1,加速写入,提升在线写入可用性。也会用在 AZ 内后台任务修复场景优化。
blobnode 写入
blobnode 是 blobstore 子系统的单机存储引擎,接收接入层模块的数据,持久化到磁盘上,负责用户数据的底层格式组织与落盘操作, 执行卷修补、迁移和回收任务。元数据、数据分离。元数据批量处理,数据尽量顺序处理。
blobnode 内部模块分层如下图
blobnode 收到上层请求后,先解析并判断 args 参数的合法性(DiskID,Size,Bid,Type),判断磁盘状态是否坏盘,是否允许写入,是否有足够空间,然后找到对应的 chunk 处理读请求。先写数据,再写元数据。
func (s *Service) ShardPut(c *rpc.Context) {
ds, exist := s.Disks[args.DiskID]
cs, exist := ds.GetChunkStorage(args.Vuid)
shard := core.NewShardWriter(args.Bid, args.Vuid, uint32(args.Size), c.Request.Body)
err = cs.Write(ctx, shard)
c.RespondJSON(ret)
}
最终执行到底层 datafile.go 里做写入,写入时候会利用令牌桶算法做并发控制和限速。
func (cs *chunk) Write(ctx context.Context, b *core.Shard) (err error) {
stg := cs.GetStg()
return stg.Write(ctx, b)
}
func (stg *storage) Write(ctx context.Context, b *core.Shard) (err error) {
data, meta := stg.data, stg.meta
err = data.Write(ctx, b)
return meta.Write(ctx, b.Bid, core.ShardMeta{...})
}
func (cd *datafile) Write(ctx context.Context, shard *core.Shard) error {
cd.qosAllow()
qoswAt := cd.qosWriterAt(ctx, cd.ef)
_, err = qoswAt.WriteAt(headerbuf, pos)
w = &bncomm.Writer{WriterAt: cd.ef, Offset: pos}
qosw := cd.qosWriter(ctx, w)
tw := bncomm.NewTimeWriter(qosw)
tr := bncomm.NewTimeReader(body)
// 写入数据,并且按 block 分段编码
_, err = encoder.Encode(tr, int64(shard.Size), tw)
_, err = qoswAt.WriteAt(footerbuf, pos)
}
慢盘 panic 优化
在长期运行过程中,由于硬件或软件等原因(硬盘体质、频繁读写等),部分硬盘访问非常慢,会出现延时变大、读写变慢的情况。
在之前版本的 blobnode 进程里,每一个从上层来的读写请求,都会对应新起一个协程来处理该请求,最终处理完返回。
在正常情况下的 IO 访问,即使有比较多的请求访问,GMP 模型会保证大部分的协程只会映射绑定到一定量的线程上。每个 G(Goroutine)绑定在 M 上一般来说最多运行 X 毫秒,超时了会主动挂起让出;或者是遇到了一些 sleep/wait 之类的主动让出,等待下一次调度。
而在出现慢盘的时候(硬件故障,性能故障,或 IO 负载过高等情况),IO 处理并没有类似 yield 让出,系统调用阻塞处理 IO。在这种情况下,GMP 模型的 G 和它绑定的 M 会一起从 P 上剥离,唤醒/新创建 线程去承接别的 G 函数任务。慢盘场景下会导致当前的处理 IO 协程几乎都各自映射绑定了一个线程,并且 IO 处理慢会出现堆积。原有的线程还没释放,又新创建线程,导致堆积的 IO 请求越来越多,协程和线程也越来越多,最终会导致 panic。
慢盘场景一开始只是会导致些许性能下降,累积一定程度则会造成 panic 服务不可用。
优化:可以将读写 IO 的协程数控制在一定的范围,即使发生了慢盘,也不会产生过多的线程。
设计一个读写 IO 池,持有固定数量的读写队列,每个队列绑定对应的读写协程。所有上层访问的读写 IO 都入队到读写 IO 池,由后台的读写协程依次消费处理。可以配置写协程的个数,以及队列深度。这样还能带来一些好处:更节省 CPU 负载,减少了内核等待 IO 的一些消耗;线程数稳定且比较少;在消费 IO 队列的时候,做一些聚合操作;减少线程切换开销;IO 在用户态管理和排队。
删除流程
删除流程比较简单一些
access 模块组装删除消息,投递给 proxy
proxy 投递给 kafka 对应的删除消息主题
scheduler 模块异步消费 kafka 对应主题的删除消息,批量删除
找对应 blobnode 发起标删和删除请求,做两阶段删除,对应的所有 blobnode 全部成功后才算本消息删除成功
blobnode 做标删,先读元数据判断 Flag 是否符合预期,最后修改元数据为标删状态
blobnode 删除。先读元数据,检查 flag 已经时标删状态,删除 shard 对应的 rocksdb 元数据。数据部分通过调用系统 punch hole 的接口,做到非常高效的释放
access 模块组装删除消息,投递给 proxy
func (h *Handler) Delete(ctx context.Context, location *access.Location) error {
blobs := location.Spread()
... // 依次组装 deleteArgs
retry.Timed(3, 200).On(func() error {
err = h.proxyClient.SendDeleteMsg(ctx, host, deleteArgs)
})
}
proxy 投递给 kafka 对应的删除消息主题
func (s *Service) SendDeleteMessage(c *rpc.Context) {
err := s.blobDeleteMgr.SendDeleteMsg(ctx, args)
}
func (d *blobDeleteMgr) SendDeleteMsg(ctx context.Context, info *proxy.DeleteArgs) error {
msgs := make([][]byte, 0, len(info.Blobs))
... // 依次组装 msgs
err := d.delMsgSender.SendMessages(d.topic, msgs)
return err
}
blobnode 处理 Delete 请求,markDelete 流程类似
func (s *Service) ShardDelete(c *rpc.Context) {
ds, exist := s.Disks[args.DiskID]
cs, exist := ds.GetChunkStorage(args.Vuid)
err = cs.Delete(ctx, args.Bid)
}
func (cs *chunk) Delete(ctx context.Context, bid proto.BlobID) (err error) {
stg := cs.GetStg()
n, err := stg.Delete(ctx, bid)
}
func (stg *storage) Delete(ctx context.Context, bid proto.BlobID) (n int64, err error) {
meta, data := stg.meta, stg.data
shardMeta, err := meta.Read(ctx, bid)
if shardMeta.Flag != bnapi.ShardStatusMarkDelete {
return n, bloberr.ErrShardNotMarkDelete
}
// 删除元数据
err = meta.Delete(ctx, bid)
shard := &core.Shard{...}
// discard hole
err = data.Delete(ctx, shard)
return int64(shardMeta.Size), err
}
作者介绍
Wei Ma,CubeFS Contributor,当前在 CubeFS 项目参与 Blobstore 的设计与研发。
参考
【1】 CubeFS存储技术揭密(1) — 纠删码引擎系统设计
【2】 CubeFS存储技术揭秘(3)— 均衡、巡检与故障自愈
【3】 CubeFS"源"理解读|纠删码之异步删除与数据修补
CubeFS 简介
CubeFS 于2019年开源并在 SIGMOD 发表工业界论文,目前是云原生计算基金会 (CNCF) 托管的孵化阶段开源项目。作为新一代云原生分布式存储平台,兼容 S3、POSIX、HDFS 等协议,支持多副本和纠删码引擎,提供多租户,多 AZ 部署、跨区域复制等特性;适用于大数据、AI、容器平台、数据库及中间件存算分离,数据共享、数据保护等广泛场景。
►►►
往期推荐
文章转载自CubeFS。点击这里阅读原文了解更多。
CNCF概况(幻灯片)
扫描二维码联系我们!
CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux Foundation,是非营利性组织。
CNCF(云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。请关注CNCF微信公众号。