使用 S3 构建分布式日志(Go 代码不到 150 行)

科技   2024-12-04 10:14   广东  

本文将展示如何使用S3实现一个持久化、分布式且高可用的日志系统。这是关于分离式存储(Disaggregated Storage)系列文章的第三部分:

  1. 分离式存储简介
  2. 零磁盘架构(Zero Disk Architecture)
  3. 使用S3构建分布式日志(本文)

什么是日志?

我非常喜欢日志(Log)。日志是数据和事件流系统的核心。数据库本质上就是一种日志,Kafka也是一种日志。简单来说,日志是一个有序的记录集合。日志是追加写入的(append-only),一旦记录被写入,它们就是不可变的。每条插入的记录都会获得一个唯一且递增的标识符。

日志是一种强大的存储抽象。通过日志,你可以构建数据库、消息队列或事件流系统。如果你想深入了解日志的概念,推荐阅读Kafka创始人Jay Kreps的文章:The Log: What every software engineer should know about real-time data’s unifying abstraction。

为什么选择S3?

在上一篇文章中,我解释了零磁盘架构的优势。基于S3的日志存储具有以下吸引力:

  • 弹性与可扩展性:无需本地磁盘。
  • 内置分布式存储:无需自行开发分布式存储服务,S3天然提供持久性、高可用性和数据复制。
  • 运维成本低:无需额外的运维开销。
  • 成本优势:例如,类似WarpStream和BufStream的系统声称其成本是Kafka的十分之一。
  • 客户偏好:许多企业喜欢“自带云存储”(BYOC, Bring Your Own Cloud),这也有助于降低成本。

日志接口设计

我们首先定义日志记录(Record)和日志接口(WAL, Write-Ahead Log)的基本结构:

type Record struct {
 Offset uint64
 Data   []byte
}

type WAL interface {
 Append(ctx context.Context, data []byte) (uint64, error)
 Read(ctx context.Context, offset uint64) (Record, error)
}

在实现中,每个数据负载将作为一个对象存储在S3中,并确保它在日志中获得唯一的偏移量(offset)。我们需要保证记录编号是唯一且递增的。

Append操作

日志的唯一“写入”操作是AppendAppend接收一段字节数据,将其写入日志末尾,并返回该记录的偏移量(即记录在日志中的位置)。

我们定义一个结构体S3WAL来维护计数器length,每次插入时将计数器递增:

type S3WAL struct {
 client     *s3.Client
 bucketName string
 length     uint64
}

第一条记录的偏移量为0000000001,每插入一个新对象,偏移量递增1。插入完成后,将偏移量返回给调用者:

func (w *S3WAL) Append(ctx context.Context, data []byte) (uint64, error) {
 nextOffset := w.length + 1

 input := &s3.PutObjectInput{
  Bucket:      aws.String(w.bucketName),
  Key:         aws.String(fmt.Sprintf("%020d", nextOffset)),
  Body:        bytes.NewReader(data),
  IfNoneMatch: aws.String("*"),
 }

 if _, err := w.client.PutObject(ctx, input); err != nil {
  return 0, fmt.Errorf("failed to put object to S3: %w", err)
 }
 w.length = nextOffset
 return nextOffset, nil
}

防止偏移量冲突

如何防止两个写入者在同一偏移量上写入记录?这是日志系统的关键特性之一。通过S3的条件写入(Conditional Write),这一问题变得非常简单。代码中的IfNoneMatch: aws.String("*")确保如果已有相同偏移量的对象存在,请求会被拒绝。

以下是一个简单的测试用例,用于验证这一特性:

func TestSameOffset(t *testing.T) {
 wal, cleanup := getWAL(t)
 defer cleanup()
 ctx := context.Background()
 data := []byte("threads are evil")
 _, err := wal.Append(ctx, data)
 if err != nil {
  t.Fatalf("failed to append first record: %v", err)
 }

 // 重置WAL计数器,使其使用相同的偏移量
 wal.length = 0
 _, err = wal.Append(ctx, data)
 if err == nil {
  t.Error("expected error when appending at same offset, got nil")
 }
}

为什么不使用S3的追加功能?

虽然可以使用S3的最新追加(append)功能将数据写入同一对象,但这种方法存在复杂性。例如,一个“僵尸写入者”(Zombie Writer)可能会在新主节点写入新文件时,回到旧对象并追加数据。与典型的基于Raft的存储系统不同,S3没有“隔离令牌”(fencing token)的概念。因此,这种优化留待后续实现。

校验和(Checksum)

尽管S3提供了99.99999999%的持久性,但为了数据完整性,我们不能完全依赖外部系统。许多数据库不使用校验和,但我们可以做得更好。这里我们使用SHA-256作为校验和(Go标准库支持)。

我们将偏移量、数据和校验和存储在一起,使记录具有自包含性。例如,如果未来进行数据压缩并更改文件名,记录的偏移量仍然保持不变:

func calculateChecksum(buf *bytes.Buffer) [32]byte {
 return sha256.Sum256(buf.Bytes())
}

func prepareBody(offset uint64, data []byte) ([]byte, error) {
 // 8字节偏移量 + 数据长度 + 32字节校验和
 bufferLen := 8 + len(data) + 32
 buf := bytes.NewBuffer(make([]byte0, bufferLen))
 binary.Write(buf, binary.BigEndian, offset)
 buf.Write(data)
 checksum := calculateChecksum(buf)
 _, err := buf.Write(checksum[:])
 return buf.Bytes(), err
}

Read操作

日志的读取操作相对简单。给定一个偏移量,我们构造对应的S3对象名称并获取它:

func (w *S3WAL) Read(ctx context.Context, offset uint64) (Record, error) {
 key := w.getObjectKey(offset)
 input := &s3.GetObjectInput{
  Bucket: aws.String(w.bucketName),
  Key:    aws.String(key),
 }
 result, _ := w.client.GetObject(ctx, input)
 defer result.Body.Close()

 data, _ := io.ReadAll(result.Body)
 if len(data) < 40 {
  return Record{}, fmt.Errorf("invalid record: data too short")
 }
 if !validateOffset(data, offset) {
  return Record{}, fmt.Errorf("offset mismatch")
 }
 if !validateChecksum(data) {
  return Record{}, fmt.Errorf("checksum mismatch")
 }
 return Record{
  Offset: offset,
  Data:   data[8 : len(data)-32],
 }, nil
}

我们进行以下验证:

  1. 记录长度必须至少为40字节。
  2. 请求中的偏移量必须与记录中的偏移量匹配。
  3. 校验和必须匹配。

故障恢复

如果节点崩溃,我们如何恢复?目前,我们总是将WAL初始化为length=0,这会导致新写入尝试从偏移量0000000001开始写入。虽然S3的条件写入会保护我们不覆盖已有数据,但新写入将无法继续。

为了解决这个问题,我们添加了一个方法,用于遍历所有键,找到最后插入的对象:

func (w *S3WAL) LastRecord(ctx context.Context) (Record, error) {
 input := &s3.ListObjectsV2Input{
  Bucket: aws.String(w.bucketName),
 }
 paginator := s3.NewListObjectsV2Paginator(w.client, input)

 var maxOffset uint64 = 0
 for paginator.HasMorePages() {
  output, _ := paginator.NextPage(ctx)
  for _, obj := range output.Contents {
   key := *obj.Key
   offset, _ := w.getOffsetFromKey(key)
   if offset > maxOffset {
    maxOffset = offset
   }
  }
 }
 if maxOffset == 0 {
  return Record{}, fmt.Errorf("WAL is empty")
 }
 w.length = maxOffset
 return w.Read(ctx, maxOffset)
}

总结

通过以上实现,我们成功使用S3构建了一个简单的分布式日志系统。关注公众号并回复 “s3-log” 获取源码。

点击关注并扫码添加进交流群
免费领取「Go 语言」学习资料

源自开发者
专注于提供关于Go语言的实用教程、案例分析、最新趋势,以及云原生技术的深度解析和实践经验分享。
 最新文章