互斥锁Mutex
应用锁爱护共享资源
Locker()
是一个interface
。
type Locker interface { Lock() Unlock()}
mutex构造体
type Mutex struct { state int32 sema uint32}
state:
state 是一个复合型的字段,一个字段蕴含多个意义,这样能够通过尽可能少的内存来实现
互斥锁。这个字段的第一位(最小的一位)来示意这个锁是否被持有,第二位代表是否有
唤醒的 goroutine,第三位代表锁开释处于饥饿状态,残余的位数代表的是期待此锁的 goroutine 数。
sema:
sema是个信号量变量,用来管制期待 goroutine 的阻塞休眠和唤醒。
Lock()
Mutex 能够处于 2 种操作模式:失常模式和饥饿模式。
在失常模式下,waiters按 FIFO 程序排队,然而被唤醒的waiters不领有mutex并与新达到的 goroutines 竞争mutex所有权。新达到的 goroutine 有一个劣势——它们是曾经在 CPU 上运行并且可能有很多,所以被唤醒waiter可能会失败。在这种状况下,唤醒的waiter会在排在期待队列的后面。如果waiter超过 1ms 未能获取mutex,它将mutex切换到饥饿模式。
在饥饿模式下,mutex的所有权间接从解锁 goroutine 移交给队列后面的waiter。
新达到的 goroutine 不会尝试获取互斥锁,即便它看起来已解锁,也不会尝试自旋。相同,他们将本人排在期待队列的尾部。
如果waiter收到mutex的所有权并看到
(1) 它是队列中的最初一个期待者(2) 它期待的工夫不到 1ms,
它将互斥锁切换回失常操作模式。
失常模式具备更好的性能,因为即便有阻塞的waiter,goroutine 能够间断屡次取得mutex。
饥饿模式对于避免尾部提早的病理状况很重要。
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. // cas获取到锁 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) // 尝试自旋竞争或饥饿状态下饥饿goroutine竞争 m.lockSlow()}func (m *Mutex) lockSlow() { // 记录此 goroutine 申请锁的初始工夫 var waitStartTime int64 // 此goroutine的饥饿标记 starving := false // 唤醒标记 awoke := false // 自旋次数 iter := 0 // 以后锁的状态 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. // 锁是非饥饿状态,锁未开释,自旋尝试获取锁 // 如果无奈间接获取锁,进行屡次自旋尝试;屡次尝试失败,进入sema队列休眠 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines. // 尝试设置 mutexWoken 标记以告诉 Unlock 不要唤醒其余阻塞的 goroutine。 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ // 再次获取锁的状态,之后会查看是否锁被开释了 old = m.state continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. // 不要尝试获取饥饿的互斥锁,新达到的 goroutine 必须排队。 if old&mutexStarving == 0 { // 非饥饿状态,加锁 new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { // waiter数量加1 new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. // 以后的 goroutine 将 mutex 切换到饥饿模式。 if starving && old&mutexLocked != 0 { // 设置饥饿状态 new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. // goroutine 曾经从睡眠中唤醒, // 所以咱们须要在任何一种状况下重置标记。 if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 新状态革除唤醒标记 new &^= mutexWoken } // cas胜利设置新状态 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 原来锁的状态已开释,并且不是饥饿状态,失常申请到了锁,返回 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. // 如果咱们之前曾经在期待,请在队列的后面排队。 // 判断是否第一次退出到 waiter 队列 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 阻塞期待 // 将此 waiter 退出到队列,如果是首次,退出到队尾,先进先出。如果不是首次,那么退出到队首,这样期待最久的 goroutine 优先可能获取到锁。此 goroutine 会进行休眠。 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 唤醒之后查看锁是否应该处于饥饿状态,此时曾经被唤醒 // 查看等待时间是否大于1ms starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // 如果锁曾经处于饥饿状态,间接抢到锁,返回 if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 加锁,waiter数减1 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. // 退出饥饿模式。 // 此 waiter 曾经是队列中的最初一个 waiter 了,没有其它的期待锁的 goroutine 了; // 此 waiter 的等待时间小于 1 毫秒。 delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) }}
Unlock()
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) }}func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. // 如果 Mutex 处于失常状态,如果没有 waiter,或者曾经有在解决的状况了,那么开释就好,不做额定的解决。 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. // waiter 数减 1,设置mutexWoken 标记,通过 CAS 更新 state 的值。 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. // 如果 Mutex 处于饥饿状态,间接唤醒期待队列中的 waiter。 runtime_Semrelease(&m.sema, true, 1) }}
TryLock()
当一个 goroutine 调用这个TryLock 办法申请锁的时候,如果这把锁没有被其余 goroutine 所持有,那么,这个goroutine 就持有了这把锁,并返回 true;如果这把锁曾经被其余 goroutine 所持有,或者是正在筹备交给某个被唤醒的 goroutine,那么,这个申请锁的 goroutine 就间接返回
false,不会阻塞在办法调用上。
func (m *Mutex) TryLock() bool { old := m.state if old&(mutexLocked|mutexStarving) != 0 { return false } // There may be a goroutine waiting for the mutex, but we are // running now and can try to grab the mutex before that // goroutine wakes up. if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) { return false } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return true}