WaitGroup 是开发过程中常常应用的并发控制技术,用来在程序中管制期待一组 goroutine 完结。
实现原理
数据结构
WaitGroup 的数据结构包含了一个 noCopy 的辅助字段,一个 state1 记录 WaitGroup 状态的数组:
- noCopy 的辅助字段;
- state1,一个具备复合意义的字段,蕴含 WaitGroup 的计数、阻塞在检查点的 waiter 数和信号量。
type WaitGroup struct {
// 防止复制应用的一个技巧,能够通知 vet 工具违反了复制应用的规定
noCopy noCopy
// 前 64bit(8bytes) 的值分成两段,高 32bit 是计数值,低 32bit 是 waiter 的计数
// 另外 32bit 是用作信号量的
// 因为 64bit 值的原子操作须要 64bit 对齐,然而 32bit 编译器不反对,所以数组中的元素在不同的架构中不一样,具体解决看上面的办法
// 总之,会找到对齐的那 64bit 作为 state,其余的 32bit 做信号量
state1 [3]uint32
}
// 失去 state 的地址和信号量的地址
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// 如果地址是 64bit 对齐的,数组前两个元素做 state,后一个元素做信号量
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 如果地址是 32bit 对齐的,数组后两个元素用来做 state,它能够用来做 64bit 的原子操作,第一个元素 32bit 用来做信号量
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
在 64 位环境下,state1 的第一个元素是 waiter 数,第二个元素是 WaitGroup 的计数值,第三个元素是信号量。
noCopy:辅助 vet 查看
noCopy 字段的作用是批示 vet 工具在做查看的时候,这个数据结构不能做值复制应用。更谨严地说,是不能在第一次应用之后复制应用。
vet 会对实现 Locker 接口的数据类型做动态查看,一旦代码中有复制应用这种数据类型的状况,就会收回正告。然而,WaitGroup 不满足 Locker 接口,这时就能够通过给 WaitGroup 增加一个 noCopy 字段来实现 Locker 接口。而且因为 noCopy 字段是未输入类型,所以 WaitGroup 不会裸露 Lock/Unlock 办法。
如果你想要本人定义的数据结构不被复制应用,或者说,不能通过 vet 工具查看出复制应用的报警,就能够通过嵌入 noCopy 这个数据类型来实现。
办法
Add & Done
Add 办法次要操作的是 state 的计数局部,去除 race 检查和异样查看的代码后,它的实现如下:
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()
// 高 32bit 是计数值 v,所以把 delta 左移 32,减少到计数上
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) // 以后计数值
w := uint32(state) // waiter count
if v > 0 || w == 0 {return}
// 如果计数值 v 为 0 并且 waiter 的数量 w 不为 0,那么 state 的值就是 waiter 的数量
// 将 waiter 的数量设置为 0,因为计数值 v 也是 0, 所以它们俩的组合 *statep 间接设置为 0 即可。此时须要并唤醒所有的 waiter
*statep = 0
for ; w != 0; w-- {runtime_Semrelease(semap, false, 0)
}
}
// Done 办法理论就是计数器减 1
func (wg *WaitGroup) Done() {wg.Add(-1)
}
Wait
Wait 办法的实现逻辑是:一直查看 state 的值。如果其中的计数值变为了 0,那么阐明所有的工作已实现,调用者不用再期待,间接返回。如果计数值大于 0,阐明此时还有工作没实现,那么调用者就变成了期待者,须要退出 waiter 队列,并且阻塞住本人。
其骨干实现代码如下:
func (wg *WaitGroup) Wait() {statep, semap := wg.state()
for {state := atomic.LoadUint64(statep)
v := int32(state >> 32) // 以后计数值
w := uint32(state) // waiter 的数量
if v == 0 {
// 如果计数值为 0, 调用这个办法的 goroutine 不用再期待,继续执行它前面的逻辑即可
return
}
// 否则把 waiter 数量加 1。期间可能有并发调用 Wait 的状况,减少可能会失败,所以最外层应用了一个 for 循环
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 阻塞休眠期待
runtime_Semacquire(semap)
// 被唤醒,不再阻塞,返回
return
}
}
}
常见谬误
计数器设置为负值
WaitGroup 的计数器的值必须大于等于 0。咱们在更改这个计数值的时候,WaitGroup 会先做查看,如果计数值被设置为正数,就会导致 panic。
个别状况下,有两种办法会导致计数器设置为正数:
- 调用 Add 的时候传递一个正数。如果你能保障以后的计数器加上这个正数后还是大于等于 0 的话,也没有问题,否则就会导致 panic。
- 调用 Done 办法的次数过多,超过了 WaitGroup 的计数值。
Add 机会谬误
在应用 WaitGroup 的时候,你肯定要遵循的准则就是, 等所有的 Add 办法调用之后再调用 Wait,否则就可能导致 panic 或者不冀望的后果。
前一个 Wait 还没完结就重用 WaitGroup
只有 WaitGroup 的计数值复原到零值的状态,那么它就能够被看作是新创建的 WaitGroup,被重复使用。然而,如果咱们在 WaitGroup 的计数值还没有复原到零值的时候就重用,就会导致程序 panic。咱们看一个例子,初始设置 WaitGroup 的计数值为 1,启动一个 goroutine 先调用 Done 办法,接着就调用 Add 办法,Add 办法有可能和主 goroutine 并发执行。
func main() {
var wg sync.WaitGroup
wg.Add(1)
go func() {time.Sleep(time.Millisecond)
wg.Done() // 计数器减 1
wg.Add(1) // 计数值加 1
}()
wg.Wait() // 主 goroutine 期待,有可能和第 7 行并发执行}
在这个例子中,第 6 行尽管让 WaitGroup 的计数复原到 0,然而因为第 9 行有个 waiter 在期待,如果期待 Wait 的 goroutine,刚被唤醒就和 Add 调用(第 7 行)有并发执行的抵触,所以就会呈现 panic。
WaitGroup 尽管能够重用,然而是有一个前提的,那就是必须等到上一轮的 Wait 实现之后,能力重用 WaitGroup 执行下一轮的 Add/Wait,如果你在 Wait 还没执行完的时候就调用下一轮 Add 办法,就有可能呈现 panic。