SeaweedFS 分布式文件系统源码分析

文摘   2024-09-18 14:38   陕西  

本文基于 seaweedfs 3.46[1]

SeaweedFS 的架构包括 Master Server、Volume Server 和 Filer Server 。

启动 Master Server

启动一个 Master Server 可以使用以下命令:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0

启动入口以及所有的参数定义在 weed/command/master.go ,默认情况 http 监听端口使用 9333 ,grpc 监听端口则在 http 端口的基础上加 10000 (所有组件的默认规则)即 19333 :

if *masterOption.portGrpc == 0 {
 *masterOption.portGrpc = 10000 + *masterOption.port
}

Master Server 支持多节点(奇数)部署。使用 Raft 一致性算法来选举 Leader 节点,这样可以保证在 Leader 节点宕机的情况下,其他节点可以重新选举出新的 Leader 节点,从而保证系统的高可用性。

如下,启动一个由三个 Master Server 节点所组成的集群:

weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9333 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9334 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"
weed master -ip=127.0.0.1 -ip.bind=0.0.0.0 -port=9335 -peers="127.0.0.1:9333,127.0.0.1:9334,127.0.0.1:9335"

当 Master Server 启动时,它会尝试加入集群并参与 Leader 选举。一旦选举完成,Leader 节点将负责管理整个集群以及 Volume Server 。

首先会创建一个 Master Server 包装的 weed_server.RaftServer 对象:

raftServer, err = weed_server.NewRaftServer(raftServerOption)
if raftServer == nil {
 glog.Fatalf("please verify %s is writable, see https://github.com/seaweedfs/seaweedfs/issues/717: %s", *masterOption.metaFolder, err)
}

weed_server.NewRaftServer() 方法中会创建好 Raft 节点所需的各种参数和对象,然后调用 github.com/seaweedfs/raft[2] 库创建 RaftServer 对象并启动 Raft 节点:

type RaftServer struct {
 // 存储初始节点信息
 peers map[string]pb.ServerAddress
 // Raft 节点
 raftServer raft.Server
 // HashiCorp Raft 节点
 RaftHashicorp *hashicorpRaft.Raft
 // 用于管理 Raft 节点之间的通信
 TransportManager *transport.Manager
 // Raft 节点的数据目录
 dataDir string
 // Raft 节点的地址
 serverAddr pb.ServerAddress
 // Raft 集群的拓扑结构
 topo *topology.Topology
 // Raft 节点的 gRPC 服务
 *raft.GrpcServer
}

func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
 // 通过 option 创建一个 RaftServer 对象 s
 s := &RaftServer{
  peers:      option.Peers,
  serverAddr: option.ServerAddr,
  dataDir:    option.DataDir,
  topo:       option.Topo,
 }

 //...

 // 调用 github.com/seaweedfs/raft 库,创建 RaftServer 对象
 s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")

 //...

 // 启动 Raft 节点
 if err := s.raftServer.Start(); err != nil {
  return nil, err
 }

 // 将节点加入到 Raft 集群中
 for name, peer := range s.peers {
  if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
   return nil, err
  }
 }

 //...

 glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())

 return s, nil
}

最后,会打印出当前的 Leader 节点,如果对 Raft 选举算法的处理细节感兴趣,可以继续深入 s.raftServer.Start() 的实现。

Raft 节点启动成功后,Master Server 会注册一些集群相关的接口,方便查看集群状态:

r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods("GET""HEAD")
if *masterOption.raftHashicorp {
 r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET")
}

请求如下:

$ curl http://127.0.0.1:9333/cluster/status
{"IsLeader":true,"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9334"]}
$ curl http://127.0.0.1:9334/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9335","127.0.0.1:9333"]}
$ curl http://127.0.0.1:9335/cluster/status
{"Leader":"127.0.0.1:9333","Peers":["127.0.0.1:9333","127.0.0.1:9334"]}

启动 Volume Server

启动一个 Volume Server 可以使用以下命令:

weed volume -mserver="127.0.0.1:9333" -dir=data -ip=127.0.0.1 -ip.bind=0.0.0.0

启动入口以及所有的参数定义在 weed/command/volume.go ,默认情况 http 监听端口使用 8080 ,grpc 监听端口使用 18080 。

其中,-mserver 为 Master Server 连接地址,当需要连接的 Master Server 为集群时,可以将多个 Master Server 的连接地址用逗号分隔; -dir 则用来指定 Volume Server 存储数据文件的目录。

和 Master Server 不同,Volume Server 支持横向扩展,其节点数量规模可以随着数据量和性能需求的变化而随时动态调整。

一旦 Volume Server 启动后,就会与 Master Server 保持通信,汇报自身的状态,并根据 Master Server 的指示执行创建、删除、修复等操作。

核心逻辑在 weed/server/volume_grpc_client_to_master.goVolumeServer.doHeartbeat 方法。

首先会创建一个 Master Server 的 gRPC 连接客户端,并使用该客户端调用 SendHeartbeat 方法:

// 创建 Master Server 的 gRPC 连接客户端
client := master_pb.NewSeaweedClient(grpcConnection)
// 调用 SendHeartbeat
stream, err := client.SendHeartbeat(ctx)
if err != nil {
 glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
 return "", err
}

SendHeartbeat 方法是一个双向流式 RPC ,允许在一次调用中发送多个请求和响应,其 ProtoBuf 定义如下:

rpc SendHeartbeat (stream Heartbeat) returns (stream HeartbeatResponse) {
}

接着创建一个 goroutine 用来处理从 Master Server 发送过来的 Heartbeat 请求:

go func() {
 for {
  // 从输入流中读取 Heartbeat 请求
  in, err := stream.Recv()
  if err != nil {
   doneChan <- err
   return
  }
  // ...

  // 如果 Heartbeat 请求中包含了卷大小限制,并且该限制和当前 Volume Server 中保存的限制不同
  if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
   // 将 Volume Server 中保存的限制更新为 Heartbeat 请求中的限制
   vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
   // 调用 vs.store.MaybeAdjustVolumeMax() 方法重新计算卷的最大容量
   if vs.store.MaybeAdjustVolumeMax() {
    // 如果计算结果发生了变化,则使用 stream.Send() 方法向 Master Server 发送 Heartbeat 响应
    if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
     glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
     return
    }
   }
  }
  // 如果 Heartbeat 请求中包含了新的 Master Server 地址,并且该地址和当前地址不同
  if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
   // 通知主函数切换新的 Master Server 地址作为 Leader
   glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
   newLeader = pb.ServerAddress(in.GetLeader())
   doneChan <- nil
   return
  }
 }
}()

最后使用一个 for select 来监听来自 Volume Server 存储层的四个通道:NewVolumesChanNewEcShardsChanDeletedVolumesChanDeletedEcShardsChan。每当有新的卷或 EC 分片被创建或删除时,会生成一个 Heartbeat 消息,并使用 stream.Send() 方法将其发送到 Master Server ,同时也会定期发送心跳消息给 Master Server :

for {
 select {
 // 有新的卷被创建
 case volumeMessage := <-vs.store.NewVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有新的 EC 分片被创建
 case ecShardMessage := <-vs.store.NewEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有卷被删除
 case volumeMessage := <-vs.store.DeletedVolumesChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 有 EC 分片被删除
 case ecShardMessage := <-vs.store.DeletedEcShardsChan:
  // ...
  // 通知 Master Server
  glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
   erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  if err = stream.Send(deltaBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
 // 发送卷信息的心跳消息
 case <-volumeTickChan.C:
  glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
  vs.store.MaybeAdjustVolumeMax()
  if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // 发送 EC 分片信息的心跳消息
 case <-ecShardTickChan.C:
  glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
  if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
   glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
   return "", err
  }
 // Volume Server 停止,退出监听
 case err = <-doneChan:
  return
 // 用于在 Volume Server 停止时发送最终的心跳消息
 case <-vs.stopChan:
  // ...
  glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
  if err = stream.Send(emptyBeat); err != nil {
   glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
   return "", err
  }
  return
 }
}

启动 Filer Server

启动一个 Filer Server 可以使用以下命令:

weed filer -s3 -master="127.0.0.1:9333" -ip=127.0.0.1 -ip.bind=0.0.0.0

启动入口以及所有的参数定义在 weed/command/filer.go ,默认情况 http 监听端口使用 8888 ,grpc 监听端口使用 18888 。

在这里,-master 为 Master Server 连接地址,同样地,当需要连接的 Master Server 为集群时,可以将多个 Master Server 的连接地址用逗号分隔; -s3 则代表要启动 S3 网关功能,默认监听 8333 端口。

Filer Server 可以理解为一个文件管理器,通过向下对接 Volume Server 与 Master Server,对外提供丰富的功能与特性,除了自身提供的 API 接口,还支持扩展其它比如 POSIX ,WebDAV,S3 等的文件操作接口。

Filer Server 通过外部数据库存储文件的元数据信息。默认情况下,使用的是 leveldb ,支持替换为其它流行的数据库,例如 Sqlite、MySql、Etcd 等,具体可以参考 wiki/Filer-Stores[3]

作为一个 API Server ,Filer Server 在架构上就是一个服务端+数据库模型,其节点的数量和规模可以根据不同的工作负载和使用情况进行优化和调整。

上传文件

首先分析 Filer Server 自身提供的 API 接口,上传文件可以直接调用 :

$ curl -F "file_name=@test.txt" -X POST "http://127.0.0.1:8888"
{"name":"test.txt","size":14}

文件上传的接口定义在 weed/server/filer_server_handlers_write.goPostHandler 方法:

func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
 // 解析请求的目标路径
 // ...
 // 解析请求的查询参数,用于确定文件的存储位置和属性
 // ...
 if query.Has("mv.from") {
  // 若查询参数中出现 mv.from ,则进行文件移动操作
  fs.move(ctx, w, r, so)
 } else {
  // 文件上传操作,自动分块
  fs.autoChunk(ctx, w, r, contentLength, so)
 }

 util.CloseRequest(r)
}

跟踪到 fs.autoChunk 方法:

func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
 //...

 if r.Method == "POST" {
  // 上传文件
  if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
   reply, err = fs.mkdir(ctx, w, r)
  } else {
   // 自动分块上传
   reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so)
  }
 } else {
  // 创建目录
  reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
 }

 //...
}

继续来到 fs.doPostAutoChunk 方法:

func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {

 // 读取上传的文件内容
 multipartReader, multipartReaderErr := r.MultipartReader()
 if multipartReaderErr != nil {
  return nilnil, multipartReaderErr
 }

 // 读取第一个分块,在这里,我们只需要读取第一个分块,即上传文件的内容的分块
 part1, part1Err := multipartReader.NextPart()
 if part1Err != nil {
  return nilnil, part1Err
 }

 // 获取文件名和 Content-Type
 fileName := part1.FileName()
 if fileName != "" {
  fileName = path.Base(fileName)
 }
 contentType := part1.Header.Get("Content-Type")
 if contentType == "application/octet-stream" {
  contentType = ""
 }

 // 核心逻辑
 // 将上传的文件内容转换为文件分块,并返回文件分块的相关信息
 fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
 if err != nil {
  return nilnil, err
 }

 // 计算文件内容的 MD5 值
 md5bytes = md5Hash.Sum(nil)
 // 保存文件元数据信息
 filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
 if replyerr != nil {
  fs.filer.DeleteChunks(fileChunks)
 }

 return
}

这些都比较好读,继续跟踪到核心逻辑处 fs.uploadReaderToChunks ,方法内首先会进行一些正确性校验和必要变量的初始化,然后开启一个循环,不断读取数据并将其转换为一个或多个 Chunk :

func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
 // ...一系列操作
 // 进行一些正确性校验和必要变量的初始化

 for {

  // 使用对象池机制限制 bytes.Buffer 对象的数量,优化内存占用
  bytesBufferLimitCond.L.Lock()
  for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
   glog.V(4).Infof("waiting for byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))
   bytesBufferLimitCond.Wait()
  }
  atomic.AddInt64(&bytesBufferCounter, 1)
  bytesBufferLimitCond.L.Unlock()

  bytesBuffer := bufPool.Get().(*bytes.Buffer)
  glog.V(4).Infof("received byte buffer %d", atomic.LoadInt64(&bytesBufferCounter))

  // 【关键】分块操作,每个块就是一个 bytes.Buffer
  // 根据 chunkSize 从 partReader 中读取数据,并将读取的数据保存到 bytes.Buffer 对象中
  limitedReader := io.LimitReader(partReader, int64(chunkSize))

  bytesBuffer.Reset()

  dataSize, err := bytesBuffer.ReadFrom(limitedReader)

  // 处理读取数据时可能出现的错误,以及在读取完整个文件时的处理
  // ...

  wg.Add(1)
  // 开启 goroutine 处理
  go func(offset int64) {
   defer func() {
    // 将 bytes.Buffer 对象归还对象池
    bufPool.Put(bytesBuffer)
    atomic.AddInt64(&bytesBufferCounter, -1)
    // 通知其他 goroutine 可以使用更多的 bytes.Buffer 对象
    bytesBufferLimitCond.Signal()
    wg.Done()
   }()

   // 【关键】上传数据块
   chunks, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)

   if toChunkErr != nil {
    // 记录上传错误
    uploadErrLock.Lock()
    if uploadErr == nil {
     uploadErr = toChunkErr
    }
    uploadErrLock.Unlock()
   }
   if chunks != nil {
    fileChunksLock.Lock()
    fileChunksSize := len(fileChunks) + len(chunks)
    for _, chunk := range chunks {
     // 【关键】将当前上传的数据块添加到 fileChunks 列表中
     fileChunks = append(fileChunks, chunk)
     glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
    }
    fileChunksLock.Unlock()
   }
  }(chunkOffset)

  // 更新已经读取的数据块的大小
  chunkOffset = chunkOffset + dataSize

  if dataSize < int64(chunkSize) {
   // 已经读取完整个文件
   break
  }
 }

 wg.Wait()

 if uploadErr != nil {
  // 上传出错,删除 fileChunks
  fs.filer.DeleteChunks(fileChunks)
  return nil, md5Hash, 0, uploadErr, nil
 }
 // 【关键】对已经上传的数据块,即 fileChunks 进行排序,以便后续可以正确地进行数据合并
 slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool {
  return a.Offset < b.Offset
 })
 // 返回 fileChunks 给调用方保存
 return fileChunks, md5Hash, chunkOffset, nil, smallContent
}

文件的分块操作都是在 Filer Server 完成的。而其中上传数据块的 fs.dataToChunk 方法会与 Master Server 进行交互。

该方法首先会调用 fs.assignNewFileInfo 向 Master Server 请求分配一个新的文件 ID(fid)以及上传 URL :

fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
if uploadErr != nil {
 // ...
 return uploadErr
}

然后使用分配的 fid 调用上传 URL 上传数据块:

uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
 // ...
 return uploadErr
}

这个由 Master Server 所分配的上传 URL ,实际就是 Volume Server 的上传地址,例 http://127.0.0.1:8080/14,1f343c431d ,其中 14,1f343c431d 就是文件 ID ,其实这个文件 ID 更准确地说应该是代表一个数据块的文件 ID。

SeaweedFS 会根据 maxMB 参数,来把文件拆分成多个块存储,默认大小是 4MB 。即一个 100MB 大小的文件,上传到 SeaweedFS 后会被分成 25 个块存储,也就是申请分配了 25 个文件 ID 。

f.maxMB = cmdFiler.Flag.Int("maxMB"4"split files larger than the limit")

到这里,总算捋清流程了。

那还有一个 S3 接口的文件上传呢?

不用担心,SeaweedFS S3 只是做了一个 API 的代理转发,依旧转发到 Filer Server 自身提供的 API 接口,逻辑依旧和上面一致,代码位置在 weed/s3api/s3api_object_handlers.go

// 这里的 uploadUrl 实际就是 Filer Server 的地址
// 例如在名称为 test 的 S3 Bucket 中上传 test.txt 文件
// 则 uploadUrl 为: http://127.0.0.1:8888/buckets/test/test.txt
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, "")

下载文件

和上传文件一样,SeaweedFS S3 为文件下载做了一个代理转发,转发到 Filer Server 自身提供的 API 接口:

// 这里的 destUrl 实际就是 Filer Server 的地址
// 例如要下载 test Bucket 中的 test.txt 文件
// 则 destUrl 为: http://127.0.0.1:8888/buckets/test/test.txt
s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse)

所以,当下载一个文件时:

$ curl http://127.0.0.1:8888/test.txt
hello test.txt

直接来看 weed/server/filer_server_handlers_read.goGetOrHeadHandler 接口:

func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {

 // ...
 // 从 URL 中获取文件或文件夹路径

 // 根据文件或文件夹的完整路径从元数据数据库中查找出 Entry 记录(即文件的元数据信息)

 // 若是文件夹,则列出文件夹下的文件
 // ...

 // 如果指定了 metadata=true 参数,则直接返回文件或文件夹的元数据信息
 if query.Get("metadata") == "true" {
  // ...
  return
 }

 // 减少服务器带宽
 // 通过 Etag 资源标识对比资源是否发生变化
 etag := filer.ETagEntry(entry)
 if checkPreconditions(w, r, entry) {
  // 如果资源未发生改变,则返回 304 Not Modified 响应,不返回具体的资源
  // 客户端可以直接读取缓存中的数据
  return
 }

 // 设置 ETag 标识到响应头
 setEtag(w, etag)

 // ...

 // 这里是用来处理获取图片文件的逻辑
 if rangeReq := r.Header.Get("Range"); rangeReq == "" {
  // ...
 }

 // 获取普通文件核心逻辑
 processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
  // 偏移量从请求头中获取,例 Range: bytes=100-199
  // 若无指定偏移量,默认为 0
  // 判断请求的范围是否在文件的内容大小范围内
  if offset+size <= int64(len(entry.Content)) {
   // ...
   return err
  }
  // 从元数据数据库获取到的chunks信息
  chunks := entry.GetChunks()
  // 判断文件是否只存在于远程存储中,例如 AWS S3 、Google Cloud Storage 等
  if entry.IsInRemoteOnly() {
   // 将远程对象缓存到本地集群,并更新新的chunks
   // ...
  }

  // 【核心】开始读取文件并写入 HTTP 响应
  // MasterClient :Master 节点的客户端
  // chunks :要读取的文件数据块列表
  // offset :请求的文件内容的起始位置
  // size :请求的文件内容的大小
  // DownloadMaxBytesPs :下载速率的限制,单位是字节/秒
  err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
  if err != nil {
   stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
   glog.Errorf("failed to stream content %s: %v", r.URL, err)
  }
  return err
 })
}

根据代码,我们可以直接通过 metadata=true 查询参数查看文件的元数据信息:

$ curl http://127.0.0.1:8888/test.txt?metadata=true
{"FullPath":"/test.txt","Mtime":"2023-04-23T17:18:37+08:00","Crtime":"2023-04-23T17:18:37+08:00","Mode":432,"Uid":4294967295,"Gid":4294967295,"Mime":"text/plain","TtlSec":0,"UserName":"","GroupNames":null,"SymlinkTarget":"","Md5":"wuSNy045Bd4p8mTjIc40cg==","FileSize":14,"Rdev":0,"Inode":0,"Extended":null,"chunks":[{"file_id":"14,1f343c431d","size":14,"modified_ts_ns":1682241517592601300,"e_tag":"wuSNy045Bd4p8mTjIc40cg==","fid":{"volume_id":14,"file_key":31,"cookie":876364573},"is_compressed":true}],"HardLinkId":null,"HardLinkCounter":0,"Content":null,"Remote":null,"Quota":0}

其中最重要的就是 chunks 信息,里面定义了该文件的所有数据块信息,只要把所有数据块拼凑一起,就可以还原出整个文件。文件大小的原因,这里刚好只有一个块,其文件 ID 为 14,1f343c431d

继续解读文件下载的核心方法 filer.StreamContentWithThrottler ,首先获取所有文件 ID 所对应的 URL 列表:

// 将 chunks 转换为视图列表 chunkViews
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)

fileId2Url := make(map[string][]string)

// 通过 chunkViews.Front() 获取 chunkViews 列表的头部元素,然后在每次迭代中将 x 移动到下一个元素,直到遍历完整个列表
for x := chunkViews.Front(); x != nil; x = x.Next {
 // 从 x.Value 中获取 chunkView 对象
 chunkView := x.Value
 var urlStrings []string
 var err error
 // 获取 chunkView 对应的文件 ID 的 URL 列表,并将 URL 列表存储在 urlStrings 变量中
 // 在分布式系统中,网络故障和其他因素可能导致某些请求失败,因此需要多次尝试获取 URL 列表,以提高获取成功的概率
 for _, backoff := range getLookupFileIdBackoffSchedule {
  urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  if err == nil && len(urlStrings) > 0 {
   break
  }
  glog.V(4).Infof("waiting for chunk: %s", chunkView.FileId)
  time.Sleep(backoff)
 }
 // 错误处理
 // ...
 fileId2Url[chunkView.FileId] = urlStrings
}

然后,通过获取到的 URL 列表下载文件的所有 chunk :

// 下载速度限制器
downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
remaining := size
// 通过遍历 chunkViews 列表来下载每个 chunk
for x := chunkViews.Front(); x != nil; x = x.Next {
 chunkView := x.Value
 // 检查文件偏移量
 if offset < chunkView.ViewOffset {
  // ...
 }
 urlStrings := fileId2Url[chunkView.FileId]
 start := time.Now()
 // 【核心】从 URL 列表中读取 chunkView 的数据,并将数据写入到 writer 中给到客户端
 err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
 // 更新文件偏移量
 offset += int64(chunkView.ViewSize)
 // 更新剩余数据大小
 remaining -= int64(chunkView.ViewSize)
 // ...
}
// 检查文件的所有数据是否都已经成功下载
if remaining > 0 {
 glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
 err := writeZero(writer, remaining)
 if err != nil {
  return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
 }
}

可以总结出,下载文件本质也是和 Master Server 交互,通过文件 ID 获取到对应 Volume Server 的数据块下载地址列表,按照列表顺序请求下载数据块,最后重新整合成了一个完整的文件返回给客户端。

最后,附上文件下载的流程:

参考资料

[1]

seaweedfs 3.46: https://github.com/seaweedfs/seaweedfs/tree/3.46

[2]

github.com/seaweedfs/raft: https://github.com/seaweedfs/raft/tree/v1.1.0

[3]

wiki/Filer-Stores: https://github.com/seaweedfs/seaweedfs/wiki/Filer-Stores


Linux内核之旅
Linux内核之旅
 最新文章