Skip to content

并发

通常程序会被编写为一个顺序执行并完成一个独立任务的代码。
如果没有特别的需求,最好总是这样写代码,因为这种类型的程序通常很容易写,也很容易维护。
不过也有一些情况下,并行执行多个任务会有更大的好处。
一个例子是,Web 服务需要在各自独立的套接字(socket)上同时接收多个数据请求。
每个套接字请求都是独立的,可以完全独立于其他套接字进行处理。
具有并行执行多个请求的能力可以显著提高这类系统的性能。
考虑到这一点,Go 语言的语法和运行时直接内置了对并发的支持。

Go 语言里的并发指的是能让某个函数独立于其他函数运行的能力。
当一个函数创建为 goroutine 时,Go 会将其视为一个独立的工作单元。这个单元会被调度到可用的逻辑处理器上执行。
Go 语言运行时的调度器是一个复杂的软件,能管理被创建的所有 goroutine 并为其分配执行时间。
这个调度器在操作系统之上,将操作系统的线程于语言运行时的逻辑处理器绑定,并在逻辑处理器上运行 goroutine。
调度器在任何给定的时间,都会全面控制哪个 goroutine 要在哪个逻辑处理器上运行。

并发与并行

并行是并发设计的理想执行模式

多线程或多进程是并行的基本条件,但单线程也可用协程(coroutine)做到并发。
尽管协程在单个线程上通过主动切换来实现多任务并发,但它也有自己的优势。
除了将因阻塞而浪费的事件找回来外,还免去了线程切换开销,有着不错的执行效率。
协程上运行的多个任务本质上是依旧串行的,加上可控自主调度,所以并不需要做同步处理。

即便采用多线程也未必就能并行。
Python 就因 GIL 限制,默认只能并发而不能并行,所以很多时候转而使用“多进程+协程”架构

很难说哦哪种方式更好一些,它们有各自适用的场景。
通常情况下,用多进程来实现分布式和负载平衡,减轻单线程垃圾回收压力;
用多线程(LWP)抢夺更多的处理器资源;
用协程来提高处理器时间片利用率

简单将 goroutine 归纳为协程并不合适。
运行时会创建多个线程来执行并发任务,且任务单元可被调度到其他线程并行执行。
这更像是多线程和协程的综合体,能最大限度提升执行效率,发挥多核处理能力。

goroutine

Go 语言的异步操作引入了协程(原单词为 goroutines) 的概念,启动新的协程后会异步执行指定的函数。
协程类似于线程,但它并非鱼线程一一对应,而是由 Go 运行时内部进行调度,有可能一个线程会运行多个协程,也有可能一个协程在不同线程间切换,这一切都是 Go 运行时自动分配和控制的

Go 协程没有名称,也没有 ID 值,程序代码无法获取协程的唯一标识。

应用程序在运行时至少会启动一个协程--执行 main 函数的协程,此协程可以称为主协程,当该协程执行完毕后,整个程序就会退出

只须在函数调用前添加 go 关键字即可创建并发任务

1
2
3
4
5
go println("hello,world!")

go func(s string) {
    println(s)
}("hello,world!")

注意是函数调用,所以必须提供相应的参数

关键字 go 并非执行并发操作,而是创建一个并发任务单元。
新建任务被放置在系统队列中,等待调度器安排合适系统线程去获取执行权。
当前流程不会阻塞,不会等待该任务启动,且运行时也不保证并发任务的执行次序。

每个任务单元除保存函数指针、调用参数外,还会分配执行所需的栈内存空间。
相比系统默认 MB 级别的线程栈,goroutine 自定义栈初始仅须 2KB,所以才能创建成千上万的并发任务。
自定义栈采取按需分配策略,在需要时进行扩容,最大能到 GB 规模

与 defer 一样,goroutine 也会因“延迟执行”而立即计算并复制执行参数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import "time"

var c int

func counter() int {
    c++
    return c
}

func main() {
    a := 100

    go func(x, y int) {
        time.Sleep(time.Second) // 让 goroutine 在 main 逻辑之后执行
        println("go:", x, y)
    }(a, counter()) // 立即计算并复制参数

    a += 100
    println("main:", a, counter())

    time.Sleep(time.Second * 3) // 等待 goroutine 结束
}

输出:

1
2
main: 200 2
go: 100 1
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
    "fmt"
    "time"
)

func f(from string) {
    for i := 0; i < 3; i++ {
        fmt.Println(from, ":", i)
    }
}

func main() {

    f("direct")

    go f("goroutine")

    go func(msg string) {
        fmt.Println(msg)
    }("going")

    time.Sleep(time.Second)
    fmt.Println("done")
}

输出:

1
2
3
4
5
6
7
8
direct : 0
direct : 1
direct : 2
going
goroutine : 0
goroutine : 1
goroutine : 2
done

Wait

进程退出时不会等待并发任务结束,可用通道(channel)阻塞,然后发出退出信号

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import "time"

func main() {
    exit := make(chan struct{}) // 创建通道。因为仅是通知,数据并没有实际意义

    go func() {
        time.Sleep(time.Second)
        println("goroutine done.")

        close(exit) // 关闭通道,发出信号
    }()

    println("main...")
    <-exit // 如果通道关闭,立即解除阻塞
    println("main exit.")
}

输出:

1
2
3
main...
goroutine done.
main exit.

除关闭通道外,写入数据也可解除阻塞

如果等待多个任务结束,推荐使用 sync.WaitGroup
通过设定计数器,让每个 goroutine 在退出前递减,直至归零时解除阻塞。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1) // 累加计数

        go func(id int) {
            defer wg.Done() //递减计数

            time.Sleep(time.Second)
            println("goroutine", id, "done.")
        }(i)
    }

    println("main...")
    wg.Wait() // 阻塞, 直到计数归零
    println("main exit.")
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
main...
goroutine 8 done.
goroutine 9 done.
goroutine 0 done.
goroutine 1 done.
goroutine 2 done.
goroutine 3 done.
goroutine 4 done.
goroutine 5 done.
goroutine 6 done.
goroutine 7 done.
main exit.

尽管 WaitGroup.Add 实现了原子操作,但建议在 goroutine 外累加计数器,以免 Add 未执行,Wait 已经退出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package main

import (
    "sync"
)

func main() {
    var wg sync.WaitGroup

    go func() {
        wg.Add(1) // 来不及设置
        println("hi!")
    }()

    wg.Wait()
    println("exit.")
}

输出:

1
exit.

可在多处使用 Wait 阻塞,它们都能接收到通知

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(1)

    go func() {
        wg.Wait() // 等待归零,解除阻塞
        println("wait exit.")
    }()

    go func() {
        time.Sleep(time.Second)
        println("done.")
        wg.Done() // 递减计数
    }()

    wg.Wait() // 等待归零,解除阻塞
    println("main exit.")

}

输出:

1
2
3
done.
wait exit.
main exit.

GOMAXPROCS

运行时可能会创建很多线程,但任何时候仅有限的几个线程参与并发任务执行。
该数量默认与处理器核数相等,可用 runtime.GOMAXPROCS 函数(或环境变量)修改。

如参数小于 1,GOMAXPROCS 仅返回当前设置值,不做任何调整

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
    "math"
    "runtime"
    "sync"
)

// 测试目标函数
func count() {
    x := 0
    for i := 0; i < math.MaxUint32; i++ {
        x += i
    }
    println(x)
}

// 循环执行
func test(n int) {
    for i := 0; i < n; i++ {
        count()
    }
}

// 并发执行
func test2(n int) {
    var wg sync.WaitGroup
    wg.Add(n)

    for i := 0; i < n; i++ {
        go func() {
            count()
            wg.Done()
        }()
    }

    wg.Wait()
}

func main() {
    n := runtime.GOMAXPROCS(0)
    test(n)
    // test2(n)
}
1
2
3
4
5
6
$ time go run test.go
9223372030412324865
9223372030412324865
9223372030412324865
9223372030412324865
go run test.go  9.79s user 0.34s system 65% cpu 15.502 total

执行test2:

1
2
3
4
5
func main() {
    n := runtime.GOMAXPROCS(0)
    // test(n)
    test2(n)
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
$ time go run test.go
9223372030412324865
9223372030412324865
9223372030412324865
9223372030412324865
go run test.go  
10.94s user   // 多核执行时间累加
0.54s system 
117% cpu 
9.799 total  // 程序实际执行事件

Local Storage

与线程不同,goroutine 任务无法设置优先级,无法获取编号,没有局部存储(TLS),甚至连返回值都会被抛弃。
但除优先级外,其他功能都很容易实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    var gs [5]struct { // 用于实现类似 TLS 功能
        id     int // 编号
        result int // 返回值
    }

    for i := 0; i < len(gs); i++ {
        wg.Add(1)

        go func(id int) {
            defer wg.Done()

            gs[id].id = id
            gs[id].result = (id + 1) * 100
        }(i)
    }

    wg.Wait()
    fmt.Printf("%+v\n", gs)
}

输出:

1
[{id:0 result:100} {id:1 result:200} {id:2 result:300} {id:3 result:400} {id:4 result:500}]

如使用 map 作为局部存储容器,建议做同步处理,因为运行时会对其做并发读写检查

Gosched

暂停,释放线程去执行其他任务。当前任务被放回队列,等待下次调度时恢复执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
    "runtime"
)

func main() {
    runtime.GOMAXPROCS(1)
    exit := make(chan struct{})

    go func() { // 任务a
        defer close(exit)

        go func() { // 任务 b。放在此处,是为了确保 a 优先执行
            println("b")
        }()

        for i := 0; i < 4; i++ {
            println("a:", i)

            if i == 1 { // 让出当前线程,调度执行 b
                runtime.Gosched()
            }
        }
    }()

    <-exit
}

输出:

1
2
3
4
5
a: 0
a: 1
b
a: 2
a: 3

该函数很少被使用,因为运行时会主动向长时间运行(10ms)的任务发出抢占调度。

Goexit

Goexit 立即终止当前任务,运行时确保所有已注册延迟调用被执行。
该函数不会影响其他并发任务,不会引发 panic,自然也就无法捕获。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
    "runtime"
)

func main() {
    exit := make(chan struct{})

    go func() {
        defer close(exit)  // 执行
        defer println("a") // 执行

        func() {
            defer func() {
                println("b", recover() == nil) // 执行, recover 返回 nil
            }()

            func() { // 在多层调用中执行 Goexit
                println("c")
                runtime.Goexit()   // 立即终止整个调用堆栈
                println("c done.") // 不会执行
            }()

            println("b done.") // 不会执行
        }()

        println("a done.") // 不会执行
    }()

    <-exit
    println("main exit.")
}

输出:

1
2
3
4
c
b true
a
main exit.

如果在 main.main 里调用 Goexit,它会等待其他任务结束,然后让进程直接崩溃

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    for i := 0; i < 2; i++ {
        go func(x int) {
            for n := 0; n < 2; n++ {
                fmt.Printf("%c: %d\n", 'a'+x, n)
                time.Sleep(time.Millisecond)
            }
        }(i)
    }

    runtime.Goexit() // 等待所有任务结束
    println("main exit.")
}

输出:

1
2
3
4
5
a: 0
b: 0
a: 1
b: 1
fatal error: no goroutines (main called runtime.Goexit) - deadlock!

无论身处哪一层,Goexit 都能立即终止整个调用堆栈,这与 return 仅退出当前函数不同

标准库函数 Os.Exit 可终止进程,但不会执行延迟调用

竞争状态

锁住共享资源

通道

相比 Erlang,Go 并未实现严格的并发安全

允许全局变量、指针、引用类型这些非安全内存共享操作,就需要开发人员自行维护数据一致和完整性。
Go 鼓励使用 CSP 通道,以通信来代替内存共享,实现并发安全。

Don't communicate by sharing memory, share memory by communicating

CSP: Communicating Sequential Process

通过消息来避免竞态的模型除了 CSP,还有 Actor。但两者有较大区别。

作为 CSP 核心,通道(channel)是显式的,要求操作双方必须知道数据类型和具体通道,并不关心另一端操作者身份和数量。
可如果另一端未准备妥当,或取消未能及时处理时,会阻塞当前端。

相比起来,Actor 是透明的,它不在乎数据类型及通道,只要知道接收者信箱即可。
默认就是异步方式,发送发对消息是否接收和处理并不关心。

从底层实现上来说,通道只是一个队列。
同步模式下,发送和接收双方配对,然后直接复制数据给对方。
如配对失败,则置入等待队列,直到另一方出现后才被唤醒。
异步模式抢夺的则是数据缓冲槽。发送方要求有空槽可供写入,而接收方则要求有缓冲数据可读。
需求不符时,同样加入等待队列,直到另一方写入数据或腾出空槽后被唤醒。

除传递消息(数据)外,通道还常被用作事件通知。

在 Go 的异步编程中,通道类型(channel,类型名称为 chan)既可以用于协程之间的数据通信,也可以用于协程之间的同步:

通道类型有以下几种表示方式:

1
2
3
chan T    // 双向通道,既可以发送数据,也可以接收数据
chan<- T   // 只能向通道发送数据
<-chan T   // 只能从通道接收数据

其中,T 是通道中可存放的数据类型。例如

1
chan int

上述格式表示双向通道,通道可以存放 int 类型的值

通道数据的输入输出是通过 "<-" 运算符(称作"接收运算符")来完成的。"<-"位于通道变量之前表示从通道中接收数据;"<-" 位于通道变量后面表示向通道发送数据

1
2
3
var ch = ......
ch<- 5   // 向通道发送数据
var n = <-ch  // 从通道接收数据     

注意上述代码中,<-ch表达式仅表示从通道 ch 中读出数据,若要将读出的值赋值给变量 n,则必须使用赋值运算符(=)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

func main() {
    done := make(chan struct{}) // 结束事件
    c := make(chan string)      // 数据传输通道

    go func() {
        s := <-c // 接收消息
        println(s)
        close(done) // 关闭通道,作为结束通知
    }()

    c <- "hi!" // 发送消息
    <-done     // 阻塞,直到有数据或管道关闭
}

输出:

1
hi!

同步模式必须有配对操作的 goroutine 出现,否则会一直阻塞。
而异步模式在缓冲区未满或数据未读完前,不会阻塞。

1
2
3
4
5
6
7
8
9
package main

func main() {
    c := make(chan int, 3) // 创建带 3 个缓冲槽的异步通道
    c <- 1                 // 缓冲区未满,不会阻塞
    c <- 2
    println(<-c) // 缓冲区上有数据不会阻塞
    println(<-c)
}

输出:

1
2
1
2

多数时候,异步通道有助于提升性能,减少排队阻塞。

缓冲区大小仅是内部属性,不属于类型组成部分。
另外通道变量本身就是指针,可用相等操作符判断是否为同一对象或 nil

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
package main

import (
    "fmt"
    "unsafe"
)

func main() {
    var a, b chan int = make(chan int, 3), make(chan int)
    var c chan bool

    println(a == b)
    println(c == nil)

    fmt.Printf("%p, %d\n", a, unsafe.Sizeof(a))
}

输出:

1
2
3
false
true
0xc000022080, 8

虽然可传递指针来避免数据复制,但须额外注意数据并发安全。

内置函数 cap 和 len 返回缓冲区大小和当前已缓冲数量;
而对于同步通道则都返回 0,据此可判断通道是同步还是异步。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package main

func main() {
    a, b := make(chan int), make(chan int, 3)

    b <- 1
    b <- 2

    println("a:", len(a), cap(a))
    println("b:", len(b), cap(b))
}

输出:

1
2
a: 0 0
b: 2 3

收发

除使用简单的发送和接收操作符外,还可用 ok-idom 或 range 模式处理数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

func main() {
    done := make(chan struct{})
    c := make(chan int)

    go func() {
        defer close(done) // 确保发出结束通知

        for {
            x, ok := <-c
            if !ok { // 据此判断通道是否被关闭
                return
            }

            println(x)
        }
    }()

    c <- 1
    c <- 2
    c <- 3
    close(c)

    <-done
}

输出:

1
2
3
1
2
3

对于循环接收数据,range 模式更简洁一些

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package main

func main() {
    done := make(chan struct{})
    c := make(chan int)

    go func() {
        defer close(done) // 确保发出结束通知

        for x := range c { // 循环获取消息,直到通道被关闭
            println(x)
        }
    }()

    c <- 1
    c <- 2
    c <- 3
    close(c)

    <-done
}

及时用 close 函数关闭通道引发结束通知,否则可能会导致死锁

通知可以是群体性的。也未必就是通知结束,可以是任何需要表达的事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    ready := make(chan struct{})

    for i := 0; i < 3; i++ {
        wg.Add(1)

        go func(id int) {
            defer wg.Done()

            println(id, ":ready.") // 运动员准备就绪
            <-ready                // 等待发令
            println(id, ":running...")
        }(i)
    }

    time.Sleep(time.Second)
    println("Ready?Go!")

    close(ready) // 砰!

    wg.Wait()
}

输出:

1
2
3
4
5
6
7
0 :ready.
1 :ready.
2 :ready.
Ready?Go!
0 :running...
2 :running...
1 :running...

一次性事件用 close 效率更好,没有多余开销。
连续或多样性事件,可传递不同数据标志实现。还可使用 sync.Cond 实现单播或广播事件。

对于 closed 或 nil 通道,发送和接收操作都有相应规则:

  • 向已关闭通道发送数据,引发 panic
  • 从已关闭接收数据,返回已缓冲数据或零值
  • 无论收发,nil 通道都会阻塞
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
package main

func main() {
    c := make(chan int, 3)
    c <- 10
    c <- 20
    close(c)

    for i := 0; i < cap(c)+1; i++ {
        x, ok := <-c
        println(i, ":", ok, x)
    }
}

输出:

1
2
3
4
0 : true 10
1 : true 20
2 : false 0
3 : false 0

重复关闭,或关闭 nil 通道都会引发 panic 错误

1
2
panic:close of closed channel
panic:close of nil channel

实例化通道

通道对象的实例是通过 make 函数创建的,此函数可以创建切片、映射、通道类型的实例

下面的代码创建一个可存储 string 类型数据的通道类型

1
var c = make(chan string)

也可以这样:

1
var c = make(chan string, 0)

make 函数的第二个参数(size)表示通道对象的缓冲值,忽略此参数或者设置为 0 表示所创建的通道实例不使用缓冲。

下面的语句向通道发送数据

1
c <- "hello"

然后可以从通道接收数据:

1
<- c

当不再使用通道的实例时,可以调用 close 函数将其关闭

1
close(c)

通道常用于不同协程之间的通信,在同一个协程中使用通道意义不大

数据缓冲

无缓冲的通道要求发送与接收操作同时进行——向通道发送数据的同时必须有另一个协程在接收。
请看下面的例子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
    "fmt"
    "time"
)

func main() {
    var mych = make(chan uint)
    go func() {
        fmt.Println("开始执行新协程")
        mych <- 350
        fmt.Println("新协程执行完毕")
    }()

    time.Sleep(time.Second)
    fmt.Println("主协程即将退出")
}

输出:

1
2
开始执行新协程
主协程即将退出

main协程等待了1秒钟后退出,从输出结果可以看出,新启动的协程并没有完全被执行。
程序在向通道mych发送数据后就被阻塞,无法继续执行,这是因为整数350发送到通道后没有被及时被读取所致,解决方法是在main函数中接收通道中的数据。修改后的代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
    "fmt"
    "time"
)

func main() {
    var mych = make(chan uint)
    go func() {
        fmt.Println("开始执行新协程")
        mych <- 350
        fmt.Println("新协程执行完毕")
    }()
    <-mych
    time.Sleep(time.Second)
    fmt.Println("主协程即将退出")
}

由于新创建的协程与主协程是异步执行的,使得通道mych的发送与接收行为可以同时完成,程序不会被阻塞,最终两个协程都顺利执行。

带缓冲的通道的读与写可以不同时进行,举个例子说明。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package main

import (
    "fmt"
    "time"
)

func main() {
    var mych = make(chan string, 1)
    go func() {
        fmt.Println("开始执行新协程")
        mych <- "hello"
        fmt.Println("新协程执行完毕")
    }()
    time.Sleep(time.Second)
    fmt.Println("主协程即将退出")
}

这一次,虽然在主协程上没有接收通道中的数据,但程序可以正常完成执行,输出结果如下:

1
2
3
开始执行新协程
新协程执行完毕
主协程即将退出

这是因为此次创建的通道实例是带缓冲的,缓冲的元素个数为1。
所以,当新的协程代码向通道发送了一次数据后,数据会缓存在通道中,不要求立即被取出,代码就不会被阻塞——哪怕main函数中未接收通道的数据也不会阻塞。

不过,若是通道中缓存的数据量已满,再次向通道发送数据就会被阻塞,直到数据被接收为止。就像下面这样:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
    "fmt"
    "time"
)

func main() {
    var mych = make(chan string, 1)
    go func() {
        fmt.Println("开始执行新协程")
        mych <- "hello"
        mych <- "world"
        fmt.Println("新协程执行完毕")
    }()
    time.Sleep(time.Second)
    fmt.Println("主协程即将退出")
}

如果将make函数的调用修改为:

1
var mych = make(chan string, 2)

那么此时的缓存容量为2,发送两次数据不会被阻塞,当第三次向通道发送数据时就会阻塞。

单向通道

1
2
var ch1 = make(<-chan bool)
varr ch2 = make(chan<- bool)

ch1为单向通道实例,只能从通道接收数据,不能向通道发送数据;ch2也是单向通道实例,只能向通道发送数据,不能接收数据。

直接在代码中使用单向通道没有意义,因为数据无法完成输入和输出。不过,要是用于代码封装,作为数据进出的间接通道,单向通道就很合适。

同步

通道并非用来取代锁的,它们有各自不同的使用场景。
通道倾向于解决逻辑层次的并发处理架构,而锁则用来保护局部范围内的数据安全。

互斥锁

当多个Go协程同时访问某一段代码时,会出现逻辑混乱的现象。举个例子,定义一个throw函数,假设用于模拟抛球机工作。当小球的总数为0时,停止抛球。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
    "fmt"
    "time"
)

var Total int

func throw() {
    for {
        if Total <= 0 {
            break
        }
        time.Sleep(time.Millisecond * 300)
        Total--
        fmt.Printf("剩余 %d 个球\n", Total)
    }
}

func main() {

}

在main函数中启动四个新协程,表示四台抛球机在抛球,小球总数为20。

1
2
3
4
5
6
7
func main() {
    Total = 20
    for i := 0; i < 4; i++ {
        go throw()
    }
    time.Sleep(time.Second * 8)
}

然而,运行后会发现存在逻辑错误——剩余的小球总数可能会变为负数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
剩余 19 个球
剩余 18 个球
剩余 17 个球
剩余 16 个球
剩余 14 个球
剩余 12 个球
剩余 15 个球
剩余 13 个球
剩余 11 个球
剩余 10 个球
剩余 9 个球
剩余 8 个球
剩余 7 个球
剩余 5 个球
剩余 4 个球
剩余 6 个球
剩余 3 个球
剩余 0 个球
剩余 1 个球
剩余 2 个球
剩余 -1 个球

这是因为四个协程是相互独立的,它们同时执行throw函数,当协程A判断还有剩余的球后,即将抛出一个球。
正在此时协程B却把球抛出去了,而A根本不知道,于是它继续执行。也就是说,A并没有抛球,却把Total减掉1。
四个协程一起运行,这种情况会不断地发生,最终导致状态不统一,引发逻辑错误,就会出现剩余的小球总数为负数的结果。

要解决此问题,需要加一把“锁”,把抛一次球的整个过程锁定,只允许一个协程进行操作,其他协程“原地待命”。
当这个协程抛完一次球,解除锁定,然后其他协程再去抛球。

接下来对上述例子进行修改,在throw函数中加上互斥锁(sync包公开的Mutex类型)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
    "fmt"
    "sync"
    "time"
)

var Total int
var locker = new(sync.Mutex)

func throw() {
    for {
        locker.Lock()
        if Total <= 0 {
            locker.Unlock()
            break
        }
        time.Sleep(time.Millisecond * 300)
        Total--
        fmt.Printf("剩余 %d 个球\n", Total)
        locker.Unlock()
    }
}

func main() {
    Total = 20
    for i := 0; i < 4; i++ {
        go throw()
    }
    time.Sleep(time.Second * 8)
}

互斥锁的锁定范围应覆盖从对Total变量进行判断到将Total变量减去1这个过程,在此过程中,始终只允许一个协程访问代码,防止Total变量被意外更改。

Lock方法与Unlock方法的调用必须成对出现,即锁定资源后,要记得将其解锁,否则其他协程将永远无法访问资源。

经过修改后,就能得到正确的结果。

WaitGroup 类型

sync.WaitGroup类型内部维护一个计数器,某个Go协程调用Wait方法后会被阻塞,直到WaitGroup对象的计数器变为0。

调用Add方法可以增加计数器的值,调用Done方法会使计数器的值减1。
实际上,Done方法内部也调用了Add方法,传递的参数值为-1。下面是Done方法的源代码。

1
2
3
func (wg *waitGroup) Done() {
    wg.Add(-1)
}

所以,调用Add方法并向参数传递负值,也可以减少计数器的值。

在前面,有多个示例代码都会在main函数结束之前调用time.Sleep函数来让主协程暂停,用以等待其他协程执行完毕。就像下面这样:

1
2
3
func main() {
    time.Sleep(time.Second * 8)
}

使用接下来介绍的WaitGroup类型就不需要用Sleep函数来暂停了,只要在主协程上调用其Wait方法,主协程就会阻塞并且等到计数器为0时才会继续运行。

下面代码演示执行三个新的协程,计数器增加3,每个协程在执行完成时调用Done方法让计数器减1。
主协程上调用Wait方法后会一直处于等待状态,直到三个协程都顺利完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(3)

    for i := 1; i <= 3; i++ {
        go func(n int) {
            defer wg.Done()
            fmt.Printf("开始执行第 %d 个协程\n", n)
            time.Sleep(time.Second * 2)
            fmt.Printf("第 %d 个协程执行完毕\n", n)
        }(i)
    }

    wg.Wait()
    fmt.Println("所有协程已完成")
}

输出结果:

1
2
3
4
5
6
7
开始执行第 2 个协程
开始执行第 3 个协程
开始执行第 1 个协程
第 1 个协程执行完毕
第 3 个协程执行完毕
第 2 个协程执行完毕
所有协程已完成