关于go:互斥锁Mutex

1次阅读

共计 5765 个字符,预计需要花费 15 分钟才能阅读完成。

互斥锁 Mutex

应用锁爱护共享资源

Locker()是一个interface

type Locker interface {Lock()
    Unlock()}

mutex 构造体

type Mutex struct {
    state int32
    sema  uint32
}

state:

state 是一个复合型的字段,一个字段蕴含多个意义,这样能够通过尽可能少的内存来实现
互斥锁。这个字段的第一位(最小的一位)来示意这个锁是否被持有,第二位代表是否有
唤醒的 goroutine,第三位代表锁开释处于饥饿状态,残余的位数代表的是期待此锁的 goroutine 数。

sema:

sema 是个信号量变量,用来管制期待 goroutine 的阻塞休眠和唤醒。

Lock()

Mutex 能够处于 2 种操作模式:失常模式和饥饿模式。
在失常模式下,waiters 按 FIFO 程序排队,然而被唤醒的 waiters 不领有 mutex 并与新达到的 goroutines 竞争 mutex 所有权。新达到的 goroutine 有一个劣势——它们是曾经在 CPU 上运行并且可能有很多,所以被唤醒 waiter 可能会失败。在这种状况下,唤醒的 waiter 会在排在期待队列的后面。如果 waiter 超过 1ms 未能获取 mutex,它将 mutex 切换到饥饿模式。
在饥饿模式下,mutex 的所有权间接从解锁 goroutine 移交给队列后面的 waiter。
新达到的 goroutine 不会尝试获取互斥锁,即便它看起来已解锁,也不会尝试自旋。相同,他们将本人排在期待队列的尾部。
如果 waiter 收到 mutex 的所有权并看到

(1) 它是队列中的最初一个期待者
(2) 它期待的工夫不到 1ms,

它将互斥锁切换回失常操作模式。
失常模式具备更好的性能,因为即便有阻塞的 waiter,goroutine 能够间断屡次取得 mutex。
饥饿模式对于避免尾部提早的病理状况很重要。

func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    // cas 获取到锁
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    // 尝试自旋竞争或饥饿状态下饥饿 goroutine 竞争
    m.lockSlow()}

func (m *Mutex) lockSlow() {
    // 记录此 goroutine 申请锁的初始工夫
    var waitStartTime int64
    // 此 goroutine 的饥饿标记
    starving := false
    // 唤醒标记
    awoke := false
    // 自旋次数
    iter := 0
    // 以后锁的状态
    old := m.state
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        // 锁是非饥饿状态,锁未开释,自旋尝试获取锁
        // 如果无奈间接获取锁,进行屡次自旋尝试; 屡次尝试失败,进入 sema 队列休眠
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            // 尝试设置 mutexWoken 标记以告诉 Unlock 不要唤醒其余阻塞的 goroutine。if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}
            runtime_doSpin()
            iter++
             // 再次获取锁的状态,之后会查看是否锁被开释了
            old = m.state
            continue
        }
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        // 不要尝试获取饥饿的互斥锁,新达到的 goroutine 必须排队。if old&mutexStarving == 0 {
            // 非饥饿状态,加锁
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            // waiter 数量加 1
            new += 1 << mutexWaiterShift
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
         // 以后的 goroutine 将 mutex 切换到饥饿模式。if starving && old&mutexLocked != 0 {
             // 设置饥饿状态
            new |= mutexStarving
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
             // goroutine 曾经从睡眠中唤醒,// 所以咱们须要在任何一种状况下重置标记。if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")
            }
             // 新状态革除唤醒标记
            new &^= mutexWoken
        }
         // cas 胜利设置新状态
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
             // 原来锁的状态已开释,并且不是饥饿状态,失常申请到了锁,返回
            if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}
            // If we were already waiting before, queue at the front of the queue.
             // 如果咱们之前曾经在期待,请在队列的后面排队。// 判断是否第一次退出到 waiter 队列
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {waitStartTime = runtime_nanotime()
            }
             // 阻塞期待
             // 将此 waiter 退出到队列,如果是首次,退出到队尾,先进先出。如果不是首次,那么退出到队首,这样期待最久的 goroutine 优先可能获取到锁。此 goroutine 会进行休眠。runtime_SemacquireMutex(&m.sema, queueLifo, 1)
             // 唤醒之后查看锁是否应该处于饥饿状态, 此时曾经被唤醒
             // 查看等待时间是否大于 1ms
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
             // 如果锁曾经处于饥饿状态,间接抢到锁,返回
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")
                }
                  // 加锁,waiter 数减 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                      // 退出饥饿模式。// 此 waiter 曾经是队列中的最初一个 waiter 了,没有其它的期待锁的 goroutine 了;// 此 waiter 的等待时间小于 1 毫秒。delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {old = m.state}
    }

    if race.Enabled {race.Acquire(unsafe.Pointer(m))
    }
}

Unlock()

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

func (m *Mutex) unlockSlow(new int32) {if (new+mutexLocked)&mutexLocked == 0 {fatal("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
             // 如果 Mutex 处于失常状态,如果没有 waiter,或者曾经有在解决的状况了,那么开释就好,不做额定的解决。if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}
            // Grab the right to wake someone.
             // waiter 数减 1,设置 mutexWoken 标记, 通过 CAS 更新 state 的值。new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter, and yield
        // our time slice so that the next waiter can start to run immediately.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
         // 如果 Mutex 处于饥饿状态,间接唤醒期待队列中的 waiter。runtime_Semrelease(&m.sema, true, 1)
    }
}

TryLock()

当一个 goroutine 调用这个 TryLock 办法申请锁的时候,如果这把锁没有被其余 goroutine 所持有,那么,这个 goroutine 就持有了这把锁,并返回 true;如果这把锁曾经被其余 goroutine 所持有,或者是正在筹备交给某个被唤醒的 goroutine,那么,这个申请锁的 goroutine 就间接返回
false,不会阻塞在办法调用上。

func (m *Mutex) TryLock() bool {
    old := m.state
    if old&(mutexLocked|mutexStarving) != 0 {return false}

    // There may be a goroutine waiting for the mutex, but we are
    // running now and can try to grab the mutex before that
    // goroutine wakes up.
    if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {return false}

    if race.Enabled {race.Acquire(unsafe.Pointer(m))
    }
    return true
}
正文完
 0