互斥锁Mutex
应用锁爱护共享资源
Locker()
是一个interface
。
type Locker interface {
Lock()
Unlock()
}
mutex构造体
type Mutex struct {
state int32
sema uint32
}
state:
state 是一个复合型的字段,一个字段蕴含多个意义,这样能够通过尽可能少的内存来实现
互斥锁。这个字段的第一位(最小的一位)来示意这个锁是否被持有,第二位代表是否有
唤醒的 goroutine,第三位代表锁开释处于饥饿状态,残余的位数代表的是期待此锁的 goroutine 数。
sema:
sema是个信号量变量,用来管制期待 goroutine 的阻塞休眠和唤醒。
Lock()
Mutex 能够处于 2 种操作模式:失常模式和饥饿模式。
在失常模式下,waiters按 FIFO 程序排队,然而被唤醒的waiters不领有mutex并与新达到的 goroutines 竞争mutex所有权。新达到的 goroutine 有一个劣势——它们是曾经在 CPU 上运行并且可能有很多,所以被唤醒waiter可能会失败。在这种状况下,唤醒的waiter会在排在期待队列的后面。如果waiter超过 1ms 未能获取mutex,它将mutex切换到饥饿模式。
在饥饿模式下,mutex的所有权间接从解锁 goroutine 移交给队列后面的waiter。
新达到的 goroutine 不会尝试获取互斥锁,即便它看起来已解锁,也不会尝试自旋。相同,他们将本人排在期待队列的尾部。
如果waiter收到mutex的所有权并看到
(1) 它是队列中的最初一个期待者
(2) 它期待的工夫不到 1ms,
它将互斥锁切换回失常操作模式。
失常模式具备更好的性能,因为即便有阻塞的waiter,goroutine 能够间断屡次取得mutex。
饥饿模式对于避免尾部提早的病理状况很重要。
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// cas获取到锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
// 尝试自旋竞争或饥饿状态下饥饿goroutine竞争
m.lockSlow()
}
func (m *Mutex) lockSlow() {
// 记录此 goroutine 申请锁的初始工夫
var waitStartTime int64
// 此goroutine的饥饿标记
starving := false
// 唤醒标记
awoke := false
// 自旋次数
iter := 0
// 以后锁的状态
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 锁是非饥饿状态,锁未开释,自旋尝试获取锁
// 如果无奈间接获取锁,进行屡次自旋尝试;屡次尝试失败,进入sema队列休眠
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 尝试设置 mutexWoken 标记以告诉 Unlock 不要唤醒其余阻塞的 goroutine。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
// 再次获取锁的状态,之后会查看是否锁被开释了
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 不要尝试获取饥饿的互斥锁,新达到的 goroutine 必须排队。
if old&mutexStarving == 0 {
// 非饥饿状态,加锁
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// waiter数量加1
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 以后的 goroutine 将 mutex 切换到饥饿模式。
if starving && old&mutexLocked != 0 {
// 设置饥饿状态
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
// goroutine 曾经从睡眠中唤醒,
// 所以咱们须要在任何一种状况下重置标记。
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 新状态革除唤醒标记
new &^= mutexWoken
}
// cas胜利设置新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 原来锁的状态已开释,并且不是饥饿状态,失常申请到了锁,返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
// 如果咱们之前曾经在期待,请在队列的后面排队。
// 判断是否第一次退出到 waiter 队列
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞期待
// 将此 waiter 退出到队列,如果是首次,退出到队尾,先进先出。如果不是首次,那么退出到队首,这样期待最久的 goroutine 优先可能获取到锁。此 goroutine 会进行休眠。
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 唤醒之后查看锁是否应该处于饥饿状态,此时曾经被唤醒
// 查看等待时间是否大于1ms
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁曾经处于饥饿状态,间接抢到锁,返回
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 加锁,waiter数减1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
// 退出饥饿模式。
// 此 waiter 曾经是队列中的最初一个 waiter 了,没有其它的期待锁的 goroutine 了;
// 此 waiter 的等待时间小于 1 毫秒。
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
Unlock()
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果 Mutex 处于失常状态,如果没有 waiter,或者曾经有在解决的状况了,那么开释就好,不做额定的解决。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// waiter 数减 1,设置mutexWoken 标记,通过 CAS 更新 state 的值。
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 如果 Mutex 处于饥饿状态,间接唤醒期待队列中的 waiter。
runtime_Semrelease(&m.sema, true, 1)
}
}
TryLock()
当一个 goroutine 调用这个TryLock 办法申请锁的时候,如果这把锁没有被其余 goroutine 所持有,那么,这个goroutine 就持有了这把锁,并返回 true;如果这把锁曾经被其余 goroutine 所持有,或者是正在筹备交给某个被唤醒的 goroutine,那么,这个申请锁的 goroutine 就间接返回
false,不会阻塞在办法调用上。
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// There may be a goroutine waiting for the mutex, but we are
// running now and can try to grab the mutex before that
// goroutine wakes up.
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return true
}
发表回复