Rill 是一个工具包,它为 Go 语言带来了可组合的并发性,使得构建由简单、可重用部分组成的并发程序变得更加容易。它减少了样板代码,同时保留了 Go 语言基于通道的自然模型。
go get -u github.com/destel/rill
目标
简化常见任务:
Rill 提供了一种更清晰、更安全的方法来解决常见的并发问题,如并行作业执行或实时事件处理。它消除了样板代码,抽象了 goroutine、通道和错误管理的复杂性。同时,开发者可以完全控制所有操作的并发级别。使并发代码可组合和清晰:
库中的大多数函数接受 Go 通道作为输入,并返回新的、经过转换的通道作为输出。这允许它们以各种方式链式组合,构建由更简单部分组成的可重用管道,类似于 Unix 管道。结果,并发程序变成了一系列可重用的清晰操作。集中错误处理:
错误会自动通过管道传递并在最后由一个位置统一处理。对于更复杂的场景,Rill 还提供了在管道的任何点截获和处理错误的工具。简化流处理:
得益于 Go 通道,内置函数可以处理潜在无限的流,边接收项目边处理。这使得 Rill 成为实时处理或处理不适合内存的大型数据集的便捷工具。提供高级任务解决方案:
除了基本操作,库还包括用于批处理、有序扇入、map-reduce、流分割、合并等的即用函数。虽然管道通常是线性的,但可以有任何无环拓扑结构(DAG)。支持自定义扩展:
由于 Rill 操作在标准的 Go 通道上,因此很容易编写与库兼容的自定义函数。保持轻量级:
Rill 拥有小巧的、类型安全的、基于通道的 API,并且零依赖,使其易于集成到现有项目中。它在资源使用方面也很轻量级,确保内存分配和 goroutine 的数量不会随着输入大小的增长而增长。
快速开始
让我们看一个实际的例子:从 API 获取用户,激活他们,并将更改保存回去。它展示了如何在每一步控制并发性,同时保持代码清晰和可管理。ForEach 在遇到第一个错误时返回,并通过 defer 取消上下文停止所有剩余的获取。
尝试它
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 将用户 ID 切片转换为通道
ids := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
// 从 API 读取用户。
// 并发性 = 3
users := rill.Map(ids, 3, func(id int) (*mockapi.User, error) {
return mockapi.GetUser(ctx, id)
})
// 激活用户。
// 并发性 = 2
err := rill.ForEach(users, 2, func(u *mockapi.User) error {
if u.IsActive {
fmt.Printf("用户 %d 已经是活跃的\n", u.ID)
return nil
}
u.IsActive = true
err := mockapi.SaveUser(ctx, u)
if err != nil {
return err
}
fmt.Printf("用户保存: %+v\n", u)
return nil
})
// 处理错误
fmt.Println("错误:", err)
}
批处理
以批量而非单独处理项目可以在许多场景中显著提高性能,特别是当涉及到外部服务或数据库时。批处理减少了查询和 API 调用的数量,提高了吞吐量,并通常降低了成本。
为了演示批处理,让我们通过使用 API 的批量获取功能来改进前面的例子。Batch 函数将单个 ID 流转换为 ID 切片流。这使得可以使用GetUsers
API 在一次调用中获取多个用户,而不是进行单独的GetUser
调用。
尝试它
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 将用户 ID 切片转换为通道
ids := rill.FromSlice([]int{
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
}, nil)
// 将 ID 分组为 5 个批次
idBatches := rill.Batch(ids, 5, -1)
// 批量从 API 获取用户
// 并发性 = 3
userBatches := rill.Map(idBatches, 3, func(ids []int) ([]*mockapi.User, error) {
return mockapi.GetUsers(ctx, ids)
})
// 将批次流转换回用户流
users := rill.Unbatch(userBatches)
// 激活用户。
// 并发性 = 2
err := rill.ForEach(users, 2, func(u *mockapi.User) error {
if u.IsActive {
fmt.Printf("用户 %d 已经是活跃的\n", u.ID)
return nil
}
u.IsActive = true
err := mockapi.SaveUser(ctx, u)
if err != nil {
return err
}
fmt.Printf("用户保存: %+v\n", u)
return nil
})
// 处理错误
fmt.Println("错误:", err)
}
实时批处理
现实世界的应用通常需要处理以不可预测速率到达的事件或数据。虽然批处理仍然为了效率而受到青睐,但是等待收集完整的批次可能会在输入流变慢或稀疏时引入不可接受的延迟。
Rill 通过基于超时的批处理来解决这个问题:当批次满或超过指定的超时时,会发出批次,以先到者为准。这种方法确保了在高负载时的最优批次大小,同时在安静期间保持响应性。
考虑一个需要在数据库中更新用户last_active_at 时间戳的应用。负责此功能的UpdateUserTimestamp
可以被并发地、以不可预测的速率调用,并且可以从应用程序的不同部分调用。单独执行所有这些更新可能会创建太多的并发查询,可能会压垮数据库。
以下示例中,更新操作被排队到userIDsToUpdate
通道,然后被分组为最多 5 个项目的批次,每个批次作为单个查询发送到数据库。Batch 函数使用 100ms 的超时,确保在高负载时零延迟,并在安静期间最多有 100ms 的延迟与较小的批次。
尝试它
func main() {
// 启动后台工作进程处理更新
go updateUserTimestampWorker()
// 执行一些更新。它们将自动被分组为
// 批次:[1,2,3,4,5], [6,7], [8]
UpdateUserTimestamp(1)
UpdateUserTimestamp(2)
UpdateUserTimestamp(3)
UpdateUserTimestamp(4)
UpdateUserTimestamp(5)
UpdateUserTimestamp(6)
UpdateUserTimestamp(7)
time.Sleep(500 * time.Millisecond) // 模拟稀疏更新
UpdateUserTimestamp(8)
}
// 这是要更新的用户 ID 队列。
var userIDsToUpdate = make(chan int)
// UpdateUserTimestamp 是更新用户表中 last_active_at 列的公共 API
func UpdateUserTimestamp(userID int) {
userIDsToUpdate <- userID
}
// 这是一个后台工作进程,它将排队的更新分批发送到数据库。
// 为简单起见,没有重试、错误处理和同步
func updateUserTimestampWorker() {
ids := rill.FromChan(userIDsToUpdate, nil)
idBatches := rill.Batch(ids, 5, 100*time.Millisecond)
_ = rill.ForEach(idBatches, 1, func(batch []int) error {
fmt.Printf("执行:UPDATE users SET last_active_at = NOW() WHERE id IN (%v)\n", batch)
return nil
})
}
错误、终止和上下文
在并发应用中,错误处理可能是非平凡的。Rill 通过提供结构化的方法来简化这个问题。 管道通常由一系列非阻塞通道转换组成,后跟一个返回最终结果和错误的阻塞阶段。
一般规则是:管道中任何地方发生的错误都会传递到最终阶段,在那儿被某个阻塞函数捕获并返回给调用者。
Rill 提供了多种阻塞函数。以下是一些常用的:
ForEach: 并发地将用户函数应用于流中的每个项目。示例 ToSlice: 将所有流项目收集到一个切片中。示例 First: 返回流中遇到的第一个项目或错误,并丢弃其余部分示例 Reduce: 并发地将流减少为单个值,使用用户提供的归约函数。示例 All: 并发地检查流中的所有项目是否满足用户提供的条件。示例 Err: 返回流中遇到的第一个错误或 nil,并丢弃其余部分的流。示例
所有阻塞函数共享一个共同的行为。在早期终止(在到达输入流的末尾之前或在出现错误的情况下),这些函数会启动剩余项目的后台排水。这是为了防止 goroutine 泄漏,确保所有为流提供数据的 goroutine 都允许完成。
Rill 是上下文不可知的,这意味着它不强制任何特定的上下文使用。然而,建议使用户定义的管道阶段具有上下文感知能力。这在初始阶段尤为重要,因为它允许在上下文取消后停止向管道中添加新项目。在实践中,第一阶段通常是通过 Go 的标准数据库、HTTP 客户端和其他外部源的 API 自然具有上下文感知的。
以下示例中,CheckAllUsersExist
函数使用多个并发工作检查给定列表中的所有用户是否存在。当出现错误时(如不存在的用户),函数返回该错误并取消上下文,这反过来停止所有剩余的用户获取。
尝试它
func main() {
ctx := context.Background()
// ID 999 不存在,因此在遇到它之后获取将停止。
err := CheckAllUsersExist(ctx, 3, []int{1, 2, 3, 4, 5, 999, 7, 8, 9, 10, 11, 12, 13, 14, 15})
fmt.Printf("检查结果:%v\n", err)
}
// CheckAllUsersExist 使用多个并发工作检查给定 ID 的所有用户是否存在。
func CheckAllUsersExist(ctx context.Context, concurrency int, ids []int) error {
// 创建一个新的上下文,当这个函数返回时将被取消
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 将切片转换为流
idsStream := rill.FromSlice(ids, nil)
// 并发获取用户。
users := rill.Map(idsStream, concurrency, func(id int) (*mockapi.User, error) {
u, err := mockapi.GetUser(ctx, id)
if err != nil {
return nil, fmt.Errorf("未能获取用户 %d:%w", id, err)
}
fmt.Printf("获取用户 %d\n", id)
return u, nil
})
// 返回第一个错误(如果有)并通过上下文取消剩余获取
return rill.Err(users)
}
在上面的例子中,只有管道的第二阶段(mockapi.GetUser
)具有上下文感知能力。FromSlice 在这里工作得很好,因为输入很小,迭代很快,上下文取消防止了昂贵的 API 调用。 以下代码演示了当完全的上下文感知变得重要时,如何用Generate 替换FromSlice。
idsStream := rill.Generate(func(send func(int), sendErr func(error)) {
for _, id := range ids {
if ctx.Err() != nil {
return
}
send(id)
}
})
顺序保持(有序扇入)
并发处理可以提高性能,但由于任务完成所需的时间不同,结果的顺序通常与输入顺序不同。虽然在许多情况下,无序的结果可以接受,但有些情况需要保持原始顺序。这个看似简单的问题实际上难以正确解决。
为了解决这个问题,Rill 提供了其核心函数的有序版本,如OrderedMap 或OrderedFilter。 这些函数在内部执行额外的同步,以确保如果值x 在输入流中先于值y,则f(x) 将先于f(y) 出现在输出流中。
下面是一个实际的例子:在托管在线的 1000 个大型文件中找到特定字符串的第一个出现。一次性下载所有文件会消耗太多内存,按顺序处理它们又太慢,传统的并发模式不保持文件的顺序,这使得找到第一个匹配变得具有挑战性。
OrderedFilter 和First 函数的组合优雅地解决了这个问题,同时下载并最多保持 5 个文件在内存中。First 在第一个匹配时返回,这通过 defer 触发上下文取消,停止 URL 生成和文件下载。
尝试它
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 在下载的文件中搜索的字符串
needle := []byte("26")
// 从 https://example.com/file-0.txt 生成 URL 流
// 到 https://example.com/file-999.txt
// 如果上下文被取消,则停止生成 URL
urls := rill.Generate(func(send func(string), sendErr func(error)) {
for i := 0; i < 1000 && ctx.Err() == nil; i++ {
send(fmt.Sprintf("https://example.com/file-%d.txt", i))
}
})
// 下载和处理文件
// 最多同时下载 5 个文件
matchedUrls := rill.OrderedFilter(urls, 5, func(url string) (bool, error) {
fmt.Println("下载中:", url)
content, err := mockapi.DownloadFile(ctx, url)
if err != nil {
return false, err
}
// 只保留包含 needle 的文件 URL
return bytes.Contains(content, needle), nil
})
// 查找第一个匹配的 URL
firstMatchedUrl, found, err := rill.First(matchedUrls)
if err != nil {
fmt.Println("错误:", err)
return
}
// 打印结果
if found {
fmt.Println("在以下位置找到:", firstMatchedUrl)
} else {
fmt.Println("未找到")
}
}
流合并和 FlatMap
Rill 带来了Merge 函数,它将多个流合并为一个。另一个经常被忽视的可以合并流的函数是FlatMap。它是一个强大的工具,将每个输入项目转换为它自己的流,然后将所有这些流合并在一起。
以下示例中,FlatMap 将每个部门转换为用户流,然后将这些流合并为一个。 像其他 Rill 函数一样,FlatMap 提供了完全的并发控制。在这种特定情况下,并发级别为 3,意味着最多同时从 3 个部门获取用户。
此外,这个示例演示了如何编写一个可重用的流包装器,用于分页 API 调用 -StreamUsers
函数。 这个包装器可以独立使用,也可以作为更大管道的一部分。
尝试它
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 从部门名称流开始
departments := rill.FromSlice([]string{"IT", "Finance", "Marketing", "Support", "Engineering"}, nil)
// 并发从所有部门流式传输用户。
// 同时最多处理 3 个部门。
users := rill.FlatMap(departments, 3, func(department string) <-chan rill.Try[*mockapi.User] {
return StreamUsers(ctx, &mockapi.UserQuery{Department: department})
})
// 从合并流中打印用户
err := rill.ForEach(users, 1, func(user *mockapi.User) error {
fmt.Printf("%+v\n", user)
return nil
})
fmt.Println("错误:", err)
}
// StreamUsers 是围绕 mockapi.ListUsers 函数的可重用流包装器。
// 它遍历所有列表页面,并使用 [Generate] 简化向结果流发送用户和错误。
// 这个函数独立使用或作为更大管道的一部分都很有用。
func StreamUsers(ctx context.Context, query *mockapi.UserQuery) <-chan rill.Try[*mockapi.User] {
return rill.Generate(func(send func(*mockapi.User), sendErr func(error)) {
var currentQuery mockapi.UserQuery
if query != nil {
currentQuery = *query
}
for page := 0; ; page++ {
currentQuery.Page = page
users, err := mockapi.ListUsers(ctx, ¤tQuery)
if err != nil {
sendErr(err)
return
}
if len(users) == 0 {
break
}
for _, user := range users {
send(user)
}
}
})
}
Go 1.23 迭代器
从 Go 1.23 开始,语言增加了range-over-function 特性,允许用户定义自定义迭代器,用于 for-range 循环。这个特性使得 Rill 能够与标准库和第三方包中现有的基于迭代器的函数无缝集成。
Rill 提供了FromSeq 和FromSeq2 函数将迭代器转换为流,以及ToSeq2 函数将流转换回迭代器。
ToSeq2 可以作为不需要并发时ForEach 的一个很好的替代品。它提供了更多的控制,并执行所有必要的清理和排水,即使循环被break 或return 提前终止。
尝试它
func main() {
// 将数字切片转换为流
numbers := rill.FromSlice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
// 转换每个数字
// 并发性 = 3
squares := rill.Map(numbers, 3, func(x int) (int, error) {
return square(x), nil
})
// 将流转换为迭代器并使用 for-range 打印结果
for val, err := range rill.ToSeq2(squares) {
if err != nil {
fmt.Println("错误:", err)
break // 无论早期退出如何,都会进行清理
}
fmt.Printf("%+v\n", val)
}
}
测试策略
Rill 的测试覆盖率超过 95%,测试重点包括:
正确性: 确保函数在不同并发级别下产生准确的结果 并发性: 确认生成和利用正确数量的 goroutine 顺序: 确保有序版本的函数保持顺序,而基本版本不保持
贡献
欢迎贡献!无论是报告 bug、建议功能还是提交 pull 请求,您的支持都有助于改进 Rill。 请确保您的代码遵循现有的风格并包含相关测试。