共计 8803 个字符,预计需要花费 23 分钟才能阅读完成。
WaitGroup 底层
回顾一下 Go 的并发管制,其实大略有以下几种管制伎俩
- 全局变量(不优雅)
- wg 只有三个办法,不便简略
- channel 有三种 panic 的状况,且操作比较复杂
- context 官网举荐,在带层级的 goroutine 中有奇效,比方 goroutine 嵌套着 goroutine;而且可能在不同的 goroutine 之间进行值传递,不便
sync.WaitGroup()就是解决 go 中并发时,多个 goroutine 同步的问题
用法如下
- 主 goroutine 通过 wg.Add(i)让设置须要启动的 goroutine 数量
- 子 goroutine 之间通过 wg.Done()来示意子 goroutine 执行结束,Add(-1)
-
主 goroutine 之间通过 Wait()来期待所有的子 goroutine 执行结束
func main() { var wg sync.WaitGroup wg.Add(2) go func() {defer wg.Done() fmt.Println("here1") }() go func() {defer wg.Done() fmt.Println("here2") }() wg.Wait() fmt.Println("finish") } // 输入 // here2 // here1 // finish 因为这里用了 defer 所以答案是确定的 先进后出 必先打印 here2
底层
//https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go // A WaitGroup waits for a collection of goroutines to finish. // The main goroutine calls Add to set the number of // goroutines to wait for. Then each of the goroutines // runs and calls Done when finished. At the same time, // Wait can be used to block until all goroutines have finished. // // A WaitGroup must not be copied after first use. type WaitGroup struct { noCopy noCopy // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. // 64-bit atomic operations require 64-bit alignment, but 32-bit // compilers do not ensure it. So we allocate 12 bytes and then use // the aligned 8 bytes in them as state, and the other 4 as storage // for the sema. state1 [3]uint32 }
第一个字段:noCopy
Go 中检测禁止复制的技术,如果有复制行为,那么就认为违规,在编译阶段禁止赋值和复制 wg 实例,因为 wg 中有一个计数器,多个 goroutine 会并发的批改该计数器。在应用 wg 的时候,倡议依照官网文档的要求,应用指针传递 wg 实例,同时如果 wg 作为构造体的成员,也须要显式地定义一个 nocopy 字段,以防止在构造体复制时导致竞态问题
第二个字段:state1 数组
这是一个长度为 3 的数组,蕴含了 wg 应用到的三种数据:counter、waiter、semaphore
三个元素的具体作用如下
- counter:计数器,用来计算要执行的协程 g 的数量,示意以后要执行的 goroutine 个数,wg.Add(i)时,counter+=i,wg.Done()时,count-=1
- waiter:计数器,示意曾经调用 Wait()函数的 goroutine-group 个数,也就是须要完结的 goroutine 组数,当 wg.Wait()的时候,waiter+=1,并且挂起以后的 goroutine
- semaphore:go runtime 外部的信号量实现,会用到以下两个函数
1.runtime_Semacquire 减少一个信号量,并且挂起以后 goroutine
2.runtime_Semrelease 缩小一个信号量,并唤醒 semaphore 上其中一个正在期待的字段
Add
// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L53
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state() // 获取 state(counter+waiter)和 semaphore 信号量的指针
... ...
// uint64(delta)<<32 把 delta 左移 32 位,因为 counter 在 statep 的高 32 位
// 而后把 delta 原子的减少到 counter 中
state := atomic.AddUint64(statep, uint64(delta)<<32)
// v => counter, w => waiter
v := int32(state >> 32)// 获取 counter 值
w := uint32(state) // 获取 waiter 值
... ...
//counter 变为负值了,panic 报错
if v < 0 {panic("sync: negative WaitGroup counter")
}
//waiter 不等于 0,阐明曾经执行了 waiter,这时你又调用 Add(),是不容许的
if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
//v->counter,counter>0,阐明还有 goroutine 没执行完,不须要开释信号量,间接返回
//w->waiter, waiter=0,没有期待的 goroutine,不须要开释信号量,间接返回
if v > 0 || w == 0 {return}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
// Add()和 Wait()不能并行操作
// counter==0,也不能执行 Wait()操作
if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
*statep = 0 // 完结了将 counter 清零,上面在开释 waiter 数的信号量
for ; w != 0; w-- {// 循环开释 waiter 个数的信号量
runtime_Semrelease(semap, false, 0)// 一次开释一个信号量,唤醒一个期待者
}
}
能看进去 Add 次要干了两件事
1. 将 delta 值累加到 counter 计数器中
2. 当 counter= 0 的时候,waiter 开释相应的信号量,把期待的 goroutine 全副唤醒,如果 <0,则 panic
Done
// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L97
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}
间接调用 wg.Add(-1),让 counter 的值减一
Wait
// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L103
func (wg *WaitGroup) Wait() {statep, semap := wg.state() // 获取 state(counter+waiter)和 semaphore 信号量的指针
... ...
for {// 死循环
state := atomic.LoadUint64(statep) // 原子的获取 state 值
v := int32(state >> 32) // 获取 counter 值
w := uint32(state) // 获取 waiter 值
if v == 0 {// counter=0,不须要 wait 间接返回
// Counter is 0, no need to wait.
if race.Enabled {race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
... ...
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {// 应用 CAS 累加 wiater
... ...
runtime_Semacquire(semap) // 减少信号量,期待信号量唤醒
// 这时 *statep 还不等于 0,那么应用过程必定有误,间接 panic
if *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")
}
... ...
return
}
}
}
累加 waiter 计数器的值,减少信号量,期待唤醒
注意事项
- Add()操作必须早于 Wait()操作
- Done()的次数必须和 Add 的值相等
- 不能让 counter 的值 <0,也就是 Done 不能大于 Add(i)
- Add 和 Wait 不能并行调用,必须一个在子协程一个在主携程
-
如果想反复调用 wg,必须得期待 Wait()执行完后能力进行下一轮调用
锁底层
go 中的 sync 包提供了两种锁的类型,别离是互斥锁
sync.Mutex
和读写锁sync.RWMutex
,这两种锁都属于乐观锁
锁的应用场景是解决多协程下数据竞态的问题,为了保证数据的平安,锁住一些共享资源。以避免并发拜访这些共享数据时可能导致的数据不统一问题,获取锁的线程能够失常拜访临界区,未获取到锁的线程期待锁开释之后能够尝试获取锁
注:当你想让一个构造体是并发平安的,能够加一个锁字段,比方 channel 就是这么做的,要留神的是,这个锁字段必须小写,不然调用方也能够进行 lock 和 unlock 操作,相当于你把钥匙和锁都交给了他人,锁就失去了应有的作用mutex
提供了三个办法
- Lock() 进行加锁操作,在同一个 goroutine 中必须在锁开释之后能力进行再次上锁,不然会 panic
- Unlock() 进行解锁操作,如果这个时候未加锁会 panic,mutex 和 goroutine 不关联,也就是说对于 mutex 的加锁解锁操作能够产生在多个 goroutine 间
- tryLock() 尝试获取锁,当锁被其余 goroutine 占有,或者锁处于饥饿模式,将立即返回 false,当锁可用时尝试获取锁,获取失败也返回 false
实现如下
type Mutex struct {
state int32
sema uint32
}
Mutex 只有两个字段
- state 示意以后互斥锁的状态,复合型字段
- sema 信号量变量,用来管制期待 goroutine 的阻塞休眠和唤醒
state 的不同位标识了不同的状态,以此实现了用最小的内存来示意更多的意义
// 前三个字段标识了锁的状态 剩下的位来标识以后共有多少个 goroutine 在期待锁
const (
mutexLocked = 1 << iota // 示意互斥锁的锁定状态
mutexWoken // 示意从失常模式被从唤醒
mutexStarving // 以后的互斥锁进入饥饿状态
mutexWaiterShift = iota // 以后互斥锁上期待者的数量
)
mutex 的最开始实现只有失常模式,在失常模式下期待的线程依照先进先出的形式获取锁,然而新创建的 goroutine 会与刚被唤醒的 goroutine 竞争,导致刚被唤起的 goroutine 拿不到锁,从而长期被阻塞。
因而 Go 在 1.9
版本中引入了饥饿模式,当 goroutine 超过 1ms 没有获取锁,那么就将以后的互斥锁切换到饥饿模式,在该模式下,互斥锁会间接交给期待队列最后面的 g,新的 g 在该状态下既不能获取锁,也不会进入自旋状态,只会在队列的开端期待。如果一个 g 获取了互斥锁,并且它在队列的开端或者期待的工夫少于 1ms,那么就回到失常模式
加锁
func (m *Mutex) Lock() {
// 判断以后锁的状态,如果锁是齐全闲暇的,即 m.state 为 0,则对其加锁,将 m.state 的值赋为 1
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
........
}
- 通过 CAS 零碎调用判断以后锁的状态,如果是闲暇则 m.state 为 0,这个时候对其加锁,将 m.state 设为 1
- 如果以后锁已被占用,通过 lockSlow 办法尝试自旋或者饥饿状态下的竞争,期待锁的开释
lockSlow:
初始化五个字段
- waitStartTime 用来计算 waiter 的等待时间
- starving 饥饿模式标记,如果等待时间超过 1ms,则为 true
- awoke 协程是否唤醒,当 g 在自旋的时候,相当于 CPU 上曾经有正在等锁的协程,为了防止 mutex 解锁时再唤醒其余协程,自旋时要尝试把 mutex 设为唤醒状态
- iter 用来记录协程的自旋次数
-
old 记录以后锁的状态
判断自旋
for { // 判断是否容许进入自旋 两个条件,条件 1 是以后锁不能处于饥饿状态 // 条件 2 是在 runtime_canSpin 内实现,其逻辑是在多核 CPU 运行,自旋的次数小于 4 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // !awoke 判断以后 goroutine 不是在唤醒状态 // old&mutexWoken == 0 示意没有其余正在唤醒的 goroutine // old>>mutexWaiterShift != 0 示意期待队列中有正在期待的 goroutine // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试将以后锁的低 2 位的 Woken 状态位设置为 1,示意已被唤醒, 这是为了告诉在解锁 Unlock()中不要再唤醒其余的 waiter 了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 设置以后 goroutine 唤醒胜利 awoke = true } // 进行自旋 runtime_doSpin() // 自旋次数 iter++ // 记录以后锁的状态 old = m.state continue } } const active_spin_cnt = 30 func sync_runtime_doSpin() {procyield(active_spin_cnt) } // asm_amd64.s TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
进入自旋的起因:乐观的认为以后正在持有锁的 g 能在短时间内偿还锁,所以须要一些条件来判断:到底能不能短时间偿还
条件如下 - 自旋的次数 <=4
- cpu 必须为多核
- gomaxprocs>1,最大被同时执行的 CPU 数目大于 1
- 以后机器上至多存在一个正在运行的 P 并且解决队列为空
满足条件之后进行循环,次数为 30 次,也就是执行 30 次 PAUSE 指令来占据 CPU,进行自旋
解锁
func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 这里示意解锁了一个没有上锁的锁,则间接产生 panic
if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")
}
// 失常模式的开释锁逻辑
if new&mutexStarving == 0 {
old := new
for {
// 如果没有期待者则间接返回即可
// 如果锁处于加锁的状态,示意曾经有 goroutine 获取到了锁,能够返回
// 如果锁处于唤醒状态,这表明有期待的 goroutine 被唤醒了,不必尝试获取其余 goroutine 了
// 如果锁处于饥饿模式,锁之后会间接给期待队头 goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}
// 抢占唤醒标记位,这里是想要把锁的状态设置为被唤醒,而后 waiter 队列 -1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 抢占胜利唤醒一个 goroutine
runtime_Semrelease(&m.sema, false, 1)
return
}
// 执行抢占不胜利时从新更新一下状态信息,下次 for 循环持续解决
old = m.state
}
} else {
// 饥饿模式开释锁逻辑,间接唤醒期待队列 goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 这里示意解锁了一个没有上锁的锁,则间接产生 panic
if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")
}
// 失常模式的开释锁逻辑
if new&mutexStarving == 0 {
old := new
for {
// 如果没有期待者则间接返回即可
// 如果锁处于加锁的状态,示意曾经有 goroutine 获取到了锁,能够返回
// 如果锁处于唤醒状态,这表明有期待的 goroutine 被唤醒了,不必尝试获取其余 goroutine 了
// 如果锁处于饥饿模式,锁之后会间接给期待队头 goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}
// 抢占唤醒标记位,这里是想要把锁的状态设置为被唤醒,而后 waiter 队列 -1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 抢占胜利唤醒一个 goroutine
runtime_Semrelease(&m.sema, false, 1)
return
}
// 执行抢占不胜利时从新更新一下状态信息,下次 for 循环持续解决
old = m.state
}
} else {
// 饥饿模式开释锁逻辑,间接唤醒期待队列 goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
解锁对于加锁来说简略很多,通过 AddInt32
办法进行疾速解锁,将 m.state 低地位为 0,而后判断值,如果为 0,那么就齐全闲暇了,完结解锁。如果不为 0 阐明以后锁未被占用,不过有期待的 g 未被唤醒,须要进行一系列唤醒操作,唤醒判断锁的状态,而后进行具体的 goroutine 唤醒
非阻塞加锁
func (m *Mutex) TryLock() bool {
// 记录以后状态
old := m.state
// 处于加锁状态 / 饥饿状态间接获取锁失败
if old&(mutexLocked|mutexStarving) != 0 {return false}
// 尝试获取锁,获取失败间接获取失败
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {return false}
return true
}
TryLock 是 Go 1.18
新退出的办法,不被激励应用,次要是两个判断逻辑
- 判断以后锁的状态,如果锁处于加锁状态或者饥饿状态就间接获取锁失败
- 尝试获取锁,如果失败则间接失败
本文由 mdnice 多平台公布