前言
golang 的 sync
包下有种锁,一种是 sync.RWMutex
, 另一种是sync.Mutex
, 本文将解说下sync.Mutex
是如何实现的?如何防止 读 / 写 饥饿 问题?就让咱们带着这些问题来看源码是如何实现的
概述
// Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don't try to acquire the mutex even if it appears // to be unlocked, and don't try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency.
以上摘自 golang 源码中对 mutex
的正文,我感觉用来概括解释十分清晰
Mutex
作为并发原语中的锁,波及锁的公平性(即偏心锁和非偏心锁,通常非偏心锁性能更加),go 中叫做两种模式:失常
和饥饿
。
失常模式 下,未获取到锁的 goroutine 会在 waiter 中依照 FIFO 形式在队列中排队。当锁被开释,会唤醒 waiter 中的 goroutine,它会和新来的 goroutine(如果开释锁时,刚好有新的协程来获取锁)进行竞争锁,新来的 goroutine 有更大的劣势获取到锁,因为他们正在 CPU 执行。那么刚刚在 waiter 中唤醒的 goroutine 因为没有获取到锁(白跑一趟), 那么它就会被放到 waiter 的队列头. 当 waiter 中的 goroutine 超过 1s 没有获取到锁,会将 mutex 置为饥饿模式。
饥饿模式 下, 在开释锁的过程,新来的 goroutine 不会参加竞争锁,间接由 waiter 中队头的 goroutine 获取锁,如果队头的 goroutine 的等待时间小于 1ms, 阐明此时曾经没有协程处于饥饿, 将切换回失常模式。
源码
const(
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
)
type Mutex struct {
state int32
sema uint32
}
state
状态中低三位用于标识锁的状态其余高位用于记录 waiter 的数量,state 能够示意为:waiterNum|mutexStarving|mutexWoken|mutexLocked
sema
是个 FIFO 队列,用于 goroutine 作为 waiter 在这里排队.
获取锁
没有竞争,间接通过 CAS 获取锁
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()}
有竞争,走 lockSlow
失常模式
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}
runtime_doSpin()
iter++
old = m.state
continue
}
失常模式下,这里通过自旋期待锁的开释,同时会将 state 置为mutexWoken
, 用于锁在开释是是否将锁资源移交给自旋锁的协程竞争锁
if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {
// 这里是失常模式下,线程唤醒后获取到锁的进口
break // locked the mutex with CAS // 线程自旋后,原来持有锁的线程开释锁后,state 的 mutexLocked 或置于 0。而后,本次 CAS 胜利,获取到锁
}
// If we were already waiting before, queue at the front of the queue. // 没有获取到锁,若是之前曾经在 waiter 中,则放入队首,否则放入队尾
queueLifo := waitStartTime != 0
if waitStartTime == 0 {waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 每次有锁开释,会唤醒 waiter 协程,唤醒点在这里
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
...
awoke = true
iter = 0
}
- 线程自旋后,原来持有锁的线程开释锁后,state 的 mutexLocked 或置于 0。而后,本次 CAS 胜利,获取到锁
- 若本人是 waiter 唤醒后,然而由没有获取到锁,则放入 waiter 队首,否则放入队尾
- 若等待时间超过 1s , 将 mutex 切换为饥饿模式
饥饿模式
new := old
...
if atomic.CompareAndSwapInt32(&m.state, old, new) {
...
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
...
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break // 这里是锁在饥饿条件下,协程被唤醒后(获取到锁)的进口
}
awoke = true
iter = 0
} else {old = m.state}
- 若协程曾经超过 1ms 没有获取到锁,则切换到饥饿模式(
runtime_nanotime()-waitStartTime > starvationThresholdNs
). - 若 waiter 队列只剩本协程,那么退出饥饿模式(
old>>mutexWaiterShift == 1
)
开释锁
没有锁竞争,间接 CAS 开释锁资源
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
有竞争,走 unlockSlow
失常模式
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
从 waiter 中唤起一个,与新来的 goroutine 一起竞争锁资源
饥饿模式
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
间接从 waiter 中取出期待队列的第一个饥饿的协程来获取锁
参考文献
- https://juejin.cn/post/695897…
- https://segmentfault.com/a/11…