在谈论 goroutines 时,我们需要记住 Go 语言的简洁性,并且它非常强调并发处理。并发是指能够独立地处理多个任务,一项接一项地进行,这与并行不同,后者是指任务同时执行。我在之前的文章中已经讨论过并发和并行的区别,所以这里不再深入探讨。
Goroutines 的基本概念
让我们直接进入 Go 中 goroutines 的主题,这是在代码中实现并发处理的一种方式。它们非常适合在提高性能的同时不牺牲数据安全性和完整性的场景。这就是 goroutines 和 channels 结合使用的意义所在。
在这篇文章中,我们将更侧重于技术层面,通过实际代码示例来进行探索,而不是理论解释。
简而言之,当我们在 Go 中使用关键字 go
时,我们是在指示某个函数的执行应该是并发的。例如,如果我们有三个互不依赖的函数,我们可以同时执行它们,而不是等待一个完成后再开始下一个。
示例代码
以下是一个使用 goroutines 同时执行三个操作的示例:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// 模拟一个耗时的计算
func performCalculation(id int, wg *sync.WaitGroup) {
defer wg.Done() // 表示 goroutine 完成
fmt.Printf("Goroutine %d: 开始计算...\n", id)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second) // 模拟处理时间
fmt.Printf("Goroutine %d: 计算完成。\n", id)
}
// 模拟读取外部 API
func fetchData(id int, wg *sync.WaitGroup) {
defer wg.Done() // 表示 goroutine 完成
fmt.Printf("Goroutine %d: 开始读取 API...\n", id)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second) // 模拟 API 响应时间
fmt.Printf("Goroutine %d: 接收到 API 数据。\n", id)
}
// 模拟日志记录
func writeLog(id int, wg *sync.WaitGroup) {
defer wg.Done() // 表示 goroutine 完成
fmt.Printf("Goroutine %d: 开始记录日志...\n", id)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second) // 模拟写入时间
fmt.Printf("Goroutine %d: 日志记录完成。\n", id)
}
func main() {
rand.Seed(time.Now().UnixNano()) // 确保随机时间
var wg sync.WaitGroup // 创建一个 WaitGroup
// 增加 WaitGroup 计数器
wg.Add(3)
// 启动三个并发的 goroutines
go performCalculation(1, &wg)
go fetchData(2, &wg)
go writeLog(3, &wg)
// 等待所有 goroutines 完成
wg.Wait()
fmt.Println("所有任务已完成。")
}
代码解析
在 main
函数中,我们首先创建了一个 WaitGroup
变量 wg
,用于控制和同步 goroutines。接着,我们使用 wg.Add(3)
来设置我们将控制的 goroutines 数量。使用 go
关键字启动 goroutines,以并发方式执行。
这三个函数只是模拟了可以并发执行的场景,使用 Done
方法和 defer
来指示 goroutine 的完成状态。从上面的输出可以看出,三个 goroutines 是并发执行的,这种方式比逐个调用函数并等待其完成更高效。
使用 Channels 进行通信
然而,在某些情况下,我们需要在保持代码性能和数据完整性的同时处理相互依赖的函数。我们可以创建两个 goroutines 并在它们之间使用 channel。以下是一个示例:
package main
import (
"context"
"fmt"
"sync"
"time"
)
func process(ctx context.Context, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
fmt.Println("process canceled")
return
case ch <- i:
fmt.Println("sending", i)
time.Sleep(500 * time.Millisecond)
}
}
close(ch)
}
func consumer(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for v := range ch {
fmt.Println("received", v)
time.Sleep(1 * time.Second)
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ch := make(chan int)
wg := sync.WaitGroup{}
wg.Add(2)
go process(ctx, ch, &wg)
go consumer(ch, &wg)
wg.Wait()
}
代码解析
在 main
函数中,我们首先创建了一个上下文 ctx
,用于演示 context.WithTimeout
的使用,它允许我们定义操作的最长持续时间。在函数执行结束时,通过 defer
确保调用 cancel
方法以释放与上下文关联的资源。
我们还创建了一个 channel,并使用 make
关键字指定它是一个通道,并推断其传输的数据类型。与前面的示例一样,我们使用 WaitGroup
控制 goroutines 的执行流,确保主程序在所有 goroutines 完成之前不会继续执行。
接下来,我们声明了 process
函数,负责处理数据。该函数接收三个参数:上下文(ctx
)、WaitGroup
(wg
)和通道(ch
)。在 process
函数中,有一个 for
循环迭代直到 i
小于 10。然而,我们使用了一个 select
语句来处理两个 case
:
当上下文到期(达到 main
函数中定义的 5 秒超时时间)时,执行中断。当处理完成时,处理过的值被发送到通道。
之后,我们使用 consumer
函数,遍历通道中的值,模拟队列行为。这样,我们在两个相互依赖的函数之间创建了一个并发流:一个生产数据,另一个消费数据。
总结
总之,尽可能分析你的代码并识别可以进行性能改进的区域。使用 goroutines 进行并发处理,并通过 channels 在它们之间进行通信,可以显著提高处理性能,同时确保数据的完整性和安全性。