点击上方蓝字,后台回复【合集】获取 Go资料
关于并发
原生线程、Java线程、Goroutine
线程栈默认空间大且不支持动态伸缩,Java 默认最小都是1MB,Linux 默认 8MB; 线程切换创建、销毁以及线程间上下文切换的代价都较大。 线程通过共享内存进行通讯,
POSIX线程(Pthreads)是C函数、类型和常数的集合,用于创建和管理线程。它是POSIX标准的一个子集,提供在BeagleBone Black上使用C/C++应用程序实现线程所需的一切。 原生线程就是操作系统线程或叫系统线程。
资源占用小,每个Goroutine的初始栈大小仅为2KB,且支持动态伸缩,避免内存浪费; 由Go运行时而不是操作系统调度,goroutine上下文切换代价较小; 内置channel作为goroutine间通信原语,为并发设计提供强大支撑。
了解Go调度原理
G-P-M模型
G(goroutine):一个执行单元,这里也就是 goroutine,它包含了执行代码所需的信息,比如栈空间、程序计数器等。
P(processor):P 一个逻辑处理器,它负责执行 goroutine。每个 P 维护了一个 goroutine 队列,它可以将 goroutine 分配到 M(系统线程)上执行。P 的数量由 GOMAXPROCS 环境变量决定,默认值为 CPU 的逻辑核心数。
M(machine):一个系统线程(machine),它负责执行 goroutine 的真正计算工作。M 与操作系统的线程直接绑定,负责实际的计算任务,比如执行 goroutine 的函数、系统调用等。Go 语言的调度器会将多个 goroutine 映射到少量的系统线程上执行。
抢占式调度
优先级调度(Priority Scheduling):
每个线程都有一个优先级,高优先级的线程会比低优先级的线程更容易获得CPU的执行权(注意:设置了优先级不是绝对优先执行,只是概率上高)。 在Java中,线程的优先级范围是从 Thread.MIN_PRIORITY
(1)到Thread.MAX_PRIORITY
(10),默认是Thread.NORM_PRIORITY
(5)。
每个线程被分配一个固定的时间片,当该线程的时间片用完时,操作系统会暂停它的执行,将CPU控制权交给下一个线程。 在Java中,时间片轮转调度通过 yield()
方法来实现。当线程调用yield()
时,它就会主动放弃CPU的执行权,让其他线程有机会执行。
sysmon
,用来进行系统监控任务,如垃圾回收、抢占调度、监视死锁等。这个函数在后台运行,确保 Go 程序的正常运行。func main() {
...
if GOARCH != "wasm" {
// 系统栈上的函数执行
systemstack(func() {
newm(sysmon, nil, -1) // 用于创建新的 M(机器,代表一个操作系统线程)。
})
}
...
}
sysmon
每20us~10ms启动一次,大体工作:释放闲置超过5分钟的span物理内存; 如果超过2分钟没有垃圾回收,强制执行; 将长时间未处理的netpoll结果添加到任务队列; 向长时间运行的G任务发出抢占调度; 收回因syscall长时间阻塞的P。
系统调用:当一个 goroutine 执行系统调用时,调度器会将该 goroutine 暂停,并将处理器分配给其他可运行的 goroutine。一旦系统调用完成,被暂停的 goroutine 可以继续执行。 函数调用:当一个 goroutine 调用一个阻塞的函数(如通道的发送和接收操作、锁的加锁和解锁操作等)时,调度器会将该 goroutine 暂停,并将处理器分配给其他可运行的 goroutine。一旦被阻塞的函数可以继续执行,被暂停的 goroutine 可以继续执行。 时间片耗尽:每个 goroutine 在运行一段时间后都会消耗一个时间片。当时间片耗尽时,调度器会将当前正在运行的 goroutine 暂停,并将处理器分配给其他可运行的 goroutine。被暂停的 goroutine 将会被放入到就绪队列中,等待下一次调度。
GO并发模型
goroutine:对应CSP模型中的P(原意是进程,在这里也就是goroutine),封装了数据的处理逻辑,是Go运行时调度的基本执行单元。 channel:对应CSP模型中的输入/输出原语,用于goroutine之间的通信和同步。 select:用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。
在无缓冲通道上,每一次发送操作都有对应匹配的接收操作。 对于从无缓冲通道进行的接收,发生在对该通道进行的发送完成之前。 对于带缓冲的通道(缓存大小为C),通道中的第K个接收完成操作发生在第K+C个发送操作完成之前。 如果将C=0就是无缓冲的通道,也就是第K个接收完成在第K个发送完成之前。
func sender(ch chan<- int, done chan<- bool) {
fmt.Println("Sending...")
ch <- 42 // 发送数据到无缓冲通道
fmt.Println("Sent")
done <- true // 发送完成信号
}
func receiver(ch <-chan int, done <-chan bool) {
<-done // 等待发送操作完成信号
fmt.Println("Receiving...")
val := <-ch // 从无缓冲通道接收数据
fmt.Println("Received:", val)
}
func main() {
ch := make(chan int) // 创建无缓冲通道
done := make(chan bool) // 用于发送操作完成信号
go sender(ch, done) // 启动发送goroutine
go receiver(ch, done) // 启动接收goroutine
time.Sleep(2 * time.Second) // 等待一段时间以观察结果
}
func sender(ch chan<- int) {
for i := 0; i < 5; i++ {
fmt.Println("Sending:", i)
ch <- i // 发送数据到通道
fmt.Println("Sent:", i)
}
close(ch)
}
func receiver(ch <-chan int) {
for {
val, ok := <-ch // 从通道接收数据
if !ok {
fmt.Println("Channel closed")
return
}
fmt.Println("Received:", val)
time.Sleep(1 * time.Second) // 模拟接收操作耗时
}
}
func main() {
ch := make(chan int, 2) // 创建带缓冲大小为2的通道
go sender(ch) // 启动发送goroutine
go receiver(ch) // 启动接收goroutine
time.Sleep(10 * time.Second) // 等待一段时间以观察结果
}
Go并发场景
并行计算
// calculateSquare 是一个计算数字平方的函数,它模拟了一个耗时的计算过程。
func calculateSquare(num int, resultChan chan<- int) {
time.Sleep(1 * time.Second) // 模拟耗时计算
resultChan <- num * num
}
func main() {
nums := []int{1, 2, 3, 4, 5}
resultChan := make(chan int)
// 启动多个goroutine并发计算数字的平方
for _, num := range nums {
go calculateSquare(num, resultChan)
}
// 从通道中接收计算结果并打印
for range nums {
result := <-resultChan
fmt.Println("Square:", result)
}
close(resultChan)
}
IO密集型任务
// fetchURL 函数用于获取指定URL的内容,并将结果发送到通道resultChan中。
func fetchURL(url string, resultChan chan<- string) {
resp, err := http.Get(url)
if err != nil {
resultChan <- fmt.Sprintf("Error fetching %s: %s", url, err)
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
resultChan <- fmt.Sprintf("Error reading response from %s: %s", url, err)
return
}
resultChan <- string(body)
}
func main() {
urls := []string{"https://example.com", "https://example.org", "https://example.net"}
resultChan := make(chan string)
// 启动多个goroutine并发获取URL的内容
for _, url := range urls {
go fetchURL(url, resultChan)
}
// 从通道中接收结果并打印
for range urls {
result := <-resultChan
fmt.Println("Response:", result)
}
close(resultChan)
}
并发数据处理
// processData 函数用于处理从dataStream中接收的数据,并将处理结果发送到resultChan中。
func processData(dataStream <-chan int, resultChan chan<- int) {
for num := range dataStream {
resultChan <- num * 2 // 假设处理数据是将数据乘以2
}
}
func main() {
dataStream := make(chan int)
resultChan := make(chan int)
// 产生数据并发送到dataStream中
go func() {
for i := 1; i <= 5; i++ {
dataStream <- i
}
close(dataStream)
}()
// 启动goroutine并发处理数据
go processData(dataStream, resultChan)
// 从通道中接收处理结果并打印
for range dataStream {
result := <-resultChan
fmt.Println("Processed Data:", result)
}
close(resultChan)
}
并发网络编程
// handler 是一个HTTP请求处理函数,它会向客户端发送"Hello, World!"的响应。
func handler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, World!")
}
func main() {
// 注册HTTP请求处理函数
http.HandleFunc("/", handler)
// 启动HTTP服务器并监听端口8080
go http.ListenAndServe(":8080", nil)
fmt.Println("Server started on port 8080")
// 使用select{}使主goroutine保持运行状态,以便HTTP服务器能够处理请求
select {}
}
定时任务和周期性任务
// task 是一个需要定时执行的任务函数。
func task() {
fmt.Println("Task executed at:", time.Now())
}
func main() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
// 循环等待定时器的触发并执行任务
for {
select {
case <-ticker.C:
task()
}
}
}
工作池
通过创建一组goroutine来处理任务池中的任务,可以有效地控制并发数量,适用于需要限制并发的情况。
// worker 是一个工作函数,它会从jobs通道中接收任务,并将处理结果发送到results通道中。
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d started job %d\n", id, job)
time.Sleep(1 * time.Second) // 模拟工作时间
fmt.Printf("Worker %d finished job %d\n", id, job)
results <- job * 2 // 假设工作的结果是输入的两倍
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs) // 缓冲channel用于发送任务
results := make(chan int, numJobs) // 用于接收任务结果
// 启动多个worker goroutine
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(i)
}