关于golang:Go进阶并发编程Mutex

46次阅读

共计 7513 个字符,预计需要花费 19 分钟才能阅读完成。

互斥锁是并发程序中对临界资源进行访问控制的最根本伎俩,Mutex 即为 Go 语言原生的互斥锁实现。

数据结构

源码包 src/sync/mutex.go 中定义了 Mutex 的数据结构:

type Mutex struct {
    state int32
    sema  uint32
}

state 字段示意互斥锁的状态,是 32 位的整型,外部实现时把该变量分成四局部,用于记录四种状态:

  • Locked: 示意该互斥锁是否已被锁定;
  • Woken: 示意是否从失常模式被唤醒;
  • Starving:示意该 Mutex 是否处于饥饿状态;
  • Waiter: 示意互斥锁上阻塞期待的协程个数。

sema 字段示意信号量,加锁失败的协程阻塞期待该信号量,解锁的协程开释信号量从而唤醒期待信号量的协程。

失常模式和饥饿模式

Mutex 有两种模式——失常模式和饥饿模式,饥饿模式是 1.9 版本中引入的优化,目标是保障互斥锁的公平性,避免协程饿死。默认状况下,Mutex 的模式为失常模式。

在失常模式下,协程如果加锁不胜利不会立刻转入期待队列,而是判断是否满足自旋的条件,如果满足则会自旋。

当持有锁的协程开释锁的时候,会开释一个信号量来唤醒期待队列中的一个协程,但如果有协程正处于自旋过程中,锁往往会被该自旋协程获取到。被唤醒的协程只好再次阻塞,不过阻塞前会判断自上次阻塞到本次阻塞通过了多长时间,如果超过 1ms 的话,会将 Mutex 标记为饥饿模式。

在饥饿模式下,新加锁的协程不会进入自旋状态,它们只会在队列的开端期待,互斥锁被开释后会间接交给期待队列最后面的协程。如果一个协程取得了互斥锁并且它在队列的开端或者它期待的工夫少于 1ms,那么互斥锁就会切换回失常模式。

办法

互斥锁 Mutex 就提供两个办法 Lock 和 Unlock:进入临界区之前调用 Lock 办法,退出临界区的时候调用 Unlock 办法。

Lock

func (m *Mutex) Lock() {if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {return}
    m.lockSlow()}

当锁的状态是 0 时,将 mutexLocked 置成 1,这是最简略的状况。如果互斥锁的状态不是 0 时就会调用 lockSlow 办法,这里将它分成几个局部介绍获取锁的过程:

  1. 判断以后 Goroutine 是否进入自旋;
  2. 通过自旋期待互斥锁的开释;
  3. 计算互斥锁的最新状态;
  4. 更新互斥锁的状态并获取锁。
判断以后 Goroutine 是否进入自旋
func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false // 此 goroutine 的饥饿标记
    awoke := false // 唤醒标记
    iter := 0 // 自旋次数
    old := m.state
    for {if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }

Goroutine 进入自旋的条件十分刻薄:

  • 互斥锁只有在一般模式能力进入自旋;
  • runtime.sync_runtime_canSpin 须要返回 true:

    • 运行在多 CPU 的机器上;
    • 以后 Goroutine 为了获取该锁进入自旋的次数小于 4 次;
    • 以后机器上至多存在一个正在运行的处理器 P 并且解决的运行队列为空。
通过自旋期待互斥锁的开释

一旦以后 Goroutine 可能进入自旋就会调用 runtime.sync_runtime_doSpin 和 runtime.procyield 执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并耗费 CPU 工夫:

func sync_runtime_doSpin() {procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ    again
    RET
计算互斥锁的最新状态

解决了自旋相干的逻辑后,会依据上下文计算以后互斥锁最新的状态。几个不同的条件别离会更新 state 字段中存储的不同信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:

        new := old
        if old&mutexStarving == 0 {new |= mutexLocked // 非饥饿状态,加锁}
        if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift // 加锁或饥饿状态,waiter 数量加 1}
        if starving && old&mutexLocked != 0 {new |= mutexStarving // 设置饥饿状态}
        if awoke {
            if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken // 新状态革除唤醒标记
        }
更新互斥锁的状态并获取锁

计算了新的互斥锁状态之后,会通过 CAS 函数更新状态。如果没有取得锁,会调用 runtime.sync_runtime_SemacquireMutex 通过信号量保障资源不会被两个 Goroutine 获取。runtime.sync_runtime_SemacquireMutex 会在办法中一直尝试获取锁并陷入休眠期待信号量的开释,一旦以后 Goroutine 能够获取信号量,它就会立即返回,继续执行残余代码。

  • 在失常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并从新执行获取锁的循环;
  • 在饥饿模式下,以后 Goroutine 会取得互斥锁,如果期待队列中只存在以后 Goroutine,互斥锁还会从饥饿模式中退出。
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 原来锁的状态已开释,且不是饥饿状态,获取到锁而后返回
            if old&(mutexLocked|mutexStarving) == 0 {break}
            
            // 解决饥饿状态
            
            // 如果之前就在队列外面,退出到队列头
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {waitStartTime = runtime_nanotime()
            }
            // 阻塞期待
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // 唤醒之后查看锁是否应该处于饥饿状态
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 如果锁曾经处于饥饿状态,间接抢到锁,返回
            if old&mutexStarving != 0 {if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")
                }
                // 加锁并且将 waiter 数减 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 革除饥饿标记
                if !starving || old>>mutexWaiterShift == 1 {delta -= mutexStarving}
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {old = m.state}
    }
}

Unlock

    func (m *Mutex) Unlock() {new := atomic.AddInt32(&m.state, -mutexLocked)
        if new != 0 {m.unlockSlow(new)
        }
    }
    
    func (m *Mutex) unlockSlow(new int32) {if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")
        }
        if new&mutexStarving == 0 {
            old := new
            for {if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}
                new = (old - 1<<mutexWaiterShift) | mutexWoken
                if atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)
                    return
                }
                old = m.state
            }
        } else {runtime_Semrelease(&m.sema, true, 1)
        }
    }

互斥锁的解锁过程比较简单,该过程会先应用 sync.atomic.AddInt32 函数疾速解锁,失败后执行慢速解锁过程。unlockSlow 会先校验锁状态的合法性 — 如果以后互斥锁曾经被解锁过了会间接抛出异样。而后依据以后互斥锁的状态,在失常模式和饥饿模式下别离解决:

  • 在失常模式下,上述代码会应用如下所示的处理过程:

    • 如果互斥锁不存在期待者或者互斥锁的 mutexLocked、mutexStarving、mutexWoken 状态不都为 0,那么以后办法能够间接返回,不须要唤醒其余期待者;
    • 如果互斥锁存在期待者,会通过 sync.runtime_Semrelease 唤醒期待者并移交锁的所有权;
  • 在饥饿模式下,上述代码会间接调用 sync.runtime_Semrelease 将以后锁交给下一个正在尝试获取锁的期待者,期待者被唤醒后会失去锁,在这时互斥锁还不会退出饥饿状态。

易错场景

应用 Mutex 常见的谬误场景有 4 类,别离是 Lock/Unlock 不是成对呈现、Copy 已应用的 Mutex、重入和死锁。其余三种比较简单,这里重点介绍一下无关重入的问题。

重入

规范库 Mutex 不是可重入锁,也就是指在一个 goroutine 中不能够屡次获取同一把锁。如果想在 Mutex 的根底上要实现一个可重入锁的话,能够有上面两个计划:

  • 通过 hacker 的形式获取到 goroutine id,记录下获取锁的 goroutine id,它能够实现 Locker 接口。
  • 调用 Lock/Unlock 办法时,由 goroutine 提供一个 token,用来标识它本人,而不是咱们通过 hacker 的形式获取到 goroutine id,然而,这样一来就不满足 Locker 接口。

可重入锁解决了代码重入或者递归调用带来的死锁问题,同时它也带来了另一个益处,就是咱们能够要求,只有持有锁的 goroutine 能力 unlock 这个锁。这也很容易实现,因为在下面这两个计划中,都曾经记录了是哪一个 goroutine 持有这个锁。

计划一:goroutine id

这个计划的要害第一步是获取 goroutine id,形式有两种,别离是简略形式和 hacker 形式。

简略形式,就是通过 runtime.Stack 办法获取栈帧信息,栈帧信息里蕴含 goroutine id。runtime.Stack 办法能够获取以后的 goroutine 信息。

接下来咱们来看 hacker 的形式,咱们获取运行时的 g 指针,反解出对应的 g 的构造。每个运行的 goroutine 构造的 g 指针保留在以后 goroutine 的一个叫做 TLS 对象中。

  1. 咱们先获取到 TLS 对象;
  2. 再从 TLS 中获取 goroutine 构造的 g 指针;
  3. 再从 g 指针中取出 goroutine id。

咱们没有必要反复创造轮子,间接应用第三方的库来获取 goroutine id 就能够了。当初曾经有很多成熟的库了,比方 petermattis/goid。接下来咱们实现一个能够应用的可重入锁:

// RecursiveMutex 包装一个 Mutex, 实现可重入
type RecursiveMutex struct {
    sync.Mutex
    owner     int64 // 以后持有锁的 goroutine id
    recursion int32 // 这个 goroutine 重入的次数
}

func (m *RecursiveMutex) Lock() {gid := goid.Get()
    // 如果以后持有锁的 goroutine 就是这次调用的 goroutine, 阐明是重入
    if atomic.LoadInt64(&m.owner) == gid {
        m.recursion++
        return
    }
    m.Mutex.Lock()
    // 取得锁的 goroutine 第一次调用,记录下它的 goroutine id, 调用次数加 1
    atomic.StoreInt64(&m.owner, gid)
    m.recursion = 1
}

func (m *RecursiveMutex) Unlock() {gid := goid.Get()
    // 非持有锁的 goroutine 尝试开释锁,谬误的应用
    if atomic.LoadInt64(&m.owner) != gid {panic(fmt.Sprintf("wrong the owner(%d): %d!", m.owner, gid))
    }
    // 调用次数减 1
    m.recursion--
    if m.recursion != 0 { // 如果这个 goroutine 还没有齐全开释,则间接返回
        return
    }
    // 此 goroutine 最初一次调用,须要开释锁
    atomic.StoreInt64(&m.owner, -1)
    m.Mutex.Unlock()}
计划二:token

计划一是用 goroutine id 做 goroutine 的标识,咱们也能够让 goroutine 本人来提供标识。不管怎么说,Go 开发者不冀望使用者利用 goroutine id 做一些不确定的货色,所以,他们没有裸露获取 goroutine id 的办法。

咱们能够这么设计,调用者本人提供一个 token,获取锁的时候把这个 token 传入,开释锁的时候也须要把这个 token 传入。通过用户传入的 token 替换计划一中 goroutine id,其它逻辑和计划一统一。

拓展

TryLock

咱们能够为 Mutex 增加一个 TryLock 的办法,也就是尝试获取锁。当一个 goroutine 调用这个 TryLock 办法申请锁的时候,如果这把锁没有被其余 goroutine 所持有,那么,这个 goroutine 就持有了这把锁,并返回 true。如果这把锁曾经被其余 goroutine 所持有,或者是正在筹备交给某个被唤醒的 goroutine,那么就间接返回 false,不会阻塞在办法调用上。

// 复制 Mutex 定义的常量
const (
    mutexLocked = 1 << iota // 加锁标识地位
    mutexWoken              // 唤醒标识地位
    mutexStarving           // 锁饥饿标识地位
    mutexWaiterShift = iota // 标识 waiter 的起始 bit 地位
)

// 扩大一个 Mutex 构造
type Mutex struct {sync.Mutex}

// 尝试获取锁
func (m *Mutex) TryLock() bool {
    // 如果能胜利抢到锁
    if atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), 0, mutexLocked) {return true}
    // 如果处于唤醒、加锁或者饥饿状态,这次申请就不参加竞争了,返回 false
    old := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    if old&(mutexLocked|mutexStarving|mutexWoken) != 0 {return false}
    // 尝试在竞争的状态下申请锁
    new := old | mutexLocked
    return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.Mutex)), old, new)
}

第 17 行是一个 fast path,如果侥幸,没有其余 goroutine 争这把锁,那么,这把锁就会被这个申请的 goroutine 获取,间接返回。

如果锁曾经被其余 goroutine 所持有,或者被其余唤醒的 goroutine 筹备持有,那么,就间接返回 false,不再申请,代码逻辑在第 23 行。

如果没有被持有,也没有其它唤醒的 goroutine 来竞争锁,锁也不处于饥饿状态,就尝试获取这把锁(第 29 行),不管是否胜利都将后果返回。因为,这个时候,可能还有其余的 goroutine 也在竞争这把锁,所以,不能保障胜利获取这把锁。

获取期待者的数量等指标

Mutex 的数据结构蕴含两个字段:state 和 sema。前四个字节(int32)就是 state 字段。Mutex 构造中的 state 字段有很多个含意,通过 state 字段,能够晓得锁是否曾经被某个 goroutine 持有、以后是否处于饥饿状态、是否有期待的 goroutine 被唤醒、期待者的数量等信息。然而,state 这个字段并没有裸露进去,怎么获取未裸露的字段呢?很简略,咱们能够通过 unsafe 的形式实现。

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
)

type Mutex struct {sync.Mutex}

func (m *Mutex) Count() int {
    // 获取 state 字段的值
    v := atomic.LoadInt32((*int32)(unsafe.Pointer(&m.Mutex)))
    v = v >> mutexWaiterShift // 失去期待者的数值
    v = v + (v & mutexLocked) // 再加上锁持有者的数量,0 或者 1
    return int(v)
}

这个例子的第 14 行通过 unsafe 操作,咱们能够失去 state 字段的值。在第 15 行通过右移三位(这里的常量 mutexWaiterShift 的值为 3),就失去了以后期待者的数量。如果以后的锁曾经被其余 goroutine 持有,那么,咱们就略微调整一下这个值,加上一个 1(第 16 行),基本上能够把它看作是以后持有和期待这把锁的 goroutine 的总数。

正文完
 0