Skip to content

并发控制(三)

互斥锁 sync.Mutex

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

func main() {
    var state = make(map[int]int)

    var mutex = &sync.Mutex{}

    var readOps uint64
    var writeOps uint64

    for r := 0; r < 100; r++ {
        go func() {
            total := 0
            for {

                key := rand.Intn(5)
                mutex.Lock()
                total += state[key]
                mutex.Unlock()
                atomic.AddUint64(&readOps, 1)

                time.Sleep(time.Millisecond)
            }
        }()
    }

    for w := 0; w < 10; w++ {
        go func() {
            for {
                key := rand.Intn(5)
                val := rand.Intn(100)
                mutex.Lock()
                state[key] = val
                mutex.Unlock()
                atomic.AddUint64(&writeOps, 1)
                time.Sleep(time.Millisecond)
            }
        }()
    }

    time.Sleep(time.Second)

    readOpsFinal := atomic.LoadUint64(&readOps)
    fmt.Println("readOps:", readOpsFinal)
    writeOpsFinal := atomic.LoadUint64(&writeOps)
    fmt.Println("writeOps:", writeOpsFinal)

    mutex.Lock()
    fmt.Println("state:", state)
    mutex.Unlock()
}

输出:

1
2
3
readOps: 79800
writeOps: 7984
state: map[0:81 1:23 2:55 3:24 4:42]

Go 语言包中的 sync 包提供了两种锁类型: sync.Mutex 和 sync.RWMutex,前者是互斥锁,后者是读写锁

互斥锁是在并发程序中对共享资源进行访问控制的主要手段,在 Go 语言中,更推崇使用通道来实现资源共享和通信。
对此 Go 语言提供了非常简单易用的 Mutex,Mutex 为一结构类型,对外暴露 Lock() 和 Unlock() 两个方法,分别用于加锁和解锁

  • 同一个协程中同步调用使用 Lock() 加锁后,不能再对其加锁,否则会引发运行时异常。只能在 Unlock() 之后再次 Lock()。多个协程中异步调用 Lock() 没问题,但每个协程之间产生了锁竞争,因此不会有运行时异常。互斥锁适用于只允许有一个读或者写的场景,所以该锁也叫做全局锁
  • Unlock() 用于解锁,如果在使用 Unlock() 前未加锁,就会引起一个运行错误。已经锁定的 Mutex 并不与特定的协程相关联,这样可以利用一个协程对其加锁,再利用其他协程对其解锁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
    "fmt"
    "sync"
    "time"
)

var mutex sync.Mutex

func LockA() {
    mutex.Lock()
    fmt.Println("Lock in A")
    LockB()
    time.Sleep(5)
    fmt.Println("Wake up in A")
    mutex.Unlock()
    fmt.Println("UnLock in A")
}

func LockB() {
    fmt.Println("B")
    mutex.Lock()
    fmt.Println("Lock in B")
    mutex.Unlock()
    fmt.Println("UnLock in B")
}

func main() {
    LockA()
    time.Sleep(10)
}

LockA 中有 Lock(),LockB 中也有 Lock(),LockB 的 Lock() 运行时,锁还没有 UnLock(),程序发生 panic。
这是在同步调用互斥锁中常见的问题,一般在对一对互斥锁中间不要调用其他函数,即使要用也尽量采用异步的方式

可以试试把 LockA 的 LockB() 调用改为 go LockB()

建议: 同一个互斥锁的成对锁定和解锁操作可以放在同一层次的代码块中。使用互斥锁的经典模式如下所示:

1
2
3
4
5
6
var lck sync.Mutex
func foo() {
    lck.Lock()
    defer lck.Unlock()
    // ...
}

lck.Lock() 会阻塞直到获取锁,然后利用 defer 语句在函数返回时自动释放锁

下面代码通过三个协程来体现 sync.Mutex 对资源的访问控制特征

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    wg := sync.WaitGroup{}

    var mutex sync.Mutex
    fmt.Println("Locking (G0)")
    mutex.Lock()
    fmt.Println("locked (G0)")
    wg.Add(3)

    for i := 1; i < 4; i++ {
        go func(i int) {
            fmt.Printf("Locking (G%d)\n", i)
            mutex.Lock()
            fmt.Printf("locked (G%d)\n", i)

            time.Sleep(time.Second * 2)
            mutex.Unlock()
            fmt.Printf("unlocked (G%d)\n", i)
            wg.Done()
        }(i)
    }

    time.Sleep(time.Second * 5)
    fmt.Println("ready unlock (G0)")
    mutex.Unlock()

    fmt.Println("unlocked (G0)")

    wg.Wait()
}

程序输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Locking (G0)
locked (G0)
Locking (G3)
Locking (G2)
Locking (G1)
ready unlock (G0)
unlocked (G0)
locked (G3)
unlocked (G3)
locked (G2)
unlocked (G2)
locked (G1)
unlocked (G1)

通过程序执行结果可以看到,当有锁释放时,才能进行加锁动作,当 G0 的锁释放时,等待加锁的 G1,G2,G3 都会竞争加锁的机会,这里是 G3 抢到了加锁的机会

Mutex 也可以作为结构体的一部分,这样结构体在被多线程处理时数据安全才有保障。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
    "fmt"
    "sync"
    "time"
)

type Book struct {
    BookName string
    L        *sync.Mutex
}

func (bk *Book) SetName(wg *sync.WaitGroup, name string) {
    defer func() {
        fmt.Println("Unlock set name:", name)
        bk.L.Unlock()
        wg.Done()
    }()

    bk.L.Lock()
    fmt.Println("Lock set name:", name)
    time.Sleep(1 * time.Second)
    bk.BookName = name
}

func main() {
    bk := Book{}
    bk.L = new(sync.Mutex)
    wg := &sync.WaitGroup{}
    books := []string{"《三国演义》", "《道德经》", "《西游记》"}
    for _, book := range books {
        wg.Add(1)
        go bk.SetName(wg, book)
    }

    wg.Wait()
}

程序输出:

1
2
3
4
5
6
Lock set name: 《西游记》
Unlock set name: 《西游记》
Lock set name: 《三国演义》
Unlock set name: 《三国演义》
Lock set name: 《道德经》
Unlock set name: 《道德经》

读写锁 sync.RWMutex

读写锁是多读单写互斥锁,分别针对读操作和写操作进行锁定和解锁操作,经常用于读次数远远多于写次数的场合。
在 Go 语言中,读写锁由结构体类型 sync.RWMutex 实现。

基本遵循原则:

  • 写锁定情况下,对读写锁进行读锁定或者写锁定,都将阻塞,而且读锁与写锁之间是互斥的
  • 读锁定情况下,对读写锁进行写锁定,将阻塞;加读锁时不会阻塞,即可多读
  • 对未被写锁定的读写锁进行写解锁,会引发运行时异常
  • 对未被读锁定的读写锁进行读解锁时也会引发运行时异常
  • 写解锁在进行的同时会试图唤醒所有因进行读锁定而被阻塞的协程
  • 读解锁在进行的时候会试图唤醒一个因进行写锁定而被阻塞的协程

与互斥锁类似,sync.RWMutex 类型的零值就已经是利己可用的读写锁了。
在此类型的方法集合中提供了四个方法:

1
2
3
4
5
func (*RWMutex) Lock //写锁定
func (*RWMutex) Unlock  // 写解锁

func (*RWMutex) RLock  // 读锁定
func (*RWMutex) RUnlock  // 读解锁

有关读写锁的实际情况,看看下面代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
    "fmt"
    "sync"
    "time"
)

var m *sync.RWMutex

func main() {
    wg := sync.WaitGroup{}
    wg.Add(20)
    var rwMutex sync.RWMutex
    Data := 0
    for i := 0; i < 10; i++ {
        go func(t int) {
            rwMutex.RLock()
            defer rwMutex.RUnlock()
            fmt.Printf("读数据: %v %d\n", Data, i)
            wg.Done()
            time.Sleep(1 * time.Second)
            // 这句代码第一次运行后,读解锁
            // 循环到第二个时,读锁定后,这个 goroutine 就没有阻塞,同时读成功
        }(i)

        go func(t int) {
            rwMutex.Lock()
            defer rwMutex.Unlock()
            Data += t
            fmt.Printf("写数据: %v %d\n", Data, t)
            wg.Done()

            // 对读写锁进行读锁定或者写锁定,都将阻塞。写锁定下是需要解锁后才能写的
            time.Sleep(5 * time.Second)
        }(i)
    }
    wg.Wait()
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
写数据: 0 0
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
写数据: 9 9
写数据: 11 2
写数据: 14 3
写数据: 18 4
写数据: 23 5
写数据: 29 6
写数据: 36 7
写数据: 44 8
写数据: 45 1
➜  tmp go run test.go
读数据: 0 1
写数据: 1 1
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
写数据: 10 9
写数据: 12 2
写数据: 15 3
写数据: 19 4
写数据: 24 5
写数据: 30 6
写数据: 37 7
写数据: 45 8
写数据: 45 0

通过程序运行的输出可以看到,在写锁定情况下,对读写锁进行读锁定或者写锁定,都将阻塞。
为了体现这个特性,可以把写数据中的 Sleep 设置更长时间,在第一次写锁定后,读数据也没有进行

再次写锁定是在 rwMutex.Unlock() 完成后,才能进行 rwMutex.lock()。
而读数据时则可以多次读,不一定需要等 rwMutexRUnlock() 完成。

sync.WaitGroup

WaitGroup 用于线程总同步。它等待一组线程集合完成,才会继续向下执行。
主线程调用 Add() 方法来设置等待的协程数量。然后每个协程运行,并在完成后调用 Done() 方法。
同时,Wait() 方法用来阻塞主线程,直到所有协程完成才会向下执行。Add(-1) 和 Done() 效果一致,都表示等待的协程数量减少一个。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(t int) {
            defer wg.Done()
            fmt.Println(t)
        }(i)
    }
    wg.Wait()
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
9
0
1
2
3
4
5
6
7
8

sync.Once

sync.Once.Do(f func()) 能保证 Do() 方法只执行一次。
对只需要运行一次的代码,如全局性的初始化操作,或者防止多次重复执行(比如重复提交等)都有很好的作用。
例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
    "fmt"
    "sync"
    "time"
)

var once sync.Once

func onces() {
    fmt.Println("once")
}

func onced() {
    fmt.Println("onced")
}

func main() {
    for i, v := range make([]string, 10) {
        once.Do(onces)
        fmt.Println("v:", v, "--i:", i)
    }

    for i := 0; i < 10; i++ {
        go func(i int) {
            once.Do(onced)
            fmt.Println(i)
        }(i)
    }
    time.Sleep(10)
}

输出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
once
v:  --i: 0
v:  --i: 1
v:  --i: 2
v:  --i: 3
v:  --i: 4
v:  --i: 5
v:  --i: 6
v:  --i: 7
v:  --i: 8
v:  --i: 9
0
4
1
2
3
6
5
7
8

上述代码在第一次循环中,once.Do(onces) 只执行了一次,而同样循环 10 次,once.Do(onced)根本没有执行
所以无论 sync.ONce.Do(f func())里面的 f 函数是否有变化,只要 Once.Do() 运行一次就没有机会再次运行了

Once 是一个结构体,通过判断 done 值来确定是否执行下一步,当 done 为 1 时直接返回,否则锁定后执行 f 函数以及置 done 值为 1
而对 done 的值的修改使用了 atomic.StoreUint32(原子级的操作)。即:

1
2
3
4
type Once struct {
    m    Mutex
    done uint32
}

sync.Map

随着 Go 1.9 的发布,Go 语言增加了一个新的特性,sync.Map,它原生支持并发安全的字典。
原有普通字典并不线程安全(或者说并发安全),一般情况下还可以继续使用它。
只有在涉及线程安全时才考虑 sync.Map,而且 sync.Map 的使用和字典有较大差异,怎么选择还是看情况再做决定。例如:

1