Skip to content

管道

管道是 Go 在语言层面提供的协程间的通信方式,比 UNIX 的管道更易用也更轻便。
我们基于源码来分析管道的实现机制

热身测验

题目一:

下面关于管道的描述正确的是(单选)?
A: 读 nil 管道会触发 panic
B: 写 nil 管道会触发 panic
C: 读关闭的管道会触发 panic
D: 写关闭的管道会触发 panic

解答:
读写 nil 管道均会阻塞。关闭的管道仍然可以读取数据,向关闭的管道写数据会触发 panic。本题选 D

题目二:

下面的函数输出什么?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import "fmt"

func ChanCap() {
    ch := make(chan int, 10)
    ch <- 1
    ch <- 2
    fmt.Println(len(ch))
    fmt.Println(cap(ch))
}

func main() {
    ChanCap()
}

解答:
内置函数 len() 和 cap() 分别用于查询管道缓存中数据的个数及缓存的大小。函数输出:

1
2
2
10

题目三:

以下选项可以实现互斥锁的是(单选)?

A:

1
2
3
4
5
6
7
8
var counter int = 0
var ch = make(chan int, 1)

func Worker() {
    ch <- 1
    counter++
    <-ch
}

B:

1
2
3
4
5
6
7
8
var counter int = 0
var ch = make(chan int)

func Worker() {
    <-ch
    counter++
    ch <- 1
}

C:

1
2
3
4
5
6
7
8
var counter int = 0
var ch = make(chan int, 1)

func Worker() {
    <-ch
    counter++
    ch <- 1
}

D:

1
2
3
4
5
6
7
8
var counter int = 0
var ch = make(chan int)

func Worker() {
    ch <- 1
    counter++
    <-ch
}

解答:
只有一个缓冲区的管道,写入数据类似于加锁,读出数据类似于释放锁。本题选 A

特性速览

初始化

声明和初始化管道的方式有以下两种:

  • 变量声明
  • 使用内置函数 make()

变量声明:

1
var ch chan int   // 声明管道

这种方式声明的管道,值为 nil。每个管道只能存储一种类型的数据

使用内置函数 make():

使用内置函数 make() 可以创建无缓冲管道和带缓冲管道

1
2
ch1 := make(chan string)   // 无缓冲管道
ch2 := make(chan string, 5)   // 带缓冲管道

管道操作:

操作符:

操作符 "<-" 表示数据流向,管道在左表示向管道写入数据,管道在右表示从管道读出数据,如下所示:

1
2
3
4
ch := make(chan int, 10)
ch <- 1    // 数据流入管道
d := <-ch   // 数据流出管道
fmt.Println(d)

默认的管道为双向可读写,管道在函数间传递时可以使用操作符限制管道的读写,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func ChanParamRw(ch chan int) {
    // 管道可读写
}

func ChanParamR(ch <-chan int) {
    // 只能从管道读取数据
}

func ChanParamW(ch chan<- int) {
    // 只能向管道写入数据
}

数据读写:

管道没有缓冲区时,从管道读取数据会阻塞,直到有协程向管道中写入数据。类似地,向管道写入数据也会阻塞,直到有协程从管道读取数据。

管道有缓冲区但缓冲区没有数据时,从管道读取数据也会阻塞,直到有协程写入数据。
类似地,向管道写入数据时,如果缓冲区已满,那么也会阻塞,直到有协程从缓冲区中读出数据。

对于值为 nil 的管道,无论读写都会阻塞,而且是永久阻塞

使用内置函数 close() 可以关闭管道,尝试向关闭的管道写入数据会触发 panic,但关闭的管道仍可读

管道读取表达式最多可以给两个变量赋值:

1
2
v1 := <-ch
x, ok := <-ch

第一个变量表示读出的数据,第二个变量(bool 类型)表示是否成功读取了数据,需要注意的是,第二个变量不用于指示管道的关闭状态

第二个变量常常会被错误地理解成管道的关闭状态,那是因为它的值确实跟管道的关闭状态有关,更确切地说跟管道缓冲区中是否有数据有关

一个已关闭的管道有两种情况

  • 管道缓冲区已没有数据
  • 管道缓冲区还有数据

对于第一种情况,管道已关闭且缓冲区中没有数据,那么管道读取表达式返回的第一个变量为相应类型的零值,第二个变量为 false。

对于第二种情况,管道已关闭但缓冲区中仍有数据,那么管道读取表达式返回的第一个变量为读到的数据,第二个变量为 true。
可以看到,只有管道已关闭且缓冲区中没有数据时,管道读取表达式返回的第二个变量才跟管道关闭状态一致

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import "fmt"

func main() {
    ch := make(chan int, 10)
    ch <- 1
    ch <- 2
    val, hasCache := <-ch
    fmt.Println(val, hasCache)
}
// 1 true
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
package main

import "fmt"

func main() {
    ch := make(chan int)
    go func() {
        ch <- 1
    }()
    val, hasCache := <-ch
    fmt.Println(val, hasCache)
}
// 1 true
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package main

import "fmt"

func main() {
    ch := make(chan int)
    close(ch)
    val, hasCache := <-ch
    fmt.Println(val, hasCache)
}
// 0 false
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package main

import "fmt"

func main() {
    ch := make(chan int, 10)
    ch <- 2333
    close(ch)
    val, hasCache := <-ch
    fmt.Println(val, hasCache)
}
// 2333 true

小结:

内置函数 len() 和 cap() 作用域管道,分别用于查询缓冲区中数据的个数及缓冲区的大小
管道实现了一种 FIFO(先进先出)的队列,数据总是按照写入的顺序流出管道

协程读取管道时,阻塞的条件有:

  • 管道无缓冲区
  • 管道的缓冲区中无数据
  • 管道的值为 nil

协程写入管道时,阻塞的条件有:

  • 管道无缓冲区
  • 管道的缓冲区已满
  • 管道的值为 nil

实现原理

数据结构

源码包中 src/runtime/chan.go:hchan 定义了管道的数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type hachn struct {
    qcount   uint           // 当前队列中剩余的元素个数
    dataqsiz uint           // 环形队列长度,即可以存放的元素个数
    buf      unsafe.Pointer // 环形队列指针
    elemsize uint16         // 每个元素的大小
    closed   uint32         // 标识关闭状态
    elemtype *_type         // 元素类型
    sendx    uint           // 队列下标,指示元素写入时存放到队列中的位置
    recvx    uint           // 队列下标,指示下一个被读取的元素在队列中的位置
    recvq    waitq          // 等待读消息的协程队列
    sendq    waitq          // 等待写消息的协程队列
    lock     mutex          // 互斥锁,chan 不允许并发读写
}

从数据结构可以看出管道由队列、类型信息、协程等待队列组成

(1) 环形队列

chan 内部实现了一个环形队列作为其缓冲区,队列的长度是在创建 chan 时指定的。

下图展示了一个可缓存 6 个元素的管道

可缓存6个元素的管道

  • dataqsiz 指示了队列长度为 6,即可缓存 6 个元素
  • buf 指向队列的内存
  • qcount 表示队列中还有两个元素
  • sendx 指示后续写入的数据存储的位置,取值为 [0, 6)
  • recvx 指示从该位置读取数据,取值为 [0, 6)

使用数组实现队列是比较常见的操作,sendx 和 recvx 分别表示队尾和队首,sendx 指示数据写入的位置,recvx 指示数据读取的位置

(2) 等待队列

从管道读取数据时,如果管道缓冲区为空或没有缓冲区,则当前协程会被阻塞,并被加入 recvq 队列。
向管道写入数据时,如果管道缓冲区已满或没有缓冲区,则当前协程会被阻塞,并被加入 sendq 队列

下图展示了一个没有缓冲区的管道,有几个协程阻塞等待读数据

等待读数据

处于等待队列中的协程会在其他协程操作管道时被唤醒:

  • 因读阻塞的协程会被向管道写入数据的协程唤醒
  • 因写阻塞的协程会被向管道读取数据的协程唤醒

注意,一般情况下 recvq 和 sendq 至少有一个为空。
只有一个例外,那就是同一个协程使用 select 语句向管道一边写入数据,一边读取数据,此时协程辉分别位于两个等待队列中

(3) 类型信息

一个管道只能传递一种类型的值,类型信息存储在 hchan 数据结构中

  • elemtype 代表类型,用于在数据传递过程中赋值
  • elemsize 代表类型大小,用于在 buf 中定位元素的位置

如果需要管道传递任意类型的数据,则可以使用 interface{} 类型

(4) 互斥锁

一个管道同时仅允许被一个协程读写,为简单起见,后面介绍读写过程时不再涉及加锁和解锁

管道操作

(1) 创建管道

创建管道的过程实际上是初始化 hcahn 结构,其中类型信息和缓冲区长度由内至函数 make() 指定,buf 的大小则由元素大小和缓冲区长度共同决定

创建管道的伪代码如下所示

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func makechan(t *chantype, size int) *hchan {
    var c *hchan
    c = new(hchan)
    c.buf = malloc(元素类型大小*size)
    c.elemsize = 元素类型大小
    c.elemtype = 元素类型
    c.dataqsiz = size

    return c
}

(2) 向管道写数据

向一个管道中写入数据的简单过程如下:

  • 如果缓冲区中有空余位置,则将数据写入缓冲区,结束发送过程
  • 如果缓冲区中没有空余位置,则将当前协程加入 sendq 队列,进入睡眠并等待被读协程唤醒

在实现时有一个小技巧,当接收队列 recvq 不为空时,说明缓冲区中没有数据但有协程在等待数据,此时会把数据直接传递给 recvq 队列中的第一个协程,而不必再写入缓冲区

简单流程如下图所示

写数据流程图

(3) 从管道读数据

从一个管道读取数据的简单过程如下:

  • 如果缓冲区中有数据,则从缓冲区中取出数据,结束读取过程
  • 如果缓冲区中没有数据,则将当前协程加入 recvq 队列,进入睡眠等待被写协程唤醒

类似地,如果等待发送队列 sendq 不为空,且没有缓冲区,那么此时将直接从 sendq 队列的第一个协程中获取数据

简单流程如下图所示:

读数据流程图

(4) 关闭管道

关闭管道时会把 recvq 中的协程全部唤醒,这些协程获取的数据都为对应类型的零值。同时会把 sendq 队列中的协程全部唤醒,但这些协程会触发 panic

除此之外,其他会触发 panic 的操作还有:

  • 关闭值为 nil 的管道
  • 关闭已经被关闭的管道
  • 向已经关闭的管道写入数据

常见用法

(1) 单向管道

顾名思义,单向管道指只能用于发送或接收数据,由管道的数据结构我们知道,实际上并没有单向管道

所谓单向管道只是对管道的一种使用限制,这跟 C 语言使用 const 修饰函数参数为只读是一个道理

  • func readChan(chanName <-chan int): 通过形参限定函数内部只能从管道中读取数据
  • func writeChan(chanName chan<- int): 通过形参限定函数内部只能向管道中写入数据

一个简单的示例程序如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func readChan(chanName <-chan int) {
    <- chanName
}

func writeChan(chanName chan<- int) {
    chanName <- 1
}

func main() {
    var mychan = make(chan int, 10)

    writeChan(mychan)
    readChan(mychan)
}

mychan 是一个正常的管道,而 readChan() 参数限制了传入的管道只能用来读,writeChan() 参数限制了传入的管道只能用来写

(2) select

使用 select 可以监控多个管道,当其中某一个管道可操作时就触发相应的 case 分支

一个简单的示例程序如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
    "fmt"
    "time"
)

func addNumberToChan(chanName chan int) {
    for {
        chanName <- 1
        time.Sleep(1 * time.Second)
    }
}

func main() {
    var chan1 = make(chan int, 10)
    var chan2 = make(chan int, 10)

    go addNumberToChan(chan1)
    go addNumberToChan(chan2)

    for {
        select {
        case e := <-chan1:
            fmt.Printf("Get element from chan1: %d\n", e)
        case e := <-chan2:
            fmt.Printf("Get element from chan2: %d\n", e)
        default:
            fmt.Printf("No element in chan1 and chan2.\n")
            time.Sleep(1 * time.Second)
        }
    }
}

程序中创建了两个管道: chan1 和 chan2。addNumberToChan() 函数会向两个管道中周期性地写入数据。通过 select 可以监控两个管道,任意一个可读时就从中读出数据。

程序输出如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
D:\SourceCode\GoExpert\src>go run main.go
Get element from chan1: 1
Get element from chan2: 1
No element in chan1 and chan2.
Get element from chan2: 1
Get element from chan1: 1
No element in chan1 and chan2.
Get element from chan2: 1
Get element from chan1: 1
No element in chan1 and chan2.

由输出可见,从管道中读出数据的顺序是随机的,事实上 select 语句的多个 case 语句的执行顺序是随机的

通过这个例子可以看出,select 的 case 语句读管道时不会阻塞,尽管管道中没有数据。
这是由于 case 语句编译后调用读管道时会明确传入不阻塞的参数,读不到数据时不会将当前协程加入等待队列,而是直接返回

(3) for-range

通过 for-range 可以持续地从管道中读出数据,好像在遍历一个数组一样,当管道中没有数据时会阻塞当前协程,与读管道时的阻塞处理机制一样。
即便管道被关闭,for-range 也可以优雅地结束,如下所示

1
2
3
4
5
func chanRange(chanName chan int) {
    for e:= range chanName {
        fmt.Printf("Get element form chan: %d\n", e)
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
package main

import "fmt"

func main() {

    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue)

    for elem := range queue {
        fmt.Println(elem)
    }
}

输出:

1
2
one
two