互斥锁Mutex

应用锁爱护共享资源

Locker()是一个interface

type Locker interface {    Lock()    Unlock()}

mutex构造体

type Mutex struct {    state int32    sema  uint32}

state:

state 是一个复合型的字段,一个字段蕴含多个意义,这样能够通过尽可能少的内存来实现
互斥锁。这个字段的第一位(最小的一位)来示意这个锁是否被持有,第二位代表是否有
唤醒的 goroutine,第三位代表锁开释处于饥饿状态,残余的位数代表的是期待此锁的 goroutine 数。

sema:

sema是个信号量变量,用来管制期待 goroutine 的阻塞休眠和唤醒。

Lock()

Mutex 能够处于 2 种操作模式:失常模式和饥饿模式。
在失常模式下,waiters按 FIFO 程序排队,然而被唤醒的waiters不领有mutex并与新达到的 goroutines 竞争mutex所有权。新达到的 goroutine 有一个劣势——它们是曾经在 CPU 上运行并且可能有很多,所以被唤醒waiter可能会失败。在这种状况下,唤醒的waiter会在排在期待队列的后面。如果waiter超过 1ms 未能获取mutex,它将mutex切换到饥饿模式。
在饥饿模式下,mutex的所有权间接从解锁 goroutine 移交给队列后面的waiter。
新达到的 goroutine 不会尝试获取互斥锁,即便它看起来已解锁,也不会尝试自旋。相同,他们将本人排在期待队列的尾部。
如果waiter收到mutex的所有权并看到

(1) 它是队列中的最初一个期待者(2) 它期待的工夫不到 1ms,

它将互斥锁切换回失常操作模式。
失常模式具备更好的性能,因为即便有阻塞的waiter,goroutine 能够间断屡次取得mutex。
饥饿模式对于避免尾部提早的病理状况很重要。

func (m *Mutex) Lock() {    // Fast path: grab unlocked mutex.    // cas获取到锁    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {        if race.Enabled {            race.Acquire(unsafe.Pointer(m))        }        return    }    // Slow path (outlined so that the fast path can be inlined)    // 尝试自旋竞争或饥饿状态下饥饿goroutine竞争    m.lockSlow()}func (m *Mutex) lockSlow() {    // 记录此 goroutine 申请锁的初始工夫    var waitStartTime int64    // 此goroutine的饥饿标记    starving := false    // 唤醒标记    awoke := false    // 自旋次数    iter := 0    // 以后锁的状态    old := m.state    for {        // Don't spin in starvation mode, ownership is handed off to waiters        // so we won't be able to acquire the mutex anyway.        // 锁是非饥饿状态,锁未开释,自旋尝试获取锁        // 如果无奈间接获取锁,进行屡次自旋尝试;屡次尝试失败,进入sema队列休眠        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {            // Active spinning makes sense.            // Try to set mutexWoken flag to inform Unlock            // to not wake other blocked goroutines.            // 尝试设置 mutexWoken 标记以告诉 Unlock 不要唤醒其余阻塞的 goroutine。            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {                awoke = true            }            runtime_doSpin()            iter++             // 再次获取锁的状态,之后会查看是否锁被开释了            old = m.state            continue        }        new := old        // Don't try to acquire starving mutex, new arriving goroutines must queue.        // 不要尝试获取饥饿的互斥锁,新达到的 goroutine 必须排队。        if old&mutexStarving == 0 {            // 非饥饿状态,加锁            new |= mutexLocked        }        if old&(mutexLocked|mutexStarving) != 0 {            // waiter数量加1            new += 1 << mutexWaiterShift        }        // The current goroutine switches mutex to starvation mode.        // But if the mutex is currently unlocked, don't do the switch.        // Unlock expects that starving mutex has waiters, which will not        // be true in this case.         // 以后的 goroutine 将 mutex 切换到饥饿模式。        if starving && old&mutexLocked != 0 {             // 设置饥饿状态            new |= mutexStarving        }        if awoke {            // The goroutine has been woken from sleep,            // so we need to reset the flag in either case.             // goroutine 曾经从睡眠中唤醒,            // 所以咱们须要在任何一种状况下重置标记。            if new&mutexWoken == 0 {                throw("sync: inconsistent mutex state")            }             // 新状态革除唤醒标记            new &^= mutexWoken        }         // cas胜利设置新状态        if atomic.CompareAndSwapInt32(&m.state, old, new) {             // 原来锁的状态已开释,并且不是饥饿状态,失常申请到了锁,返回            if old&(mutexLocked|mutexStarving) == 0 {                break // locked the mutex with CAS            }            // If we were already waiting before, queue at the front of the queue.             // 如果咱们之前曾经在期待,请在队列的后面排队。             // 判断是否第一次退出到 waiter 队列            queueLifo := waitStartTime != 0            if waitStartTime == 0 {                waitStartTime = runtime_nanotime()            }             // 阻塞期待             // 将此 waiter 退出到队列,如果是首次,退出到队尾,先进先出。如果不是首次,那么退出到队首,这样期待最久的 goroutine 优先可能获取到锁。此 goroutine 会进行休眠。            runtime_SemacquireMutex(&m.sema, queueLifo, 1)             // 唤醒之后查看锁是否应该处于饥饿状态,此时曾经被唤醒             // 查看等待时间是否大于1ms            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs            old = m.state             // 如果锁曾经处于饥饿状态,间接抢到锁,返回            if old&mutexStarving != 0 {                // If this goroutine was woken and mutex is in starvation mode,                // ownership was handed off to us but mutex is in somewhat                // inconsistent state: mutexLocked is not set and we are still                // accounted as waiter. Fix that.                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {                    throw("sync: inconsistent mutex state")                }                  // 加锁,waiter数减1                delta := int32(mutexLocked - 1<<mutexWaiterShift)                if !starving || old>>mutexWaiterShift == 1 {                    // Exit starvation mode.                    // Critical to do it here and consider wait time.                    // Starvation mode is so inefficient, that two goroutines                    // can go lock-step infinitely once they switch mutex                    // to starvation mode.                      // 退出饥饿模式。                      // 此 waiter 曾经是队列中的最初一个 waiter 了,没有其它的期待锁的 goroutine 了;                    // 此 waiter 的等待时间小于 1 毫秒。                    delta -= mutexStarving                }                atomic.AddInt32(&m.state, delta)                break            }            awoke = true            iter = 0        } else {            old = m.state        }    }    if race.Enabled {        race.Acquire(unsafe.Pointer(m))    }}

Unlock()

func (m *Mutex) Unlock() {    if race.Enabled {        _ = m.state        race.Release(unsafe.Pointer(m))    }    // Fast path: drop lock bit.    new := atomic.AddInt32(&m.state, -mutexLocked)    if new != 0 {        // Outlined slow path to allow inlining the fast path.        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.        m.unlockSlow(new)    }}func (m *Mutex) unlockSlow(new int32) {    if (new+mutexLocked)&mutexLocked == 0 {        fatal("sync: unlock of unlocked mutex")    }    if new&mutexStarving == 0 {        old := new        for {            // If there are no waiters or a goroutine has already            // been woken or grabbed the lock, no need to wake anyone.            // In starvation mode ownership is directly handed off from unlocking            // goroutine to the next waiter. We are not part of this chain,            // since we did not observe mutexStarving when we unlocked the mutex above.            // So get off the way.             // 如果 Mutex 处于失常状态,如果没有 waiter,或者曾经有在解决的状况了,那么开释就好,不做额定的解决。            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {                return            }            // Grab the right to wake someone.             // waiter 数减 1,设置mutexWoken 标记,通过 CAS 更新 state 的值。            new = (old - 1<<mutexWaiterShift) | mutexWoken            if atomic.CompareAndSwapInt32(&m.state, old, new) {                runtime_Semrelease(&m.sema, false, 1)                return            }            old = m.state        }    } else {        // Starving mode: handoff mutex ownership to the next waiter, and yield        // our time slice so that the next waiter can start to run immediately.        // Note: mutexLocked is not set, the waiter will set it after wakeup.        // But mutex is still considered locked if mutexStarving is set,        // so new coming goroutines won't acquire it.         // 如果 Mutex 处于饥饿状态,间接唤醒期待队列中的 waiter。        runtime_Semrelease(&m.sema, true, 1)    }}

TryLock()

当一个 goroutine 调用这个TryLock 办法申请锁的时候,如果这把锁没有被其余 goroutine 所持有,那么,这个goroutine 就持有了这把锁,并返回 true;如果这把锁曾经被其余 goroutine 所持有,或者是正在筹备交给某个被唤醒的 goroutine,那么,这个申请锁的 goroutine 就间接返回
false,不会阻塞在办法调用上。

func (m *Mutex) TryLock() bool {    old := m.state    if old&(mutexLocked|mutexStarving) != 0 {        return false    }    // There may be a goroutine waiting for the mutex, but we are    // running now and can try to grab the mutex before that    // goroutine wakes up.    if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {        return false    }    if race.Enabled {        race.Acquire(unsafe.Pointer(m))    }    return true}