并发控制(三)
互斥锁 sync.Mutex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 | package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
func main() {
var state = make(map[int]int)
var mutex = &sync.Mutex{}
var readOps uint64
var writeOps uint64
for r := 0; r < 100; r++ {
go func() {
total := 0
for {
key := rand.Intn(5)
mutex.Lock()
total += state[key]
mutex.Unlock()
atomic.AddUint64(&readOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
for w := 0; w < 10; w++ {
go func() {
for {
key := rand.Intn(5)
val := rand.Intn(100)
mutex.Lock()
state[key] = val
mutex.Unlock()
atomic.AddUint64(&writeOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
time.Sleep(time.Second)
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
mutex.Lock()
fmt.Println("state:", state)
mutex.Unlock()
}
|
输出:
| readOps: 79800
writeOps: 7984
state: map[0:81 1:23 2:55 3:24 4:42]
|
Go 语言包中的 sync 包提供了两种锁类型: sync.Mutex 和 sync.RWMutex,前者是互斥锁,后者是读写锁
互斥锁是在并发程序中对共享资源进行访问控制的主要手段,在 Go 语言中,更推崇使用通道来实现资源共享和通信。
对此 Go 语言提供了非常简单易用的 Mutex,Mutex 为一结构类型,对外暴露 Lock() 和 Unlock() 两个方法,分别用于加锁和解锁
- 同一个协程中同步调用使用 Lock() 加锁后,不能再对其加锁,否则会引发运行时异常。只能在 Unlock() 之后再次 Lock()。多个协程中异步调用 Lock() 没问题,但每个协程之间产生了锁竞争,因此不会有运行时异常。互斥锁适用于只允许有一个读或者写的场景,所以该锁也叫做全局锁
- Unlock() 用于解锁,如果在使用 Unlock() 前未加锁,就会引起一个运行错误。已经锁定的 Mutex 并不与特定的协程相关联,这样可以利用一个协程对其加锁,再利用其他协程对其解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | package main
import (
"fmt"
"sync"
"time"
)
var mutex sync.Mutex
func LockA() {
mutex.Lock()
fmt.Println("Lock in A")
LockB()
time.Sleep(5)
fmt.Println("Wake up in A")
mutex.Unlock()
fmt.Println("UnLock in A")
}
func LockB() {
fmt.Println("B")
mutex.Lock()
fmt.Println("Lock in B")
mutex.Unlock()
fmt.Println("UnLock in B")
}
func main() {
LockA()
time.Sleep(10)
}
|
LockA 中有 Lock(),LockB 中也有 Lock(),LockB 的 Lock() 运行时,锁还没有 UnLock(),程序发生 panic。
这是在同步调用互斥锁中常见的问题,一般在对一对互斥锁中间不要调用其他函数,即使要用也尽量采用异步的方式
可以试试把 LockA 的 LockB() 调用改为 go LockB()
建议: 同一个互斥锁的成对锁定和解锁操作可以放在同一层次的代码块中。使用互斥锁的经典模式如下所示:
| var lck sync.Mutex
func foo() {
lck.Lock()
defer lck.Unlock()
// ...
}
|
lck.Lock() 会阻塞直到获取锁,然后利用 defer 语句在函数返回时自动释放锁
下面代码通过三个协程来体现 sync.Mutex 对资源的访问控制特征
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 | package main
import (
"fmt"
"sync"
"time"
)
func main() {
wg := sync.WaitGroup{}
var mutex sync.Mutex
fmt.Println("Locking (G0)")
mutex.Lock()
fmt.Println("locked (G0)")
wg.Add(3)
for i := 1; i < 4; i++ {
go func(i int) {
fmt.Printf("Locking (G%d)\n", i)
mutex.Lock()
fmt.Printf("locked (G%d)\n", i)
time.Sleep(time.Second * 2)
mutex.Unlock()
fmt.Printf("unlocked (G%d)\n", i)
wg.Done()
}(i)
}
time.Sleep(time.Second * 5)
fmt.Println("ready unlock (G0)")
mutex.Unlock()
fmt.Println("unlocked (G0)")
wg.Wait()
}
|
程序输出:
1
2
3
4
5
6
7
8
9
10
11
12
13 | Locking (G0)
locked (G0)
Locking (G3)
Locking (G2)
Locking (G1)
ready unlock (G0)
unlocked (G0)
locked (G3)
unlocked (G3)
locked (G2)
unlocked (G2)
locked (G1)
unlocked (G1)
|
通过程序执行结果可以看到,当有锁释放时,才能进行加锁动作,当 G0 的锁释放时,等待加锁的 G1,G2,G3 都会竞争加锁的机会,这里是 G3 抢到了加锁的机会
Mutex 也可以作为结构体的一部分,这样结构体在被多线程处理时数据安全才有保障。例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 | package main
import (
"fmt"
"sync"
"time"
)
type Book struct {
BookName string
L *sync.Mutex
}
func (bk *Book) SetName(wg *sync.WaitGroup, name string) {
defer func() {
fmt.Println("Unlock set name:", name)
bk.L.Unlock()
wg.Done()
}()
bk.L.Lock()
fmt.Println("Lock set name:", name)
time.Sleep(1 * time.Second)
bk.BookName = name
}
func main() {
bk := Book{}
bk.L = new(sync.Mutex)
wg := &sync.WaitGroup{}
books := []string{"《三国演义》", "《道德经》", "《西游记》"}
for _, book := range books {
wg.Add(1)
go bk.SetName(wg, book)
}
wg.Wait()
}
|
程序输出:
| Lock set name: 《西游记》
Unlock set name: 《西游记》
Lock set name: 《三国演义》
Unlock set name: 《三国演义》
Lock set name: 《道德经》
Unlock set name: 《道德经》
|
读写锁 sync.RWMutex
读写锁是多读单写互斥锁,分别针对读操作和写操作进行锁定和解锁操作,经常用于读次数远远多于写次数的场合。
在 Go 语言中,读写锁由结构体类型 sync.RWMutex 实现。
基本遵循原则:
- 写锁定情况下,对读写锁进行读锁定或者写锁定,都将阻塞,而且读锁与写锁之间是互斥的
- 读锁定情况下,对读写锁进行写锁定,将阻塞;加读锁时不会阻塞,即可多读
- 对未被写锁定的读写锁进行写解锁,会引发运行时异常
- 对未被读锁定的读写锁进行读解锁时也会引发运行时异常
- 写解锁在进行的同时会试图唤醒所有因进行读锁定而被阻塞的协程
- 读解锁在进行的时候会试图唤醒一个因进行写锁定而被阻塞的协程
与互斥锁类似,sync.RWMutex 类型的零值就已经是利己可用的读写锁了。
在此类型的方法集合中提供了四个方法:
| func (*RWMutex) Lock //写锁定
func (*RWMutex) Unlock // 写解锁
func (*RWMutex) RLock // 读锁定
func (*RWMutex) RUnlock // 读解锁
|
有关读写锁的实际情况,看看下面代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 | package main
import (
"fmt"
"sync"
"time"
)
var m *sync.RWMutex
func main() {
wg := sync.WaitGroup{}
wg.Add(20)
var rwMutex sync.RWMutex
Data := 0
for i := 0; i < 10; i++ {
go func(t int) {
rwMutex.RLock()
defer rwMutex.RUnlock()
fmt.Printf("读数据: %v %d\n", Data, i)
wg.Done()
time.Sleep(1 * time.Second)
// 这句代码第一次运行后,读解锁
// 循环到第二个时,读锁定后,这个 goroutine 就没有阻塞,同时读成功
}(i)
go func(t int) {
rwMutex.Lock()
defer rwMutex.Unlock()
Data += t
fmt.Printf("写数据: %v %d\n", Data, t)
wg.Done()
// 对读写锁进行读锁定或者写锁定,都将阻塞。写锁定下是需要解锁后才能写的
time.Sleep(5 * time.Second)
}(i)
}
wg.Wait()
}
|
输出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 | 写数据: 0 0
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
读数据: 0 10
写数据: 9 9
写数据: 11 2
写数据: 14 3
写数据: 18 4
写数据: 23 5
写数据: 29 6
写数据: 36 7
写数据: 44 8
写数据: 45 1
➜ tmp go run test.go
读数据: 0 1
写数据: 1 1
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
读数据: 1 10
写数据: 10 9
写数据: 12 2
写数据: 15 3
写数据: 19 4
写数据: 24 5
写数据: 30 6
写数据: 37 7
写数据: 45 8
写数据: 45 0
|
通过程序运行的输出可以看到,在写锁定情况下,对读写锁进行读锁定或者写锁定,都将阻塞。
为了体现这个特性,可以把写数据中的 Sleep 设置更长时间,在第一次写锁定后,读数据也没有进行
再次写锁定是在 rwMutex.Unlock() 完成后,才能进行 rwMutex.lock()。
而读数据时则可以多次读,不一定需要等 rwMutexRUnlock() 完成。
sync.WaitGroup
WaitGroup 用于线程总同步。它等待一组线程集合完成,才会继续向下执行。
主线程调用 Add() 方法来设置等待的协程数量。然后每个协程运行,并在完成后调用 Done() 方法。
同时,Wait() 方法用来阻塞主线程,直到所有协程完成才会向下执行。Add(-1) 和 Done() 效果一致,都表示等待的协程数量减少一个。例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(t int) {
defer wg.Done()
fmt.Println(t)
}(i)
}
wg.Wait()
}
|
输出:
sync.Once
sync.Once.Do(f func())
能保证 Do() 方法只执行一次。
对只需要运行一次的代码,如全局性的初始化操作,或者防止多次重复执行(比如重复提交等)都有很好的作用。
例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | package main
import (
"fmt"
"sync"
"time"
)
var once sync.Once
func onces() {
fmt.Println("once")
}
func onced() {
fmt.Println("onced")
}
func main() {
for i, v := range make([]string, 10) {
once.Do(onces)
fmt.Println("v:", v, "--i:", i)
}
for i := 0; i < 10; i++ {
go func(i int) {
once.Do(onced)
fmt.Println(i)
}(i)
}
time.Sleep(10)
}
|
输出:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | once
v: --i: 0
v: --i: 1
v: --i: 2
v: --i: 3
v: --i: 4
v: --i: 5
v: --i: 6
v: --i: 7
v: --i: 8
v: --i: 9
0
4
1
2
3
6
5
7
8
|
上述代码在第一次循环中,once.Do(onces) 只执行了一次,而同样循环 10 次,once.Do(onced)
根本没有执行
所以无论 sync.ONce.Do(f func())
里面的 f 函数是否有变化,只要 Once.Do()
运行一次就没有机会再次运行了
Once 是一个结构体,通过判断 done 值来确定是否执行下一步,当 done 为 1 时直接返回,否则锁定后执行 f 函数以及置 done 值为 1
而对 done 的值的修改使用了 atomic.StoreUint32
(原子级的操作)。即:
| type Once struct {
m Mutex
done uint32
}
|
sync.Map
随着 Go 1.9 的发布,Go 语言增加了一个新的特性,sync.Map,它原生支持并发安全的字典。
原有普通字典并不线程安全(或者说并发安全),一般情况下还可以继续使用它。
只有在涉及线程安全时才考虑 sync.Map,而且 sync.Map 的使用和字典有较大差异,怎么选择还是看情况再做决定。例如: