TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE //空命令 SUBL $1, AX JNZ again RET
Lock
1 2 3 4 5 6 7 8 9
func(m *Mutex)Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { //...竞争检测 return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() }
func(m *Mutex)unlockSlow(newint32) { if (new+mutexLocked)&mutexLocked == 0 { // 当new不是锁住状态 throw("sync: unlock of unlocked mutex") } //如果不是饥饿模式 ifnew&mutexStarving == 0 { old := new for { // 如果waiter数量为0,锁的三个标志位任一非0,直接返回 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 尝试将锁更新为woken状态,如果成功了,就通过信号量去唤醒goroutine new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. // 饥饿模式直接手把手交接锁的控制权 runtime_Semrelease(&m.sema, true, 1) } }
总结
互斥锁只有在普通模式才能进入自旋
能够自旋的条件:a. 需要运行在多 CPU 的机器上;b. 当前的Goroutine 为了获取该锁进入自旋的次数小于四次;c. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空
一旦当前 Goroutine 能够进入自旋就会调用runtime.sync_runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间
func(rw *RWMutex)Lock() { //竞争检查 // 首先调用锁的能力,阻塞后续的写 rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } //竞争检查 }
获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作饿死
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
func(rw *RWMutex)Unlock() { //竞争检查...
// Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() //竞争检查 }
读锁
1 2 3 4 5 6 7 8
func(rw *RWMutex)RLock() { //竞争检查... if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } //竞争检查... }
func(rw *RWMutex)RUnlock() { //竞争检查... if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } //竞争检查... }
func(rw *RWMutex)rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
func(wg *WaitGroup)state()(statep *uint64, semap *uint32) { if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { // state1 is 64-bit aligned: nothing to do. return &wg.state1, &wg.state2 } else { // state1 is 32-bit aligned but not 64-bit aligned: this means that // (&state1)+4 is 64-bit aligned. state := (*[3]uint32)(unsafe.Pointer(&wg.state1)) return (*uint64)(unsafe.Pointer(&state[1])), &state[0] } }
func(wg *WaitGroup)Add(delta int) { statep, semap := wg.state() //竞争检查... state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) w := uint32(state) //竞争检查... if v < 0 { panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } if v > 0 || w == 0 { return } // This goroutine has set counter to 0 when waiters > 0. // Now there can't be concurrent mutations of state: // - Adds must not happen concurrently with Wait, // - Wait does not increment waiters if it sees counter == 0. // Still do a cheap sanity check to detect WaitGroup misuse. if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. *statep = 0 for ; w != 0; w-- { runtime_Semrelease(semap, false, 0) } }
func(wg *WaitGroup)Wait() { statep, semap := wg.state() //竞争检查... for { state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { // Counter is 0, no need to wait. //竞争检查... return } // Increment waiters count. if atomic.CompareAndSwapUint64(statep, state, state+1) { //竞争检查... runtime_Semacquire(semap) if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } //竞争检查... return } } }
Once
Once 是一个仅执行一次操作的对象
使用例子
1 2 3 4 5 6 7 8
var d sync.Once for i := 0; i < 5; i++ { d.Do(func() { fmt.Println("hello") }) }