Go 协程与通道

科技   2024-11-27 11:47   广东  

在谈论 goroutines 时,我们需要记住 Go 语言的简洁性,并且它非常强调并发处理。并发是指能够独立地处理多个任务,一项接一项地进行,这与并行不同,后者是指任务同时执行。我在之前的文章中已经讨论过并发和并行的区别,所以这里不再深入探讨。

Goroutines 的基本概念

让我们直接进入 Go 中 goroutines 的主题,这是在代码中实现并发处理的一种方式。它们非常适合在提高性能的同时不牺牲数据安全性和完整性的场景。这就是 goroutines 和 channels 结合使用的意义所在。

在这篇文章中,我们将更侧重于技术层面,通过实际代码示例来进行探索,而不是理论解释。

简而言之,当我们在 Go 中使用关键字 go 时,我们是在指示某个函数的执行应该是并发的。例如,如果我们有三个互不依赖的函数,我们可以同时执行它们,而不是等待一个完成后再开始下一个。

示例代码

以下是一个使用 goroutines 同时执行三个操作的示例:

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// 模拟一个耗时的计算
func performCalculation(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 表示 goroutine 完成
    fmt.Printf("Goroutine %d: 开始计算...\n", id)
    time.Sleep(time.Duration(rand.Intn(3)) * time.Second) // 模拟处理时间
    fmt.Printf("Goroutine %d: 计算完成。\n", id)
}

// 模拟读取外部 API
func fetchData(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 表示 goroutine 完成
    fmt.Printf("Goroutine %d: 开始读取 API...\n", id)
    time.Sleep(time.Duration(rand.Intn(3)) * time.Second) // 模拟 API 响应时间
    fmt.Printf("Goroutine %d: 接收到 API 数据。\n", id)
}

// 模拟日志记录
func writeLog(id int, wg *sync.WaitGroup) {
    defer wg.Done() // 表示 goroutine 完成
    fmt.Printf("Goroutine %d: 开始记录日志...\n", id)
    time.Sleep(time.Duration(rand.Intn(3)) * time.Second) // 模拟写入时间
    fmt.Printf("Goroutine %d: 日志记录完成。\n", id)
}

func main() {
    rand.Seed(time.Now().UnixNano()) // 确保随机时间
    var wg sync.WaitGroup // 创建一个 WaitGroup

    // 增加 WaitGroup 计数器
    wg.Add(3)

    // 启动三个并发的 goroutines
    go performCalculation(1, &wg)
    go fetchData(2, &wg)
    go writeLog(3, &wg)

    // 等待所有 goroutines 完成
    wg.Wait()
    fmt.Println("所有任务已完成。")
}

代码解析

main 函数中,我们首先创建了一个 WaitGroup 变量 wg,用于控制和同步 goroutines。接着,我们使用 wg.Add(3) 来设置我们将控制的 goroutines 数量。使用 go 关键字启动 goroutines,以并发方式执行。

这三个函数只是模拟了可以并发执行的场景,使用 Done 方法和 defer 来指示 goroutine 的完成状态。从上面的输出可以看出,三个 goroutines 是并发执行的,这种方式比逐个调用函数并等待其完成更高效。

使用 Channels 进行通信

然而,在某些情况下,我们需要在保持代码性能和数据完整性的同时处理相互依赖的函数。我们可以创建两个 goroutines 并在它们之间使用 channel。以下是一个示例:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

func process(ctx context.Context, ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 10; i++ {
        select {
        case <-ctx.Done():
            fmt.Println("process canceled")
            return
        case ch <- i:
            fmt.Println("sending", i)
            time.Sleep(500 * time.Millisecond)
        }
    }
    close(ch)
}

func consumer(ch chan int, wg *sync.WaitGroup) {
    defer wg.Done()
    for v := range ch {
        fmt.Println("received", v)
        time.Sleep(1 * time.Second)
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    ch := make(chan int)
    wg := sync.WaitGroup{}
    wg.Add(2)

    go process(ctx, ch, &wg)
    go consumer(ch, &wg)

    wg.Wait()
}

代码解析

main 函数中,我们首先创建了一个上下文 ctx,用于演示 context.WithTimeout 的使用,它允许我们定义操作的最长持续时间。在函数执行结束时,通过 defer 确保调用 cancel 方法以释放与上下文关联的资源。

我们还创建了一个 channel,并使用 make 关键字指定它是一个通道,并推断其传输的数据类型。与前面的示例一样,我们使用 WaitGroup 控制 goroutines 的执行流,确保主程序在所有 goroutines 完成之前不会继续执行。

接下来,我们声明了 process 函数,负责处理数据。该函数接收三个参数:上下文(ctx)、WaitGroupwg)和通道(ch)。在 process 函数中,有一个 for 循环迭代直到 i 小于 10。然而,我们使用了一个 select 语句来处理两个 case

  1. 当上下文到期(达到 main 函数中定义的 5 秒超时时间)时,执行中断。
  2. 当处理完成时,处理过的值被发送到通道。

之后,我们使用 consumer 函数,遍历通道中的值,模拟队列行为。这样,我们在两个相互依赖的函数之间创建了一个并发流:一个生产数据,另一个消费数据。

总结

总之,尽可能分析你的代码并识别可以进行性能改进的区域。使用 goroutines 进行并发处理,并通过 channels 在它们之间进行通信,可以显著提高处理性能,同时确保数据的完整性和安全性。

点击关注并扫码添加进交流群
免费领取「Go 语言」学习资料

源自开发者
专注于提供关于Go语言的实用教程、案例分析、最新趋势,以及云原生技术的深度解析和实践经验分享。
 最新文章