共计 9599 个字符,预计需要花费 24 分钟才能阅读完成。
0 常识背景
在进入注释前,先对 WaitGroup
及其相干背景常识做个简略的介绍,这里次要是 WaitGroup
的根本应用,以及零碎信号量的基础知识。对这些比拟相熟的小伙伴能够间接跳过这一节。
0.1 WaitGroup
WaitGroup
是 Golang 中最常见的并发控制技术之一,它的作用咱们能够简略类比为其余语言中多线程并发管制中的 join()
,实例代码如下:
package main
import (
"fmt"
"sync"
"time"
)
func main() {fmt.Println("Main starts...")
var wg sync.WaitGroup
// 2 指的是上面有两个协程须要期待
wg.Add(2)
go waitFunc(&wg, 3)
go waitFunc(&wg, 1)
// 阻塞期待
wg.Wait()
fmt.Println("Main ends...")
}
func waitFunc(wg *sync.WaitGroup, num int) {
// 函数完结时告知 WaitGroup 本人曾经完结
defer wg.Done()
time.Sleep(time.Duration(num) * time.Second)
fmt.Printf("Hello World from %v\n", num)
}
// 后果输入:Main starts...
Hello World from 1
Hello World from 3
Main ends...
如果这里没有 WaitGroup
,主协程(main 函数)会间接跑到最初的 Main ends...
,而没有两头两个 goroutine 的输入,加了 WaitGroup
后,main 就会在 wg.Wait()
处阻塞期待两个协程都完结后才继续执行。
下面咱们看到的 WaitGroup
的三个办法:Wait()
、Add(int)
和 Done()
也是 WaitGroup
对象仅有的三个办法。
0.2 信号量(Semaphore)
信号量(Semaphore)是一种用于实现多过程或多线程之间同步和互斥的机制,也是 WaitGroup
中所采纳的技术。并且 WaitGroup
本身的同步原理,也与信号量很类似。
因为翻译问题,不相熟的小伙伴常常将信号量(Semaphore)和信号(Signal)搞混,这俩实际上是两个齐全不同的货色。Semaphore 在英文中的本意是 旗语,也就是航海畛域的那个旗语,利用手旗或旗号传递信号的沟通形式。在计算机领域,Semaphore,即信号量,在狭义上也能够了解为一种过程、线程间的通信形式,但它的次要作用,正如后面所说,是用于实现过程、线程间的同步和互斥。
信号量实质上能够简略了解为一个整型数,次要蕴含两种操作:P(Proberen,测试)操作和 V(Verhogen,减少)操作。其中,P 操作会尝试获取一个信号量,如果信号量的值大于 0,则将信号量的值减 1 并继续执行;否则,以后过程或线程就会被阻塞,直到有其余过程或线程开释这个信号量为止。V 操作则是开释一个信号量,将信号量的值加 1。
能够把信号量看作是一种相似锁的货色,P 操作相当于获取锁,而 V 操作相当于开释锁。因为信号量是一种操作系统级别的机制,通常由内核提供反对,因而咱们不必放心上述对信号量的操作自身会产生竞态条件,置信内核能搞定这种货色。
本文的重点不是信号量,因而不会过多开展对于信号量的技术细节,有趣味的小伙伴能够查阅相干材料。
最初提一嘴技术之外的货色,Proberen 和 Verhogen 这俩单词眼熟吧?因为它们是荷兰语,不是英语。为啥是荷兰语嘞?因为创造信号量的人,是上古计算机大神,来自荷兰的计算机先驱 Edsger W. Dijkstra 学生。嗯,对,就是那个 Dijkstra。
1 WaitGroup 底层原理
申明:本文所用源码均基于 Go 1.20.3 版本,不同版本 Go 的 WaitGroup
源码可能略有不同,但设计思维根本是统一的。
WaitGroup
相干源码十分短,加上正文和空行也只有 120 多行,它们全都在 src/sync/waitgroup.go
中。
1.1 定义
先来看 WaitGroup
的定义,这里我把源文件中的正文都简略翻译了一下:
// WaitGroup 期待一组 Goroutine 实现。// 主 Goroutine 调用 Add 办法设置要期待的 Goroutine 数量,// 而后每个 Goroutine 运行并在实现后调用 Done 办法。// 同时,能够应用 Wait 办法阻塞,直到所有 Goroutine 实现。//
// WaitGroup 在第一次应用后不能被复制。//
// 依据 Go 内存模型的术语,Done 调用“同步于”任何它解除阻塞的 Wait 调用的返回。type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // 高 32 位是计数器, 低 32 位是期待者数量(后文解释)。sema uint32
}
WaitGroup
类型是一个构造体,它有三个公有成员,咱们一个一个来看。
1.1.1 noCopy
首先是 noCopy
,这个货色是为了通知编译器,WaitGroup
构造体对象不可复制,即 wg2 := wg
是非法的。之所以禁止复制,是为了避免可能产生的死锁。但实际上如果咱们对 WaitGroup
对象进行复制后,至多在 1.20 版本下,Go 的编译器只是收回正告,没有阻止编译过程,咱们仍然能够编译胜利。正告的内容如下:
assignment copies lock value to wg2: sync.WaitGroup contains sync.noCopy
为什么编译器没有编译失败,我猜应该是 Go 官网想尽量减少编译器对程序的干涉,而更多地交给程序员本人去解决(此时 Rust 收回了一阵笑声)。总之,咱们在应用 WaitGroup
的过程中,不要去复制它就对了,不然非常容易产生死锁(其实构造体正文上也说了,WaitGroup 在第一次应用后不能被复制)。譬如我将文章结尾代码中的 main 函数略微改了改:
func main() {fmt.Println("Main starts...")
var wg sync.WaitGroup
// 2 指的是上面有两个协程须要期待
wg.Add(1)
wg2 := wg
wg2.Add(1)
go waitFunc(&wg, 3)
go waitFunc(&wg2, 1)
// 阻塞期待
wg.Wait()
wg2.Wait()
fmt.Println("Main ends...")
}
// 输入后果
Main starts...
Hello World from 1
Hello World from 3
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc000042060?)
C:/Program Files/Go/src/runtime/sema.go:62 +0x27
sync.(*WaitGroup).Wait(0xe76b28?)
C:/Program Files/Go/src/sync/waitgroup.go:116 +0x4b
main.main()
D:/Codes/Golang/waitgroup/main.go:23 +0x139
exit status 2
为什么会这样?因为 wg 曾经 Add(1)
了,这时咱们复制了 wg 给 wg2,并且是个浅拷贝,意味着 wg2 内实际上曾经是 Add(1)
后的状态了(state 成员保留的状态,即它的值),此时咱们再执行 wg2.Add(1)
,其实相当于执行了两次 wg2.Add(1)
。而前面 waitFunc()
中对 wg2 只进行了一次 Done()
开释操作,main 函数在 wg2.Wait()
时就陷入了有限期待,即 all goroutines are asleep
。等看了前面 Add()
和 Done()
的原理后,再回头来看这段死锁的代码,会更加清晰。
那么这段代码能既复制,又不死锁吗?当然能够,只须要把 wg2 := wg
提到 wg.Add(1)
后面即可。
1.1.2 state atomic.Uint64
state
是 WaitGroup
的外围,它是一个无符号的 64 位整型,并且用的是 atomic
包中的 Uint64
,所以 state
自身是线程平安的。至于 atomic.Uint64
为什么能保障线程平安,因为它应用了 CompareAndSwap(CAS)
操作,而这个操作依赖于 CPU 提供的原子性指令,是 CPU 级的原子操作。
state
的高 32 位是计数器(counter),低 32 位是期待者数量(waiters)。其中计数器其实就是 Add(int)
数量的总和,譬如 Add(1)
后再 Add(2)
,那么这个计数器就是 1 + 2 = 3;而期待数量就是当初有多少 goroutine 在执行 Wait()
期待 WaitGroup
被开释。
1.1.3 sema uint32
这玩意儿就是信号量,它的用法咱们到后文联合代码再讲。
1.2 Add(delta int)
首先是 Add(delta int)
办法。WaitGroup
所有三个办法都没有返回值,并且只有 Add
领有参数,整个设计堪称简洁到了极点。
Add
办法的第一句代码是:
if race.Enabled {
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()}
race.Enabled
是判断以后程序是否开启了竞态条件查看,这个查看是在编译时须要咱们手动指定的:go build -race main.go
,默认状况下并不开启,即 race.Enabled
在默认状况下就是 false
。这段代码里如果程序开启了竞态条件查看,会将其敞开,最初再从新关上。其余无关 race
的细节本文不再探讨,这对咱们了解 WaitGroup
也没有太大影响,将其思考进去反而会减少咱们了解 WaitGroup
外围机制的复杂度,因而后续代码中也会疏忽所有与 race
相干的局部。
Add
办法整顿后的代码如下:
// Add 办法将 delta 值加上计数器,delta 能够为正数。如果计数器变为 0,// 则所有在 Wait 上阻塞的 Goroutine 都会被开释。// 如果计数器变为正数,则 Add 办法会 panic。//
// 留神:当计数器为 0 时调用 delta 值为负数的 Add 办法必须在 Wait 办法之前执行。// 而 delta 值为正数或者 delta 值为负数但计数器大于 0 时,则能够在任何工夫点执行。// 通常状况下,这意味着应该在创立 Goroutine 或其余期待事件的语句之前执行 Add 办法。// 如果一个 WaitGroup 用于期待多组独立的事件,// 那么必须在所有先前的 Wait 调用返回之后再进行新的 Add 调用。// 详见 WaitGroup 示例代码。func (wg *WaitGroup) Add(delta int) {
// 将 int32 的 delta 变成 unint64 后左移 32 位再与 state 累加。// 相当于将 delta 与 state 的高 32 位累加。state := wg.state.Add(uint64(delta) << 32)
// 高 32 位,就是 counter,计数器
v := int32(state >> 32)
// 低 32 位,就是 waiters,期待者数量
w := uint32(state)
// 计数器为正数时间接 panic
if v < 0 {panic("sync: negative WaitGroup counter")
}
// 当 Wait 和 Add 并发执行时,会有概率触发上面的 panic
if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 如果计数器大于 0,或者没有任何期待者,即没有任何 goroutine 在 Wait(),那么就间接返回
if v > 0 || w == 0 {return}
// 当 waiters > 0 时,这个 Goroutine 将计数器设置为 0。// 当初不可能有对状态的并发批改:// - Add 办法不能与 Wait 办法同时执行,// - Wait 不会在看到计数器为 0 时减少期待者。// 依然须要进行简略的健全性查看来检测 WaitGroup 的误用状况。if wg.state.Load() != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 重置 state 为 0
wg.state.Store(0)
// 唤醒所有期待者
for ; w != 0; w-- {
// 应用信号量管制唤醒期待者
runtime_Semrelease(&wg.sema, false, 0)
}
}
这里我将原代码中的正文翻译成了中文,并且本人在每句代码前也都加了正文。
一开始,办法将参数 delta
变成 uint64 后左移 32 位,和 state
相加。因为 state
的高 32 位是这个 WaitGroup
的计数器,所以这里其实就是把计数器进行了累加操作:
state := wg.state.Add(uint64(delta) << 32)
接着,程序会别离取出曾经累加后的计数器 v
,和以后的期待者数量 w
:
v := int32(state >> 32)
w := uint32(state)
而后是几个判断:
// 计数器为正数时间接 panic
if v < 0 {panic("sync: negative WaitGroup counter")
}
// 当 Wait 和 Add 并发执行时,会有概率触发上面的 panic
if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 如果计数器大于 0,或者没有任何期待者,// 即没有任何 goroutine 在 Wait(),那么就间接返回
if v > 0 || w == 0 {return}
正文曾经比拟清晰了,这里次要开展解释一下第二个 if
:if w != 0 && delta > 0 && v == int32(delta)
。
w != 0
意味着以后有 goroutine 在Wait()
;delta > 0
意味着Add()
传入的是正整数,也就是失常调用;v == int32(delta)
意味着累加后的计数器等于传入的delta
,这里最容易想到的合乎这个等式的场景是:原计数器等于 0 时,也就是 wg 第一次应用,或后面的Wait()
曾经全副完结时。
上述三个条件看上去有些抵触:w != 0
示意存在 Wait()
,而 v == int32(delta)
依照剖析应该不存在 Wait()
。再往下剖析,其实应该是 v
在获取的时候不存在 Wait()
,而 w
在获取的时候存在 Wait()
。会有这种可能吗?会!就是并发的时候:以后 goroutine 获取了 v
,而后另一个 goroutine 立即进行了 Wait()
,接着本 goroutine 又获取了 w
,过程如下:
咱们能够用上面这段代码来复现这个 panic
:
func main() {
var wg sync.WaitGroup
// 并发问题不易复现,所以循环屡次
for i := 0; i < 100000; i++ {go addDoneFunc(&wg)
go waitFunc(&wg)
}
wg.Wait()}
func addDoneFunc(wg *sync.WaitGroup) {wg.Add(1)
wg.Done()}
func waitFunc(wg *sync.WaitGroup) {wg.Wait()
}
// 输入后果
panic: sync: WaitGroup misuse: Add called concurrently with Wait
goroutine 71350 [running]:
sync.(*WaitGroup).Add(0x0?, 0xbf8aa5?)
C:/Program Files/Go/src/sync/waitgroup.go:65 +0xce
main.addDoneFunc(0xc1cf66?, 0x0?)
D:/Codes/Golang/waitgroup/main.go:19 +0x1e
created by main.main
D:/Codes/Golang/waitgroup/main.go:11 +0x8f
exit status 2
这段代码可能要多运行几次才会看到上述成果,因为这种并发操作在整个 WaitGroup
的生命周期中会造成好几种 panic
,包含 Wait()
办法中的。
因而,咱们在应用 WaitGroup
的时候该当留神一点:不要在被调用的 goroutine 外部应用 Add
,而该当在里面应用,也就是:
// 正确
wg.Add(1)
go func(wg *sync.WaitGroup) {defer wg.Done()
}(&wg)
wg.Wait()
// 谬误
go func(wg *sync.WaitGroup) {wg.Add(1)
defer wg.Done()}(&wg)
wg.Wait()
从而防止并发导致的异样。
下面三个 if
都完结后,会再次对 state
的一致性进行判断,避免并发异样:
if wg.state.Load() != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
这里 state.Load()
包含前面会呈现的 Store()
都是 atomic.Uint64
的原子操作。
依据后面代码的逻辑,当程序运行到这里时,计数器肯定为 0,而期待者则可能 >= 0,于是代码会执行一次 wg.state.Store(0)
将 state
设为 0,接着执行告诉期待者完结期待的操作:
wg.state.Store(0)
for ; w != 0; w-- {runtime_Semrelease(&wg.sema, false, 0)
}
好了,这里又是让人蛊惑的中央,我第一次看到这段代码时产生了上面几个疑难:
- 为什么
Add
办法会有计数器为 0 的分支逻辑?计数器不是累加的吗? - 为什么要在
Add
中告诉期待者完结,不应该是Done
办法吗? - 那个
runtime_Semrelease(&wg.sema, false, 0)
为什么须要循环w
次?
一个一个来看。
- 为什么
Add
办法会有计数器为 0 的分支逻辑?
首先,依照后面代码的逻辑,只有计数器 v
为 0 的时候,代码才会走到最初两句,而之所以为 0,是因为 Add(delta int)
的参数 delta
是一个 int
,也就是说,delta
能够为正数!那什么时候会传入正数进来呢?Done
的时候。咱们去看 Done()
的代码,会发现它非常简单:
// Done 给 WaitGroup 的计数器减 1。func (wg *WaitGroup) Done() {wg.Add(-1)
}
所以,Done
操作或是咱们手动给 Add
传入正数时,就会进入到 Add
最初几行逻辑,而 Done
自身也意味着以后 goroutine 的 WaitGroup
完结,须要同步给内部的 Wait
让它不再阻塞。
- 为什么要在
Add
中告诉期待者完结,不应该是Done
办法吗?
嗯,这个问题其实在上一个问题曾经一起解决了,因为 Done()
实际上调用了 Add(-1)
。
- 那个
runtime_Semrelease(&wg.sema, false, 0)
为什么须要循环w
次?
这个函数依照字面意思,就是开释信号量。源码在 src/sync/runtime.go
中,函数申明如下:
// Semrelease 函数用于原子地减少 *s 的值,// 并在有期待 Semacquire 函数被阻塞的协程时告诉它们继续执行。// 它旨在作为同步库应用的简略唤醒基元,不应间接应用。// 如果 handoff 参数为 true,则将 count 间接传递给第一个期待者。// skipframes 参数示意在跟踪时要疏忽的帧数,从 runtime_Semrelease 的调用者开始计数。func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
第一个参数就是信号量的值自身,开释时会 +1。
第二个参数 handoff
在我查阅了材料后,依据我的了解,应该是:当 handoff
为 false
时,仅失常唤醒其余期待的协程,然而不会立刻调度被唤醒的协程;而当 handoff
为 true
时,会立即调度被唤醒的协程。
第三个参数 skipframes
,看上去该当也和调度无关,但具体含意我不太确定,这里就不猜了(程度无限,见谅哈)。
依照信号量自身的机制,这里开释时会 +1,同理还存在一个信号量获取函数 runtime_Semacquire(s *uint32)
会在信号量 > 0 时将信号量 -1,否则期待,它会在 Wait()
中被调用。这也是 runtime_Semrelease
须要循环 w
次的起因:因为那 w
个 Wait()
中会调用 runtime_Semacquire
并一直将信号量 -1,也就是减了 w
次,所以两个中央须要对冲一下嘛。
信号量和 WaitGroup
的机制很像,但计数器又是反的,所以这里再多嘴补充几句:
信号量获取时(runtime_Semacquire
),其实就是在阻塞期待,P(Proberen,测试)操作,如果此时信号量 > 0,则获取胜利,并将信号量 -1,否则持续期待;
信号量开释时(runtime_Semrelease
),会把信号量 +1,也就是 V(Verhogen,减少)操作。
1.2 Done()
Done()
办法咱们在下面曾经看到过了:
// Done 给 WaitGroup 的计数器减 1。func (wg *WaitGroup) Done() {wg.Add(-1)
}
1.3 Wait()
同样的,这里我会把与 race
相干的代码都删掉:
// Wait 会阻塞,直到计数器为 0。func (wg *WaitGroup) Wait() {
for {state := wg.state.Load()
v := int32(state >> 32) // 计数器
w := uint32(state) // 期待者数量
if v == 0 {
// 计数器为 0,间接返回。return
}
// 减少期待者数量
if wg.state.CompareAndSwap(state, state+1) {
// 获取信号量
runtime_Semacquire(&wg.sema)
// 这里仍然是为了避免并发问题
if wg.state.Load() != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
比 Add
简略多了,而且有了后面 Add
的简明扼要为根底,Wait
的代码看上去高深莫测。
当计数器为 0,即没有任何 goroutine 调用 Add
时,间接调用 Wait
,没有任何意义,因而间接返回,也不操作信号量。
最初 Wait
也有一个避免并发问题的判断,而这个 panic 同样能够用后面 Add
中的那段并发问题代码复现,大家能够试试。
Wait
中惟一不同的是,它用了一个有限循环 for{}
,为什么?这是因为,wg.state.CompareAndSwap(state, state+1)
这个原子操作因为并发等起因有可能失败,此时就须要从新获取 state
,把整个过程再走一遍。而一旦操作胜利,Wait
会在 runtime_Semacquire(&wg.sema)
处阻塞,直到 Done
操作将计数器减为 0,Add
中开释了信号量。
2 结语
至此,WaitGroup
的源码已全副解析结束。作为 Golang 中最重要的并发组件之一,WaitGroup
的源码竟然只有这么寥寥百行代码,倒是给咱们了解它的原理升高了不少难度。
开文之前我也没想到会写这么多货色,能看到这里的小伙伴们,感激你们的急躁。
自己程度无限,若文中有什么纰漏或谬误,还请大家不吝指出,再次感激!