并发控制(一)
我们考虑这么一种场景,协程 A 在执行过程中需要创建子协程 A1、A2、A3...An, 协程 A 创建完子协程后就等待子协程退出。
针对这种场景,Go 提供了三种解决方案。
- Channel: 使用 channel 控制子协程
- WaitGroup: 使用信号量机制控制子协程
- Context: 使用上下文控制子协程
三种方案各有优劣,比如 Channel 的优点时实现简单,清晰易懂,WaitGroup 的优点是子协程个数可动态调整,Context 的优点是对子协程派生出来的孙子协程的控制。
各种解决方案的缺点时相对而言的,要结合实例应用场景进行选择。
channel
channel 一般用于协程之间的通信,不过 channel 也可以用于并发控制。
比如主协程启动 N 个子协程,主协程等待所有子协程退出后再继续后续流程,这种场景下 channel 也可轻易实现并发控制
场景演示
下面的程序展示了一个使用 channel 控制子协程的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | package main
import (
"fmt"
"time"
)
func Process(ch chan int, num int) {
// Do some work...
time.Sleep(time.Second)
fmt.Printf("Process %d finish\n", num)
ch <- 1 // 在管道中写入一个元素表示当前协程已结束
}
func main() {
channels := make([]chan int, 10) // 创建一个包含 10 个元素的切片,元素类型为 channel
for i := 0; i < 10; i++ {
channels[i] = make(chan int) // 在切片中放入一个 channel
go Process(channels[i], i) // 启动协程,传一个管道用于通信
}
for i, ch := range channels { // 遍历切片,等待子协程结束
<-ch
fmt.Println("routine ", i, " quit!")
}
}
|
输出结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | Process 0 finish
Process 2 finish
routine 0 quit!
Process 1 finish
Process 6 finish
Process 7 finish
routine 1 quit!
routine 2 quit!
Process 8 finish
Process 3 finish
routine 3 quit!
Process 5 finish
Process 4 finish
routine 4 quit!
routine 5 quit!
routine 6 quit!
routine 7 quit!
routine 8 quit!
Process 9 finish
routine 9 quit!
|
上面的程序通过创建 N 个 channel 来管理 N 个协程,每个协程都有一个 channel 用于跟父协程通信,父协程创建完所有协程后等待所有协程结束
在这个例子中,父协程仅仅是等待子协程结束,其实父协程也可以向管道中写入数据通知子协程结束,这时子协程需要定期地探测管道中是否有消息出现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 | package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= numJobs; a++ {
<-results
}
}
|
time go run test.go
输出:
| worker 3 started job 1
worker 2 started job 3
worker 1 started job 2
worker 2 finished job 3
worker 2 started job 4
worker 1 finished job 2
worker 1 started job 5
worker 3 finished job 1
worker 1 finished job 5
worker 2 finished job 4
go run test.go 0.22s user 0.30s system 17% cpu 3.013 total
|
小结
使用 channel 控制子协程的优点是实现简单,缺点是当需要大量创建协程时就需要有相同数量的 channel,而且对于子协程继续派生出来的协程不方便控制
后面继续介绍的 WaitGroup、Context 看起来比 channel 优雅一些,在各种开源组件中使用的概率比 channel 高很多
WaitGroup
WaitGroup 是 Go 应用开发过程中经常使用的并发控制技术
WaitGroup 可理解为 Wait-Goroutine-Group,即等待一组 goroutine 结束。
比如某个 goroutine 需要等待其他几个 goroutine 全部完成,那么使用 WaitGroup 可以轻松实现
下面的程序展示了一个 goroutine 等待另外两个 goroutine 结束的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 | package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
wg.Add(2) // 设置计数器,数值即 goroutine 的个数
go func() {
// Do some work
time.Sleep(1 * time.Second)
fmt.Println("Goroutine 1 finished!")
wg.Done() // goroutine 执行结束后将计数器减 1
}()
go func() {
// Do some work
time.Sleep(2 * time.Second)
fmt.Println("Goroutine 2 finished!")
wg.Done() // goroutine 执行结束后将计数器减 1
}()
wg.Wait() // 主 goroutine 阻塞等待计数器变为 0
fmt.Printf("All Goroutine finished!")
}
|
输出结果:
| Goroutine 1 finished!
Goroutine 2 finished!
All Goroutine finished!
|
简单地说,上面的程序中 wg 内部维护了一个计数器:
- 启动 goroutine 前通过 Add(2) 将计数器设置为待启动的 goroutine 个数
- 启动 goroutine 后,使用 Wait() 方法阻塞自己,等待计数器变为 0
- 每个 goroutine 执行结束后通过 Done() 方法将计数器减 1
- 计数器变为 0 后,阻塞的 goroutine 被唤醒
其实 WaitGroup 也可以实现一组 goroutine 等待另一组 goroutine,这有点像 "玩杂技",很容易出错,如果不了解其实现原理更是如此。
实际上,WaitGroup 的实现源码非常简单
基础知识
信号量是 UNIX 系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源
信号量可简单理解为一个数值:
- 当信号量 > 0 时,表示资源可用,获取信号量时系统自动将信号减 1
- 当信号量
==
0 时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒
由于 WaitGroup 实现中也使用了信号量,在此做一个简单的介绍
WaitGroup
(1) 数据结构
源码包中 src/sync/waitgroup.go:WaitGroup
定义了其数据结构:
| type WaitGroup struct {
state1 [3]uint32
}
|
state1 是一个长度为 3 的数组,其中包含 state 和一个信号量,而 state 实际上是两个计数器。
- counter: 当前还未执行结束的 goroutine 计数器
- waiter count: 等待 goroutine-group 结束的 goroutine 数量,即有多少个等候者
- semaphore: 信号量
WaitGroup 对外提供了三个接口:
Add(delta int)
: 将 delta 值加到 counter 中
Wait()
: waiter 递增 1,并阻塞等待信号量 semaphore
Done()
: counter 递减 1,按照 waiter 数值释放相应次数的信号量
下面分别介绍这三个函数的实现细节:
(2) Add(delta int)
:
Add() 做了两件事,一是把 delta 值累加到 counter 中,因为 delta 可以为负值,也就是说 counter 有可能变成 0 或负值,
所以第二件事就是当 counter 的值变为 0 时,根据 waiter 数值释放等量的信号量,把等待的 goroutine 全部唤醒,如果 counter 变为负值,则触发 panic
Add() 的伪代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state() // 获取 state 和 semaphore 地址指针
state := atomic.AddUint64(statep, uint64(delta)<<32) // 把 delta 左移 32 位
v := int32(state >> 32) // 获取 counter 值
w := uint32(state) // 获取 waiter 值
if v < 0 { // 经过累加后 counter 值变为负值,触发 panic
panic("sync: negative WaitGroup counter")
}
// 经过累加后,此时 counter >= 0
// 如果 counter 为正,则说明不需要释放信号量,直接退出
// 如果 waiter 为 0,则说明没有等待者,也不需要释放信号量,直接退出
if v > 0 || w == 0 {
return
}
// 此时 counter 一定等于 0,而 waiter 一定大于 0(内部维护 waiter,不会出现小于 0 的情况)
// 先把 counter 置为 0,再释放 waiter 个数的信号量
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false) // 释放信号量,执行一次释放一个,唤醒一个等待着
}
}
|
(3) Wait()
Wait() 方法也做了两件事,一是累加 waiter,二是阻塞等待信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | func (wg *WaitGroup) Wait() {
statep, semap := wg.state() // 获取 state 和 semaphore 地址指针
for {
state := atomic.LoadUint64(statep) // 获取 state 值
v := int32(state >> 32) // 获取 counter 值
w := uint32(state) // 获取 waiter 值
if v == 0 { // 如果 counter 值为 0,则说明所有 goroutine 都退出了,不需要等待,直接返回
return
}
// 使用 CAS(比较交换算法)累加 waiter,累加可能会失败,失败后通过 for loop 下次重试
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap) // 累加成功后,等待信号量唤醒自己
return
}
}
}
|
这里用到了 CAS 算法,保证有多个 goroutine 同时执行 Wait() 时也能正确累加 waiter
(4) Done()
Done() 只做一件事,即把 counter 减 1,我们知道 Add() 可以接受负值,所以 Done 实际上只是调用了 Add(-1)。源码如下:
| func (wg *WaitGroup) Done() {
wg.Add(-1)
}
|
Done() 的执行逻辑就转到了 Add(),实际上也正是最后一个完成的 goroutine 把等待者唤醒的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | package main
import (
"fmt"
"sync"
"time"
)
func worker(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
worker(i)
}()
}
wg.Wait()
}
|
输出:
| Worker 5 starting
Worker 2 starting
Worker 4 starting
Worker 1 starting
Worker 3 starting
Worker 3 done
Worker 5 done
Worker 1 done
Worker 4 done
Worker 2 done
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var ops uint64
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
for c := 0; c < 1000; c++ {
atomic.AddUint64(&ops, 1)
}
wg.Done()
}()
}
wg.Wait()
fmt.Println("ops:", ops)
}
|
输出:
编程 Tips
- Add() 操作必须早于 Wait(),否则会触发 panic
- Add() 设置的值必须与实际等待的 goroutine 的个数一致,否则会触发 panic