Skip to content

并发控制(一)

我们考虑这么一种场景,协程 A 在执行过程中需要创建子协程 A1、A2、A3...An, 协程 A 创建完子协程后就等待子协程退出。
针对这种场景,Go 提供了三种解决方案。

  • Channel: 使用 channel 控制子协程
  • WaitGroup: 使用信号量机制控制子协程
  • Context: 使用上下文控制子协程

三种方案各有优劣,比如 Channel 的优点时实现简单,清晰易懂,WaitGroup 的优点是子协程个数可动态调整,Context 的优点是对子协程派生出来的孙子协程的控制。
各种解决方案的缺点时相对而言的,要结合实例应用场景进行选择。

channel

channel 一般用于协程之间的通信,不过 channel 也可以用于并发控制。
比如主协程启动 N 个子协程,主协程等待所有子协程退出后再继续后续流程,这种场景下 channel 也可轻易实现并发控制

场景演示

下面的程序展示了一个使用 channel 控制子协程的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
    "fmt"
    "time"
)

func Process(ch chan int, num int) {
    // Do some work...
    time.Sleep(time.Second)

    fmt.Printf("Process %d finish\n", num)
    ch <- 1 // 在管道中写入一个元素表示当前协程已结束
}

func main() {
    channels := make([]chan int, 10) // 创建一个包含 10 个元素的切片,元素类型为 channel

    for i := 0; i < 10; i++ {
        channels[i] = make(chan int) // 在切片中放入一个 channel
        go Process(channels[i], i)   // 启动协程,传一个管道用于通信
    }

    for i, ch := range channels { // 遍历切片,等待子协程结束
        <-ch
        fmt.Println("routine ", i, " quit!")
    }
}

输出结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
Process 0 finish
Process 2 finish
routine  0  quit!
Process 1 finish
Process 6 finish
Process 7 finish
routine  1  quit!
routine  2  quit!
Process 8 finish
Process 3 finish
routine  3  quit!
Process 5 finish
Process 4 finish
routine  4  quit!
routine  5  quit!
routine  6  quit!
routine  7  quit!
routine  8  quit!
Process 9 finish
routine  9  quit!

上面的程序通过创建 N 个 channel 来管理 N 个协程,每个协程都有一个 channel 用于跟父协程通信,父协程创建完所有协程后等待所有协程结束

在这个例子中,父协程仅仅是等待子协程结束,其实父协程也可以向管道中写入数据通知子协程结束,这时子协程需要定期地探测管道中是否有消息出现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
    "fmt"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- j * 2
    }
}

func main() {

    const numJobs = 5
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)

    for a := 1; a <= numJobs; a++ {
        <-results
    }
}

time go run test.go 输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
worker 3 started  job 1
worker 2 started  job 3
worker 1 started  job 2
worker 2 finished job 3
worker 2 started  job 4
worker 1 finished job 2
worker 1 started  job 5
worker 3 finished job 1
worker 1 finished job 5
worker 2 finished job 4
go run test.go  0.22s user 0.30s system 17% cpu 3.013 total

小结

使用 channel 控制子协程的优点是实现简单,缺点是当需要大量创建协程时就需要有相同数量的 channel,而且对于子协程继续派生出来的协程不方便控制

后面继续介绍的 WaitGroup、Context 看起来比 channel 优雅一些,在各种开源组件中使用的概率比 channel 高很多

WaitGroup

WaitGroup 是 Go 应用开发过程中经常使用的并发控制技术

WaitGroup 可理解为 Wait-Goroutine-Group,即等待一组 goroutine 结束。
比如某个 goroutine 需要等待其他几个 goroutine 全部完成,那么使用 WaitGroup 可以轻松实现

下面的程序展示了一个 goroutine 等待另外两个 goroutine 结束的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    wg.Add(2) // 设置计数器,数值即 goroutine 的个数
    go func() {
        // Do some work
        time.Sleep(1 * time.Second)

        fmt.Println("Goroutine 1 finished!")
        wg.Done() // goroutine 执行结束后将计数器减 1
    }()

    go func() {
        // Do some work
        time.Sleep(2 * time.Second)

        fmt.Println("Goroutine 2 finished!")
        wg.Done() // goroutine 执行结束后将计数器减 1
    }()

    wg.Wait() // 主 goroutine 阻塞等待计数器变为 0
    fmt.Printf("All Goroutine finished!")
}

输出结果:

1
2
3
Goroutine 1 finished!
Goroutine 2 finished!
All Goroutine finished!

简单地说,上面的程序中 wg 内部维护了一个计数器:

  • 启动 goroutine 前通过 Add(2) 将计数器设置为待启动的 goroutine 个数
  • 启动 goroutine 后,使用 Wait() 方法阻塞自己,等待计数器变为 0
  • 每个 goroutine 执行结束后通过 Done() 方法将计数器减 1
  • 计数器变为 0 后,阻塞的 goroutine 被唤醒

其实 WaitGroup 也可以实现一组 goroutine 等待另一组 goroutine,这有点像 "玩杂技",很容易出错,如果不了解其实现原理更是如此。
实际上,WaitGroup 的实现源码非常简单

基础知识

信号量是 UNIX 系统提供的一种保护共享资源的机制,用于防止多个线程同时访问某个资源

信号量可简单理解为一个数值:

  • 当信号量 > 0 时,表示资源可用,获取信号量时系统自动将信号减 1
  • 当信号量 == 0 时,表示资源暂不可用,获取信号量时,当前线程会进入睡眠,当信号量为正时被唤醒

由于 WaitGroup 实现中也使用了信号量,在此做一个简单的介绍

WaitGroup

(1) 数据结构

源码包中 src/sync/waitgroup.go:WaitGroup 定义了其数据结构:

1
2
3
type WaitGroup struct {
    state1 [3]uint32
}

state1 是一个长度为 3 的数组,其中包含 state 和一个信号量,而 state 实际上是两个计数器。

  • counter: 当前还未执行结束的 goroutine 计数器
  • waiter count: 等待 goroutine-group 结束的 goroutine 数量,即有多少个等候者
  • semaphore: 信号量

WaitGroup 对外提供了三个接口:

  • Add(delta int): 将 delta 值加到 counter 中
  • Wait(): waiter 递增 1,并阻塞等待信号量 semaphore
  • Done(): counter 递减 1,按照 waiter 数值释放相应次数的信号量

下面分别介绍这三个函数的实现细节:

(2) Add(delta int):

Add() 做了两件事,一是把 delta 值累加到 counter 中,因为 delta 可以为负值,也就是说 counter 有可能变成 0 或负值,
所以第二件事就是当 counter 的值变为 0 时,根据 waiter 数值释放等量的信号量,把等待的 goroutine 全部唤醒,如果 counter 变为负值,则触发 panic

Add() 的伪代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state() // 获取 state 和 semaphore 地址指针

    state := atomic.AddUint64(statep, uint64(delta)<<32) // 把 delta 左移 32 位

    v := int32(state >> 32) // 获取 counter 值
    w := uint32(state)      // 获取 waiter 值

    if v < 0 { // 经过累加后 counter 值变为负值,触发 panic
        panic("sync: negative WaitGroup counter")
    }

    // 经过累加后,此时 counter >= 0
    // 如果 counter 为正,则说明不需要释放信号量,直接退出
    // 如果 waiter 为 0,则说明没有等待者,也不需要释放信号量,直接退出
    if v > 0 || w == 0 {
        return
    }

    // 此时 counter 一定等于 0,而 waiter 一定大于 0(内部维护 waiter,不会出现小于 0 的情况)
    // 先把 counter 置为 0,再释放 waiter 个数的信号量
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false) // 释放信号量,执行一次释放一个,唤醒一个等待着
    }
}

(3) Wait()

Wait() 方法也做了两件事,一是累加 waiter,二是阻塞等待信号量

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (wg *WaitGroup) Wait() {
    statep, semap := wg.state() // 获取 state 和 semaphore 地址指针
    for {
        state := atomic.LoadUint64(statep) // 获取 state 值
        v := int32(state >> 32)            // 获取 counter 值
        w := uint32(state)                 // 获取 waiter 值
        if v == 0 {                        // 如果 counter 值为 0,则说明所有 goroutine 都退出了,不需要等待,直接返回
            return
        }
        // 使用 CAS(比较交换算法)累加 waiter,累加可能会失败,失败后通过 for loop 下次重试
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap) // 累加成功后,等待信号量唤醒自己
            return
        }
    }
}

这里用到了 CAS 算法,保证有多个 goroutine 同时执行 Wait() 时也能正确累加 waiter

(4) Done()

Done() 只做一件事,即把 counter 减 1,我们知道 Add() 可以接受负值,所以 Done 实际上只是调用了 Add(-1)。源码如下:

1
2
3
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

Done() 的执行逻辑就转到了 Add(),实际上也正是最后一个完成的 goroutine 把等待者唤醒的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int) {
    fmt.Printf("Worker %d starting\n", id)

    time.Sleep(time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        i := i
        go func() {
            defer wg.Done()
            worker(i)
        }()
    }

    wg.Wait()
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Worker 5 starting
Worker 2 starting
Worker 4 starting
Worker 1 starting
Worker 3 starting
Worker 3 done
Worker 5 done
Worker 1 done
Worker 4 done
Worker 2 done
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var ops uint64

    var wg sync.WaitGroup

    for i := 0; i < 50; i++ {
        wg.Add(1)
        go func() {
            for c := 0; c < 1000; c++ {
                atomic.AddUint64(&ops, 1)
            }
            wg.Done()
        }()
    }

    wg.Wait()
    fmt.Println("ops:", ops)
}

输出:

1
ops: 50000

编程 Tips

  • Add() 操作必须早于 Wait(),否则会触发 panic
  • Add() 设置的值必须与实际等待的 goroutine 的个数一致,否则会触发 panic