本文将展示如何使用S3实现一个持久化、分布式且高可用的日志系统。这是关于分离式存储(Disaggregated Storage)系列文章的第三部分:
分离式存储简介 零磁盘架构(Zero Disk Architecture) 使用S3构建分布式日志(本文)
什么是日志?
我非常喜欢日志(Log)。日志是数据和事件流系统的核心。数据库本质上就是一种日志,Kafka也是一种日志。简单来说,日志是一个有序的记录集合。日志是追加写入的(append-only),一旦记录被写入,它们就是不可变的。每条插入的记录都会获得一个唯一且递增的标识符。
日志是一种强大的存储抽象。通过日志,你可以构建数据库、消息队列或事件流系统。如果你想深入了解日志的概念,推荐阅读Kafka创始人Jay Kreps的文章:The Log: What every software engineer should know about real-time data’s unifying abstraction。
为什么选择S3?
在上一篇文章中,我解释了零磁盘架构的优势。基于S3的日志存储具有以下吸引力:
弹性与可扩展性:无需本地磁盘。 内置分布式存储:无需自行开发分布式存储服务,S3天然提供持久性、高可用性和数据复制。 运维成本低:无需额外的运维开销。 成本优势:例如,类似WarpStream和BufStream的系统声称其成本是Kafka的十分之一。 客户偏好:许多企业喜欢“自带云存储”(BYOC, Bring Your Own Cloud),这也有助于降低成本。
日志接口设计
我们首先定义日志记录(Record)和日志接口(WAL, Write-Ahead Log)的基本结构:
type Record struct {
Offset uint64
Data []byte
}
type WAL interface {
Append(ctx context.Context, data []byte) (uint64, error)
Read(ctx context.Context, offset uint64) (Record, error)
}
在实现中,每个数据负载将作为一个对象存储在S3中,并确保它在日志中获得唯一的偏移量(offset)。我们需要保证记录编号是唯一且递增的。
Append操作
日志的唯一“写入”操作是Append
。Append
接收一段字节数据,将其写入日志末尾,并返回该记录的偏移量(即记录在日志中的位置)。
我们定义一个结构体S3WAL
来维护计数器length
,每次插入时将计数器递增:
type S3WAL struct {
client *s3.Client
bucketName string
length uint64
}
第一条记录的偏移量为0000000001
,每插入一个新对象,偏移量递增1。插入完成后,将偏移量返回给调用者:
func (w *S3WAL) Append(ctx context.Context, data []byte) (uint64, error) {
nextOffset := w.length + 1
input := &s3.PutObjectInput{
Bucket: aws.String(w.bucketName),
Key: aws.String(fmt.Sprintf("%020d", nextOffset)),
Body: bytes.NewReader(data),
IfNoneMatch: aws.String("*"),
}
if _, err := w.client.PutObject(ctx, input); err != nil {
return 0, fmt.Errorf("failed to put object to S3: %w", err)
}
w.length = nextOffset
return nextOffset, nil
}
防止偏移量冲突
如何防止两个写入者在同一偏移量上写入记录?这是日志系统的关键特性之一。通过S3的条件写入(Conditional Write),这一问题变得非常简单。代码中的IfNoneMatch: aws.String("*")
确保如果已有相同偏移量的对象存在,请求会被拒绝。
以下是一个简单的测试用例,用于验证这一特性:
func TestSameOffset(t *testing.T) {
wal, cleanup := getWAL(t)
defer cleanup()
ctx := context.Background()
data := []byte("threads are evil")
_, err := wal.Append(ctx, data)
if err != nil {
t.Fatalf("failed to append first record: %v", err)
}
// 重置WAL计数器,使其使用相同的偏移量
wal.length = 0
_, err = wal.Append(ctx, data)
if err == nil {
t.Error("expected error when appending at same offset, got nil")
}
}
为什么不使用S3的追加功能?
虽然可以使用S3的最新追加(append)功能将数据写入同一对象,但这种方法存在复杂性。例如,一个“僵尸写入者”(Zombie Writer)可能会在新主节点写入新文件时,回到旧对象并追加数据。与典型的基于Raft的存储系统不同,S3没有“隔离令牌”(fencing token)的概念。因此,这种优化留待后续实现。
校验和(Checksum)
尽管S3提供了99.99999999%的持久性,但为了数据完整性,我们不能完全依赖外部系统。许多数据库不使用校验和,但我们可以做得更好。这里我们使用SHA-256作为校验和(Go标准库支持)。
我们将偏移量、数据和校验和存储在一起,使记录具有自包含性。例如,如果未来进行数据压缩并更改文件名,记录的偏移量仍然保持不变:
func calculateChecksum(buf *bytes.Buffer) [32]byte {
return sha256.Sum256(buf.Bytes())
}
func prepareBody(offset uint64, data []byte) ([]byte, error) {
// 8字节偏移量 + 数据长度 + 32字节校验和
bufferLen := 8 + len(data) + 32
buf := bytes.NewBuffer(make([]byte, 0, bufferLen))
binary.Write(buf, binary.BigEndian, offset)
buf.Write(data)
checksum := calculateChecksum(buf)
_, err := buf.Write(checksum[:])
return buf.Bytes(), err
}
Read操作
日志的读取操作相对简单。给定一个偏移量,我们构造对应的S3对象名称并获取它:
func (w *S3WAL) Read(ctx context.Context, offset uint64) (Record, error) {
key := w.getObjectKey(offset)
input := &s3.GetObjectInput{
Bucket: aws.String(w.bucketName),
Key: aws.String(key),
}
result, _ := w.client.GetObject(ctx, input)
defer result.Body.Close()
data, _ := io.ReadAll(result.Body)
if len(data) < 40 {
return Record{}, fmt.Errorf("invalid record: data too short")
}
if !validateOffset(data, offset) {
return Record{}, fmt.Errorf("offset mismatch")
}
if !validateChecksum(data) {
return Record{}, fmt.Errorf("checksum mismatch")
}
return Record{
Offset: offset,
Data: data[8 : len(data)-32],
}, nil
}
我们进行以下验证:
记录长度必须至少为40字节。 请求中的偏移量必须与记录中的偏移量匹配。 校验和必须匹配。
故障恢复
如果节点崩溃,我们如何恢复?目前,我们总是将WAL初始化为length=0
,这会导致新写入尝试从偏移量0000000001
开始写入。虽然S3的条件写入会保护我们不覆盖已有数据,但新写入将无法继续。
为了解决这个问题,我们添加了一个方法,用于遍历所有键,找到最后插入的对象:
func (w *S3WAL) LastRecord(ctx context.Context) (Record, error) {
input := &s3.ListObjectsV2Input{
Bucket: aws.String(w.bucketName),
}
paginator := s3.NewListObjectsV2Paginator(w.client, input)
var maxOffset uint64 = 0
for paginator.HasMorePages() {
output, _ := paginator.NextPage(ctx)
for _, obj := range output.Contents {
key := *obj.Key
offset, _ := w.getOffsetFromKey(key)
if offset > maxOffset {
maxOffset = offset
}
}
}
if maxOffset == 0 {
return Record{}, fmt.Errorf("WAL is empty")
}
w.length = maxOffset
return w.Read(ctx, maxOffset)
}
总结
通过以上实现,我们成功使用S3构建了一个简单的分布式日志系统。关注公众号并回复 “s3-log” 获取源码。