Go-16-Sync

Golang 源码 sync 包提供了同步操作

  1. 互斥锁 sync.Mutext
  2. 读写锁 sync.RWMutex
  3. 等待组 sync.WaitGroup
  4. 单次操作 sync.Once
  5. 内存池 sync.Pool
  6. 安全map sync.Map
  7. 同步条件 sync.Cond

Mutex

  1. Mutex是一把公平锁(Mutex fairness)

  2. Mutex 有两种模式:正常模式饥饿模式

    • 正常模式:在正常模式,请求锁的 goroutines 是按照先进先出的顺序进行排队行程 waiters 的。 调用 Unlock() 方法释放锁资源时,如果发现有等待唤起的 waiter 时,则会将队头的 waiter 唤起。被唤起后调用CAS方法去尝试 修改锁的状态,如果修改成功则表示占有锁的资源成功。

      锁一共有三个状态:锁住状态、唤起状态、饥饿状态

    • 饥饿模式:当调用 Unlock()方法释放锁资源的时候,唤起的 waiter G1 需要通过 CAS 操作获取锁资源,如果此时有新请求锁资源的 goroutine G2,那么它们会一起通过 CAS 方法竞争获取锁资源。当不断有G2来进行锁资源争夺,就有可能导致 G1 一直无法获取到 锁资源而饿死,所以Go采用饥饿模式

      • G1 在超过一定时间获取不到资源之后,会在 Unlock 释放锁资源时,直接将锁的资源交给 G1,并且将当前状态改为 饥饿模式
      • G1 获取到锁的所有权时,发现自己是队列中最后一个waiter或者自己等待时间小于1ms,那么锁将切换回正常模式
  3. 正常模式拥有非常好的性能表现,因为即使存在阻塞的 waiter,一个goroutine也能够多次获取锁。

  4. 饥饿模式对于预防极端的长尾时延(tail latency)

实现原理

代码路径/go/src/sync/mutex.go

1
2
3
4
5
6
7
8
9
10
11
type Locker interface {
Lock()
Unlock()
}

// Mutex 是互斥锁。
// 互斥锁的零值是未锁定的互斥锁。
type Mutex struct {
state int32 //锁的状态
sema uint32 //信号
}

Mutex 结构体有两个字段:

  1. state: 表示当前互斥锁的状态
  2. sema: 是个信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒

注意,首次使用后不要进行Mutex的值拷贝,否则 Mutex 会失效

state 状态字段代表多个意思,mutexWaiterShift = 3低三位记录三种状态,剩下的位置,用来表示可以有1<<(32-3)个 Goroutine 等待互斥锁的释放

Group 1

1
2
3
4
5
6
7
const (
mutexLocked = 1 << iota // 表示当前对象锁的状态 0-未锁住,1-已锁住
mutexWoken //表示当前对象是否被唤醒 0-唤醒,1-未唤醒
mutexStarving //表示当前对象是否为饥饿模式 0-正常模式,1为饥饿模式
mutexWaiterShift = iota //从倒数第四位往前的bit位表示在排队等待的goroutine
starvationThresholdNs = 1e6 //1ms
)

自旋

互斥锁中提到了比较重要的自旋操作

runtime_canSpin:比较保守的自旋,golang中自旋锁并不会一直自旋下去,在runtime包中runtime_canSpin方法做了一些限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//判断能否自旋
func sync_runtime_canSpin(i int) bool {
// 1. i 大于 active_spin = 4 的时候不能自旋
// 2. 机器是单核 的时候不能自旋
// 3. gomaxprocs 小于 (闲置的p + 自旋的m + 1) 的时候不能自旋
// 4. 本地队列不为空 的时候不能自旋
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) { //本地可运行G队列不为空
return false
}
return true
}

**runtime_doSpin:**会调用procyield函数,该函数也是汇编语言实现。函数内部循环调用PAUSE指令。PAUSE指令什么都不做,但是会消耗CPU时间,在执行PAUSE指令时,CPU不会对它做不必要的优化

1
2
3
4
5
6
7
8
9
10
11
func sync_runtime_doSpin() {
procyield(active_spin_cnt) //自旋操作
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE //空命令
SUBL $1, AX
JNZ again
RET

Lock

1
2
3
4
5
6
7
8
9
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
//...竞争检测
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}

加锁首先会使用 CompareAndSwapInt32 看能不能拿到锁,否则进入到 lockSlow 流程

1
2
//如果*addr中的值 与 old 相等,则将 *addr与 new进行交换,并返回true,否则返回false
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
lockSlow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
func (m *Mutex) lockSlow() {
var waitStartTime int64 //当前groutine的等待时间
starving := false //当前goroutine是否是饥饿标记
awoke := false //当前goroutine是否是唤醒标记
iter := 0 //当前goroutine自旋次数自旋次数
old := m.state //copy锁的状态为历史状态
for {

// 在饥饿模式不进行自旋,锁的所有权会直接移交给waiters。

// 当锁是locked状态并且当前goroutine可以自旋时,开始自旋。
// 当锁是starving状态,就直接false,不自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 触发自旋是有意义的。
// 尝试设置woken标志来通知unlock,以便不唤起其他阻塞的goroutines
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// 如果当前goroutine是未唤醒状态,互斥锁也是未唤醒状态,并且互斥锁的waiter数量不等于0,
// 就比较锁的最新状态(m.state)和历史状态(old),如果未发生改变,将锁的状态更新为woken。
// 并且设置当前goroutine为awoke状态。
awoke = true
}
//自旋
runtime_doSpin()
//自旋次数增加
iter++
// copy锁的状态为历史状态,自旋期间其他goroutine可能修改了state,所以要更新
old = m.state
// 继续尝试自旋
continue
}

//此时,到这里有两个可能:
//1. 锁的状态发生了变化(唤起状态、饥饿状态)
//2. 无法继续进行自旋
new := old // copy锁的历史状态为new状态(为什么不是直接获取m.state)

// 如果锁的历史状态(old)不是starving状态,将锁的新状态(new)更新为locked状态
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果锁的历史状态(old)是locked状态或者是starving状态,将锁的waiter数量加1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift //注意 waiter数量怎样记录数量的
}

// 如果当前goroutine是starving状态且锁的历史状态(old)是locked状态,将锁的新状态(new)更新为starving状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}

// 如果当前goroutine是awoke状态
if awoke {
// 锁的状态是非唤醒状态则直接报错
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// &^ 是 bit clear (AND NOT)
// 取消锁的新状态(new)的woken状态标志。
new &^= mutexWoken
}
// 比较锁的最新状态(m.state)和历史状态(old),如果未发生改变,那么更新为new。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//如果cas更新成功,并且锁的历史状态(old)即不是locked也不是starving,那么结束循环,通过CAS加锁成功。
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 如果之前已经等待,将排在队列前面。
// 当前goroutine是否等待过。
queueLifo := waitStartTime != 0

// 如果开始等待时间为0,更新为当前时间为开始等待时间。
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}

// 通过信号量获取锁
// runtime实现代码:https://github.com/golang/go/blob/go1.15.5/src/runtime/sema.go#L69-L72
// runtime信号量获取:https://github.com/golang/go/blob/go1.15.5/src/runtime/sema.go#L98-L153
runtime_SemacquireMutex(&m.sema, queueLifo, 1)

// 如果当前goroutine是starving状态或者等待时间大于1ms,更新当前goroutine为starving状态。
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

// 更新锁的历史状态(old)
old = m.state

//锁是饥饿状态
if old&mutexStarving != 0 {
// 如果当前goroutine是唤醒状态并且锁在饥饿模式,
// 锁的所有权转移给当前goroutine,但是锁处于不一致的状态中:mutexLocked没有设置
// 并且我们将任然被认为是waiter。这个状态需要被修复。
// 如果锁的历史状态(old)是locked或者woken的,或者waiters的数量不为0,触发锁状态异常。
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前goroutine获取锁,waiter数量-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)

// 如果当前goroutine不是starving状态或者锁的历史状态(old)的waiter数量是1,delta减去3。
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式
// 在这里这么做至关重要,还要考虑等待时间。
// 饥饿模式是非常低效率的,一旦两个goroutine将互斥锁切换为饥饿模式,它们便可以无限锁。
delta -= mutexStarving
}
// 更新锁的状态
atomic.AddInt32(&m.state, delta)
break //退出自旋
}
// 当前goroutine更新为awoke状态
awoke = true
// 当前goroutine自旋次数清零
iter = 0
} else {
// 更新锁的历史状态(old) //当在处理 new 状态的时候 锁的状态发生了变化,那么重新复制 old 再进行逻辑判断
old = m.state
}
}//for end
//...竞争判断
}

Unlock

解锁的前提是加锁

1
2
3
4
5
6
7
8
9
func (m *Mutex) Unlock() {
//竞争..

// 如果waiter数量为0,三个标志位去除locked后也为0,那么可以直接解锁了。
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
unlockSlow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 { // 当new不是锁住状态
throw("sync: unlock of unlocked mutex")
}
//如果不是饥饿模式
if new&mutexStarving == 0 {
old := new
for {
// 如果waiter数量为0,锁的三个标志位任一非0,直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 尝试将锁更新为woken状态,如果成功了,就通过信号量去唤醒goroutine
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿模式直接手把手交接锁的控制权
runtime_Semrelease(&m.sema, true, 1)
}
}

总结

  1. 互斥锁只有在普通模式才能进入自旋
  2. 能够自旋的条件:a. 需要运行在多 CPU 的机器上;b. 当前的Goroutine 为了获取该锁进入自旋的次数小于四次;c. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空
  3. 一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间

RWMutex

读写互斥锁 sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行

实现原理

代码路径:/go/src/sync/rwmutex.go

1
2
3
4
5
6
7
8
9
type RWMutex struct {
w Mutex // 复制原子锁的能力
writerSem uint32 // 写等待读完成信号量
readerSem uint32 // 读等待写完成信号量
readerCount int32 // 当前正在执行的读操作的数量
readerWait int32 // 当写操作被阻塞时等待的读操作个数
}

const rwmutexMaxReaders = 1 << 30

同理,首次使用后不要进行值拷贝,否则会失效

写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
func (rw *RWMutex) Lock() {
//竞争检查

// 首先调用锁的能力,阻塞后续的写
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
//竞争检查
}

获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作饿死

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rw *RWMutex) Unlock() {
//竞争检查...

// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
//竞争检查
}

读锁

1
2
3
4
5
6
7
8
func (rw *RWMutex) RLock() {
//竞争检查...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
//竞争检查...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (rw *RWMutex) RUnlock() {
//竞争检查...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
//竞争检查...
}

func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

WaitGroup

WaitGroup 等待一组 goroutine 完成,主 goroutine 调用 Add 设置数量要等待的 goroutine。然后每个 goroutine 完成后运行并调用 Done。同时,等待可用于阻塞,直到所有 goroutine 完成。

代码路径:go/src/sync/waitgroup.go

实现原理

1
2
3
4
5
6
7
8
9
type WaitGroup struct {
noCopy noCopy

// 64 位值:高 32 位是计数器,低 32 位是等待者的数量。
// 64位原子操作需要64位对齐,但是32位编译器只保证 64 位字段是 32 位对齐的。
// 出于这个原因,在 32 位架构上,需要检查 state(),判断 state1 是否对齐并自动“交换”字段顺序
state1 uint64
state2 uint32
}

同理,首次使用后不要进行值拷贝,否则会失效

state

1
2
3
4
5
6
7
8
9
10
11
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// state1 is 64-bit aligned: nothing to do.
return &wg.state1, &wg.state2
} else {
// state1 is 32-bit aligned but not 64-bit aligned: this means that
// (&state1)+4 is 64-bit aligned.
state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
}
}

Add/Done

Add(n) 表示添加一个等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
//竞争检查...
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
//竞争检查...
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}

Done() 操作其实是 减1

1
2
3
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()

//竞争检查...

for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
//竞争检查...
return
}
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
//竞争检查...
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
//竞争检查...
return
}
}
}

Once

Once 是一个仅执行一次操作的对象

使用例子

1
2
3
4
5
6
7
8
var d sync.Once
for i := 0; i < 5; i++ {
d.Do(func() {
fmt.Println("hello")
})
}

//hello

循环执行打印输出操作,不同的是使用 sync.Once 对象执行打印方法,仅执行一次。并发执行效果一致

实现原理

代码路径:/go/src/sync/once.go

1
2
3
4
type Once struct {
done uint32 //done 表示动作是否已经执行
m Mutex
}

执行函数逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 { //如果不等于0直接退出
// Outlined slow-path to allow inlining of the fast-path.
o.doSlow(f)
}
}

func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock() //通过获取锁来进行done状态修改
if o.done == 0 { //这里直接使用o.done
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
  1. 直接使用 o.done == 0 判断是否执行过的原因是上面获取到了写锁
  2. atomic.StoreUint32(&o.done, 1)放置在 f() 的后面从而保证当 done 变化,f() 已经执行完毕

在注释中,它提到了另外一种错误的实现方式

1
2
3
4
5
func (o *Once) Do(f func()) {
if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
f()
}
}

Do() 保证当它返回时,f 已经完成。此实现不会实现该保证:给定两个同时调用,cas 的获胜者将调用 f,第二个会立即返回,没有等待第一个对 f 的调用完成。

Cond

sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine

  1. 它和互斥锁的区别是:互斥锁 sync.Mutex 通常用来保护临界区和共享资源,条件变量 sync.Cond 用来协调想要访问共享资源的 goroutine
  2. 注意点:每个Cond都会关联一个 Lock(*sync.Mutex or *sync.RWMutex),当修改条件或者调用Wait方法时,必须加锁保护condition

使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
var done = false

func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
log.Println(name, "starts wait")
c.Wait()
log.Println(name, "end wait")
}
c.L.Unlock()
log.Println(name, "end reading")
}

func write(name string, c *sync.Cond) {
log.Println(name, "starts writing")
time.Sleep(time.Second)
c.L.Lock()
done = true
c.L.Unlock()
log.Println(name, "wakes all")
c.Broadcast()
}

func main() {
cond := sync.NewCond(&sync.Mutex{})

go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
write("writer", cond)

time.Sleep(time.Second * 3)
}
  • done 即互斥锁需要保护的条件变量。
  • read() 调用 Wait() 等待通知,直到 done 为 true。
  • write() 接收数据,接收完成后,将 done 置为 true,调用 Broadcast() 通知所有等待的协程。
  • write() 中的暂停了 1s,一方面是模拟耗时,另一方面是确保前面的 3 个 read 协程都执行到 Wait(),处于等待状态。main 函数最后暂停了 3s,确保所有操作执行完毕

运行结果如下;

1
2
3
4
5
6
7
8
9
10
11
2022/09/10 22:40:00 writer starts writing
2022/09/10 22:40:00 reader1 starts wait
2022/09/10 22:40:00 reader2 starts wait
2022/09/10 22:40:00 reader3 starts wait
2022/09/10 22:40:01 writer wakes all
2022/09/10 22:40:01 reader3 end wait
2022/09/10 22:40:01 reader3 end reading
2022/09/10 22:40:01 reader1 end wait
2022/09/10 22:40:01 reader1 end reading
2022/09/10 22:40:01 reader2 end wait
2022/09/10 22:40:01 reader2 end reading

注意点:

  1. 每次都是 end reading 下一个才会开始执行,原因其实是 c.L.Unlock(),只有释放锁然后其他Wait()中才能获取写锁继续执行
  2. 因为 done = false 所以所有的 reader 都进入 wait,但是在构建条件 sync.Cond的时候传入的是锁,那么它又是怎么跟 done 以及 Broadcast 联系在一起的呢?其实没有关系,done 只是用于判断是否重复进入 Wait()

当去掉 done 的变换

1
2
3
4
5
6
7
8
9
func write(name string, c *sync.Cond) {
log.Println(name, "starts writing")
time.Sleep(time.Second)
// c.L.Lock()
// done = true
// c.L.Unlock()
log.Println(name, "wakes all")
c.Broadcast()
}

结果变成了

1
2
3
4
5
6
7
8
9
10
11
2022/09/10 22:46:14 writer starts writing
2022/09/10 22:46:14 reader3 starts wait
2022/09/10 22:46:14 reader1 starts wait
2022/09/10 22:46:14 reader2 starts wait
2022/09/10 22:46:15 writer wakes all
2022/09/10 22:46:15 reader2 end wait
2022/09/10 22:46:15 reader2 starts wait
2022/09/10 22:46:15 reader3 end wait
2022/09/10 22:46:15 reader3 starts wait
2022/09/10 22:46:15 reader1 end wait
2022/09/10 22:46:15 reader1 starts wait

可以看出来,当 Broadcast的时候 wait 才会返回,由于 done 没有变化,所以reader重新进入了Wait状态

实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker

notify notifyList
checker copyChecker
}

type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}

type copyChecker uintptr

//判断是否复制过
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

注意:

  1. noCopy 只在 go vet 语法检测的时候有效。即使发生拷贝,编译与运行都能正常的运行
Wait

Wait()会自动释放c.L,并挂起调用者的goroutine。之后恢复执行,Wait()会在返回时对c.L加锁。除非被Signal或者Broadcast唤醒,否则Wait()不会返回。由于Wait()第一次恢复时,C.L并没有加锁,所以当Wait返回时,调用者通常并不能假设条件为真。取而代之的是, 调用者应该在循环中调用Wait。(简单来说,只要想使用condition,就必须加锁)

1
2
3
4
5
6
7
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify) //添加通知
c.L.Unlock() //释放锁
runtime_notifyListWait(&c.notify, t) //等待通知
c.L.Lock() //加锁
}

对条件的检查,使用了 for !condition() 而非 if,是因为当前协程被唤醒时,条件不一定符合要求,需要再次 Wait 等待下次被唤醒。为了保险起见,使用 for 能够确保条件符合要求后,再执行后续的代码

1
2
3
4
5
6
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
Singal

Signal只唤醒1个等待c的goroutine。

调用Signal的时候,可以加锁,也可以不加锁。

1
2
3
4
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
Boardcase

Broadcast会唤醒所有等待c 的 goroutine

1
2
3
4
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

总结

  1. 这里一直出现的 race 到底是干嘛的后续文章说明
  2. 这里一直出现的 runtime_Semacquire 是干嘛的后续文章说明

参考链接

  1. https://www.modb.pro/db/170131
  2. https://vearne.cc/archives/680