CubeFS之Blobstore子系统IO流程介绍

文摘   2024-11-27 10:12   中国香港  

CubeFS 在 3.0.0 版本推出了纠删码特性[1],在保障数据耐久性的前提下提供更低的数据冗余度,也是大数据降冷、数据库备份等业务场景的首选。为了让社区用户更加深入的理解纠删码设计细节,我们将从源码入手逐步讲解各个模块。


本文结合图文和源码,阐述 CubeFS 的纠删码存储子系统——Blobstore 的IO流程设计和实现,从顶层到底层介绍一下基本的读、写、删除流程在access、proxy、clusterMgr、scheduler、blobnode几个模块之间的交互和流转。



访问blobstore




模块简介


  • Access:请求接入网关,提供数据读、写、删等基本接口;

  • BlobNode:单机存储引擎,管理整机的磁盘数据,负责数据的持久化存储,  执行卷修补、迁移和回收任务;

  • ClusterManager:元数据管理模块, 负责集群资源(如磁盘、节点、存储空间单元)的管理;

  • Proxy:ClusterManager 与异步消息代理模块,提供数据写入空间的分配、删除与修补消息转发等;

  • Scheduler:异步任务调度中心,负责磁盘修复、磁盘下线、数据均衡、数据巡检、数据修补以及数据删除等任务的生成和调度。


服务注册与发现


在搭建一个 blobstore 集群的时候,会利用 consul 服务注册与发现,注册多个集群,并将可以访问这些集群的一篮子 access 注册至该 consul 地址。上层应用在访问 blobstore 子系统时,会解析 consul 地址,通过一定策略(随机、轮询等)拿到相应的 access。


用户只会直接访问 Access,通过 Access 为入口来访问整个 blobstore 子系统。Access 内部会通过一些策略(随机、容量、地域就近等)选择某个集群,或者由上层记录了 ClusterID 指定访问某个集群。




名词解释



为方便理解源码和文章内容,下面先对 Blobstore 的一些相关名词进行解释:


  • Volume:卷是用户数据的逻辑存储单元,通过 vid 唯一标识;

  • Blob:进行一次 EC 计算的数据大小,用户数据先按照 Slice大小进行拆分,然后进行计算,如图在 4+4 的纠删码模式下,产生 4 个数据块和 4 个校验块,写入到 chunk 中,这 8 份数据落在 8 个不同节点的磁盘上, 通过 bid 唯一标识;

  • Chunk:组成 volume 的基本单元,对应实际的物理存储空间,每个 chunk 只归属于一个卷,由 blobnode 节点管理;

  • Shard:组成 SliceUnit的小数据块,对应上述 单个Slice EC计算得到的 8 个数据块和校验块,每个Slice Unit追加写入到物理Chunk中,这些 shard 组成对应用户数据的一条 EC 条带。




基本流程



下面我们将以 access 模块作为入口,介绍一下基本的 IO 流程。


上层用户访问 blobstore 子系统,一般会通过 access 模块访问,它对外暴露两种形式的 client:


access 作为进程提供的 http API 接口,access 封装的 SDK 集成到上层使用者,以下统称为 access,需要区分这两种形式时简称 api 和 sdk。sdk 和 api 都提供了基本的读、写、删等操作。


读流程



  • 上层通过一定策略(负载均衡等)挑选一个 access,发起读请求

  • access 模块接受来自上层的读请求,检查有效性、合法性

  • access 模块拿到卷信息(卷信息、磁盘信息,都有本地缓存,本地缓存失效,读 Proxy 缓存,再读 ClusterMgr)。然后生成更具体的 Blob 对象信息

  • 如果是小对象(对应 1 个 blob,并且读的 blobsize 比较小),会优化为直读,根据 diskID 找到对应 blobnode 的 host,依次只读取该 blob 下所有 shard 数据块,不读校验块

  • 否则普通对象,遍历 blob 数组,依次读取 blob。根据 volume 的 codemode 获取 N,根据 idc 排序 vuid,并行读取 N+1 个块。如果有 N 份数据正常,则可以正常拼接为期望的对象,也就是这 N+1 里最多允许有 1 个失败,如果有 2 个及以上失败则继续读剩余的 vuid 对应数据修复读

  • blobnode 模块接受 access 模块的 Get 请求,检查有效性

  • blobnode 拿到 disk,根据 vuid 得到 chunkStorage,按[from, to)范围读取 shard

  • 读取 rocksdb 上的元数据

  • 获取 storage,限制磁盘带宽速度,读取 shard 数据,向上返回


写流程



  • access 模块接受来自上层的写请求,检查有效性、合法性

  • access 确定 codemode 和 cluster,申请 blobs 的一段范围 bid

  • 读取 readBody 流到一段 buffer 里,对读到的数据做 EC 编码得到校验块

  • 遍历 blobs 数组,对每个 blob 请求拿到卷信息(本地 cache/Proxy/ClusterMgr),拿到 blobnode 对应的 host

  • 遍历 volume 的 units,组装 putShard 参数,依次调用 blobnode 接口把 shards 写入。大于 quorum 则算成功

  • blobnode 模块接受到 access 模块的 Put 请求,检查有效性

  • blobnode 拿到 disk,根据 vuid 得到 chunkStorage,判断是否有足够空间且允许写入

  • 取得 storage,先写数据,再写 rocksdb 元数据。写入时会限制磁盘带宽速度。向上返回


删除流程



  • access 模块接受来自上层的删除请求,检查有效性、合法性

  • 根据 location 封装删除消息,投递给 proxy 模块,proxy 投递给 kafka 对应的删除消息主题

  • scheduler 模块后台任务异步消费 kafka 对应主题的删除消息,批量删除

  • scheduler 根据 locations 里携带的 host,找对应 blobnode 发起标删和删除请求,做两阶段删除,对应的所有 blobnode 全部成功后才算本消息删除成功

  • 如果删除失败,则 scheduler 会将失败消息重新投递只 kafka 的 failed 主题中等待下一次消费

  • blobnode 接收到 scheduler 模块的标删/删除请求,检查有效性

  • 读元数据,检查 flag 已经时标删状态,删除 shard 对应的 rocksdb 元数据。

  • 数据部分通过调用系统 punch hole 的接口,做到非常高效的释放



详细流程介绍



下面结合图文和代码,具体介绍一下几个流程,以及一些特殊场景的优化和处理。


代码做了一定程度的精简,仅作说明


读、写、删操作,都会用到一个比较重要的结构体 Location,location 定义:


type Location struct {  ClusterID proto.ClusterID   // 集群 ID  CodeMode  codemode.CodeMode // 编码模式  Size      uint64            // 对象大小  BlobSize  uint32            // 分块大小  Crc       uint32            // crc 签名  Blobs     []SliceInfo       // 分块位置}
type SliceInfo struct {    Vid    proto.Vid    // 卷 ID  MinBid proto.BlobID // 开始的 BID  Count  uint32       // 连续多少个 Blob}


读流程


用户数据按照一定大小被切分成多个连续的 blob,打散分布在集群的多个物理节点。在并发场景下,读请求能充分利用全部集群资源数据。


来自于 api 和 sdk 的请求 Access,最终都会调用 access 模块的 StreamHandler 来处理。


另外,该实例会启动后台协程 discard 异常的卷,后台更新 cluster 的容量等信息。


为了提供更高的可用性和容灾能力,blobstore 子系统会将用户数据切分后编码计算并持久化存储于多个 AZ 中。但是多 AZ 情况下网络带宽消耗比较大,access 访问时候会根据机房 IDC 做排序,优先读取本 AZ 的数据,减少跨 A 带宽 Z 流量。


user Get|-> api/sdk     |->Get (ctx, writer, location, readSize, offset)           |-> readOneBlob(ctx, getTime, serviceController, blob, sortedVuids, shards)               |-> readOneShard(ctx, serviceController, blob, vuid, stopChan)                  |-> blobnodeClient.RangeGetShard(ctx, host, &args)


代码调用流程


Get:

  • 接收请求参数,包括读取位置、读取大小和偏移量

  • 生成需要读取的 blob 列表

  • 获取服务控制器

  • 尝试直接读取数据分片,如果失败则进行 EC 重构读取

  • 将读取的数据写入客户端


readOneBlob:

  • 计算需要读取的最小分片数 minShardsRead

  • 使用 shardPipe 生成器函数并行读取分片

  • 将正确偏移的字节写入 writer

  • 收集读取的分片数据,检查是否可以重构数据块

  • 如果数据块可以重构,停止读取并返回成功;否则返回错误


小文件优化



如果 blobs 里只有 1 个对象,并且是小对象,会针对性的做小对象优化直读。


小文件做在线 EC 会有较严重的 IO 放大。比如 1K 大小的文件,做 EC(3+3)切分后的 shard 存储在 3+3 个位置,读取至少需要 3 次网络请求,访问 3 个 blobnode 才能构建出完整数据。在纠删码中小文件的扇入扇出问题尤为突出,极大影响小文件的读写效率;对此我们规定 EC 的每个数据块有最小数据大小,且可根据情况定义数据块大小值(比如 4K)。当实际数据不足 data*4K 时用 0 补齐,编码后写入到各 blobnode。


以上图为例讨论该优化项的优势:

  • 文件系统一般都以 4K 为一个 Block 大小

  • 读取时可直接读第一个 D 数据块;也可从任一个 P 走恢复读流程;减少 IO 路径且不消耗跨 AZ 带宽(假设多个 blobnode 在多个 AZ,即多机房/地域)

  • 数据损坏时,可从任一个校验块恢复数据

  • 数据冗余度提升至 (P+1),提高了可靠性

  • 这样做也带来了缺点:同时也浪费了部分空间。

  • 但是在云计算场景中,小文件的性能要求比存储成本更高,用空间换时间的思想很好地解决了该问题,使存储抽象层更加统一完善。


if int(blob.BlobSize) <= blob.ShardSize || blob.ReadSize < blob.BlobSize/4 {  h.getDataShardOnly(ctx, getTime, w, serviceController, blob)}


并发优化



例如 access 需要读一个 blob(1,2,3)有 3 个 shard 块,会同时向这 3 个 shard 块所在的 blobndoe 发起读请求,充分利用全部集群资源。


修复读


access 会并发找多个 blobnode,同时读取 N+1 个块。如果有 N 份数据正常,则可以正常拼接为期望的对象。当凑不够 N 份数据,需要继续读取直到 N 份数据成功,可 EC 修复。


EIO 坏盘修复


特殊的,如果读或者写的过程中,遇到了 EIO 错误,会把当前磁盘标记为坏盘,并且上报给 ClusterMgr 模块,然后 ClusterMgr 会通知到 Scheduler 模块,生成修盘任务发给其它 blobnode(worker)。做到坏盘的自动发现,自动修复。



  • 磁盘报告 EIO 之后,状态设置为 broken,对于非 EIO 的坏盘场景,也可以通过接口主动设置坏盘;volume 感知到有坏盘的,会降低卷的健康度。续租卷的时候会失败,分配的时候不会优先分配出去

  • clusterMgr 收到标记坏盘请求后,scheduler 模块会周期拉取感知到坏盘,并针对该盘下的所有 chunk 生成修盘任务,每个 chunk 为一个任务触发数据重建,进入 reparing 修复状态

  • scheduler 对 blobnode 提供 AcquireTask 接口,blobnode 会周期获取修盘任务,依次修复每个 chunk,并上报该 chunk 完成修复,并把该 chunk 做 release 释放

  • scheduler 感知到该坏盘下的所有 chunk 修复完毕,会找 clusterMgr 修改磁盘状态为 repaired

  • 整个进度状态 scheduler 可感知,修复整个盘后,blobnode 则会彻底摘除句柄

  • 换上新盘之后,调用在线注册的接口,注册之后自动发现


blobnode 遇到 EIO 上报坏盘。


func (s *Service) handleDiskIOError(ctx context.Context, diskID proto.DiskID, diskErr error) {  ds, exist := s.Disks[diskID]  ds.SetStatus(proto.DiskStatusBroken)    err := s.ClusterMgrClient.SetDisk(ctx, diskID, proto.DiskStatusBroken)  go s.waitRepairAndClose(ctx, ds)}


scheduler 后台异步处理坏盘任务的生成,以及任务流转。


func (mgr *DiskRepairMgr) Run() {  go mgr.collectTaskLoop()  go mgr.prepareTaskLoop()  go mgr.finishTaskLoop()  go mgr.checkRepairedAndClearLoop()  go mgr.checkAndClearJunkTasksLoop()}


写流程


用户数据按照一定大小被切分成多个连续的 blob,打散分布在集群的多个物理节点。在并发场景下,写请求能充分利用全部集群资源数据。


user Put|-> api/sdk     |-> Put(ctx, reader, args.Size, hasherMap)          |-> writeToBlobnodesWithHystrix(ctx, blobident, shards, func() {...})               |->writeToBlobnodes(ctx, blob, shards, callback)                  |->blobnodeClient.PutShard(ctxChild, host, args)


在写流程里,上层通过 api 和 sdk,最终都会走到 access 里 StreamHandler 的 Put 方法。将对参数 Body 里读到的数据做 EC 编码得到校验块,把数据块和校验块一起整个 shards 下发给 blobnode。


Quorum 写加速


并发同时向多个 blobnode 节点写入 shard 分片数据,等待多个节点返回结果。设定一个 quorum 值,满足 quorum 成功个数后则向上返回成功,不必等待所有节点都成功。能一定程度上减少网络波动、长尾效应。



LRC 优化



如上图,编码为 LRC(12,9,3) 或者 RS(12,12)。


若使用 RS 码:以 D1~D12 数据块计算 P1~P12 个校验块。


若使用 LRC 码:以 D1~D4 数据块计算本地校验块 L1,以 D5~D8 计算 L2,D9~D12 计算 L3;以 D1~D12 计算全局校验块 P1~P9。


在一个 EC 组中,损坏一个数据块的几率要远大于损坏多个数据块的几率。如果能够及时发现坏块,那么多数情况下只需要恢复一块就够了。这时启用 LRC 就可以在本 AZ 内完成数据恢复,避免跨 AZ 的带宽和 IO 消耗。


比如在并发写入时候,写入 blobnode chunk1 的 D1 写失败或者其它原因导致损坏,写入其它 blobnode chunk 的 D2~D4 成功,L1 成功;则就可以借助 AZ-1 中的其它几个块来修复 D1,加速写入,提升在线写入可用性。也会用在 AZ 内后台任务修复场景优化。


blobnode 写入


blobnode 是 blobstore 子系统的单机存储引擎,接收接入层模块的数据,持久化到磁盘上,负责用户数据的底层格式组织与落盘操作,  执行卷修补、迁移和回收任务。元数据、数据分离。元数据批量处理,数据尽量顺序处理。


blobnode 内部模块分层如下图



blobnode 收到上层请求后,先解析并判断 args 参数的合法性(DiskID,Size,Bid,Type),判断磁盘状态是否坏盘,是否允许写入,是否有足够空间,然后找到对应的 chunk 处理读请求。先写数据,再写元数据。


func (s *Service) ShardPut(c *rpc.Context) {  ds, exist := s.Disks[args.DiskID]  cs, exist := ds.GetChunkStorage(args.Vuid)  shard := core.NewShardWriter(args.Bid, args.Vuid, uint32(args.Size), c.Request.Body)  err = cs.Write(ctx, shard)  c.RespondJSON(ret)}


最终执行到底层 datafile.go 里做写入,写入时候会利用令牌桶算法做并发控制和限速。


func (cs *chunk) Write(ctx context.Context, b *core.Shard) (err error) {  stg := cs.GetStg()  return stg.Write(ctx, b)}
func (stg *storage) Write(ctx context.Context, b *core.Shard) (err error) {  data, meta := stg.data, stg.meta  err = data.Write(ctx, b)  return meta.Write(ctx, b.Bid, core.ShardMeta{...})}
func (cd *datafile) Write(ctx context.Context, shard *core.Shard) error {  cd.qosAllow()  qoswAt := cd.qosWriterAt(ctx, cd.ef)  _, err = qoswAt.WriteAt(headerbuf, pos)    w = &bncomm.Writer{WriterAt: cd.ef, Offset: pos}  qosw := cd.qosWriter(ctx, w)  tw := bncomm.NewTimeWriter(qosw)  tr := bncomm.NewTimeReader(body)    // 写入数据,并且按 block 分段编码  _, err = encoder.Encode(tr, int64(shard.Size), tw)  _, err = qoswAt.WriteAt(footerbuf, pos)}


慢盘 panic 优化


在长期运行过程中,由于硬件或软件等原因(硬盘体质、频繁读写等),部分硬盘访问非常慢,会出现延时变大、读写变慢的情况。


在之前版本的 blobnode 进程里,每一个从上层来的读写请求,都会对应新起一个协程来处理该请求,最终处理完返回。



在正常情况下的 IO 访问,即使有比较多的请求访问,GMP 模型会保证大部分的协程只会映射绑定到一定量的线程上。每个 G(Goroutine)绑定在 M 上一般来说最多运行 X 毫秒,超时了会主动挂起让出;或者是遇到了一些 sleep/wait 之类的主动让出,等待下一次调度。


而在出现慢盘的时候(硬件故障,性能故障,或 IO 负载过高等情况),IO 处理并没有类似 yield 让出,系统调用阻塞处理 IO。在这种情况下,GMP 模型的 G 和它绑定的 M 会一起从 P 上剥离,唤醒/新创建 线程去承接别的 G 函数任务。慢盘场景下会导致当前的处理 IO 协程几乎都各自映射绑定了一个线程,并且 IO 处理慢会出现堆积。原有的线程还没释放,又新创建线程,导致堆积的 IO 请求越来越多,协程和线程也越来越多,最终会导致 panic。


慢盘场景一开始只是会导致些许性能下降,累积一定程度则会造成 panic 服务不可用。



优化:可以将读写 IO 的协程数控制在一定的范围,即使发生了慢盘,也不会产生过多的线程。


设计一个读写 IO 池,持有固定数量的读写队列,每个队列绑定对应的读写协程。所有上层访问的读写 IO 都入队到读写 IO 池,由后台的读写协程依次消费处理。可以配置写协程的个数,以及队列深度。这样还能带来一些好处:更节省 CPU 负载,减少了内核等待 IO 的一些消耗;线程数稳定且比较少;在消费 IO 队列的时候,做一些聚合操作;减少线程切换开销;IO 在用户态管理和排队。


删除流程


删除流程比较简单一些


  • access 模块组装删除消息,投递给 proxy

  • proxy 投递给 kafka 对应的删除消息主题

  • scheduler 模块异步消费 kafka 对应主题的删除消息,批量删除

  • 找对应 blobnode 发起标删和删除请求,做两阶段删除,对应的所有 blobnode 全部成功后才算本消息删除成功

  • blobnode 做标删,先读元数据判断 Flag 是否符合预期,最后修改元数据为标删状态

  • blobnode 删除。先读元数据,检查 flag 已经时标删状态,删除 shard 对应的 rocksdb 元数据。数据部分通过调用系统 punch hole 的接口,做到非常高效的释放



access 模块组装删除消息,投递给 proxy


func (h *Handler) Delete(ctx context.Context, location *access.Location) error {  blobs := location.Spread()  ... // 依次组装 deleteArgs  retry.Timed(3, 200).On(func() error {    err = h.proxyClient.SendDeleteMsg(ctx, host, deleteArgs)  })  }


proxy 投递给 kafka 对应的删除消息主题


func (s *Service) SendDeleteMessage(c *rpc.Context) {  err := s.blobDeleteMgr.SendDeleteMsg(ctx, args)}
func (d *blobDeleteMgr) SendDeleteMsg(ctx context.Context, info *proxy.DeleteArgs) error {  msgs := make([][]byte, 0, len(info.Blobs))  ... // 依次组装 msgs  err := d.delMsgSender.SendMessages(d.topic, msgs)  return err}


blobnode 处理 Delete 请求,markDelete 流程类似


func (s *Service) ShardDelete(c *rpc.Context) {    ds, exist := s.Disks[args.DiskID]    cs, exist := ds.GetChunkStorage(args.Vuid)    err = cs.Delete(ctx, args.Bid)}
func (cs *chunk) Delete(ctx context.Context, bid proto.BlobID) (err error) {    stg := cs.GetStg()    n, err := stg.Delete(ctx, bid)}
func (stg *storage) Delete(ctx context.Context, bid proto.BlobID) (n int64, err error) {  meta, data := stg.meta, stg.data    shardMeta, err := meta.Read(ctx, bid)  if shardMeta.Flag != bnapi.ShardStatusMarkDelete {    return n, bloberr.ErrShardNotMarkDelete  }
 // 删除元数据  err = meta.Delete(ctx, bid)  shard := &core.Shard{...}  // discard hole  err = data.Delete(ctx, shard)  return int64(shardMeta.Size), err}


作者介绍


Wei Ma,CubeFS Contributor,当前在 CubeFS 项目参与 Blobstore 的设计与研发。


参考


【1】 CubeFS存储技术揭密(1) — 纠删码引擎系统设计

【2】 CubeFS存储技术揭秘(3)— 均衡、巡检与故障自愈

【3】 CubeFS"源"理解读|纠删码之异步删除与数据修补





CubeFS 简介




CubeFS 于2019年开源并在 SIGMOD 发表工业界论文,目前是云原生计算基金会 (CNCF) 托管的孵化阶段开源项目。作为新一代云原生分布式存储平台,兼容 S3、POSIX、HDFS 等协议,支持多副本和纠删码引擎,提供多租户,多 AZ 部署、跨区域复制等特性;适用于大数据、AI、容器平台、数据库及中间件存算分离,数据共享、数据保护等广泛场景。



►►►

往期推荐




文章转载自CubeFS点击这里阅读原文了解更多


CNCF概况(幻灯片)

扫描二维码联系我们!




CNCF (Cloud Native Computing Foundation)成立于2015年12月,隶属于Linux  Foundation,是非营利性组织。 

CNCF云原生计算基金会)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。请关注CNCF微信公众号。

CNCF
云原生计算基金会(CNCF)致力于培育和维护一个厂商中立的开源生态系统,来推广云原生技术。我们通过将最前沿的模式民主化,让这些创新为大众所用。
 最新文章