Skip to content

并发控制(二)

Go 语言的 context 是应用开发常用的并发控制技术,它与 WaitGroup 最大的不同点是 context 对于派生 goroutine 有更强的控制力,它可以控制多级的 goroutine

context 翻译成中文是 "上下文",即它可以控制一组呈树状结构的 goroutine,每个 goroutine 拥有相同的上下文

由于 goroutine 派生出子 goroutine,而子 goroutine 又继续派生新的 goroutine,这种情况下使用 WaitGroup 就不太容易,因为子 goroutine 的个数不容易确定。而使用 context 就可以很容易实现

介绍

在 Go 语言中,每个独立调用一般都会被单独的协程处理。
但在处理一个请求时,往往可能需要在多个协程之间进行信息传递,甚至包括一层层地递进顺序传递,而且这种信息往往具有一定的场景状态。
如一个请求可能衍生出各个协程之间需要满足一定的约束关系,如登录状态、前一个协程的计算结果、传递请求全局变量等功能。

Go 语言为开发人员提供了一个解决方案,即标准库的 context 包,有的地方也称为上下文。
使用上下文可以在多个协程之间传递请求相关的数据、主动取消上下文或按照时间自动取消上下文等

每个协程在执行之前,一般都需要了解当前的执行状态,通常会将这些状态包装在上下文变量中进行传递。
上下文几乎已经成为传递与 Request 同生命周期的变量的标准方法

当程序接收到一个网络请求 Request, 在处理 Request 时,可能需要开启不同的协程来获取数据与逻辑处理,即一个请求 Request,可能需要在多个协程中被处理。
这些协程需要共享 Request 的一些信息,同时当顶层 Context 被取消或者超时的时候,所有从这个顶层 Request 创建的 Context 也应该结束。
这些都可以通过 Context 来实现,Context 就像是 Request 中的全局变量能让大家共享数据,当然实际上它是需要创建并传递的。

ccontext 包实现了在程序协程之间共享状态变量的方法,在被调用程序单元的外部,通过设置上下文变量 ctx,将过期或取消信号传递给被调用的程序单元

Context 包中定义的 Context 结构如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Context 包含过期、取消信号、request 值传递等,方法在多个协程中线程安全
type Context interface {
    // Done 方法在 context 被取消或者超时返回一个 close 的 channel
    Done() <-chan struct {}

    Err() error

    // Deadline 返回 context 超时时间
    Deadline() (deadline time.Time, ok bool)

    // Value 返回 context 相关 key 对应的值
    Value(key interface {}) interface{}
}
  • Deadline 会返回一个超时时间,超时后 Context 无效
  • Done 方法返回一个通道(channel),当 Context 被取消或过期时,该通道关闭,即它是一个表示是否已关闭的信号
  • 当 Done 通道关闭后,Err 方法表明 Context 被取消的原因
  • Value 是可以共享的数据

Context 的创建和调用关系总是层层递进的,就像社会组织的层次一样,Context 创建者的协程可以主动关闭其下层的 Context 的执行。
为了实现这种关系,Context 结构就像一棵树,叶子节点总是由根节点衍生出来的

要创建上下文树,第一步就是要得到根节点,context.Background 函数的返回值就是根节点:

1
func Background() Context

Background() 函数返回空的上下文,该上下文一般由接收请求的第一个协程创建,作为进入请求的上下文根节点,它不能被取消,没有值,也没有过期时间。它常常作为处理 Request 的顶层上下文存在。

有了根节点,又该怎么创建其他的子节点、孙节点呢?context 包提供了四类函数来创建它们。

1
2
3
4
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key interface{}, val interface{}) Context

这些函数都接收一个 Context 类型的参数 parent,并返回一个 Context 类型的值,这样就层层创建出不同的节点。父节点创建 Context,并传递给子节点。

怎么通过 Context 传递改变的状态呢?使用 Context 的协程师无法取消某个操作的,只有父协程可以取消操作。
在父协程中可以通过 WithCancel 函数获得一个 CancelFunc 函数类型变量,从而可以手工取消这个 Context

WithCancel 函数将父节点 Context 复制到子节点,并且返回一个额外的 CancelFunc 函数类型变量,该函数类型的定义为:

1
type CancelFunc func()

调用 CancelFunc 对象 cancel 将取消对应的 Context 对象,这就是主动取消 Context 的方法。
在父节点的 Context 所对应的环境中,通过 WithCancel 函数不仅可以创建该节点的 Context,同时也获得了该节点 Context 的控制权,一旦执行该函数取消,该节点 Context 就结束了,而子节点则需要根据 context.Done() 来判断是否结束。例如:

1
2
3
select {
    case <-ctx.Done():
}

WithDeadline 函数的作用也差不多,它返回的 Context 类型值同样是 parent 的副本,但其过期时间由 deadline 和 parent 的过期时间共同决定。
这是因为父节点过期时,其所有的子孙节点必须同时关闭;反之,返回的父节点的过期时间则为 deadline

WithTimeout 函数与 WithDeadline 类似,不过它传入的是上下文从现在开始剩余的生命时长。
它们同样也都返回了所创建的子上下文的控制权及一个 CancelFunc 类型的函数变量。

当顶层的 Request 请求函数结束后,就可以取消某个上下文,从而再在对应协程中根据 ctx.Done() 来决定是否结束协程本身。

WithValue 函数返回 parent 的一个副本,调用该副本的 Value(key) 方法将得到对应 key 的值。
不光可以将根节点原有的值保留,还可以在子孙节点中加入新的值,注意若存在新、旧 Key 相同的情况,则旧 key 的值会被覆盖。

Context 对象的生存周期一般仅为一个请求的处理周期,即针对一个请求创建一个 Context 变量(它是上下文树结构的根)。
在请求处理结束后,撤销此变量,释放资源

每次创建一个协程时,可以将原有的上下文传递给这个子协程,或者新创建一个子上下文传递给这个协程

上下文能灵活地存储不同类型、不同数目的值,并且使多个协程安全地读写其中的值

当通过父 Context 对象创建子上下文对象时,即可获得子上下文的一个取消函数,这样父上下文对象的创建环境就获得了对子上下文的撤销权。

使用上下文时需遵循以下规则:
(1) 上下文变量需要作为第一个参数使用,一般命名为 ctx
(2) 不要传入一个 nil 的上下文,不确定 Context 时可传一个 context.TODO
(3) 使用上下文的 Value 相关方法只传递请求相关的元数据,不要传递可选参数
(4) 同样的上下文可以用来传递到不同的协程中,上下文在多个协程中时安全的

在子上下文被传递到的协程中,应该对该子上下文的 Done 通道(channel) 进行监控,一旦该通道被关闭(即上层运行环境撤销了本协程的执行),应主动终止对当前请求信息的处理,释放资源并返回

上下文应用

通道、上下文以及 sync 包,通过这三者,完全可以达到完美控制协程运行的目的

通过 go 关键字很容易就能启动一个协程,但很好地管理和控制它们的运行却比较难。
因此可以根据场景使用以下几种方法。

(1) 使用 sync.WaitGroup,它用于线程总同步,会等待一组线程集合完成,才会继续向下执行,这对监控所有子协程全部完成的情况特别有用,但要控制某个协程就无能为力了

(2) 使用通道来传递消息,一个协程发送通道信号,另一个协程通过 select 得到通道信息,这种方式可以满足协程之间的通信,控制协程运行。
但如果协程数量达到一定程序,就很难把控了。或者这两个协程还和其他协程也有类似通信,例如 A 与 B,B 与 C,如果 A 发信号 B 退出了,C 有可能等不到 B 的通道信号而被遗忘。

(3) 使用上下文来传递消息,上下文是层层传递机制,根节点完全控制了子节点,根节点(父节点)可以根据需要选择自动还是手动结束子节点。
而每层节点所在的协程就可以根据信息来决定下一步的操作

下面来看看怎样使用上下文控制协程的运行

这里用上下文同时控制两个协程,这两个协程都可以收到 cancel() 发出的信息,甚至 doNothing 不结束协程可反复接收取消信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main

import (
    "context"
    "log"
    "os"
    "time"
)

var logs *log.Logger

func doClearn(ctx context.Context) {
    // for 循环每 1 秒检查一下,判断 ctx 是否被取消了,如果是就退出
    for {
        time.Sleep(1 * time.Second)
        select {
        case <-ctx.Done():
            logs.Println("doClearn:收到 Cancel, 做好收尾工作后马上退出。")
            return
        default:
            logs.Println("doClearn:每隔 1 秒观察信号,继续观察...")
        }
    }
}

func doNothing(ctx context.Context) {
    for {
        time.Sleep(3 * time.Second)
        select {
        case <-ctx.Done():
            logs.Println("doNothing:收到 Cancel,但不退出......")

            // 注释 return 可以观察到,ctx.Done() 信号时可以一直接收到的,return 不注释意味退出协程
            // return
        default:
            logs.Println("doNothing:每隔 3 秒观察信号,一直运行")
        }
    }
}

func main() {
    logs = log.New(os.Stdout, "", log.Ltime)

    // 新建一个 ctx
    ctx, cancel := context.WithCancel(context.Background())
    // 传递 ctx
    go doClearn(ctx)
    go doNothing(ctx)

    // 主程序阻塞 20 秒,留给协程来演示
    time.Sleep(20 * time.Second)
    logs.Println("cancel")

    // 调用 cancel: context.WithCancel 返回的 CancelFunc
    cancel()

    // 发出 cancel 命令后,主程序阻塞 10 秒,再看协程的运行情况
    time.Sleep(10 * time.Second)
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
23:10:55 doClearn:每隔 1 秒观察信号,继续观察...
23:10:56 doClearn:每隔 1 秒观察信号,继续观察...
23:10:57 doNothing:每隔 3 秒观察信号,一直运行
23:10:57 doClearn:每隔 1 秒观察信号,继续观察...
23:10:58 doClearn:每隔 1 秒观察信号,继续观察...
23:10:59 doClearn:每隔 1 秒观察信号,继续观察...
23:11:00 doNothing:每隔 3 秒观察信号,一直运行
23:11:00 doClearn:每隔 1 秒观察信号,继续观察...
23:11:01 doClearn:每隔 1 秒观察信号,继续观察...
23:11:02 doClearn:每隔 1 秒观察信号,继续观察...
23:11:03 doNothing:每隔 3 秒观察信号,一直运行
23:11:03 doClearn:每隔 1 秒观察信号,继续观察...
23:11:04 doClearn:每隔 1 秒观察信号,继续观察...
23:11:05 doClearn:每隔 1 秒观察信号,继续观察...
23:11:06 doNothing:每隔 3 秒观察信号,一直运行
23:11:06 doClearn:每隔 1 秒观察信号,继续观察...
23:11:07 doClearn:每隔 1 秒观察信号,继续观察...
23:11:08 doClearn:每隔 1 秒观察信号,继续观察...
23:11:09 doNothing:每隔 3 秒观察信号,一直运行
23:11:09 doClearn:每隔 1 秒观察信号,继续观察...
23:11:10 doClearn:每隔 1 秒观察信号,继续观察...
23:11:11 doClearn:每隔 1 秒观察信号,继续观察...
23:11:12 doNothing:每隔 3 秒观察信号,一直运行
23:11:12 doClearn:每隔 1 秒观察信号,继续观察...
23:11:13 doClearn:每隔 1 秒观察信号,继续观察...
23:11:14 cancel
23:11:14 doClearn:收到 Cancel, 做好收尾工作后马上退出。
23:11:15 doNothing:收到 Cancel,但不退出......
23:11:18 doNothing:收到 Cancel,但不退出......
23:11:21 doNothing:收到 Cancel,但不退出......

下面代码用上下文嵌套控制 3 个协程 A,B,C。在主程序发出 cancel 信号后,每个协程都能接收根上下文的 Done() 信号而退出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main

import (
    "context"
    "fmt"
    "time"
)

func B(ctx context.Context) int {
    fmt.Println("A value in B:", ctx.Value("AFunction"))
    ctx = context.WithValue(ctx, "BFunction", 999)

    go C(ctx)

    select {
    // 监测自己上层的 ctx
    case <-ctx.Done():
        fmt.Println("B Done")
        return -2
    }
    return 2
}

func C(ctx context.Context) int {
    fmt.Println("A value in C:", ctx.Value("AFunction"))
    fmt.Println("B value in C:", ctx.Value("BFunction"))
    select {
    // 结束时候做点什么
    case <-ctx.Done():
        fmt.Println("C Done")
        return -3
    }
    return 3
}

func A(ctx context.Context) int {
    ctx = context.WithValue(ctx, "AFunction", "Great")

    go B(ctx)
    select {
    // 监测自己上层的 ctx
    case <-ctx.Done():
        fmt.Println("A Done")
        return -1
    }
    return 1
}

func main() {
    // 自动取消(定时取消)
    {
        timeout := 10 * time.Second
        ctx, _ := context.WithTimeout(context.Background(), timeout)

        fmt.Println("A 执行完成,返回: ", A(ctx))
        select {
        case <-ctx.Done():
            fmt.Println("context Done")
            break
        }
    }
    time.Sleep(20 * time.Second)
}

输出:

1
2
3
4
5
6
7
8
A value in B: Great
A value in C: Great
B value in C: 999
C Done
A Done
A 执行完成,返回:  -1
context Done
B Done

最后看看上下文在 http 中是怎么传递的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
    "context"
    "net/http"
    "time"
)

func ContextMiddle(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        cookie, _ := r.Cookie("Check")
        if cookie != nil {
            ctx := context.WithValue(r.Context(), "Check", cookie.Value)
            next.ServeHTTP(w, r.WithContext(ctx))
        } else {
            next.ServeHTTP(w, r)
        }
    })
}

// 强制设置通行 cookie
func CheckHandler(w http.ResponseWriter, r *http.Request) {
    expitation := time.Now().Add(24 * time.Hour)
    cookie := http.Cookie{Name: "Check", Value: "42", Expires: expitation}
    http.SetCookie(w, &cookie)
}

func indexHandler(w http.ResponseWriter, r *http.Request) {
    // 通过取中间件传过来的 context 值来判断是否放行通过
    if chk := r.Context().Value("Check"); chk == "42" {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("Let's go!\n"))
    } else {
        w.WriteHeader(http.StatusNotFound)
        w.Write([]byte("No Pass!"))
    }
}

func main() {
    mux := http.NewServeMux()

    mux.HandleFunc("/", indexHandler)

    // 人为设置通行 cookie
    mux.HandleFunc("/chk", CheckHandler)

    ctxMux := ContextMiddle(mux)
    http.ListenAndServe(":8080", ctxMux)
}

打开浏览器访问 http://localhost:8080/chk,然后访问 http://localhost:8080/,由于 cookie 已经设置,会看到正常通行的结果,否则将会看到无法正常通行时的信息。
Context 信息的传递主要靠中间件 ContextMiddle 来进行

context 的实现原理

context 实际上只定义了接口,凡是实现该接口的类都可称为是一种 context,官方包中实现了几个常用的 context,分别用于不同的场景

接口定义

源码包中 src/context/context.go:Context 定义了该接口:

1
2
3
4
5
6
7
8
9
type Context interface {
    Deadline() (deadline time.Time, ok bool)

    Done() <-chan struct {}

    Err() error

    Value(key interface{}) interface{}
}

基础的 context 接口只定义了 4 个方法,下面分别简要说明一下

(1) Deadline()

该方法返回一个 deadline 和标识是否已设置 deadline 的 bool 值,如果没有设置 deadline,则 ok == false,此时 deadline 为一个初始值的 time.Time 值

(2) Done()

该方法返回一个 channel,需要在 select-case 语句中使用,如 case <-context.Done():

当 context 关闭后,Done() 返回一个被关闭的管道,关闭的管道仍然是可读,据此 goroutine 可以收到关闭请求;当 context 还未关闭时,Done() 返回 nil

(3) Err()

该方法描述 context 关闭的原因。关闭原因由 context 实现控制,不需要用户设置。
比如 Deadline context,关闭原因可能是因为 deadline,也可能提前被主动关闭

  • 因 deadline 关闭: context deadline exceeded
  • 因主动关闭: context canceled

当 context 关闭后,Err() 返回 context 的关闭原因;当 context 还未关闭时,Err() 返回 nil

(4) Value()

有一种 context,它不是用于控制呈树状分布的 goroutine,而是用于在树状分布的 goroutine 之间传递信息

Value() 方法就是用于此种类型的 context,该方法根据 key 值查询 map 中的 value。具体使用在后面示例中说明

空 context

context 包中定义了一个空的 context,名为 emptyCtx,用于 context 的根节点,空的 context 只是简单地实现了 Context,本身不包含任何值,仅用于其他 context 的父节点

emptyCtx 类型的定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type emptyCtx int

func (*emptyCtx) Deadline() (dealine time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
}

context 包中定义了一个公用的 emptyCtx 全局变量,名为 background,可以使用 context.Background() 获取它,实现代码如下:

1
2
3
4
var background = new(emptyCtx)
func Background() Context {
    return background
}

context 包提供了四个方法创建不同类型的 context,使用这四个方法时如果没有父 context, 则都需要传入 background,即将 background 作为其父节点:

  • WithCancel()
  • WithDeadline()
  • WithTimeout()
  • WithValue()

context 包中实现 Context 接口的 struct,除了 emptyCtx,还有 cancelCtx、timeCtx 和 valueCtx 三种,正是基于这三种 context 实例,实现了上述四种类型的 context

struct cancelCtxtimeCtxvalueCtx 都继承于 Context,下面分别介绍这三个 struct

cancelCtx

源码包中 src/context/context.go:cancelCtx 定义了该类型的 context:

1
2
3
4
5
6
7
8
type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     chan struct{}         // created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

children 中记录了由此 context 派生的所有 child,此 context 被 "cancel" 时会把其中的所有 child 都 "cancel" 掉

cancelCtx 与 deadline 和 value 无关,所以只需要实现 Done() 和 Err() 外露接口即可