WaitGroup底层
回顾一下Go的并发管制,其实大略有以下几种管制伎俩
- 全局变量(不优雅)
- wg 只有三个办法,不便简略
- channel 有三种panic的状况,且操作比较复杂
- context 官网举荐,在带层级的goroutine中有奇效,比方goroutine嵌套着goroutine;而且可能在不同的goroutine之间进行值传递,不便
sync.WaitGroup()就是解决go中并发时,多个goroutine同步的问题
用法如下
- 主goroutine通过wg.Add(i)让设置须要启动的goroutine数量
- 子goroutine之间通过wg.Done()来示意子goroutine执行结束,Add(-1)
主goroutine之间通过Wait()来期待所有的子goroutine执行结束
func main() { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() fmt.Println("here1") }() go func() { defer wg.Done() fmt.Println("here2") }() wg.Wait() fmt.Println("finish") }//输入// here2// here1 // finish因为这里用了defer 所以答案是确定的 先进后出 必先打印here2
底层
//https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go// A WaitGroup waits for a collection of goroutines to finish.// The main goroutine calls Add to set the number of// goroutines to wait for. Then each of the goroutines// runs and calls Done when finished. At the same time,// Wait can be used to block until all goroutines have finished.//// A WaitGroup must not be copied after first use.type WaitGroup struct { noCopy noCopy // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. // 64-bit atomic operations require 64-bit alignment, but 32-bit // compilers do not ensure it. So we allocate 12 bytes and then use // the aligned 8 bytes in them as state, and the other 4 as storage // for the sema. state1 [3]uint32}
第一个字段:noCopy
Go中检测禁止复制的技术,如果有复制行为,那么就认为违规,在编译阶段禁止赋值和复制wg实例,因为wg中有一个计数器,多个goroutine会并发的批改该计数器。在应用wg的时候,倡议依照官网文档的要求,应用指针传递wg实例,同时如果wg作为构造体的成员,也须要显式地定义一个nocopy字段,以防止在构造体复制时导致竞态问题
第二个字段:state1数组
这是一个长度为3的数组,蕴含了wg应用到的三种数据:counter、waiter、semaphore
三个元素的具体作用如下
- counter:计数器,用来计算要执行的协程g的数量,示意以后要执行的goroutine个数,wg.Add(i)时,counter+=i,wg.Done()时,count-=1
- waiter:计数器,示意曾经调用Wait()函数的goroutine-group个数,也就是须要完结的goroutine组数,当wg.Wait()的时候,waiter+=1,并且挂起以后的goroutine
- semaphore:go runtime外部的信号量实现,会用到以下两个函数
1.runtime_Semacquire 减少一个信号量,并且挂起以后goroutine
2.runtime_Semrelease 缩小一个信号量,并唤醒semaphore上其中一个正在期待的字段
Add
// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L53func (wg *WaitGroup) Add(delta int) { statep, semap := wg.state() // 获取state(counter+waiter)和semaphore信号量的指针 ... ... // uint64(delta)<<32 把 delta 左移32位,因为counter在statep的高32位 // 而后把delta原子的减少到counter中 state := atomic.AddUint64(statep, uint64(delta)<<32) // v => counter, w => waiter v := int32(state >> 32)//获取counter值 w := uint32(state) //获取waiter值 ... ... //counter变为负值了,panic报错 if v < 0 { panic("sync: negative WaitGroup counter") } //waiter不等于0,阐明曾经执行了waiter,这时你又调用Add(),是不容许的 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } //v->counter,counter>0,阐明还有goroutine没执行完,不须要开释信号量,间接返回 //w->waiter, waiter=0,没有期待的goroutine,不须要开释信号量,间接返回 if v > 0 || w == 0 { return } // This goroutine has set counter to 0 when waiters > 0. // Now there can't be concurrent mutations of state: // - Adds must not happen concurrently with Wait, // - Wait does not increment waiters if it sees counter == 0. // Still do a cheap sanity check to detect WaitGroup misuse. // Add()和Wait()不能并行操作 // counter==0,也不能执行Wait()操作 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } *statep = 0 // 完结了将counter清零,上面在开释waiter数的信号量 for ; w != 0; w-- {// 循环开释waiter个数的信号量 runtime_Semrelease(semap, false, 0)// 一次开释一个信号量,唤醒一个期待者 }}
能看进去Add次要干了两件事
1.将delta值累加到counter计数器中
2.当counter=0的时候,waiter开释相应的信号量,把期待的goroutine全副唤醒,如果<0,则panic
Done
// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L97// Done decrements the WaitGroup counter by one.func (wg *WaitGroup) Done() { wg.Add(-1)}
间接调用wg.Add(-1),让counter的值减一
Wait
// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L103func (wg *WaitGroup) Wait() { statep, semap := wg.state() //获取state(counter+waiter)和semaphore信号量的指针 ... ... for {// 死循环 state := atomic.LoadUint64(statep) //原子的获取state值 v := int32(state >> 32) // 获取counter值 w := uint32(state) //获取waiter值 if v == 0 {// counter=0,不须要wait间接返回 // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } ... ... // Increment waiters count. if atomic.CompareAndSwapUint64(statep, state, state+1) {// 应用CAS累加wiater ... ... runtime_Semacquire(semap) //减少信号量,期待信号量唤醒 // 这时 *statep 还不等于 0,那么应用过程必定有误,间接 panic if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } ... ... return } }}
累加waiter计数器的值,减少信号量,期待唤醒
注意事项
- Add()操作必须早于Wait()操作
- Done()的次数必须和Add的值相等
- 不能让counter的值<0,也就是Done不能大于Add(i)
- Add和Wait不能并行调用,必须一个在子协程一个在主携程
如果想反复调用wg,必须得期待Wait()执行完后能力进行下一轮调用
锁底层
go中的sync包提供了两种锁的类型,别离是互斥锁
sync.Mutex
和读写锁sync.RWMutex
,这两种锁都属于乐观锁
锁的应用场景是解决多协程下数据竞态的问题,为了保证数据的平安,锁住一些共享资源。以避免并发拜访这些共享数据时可能导致的数据不统一问题,获取锁的线程能够失常拜访临界区,未获取到锁的线程期待锁开释之后能够尝试获取锁
注:当你想让一个构造体是并发平安的,能够加一个锁字段,比方channel就是这么做的,要留神的是,这个锁字段必须小写,不然调用方也能够进行lock和unlock操作,相当于你把钥匙和锁都交给了他人,锁就失去了应有的作用mutex
提供了三个办法
- Lock() 进行加锁操作,在同一个goroutine中必须在锁开释之后能力进行再次上锁,不然会panic
- Unlock() 进行解锁操作,如果这个时候未加锁会panic,mutex和goroutine不关联,也就是说对于mutex的加锁解锁操作能够产生在多个goroutine间
- tryLock() 尝试获取锁,当锁被其余goroutine占有,或者锁处于饥饿模式,将立即返回false,当锁可用时尝试获取锁,获取失败也返回false
实现如下
type Mutex struct { state int32 sema uint32}
Mutex只有两个字段
- state 示意以后互斥锁的状态,复合型字段
- sema 信号量变量,用来管制期待goroutine的阻塞休眠和唤醒
state的不同位标识了不同的状态,以此实现了用最小的内存来示意更多的意义
// 前三个字段标识了锁的状态 剩下的位来标识以后共有多少个goroutine在期待锁const ( mutexLocked = 1 << iota // 示意互斥锁的锁定状态 mutexWoken // 示意从失常模式被从唤醒 mutexStarving // 以后的互斥锁进入饥饿状态 mutexWaiterShift = iota // 以后互斥锁上期待者的数量)
mutex的最开始实现只有失常模式,在失常模式下期待的线程依照先进先出的形式获取锁,然而新创建的goroutine会与刚被唤醒的goroutine竞争,导致刚被唤起的goroutine拿不到锁,从而长期被阻塞。
因而Go在1.9
版本中引入了饥饿模式,当goroutine超过1ms没有获取锁,那么就将以后的互斥锁切换到饥饿模式,在该模式下,互斥锁会间接交给期待队列最后面的g,新的g在该状态下既不能获取锁,也不会进入自旋状态,只会在队列的开端期待。如果一个g获取了互斥锁,并且它在队列的开端或者期待的工夫少于1ms,那么就回到失常模式
加锁
func (m *Mutex) Lock() { // 判断以后锁的状态,如果锁是齐全闲暇的,即m.state为0,则对其加锁,将m.state的值赋为1 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow()}func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state ........}
- 通过CAS零碎调用判断以后锁的状态,如果是闲暇则m.state为0,这个时候对其加锁,将m.state设为1
- 如果以后锁已被占用,通过lockSlow办法尝试自旋或者饥饿状态下的竞争,期待锁的开释
lockSlow:
初始化五个字段
- waitStartTime 用来计算waiter的等待时间
- starving 饥饿模式标记,如果等待时间超过1ms,则为true
- awoke 协程是否唤醒,当g在自旋的时候,相当于CPU上曾经有正在等锁的协程,为了防止mutex解锁时再唤醒其余协程,自旋时要尝试把mutex设为唤醒状态
- iter 用来记录协程的自旋次数
old 记录以后锁的状态
判断自旋
for { // 判断是否容许进入自旋 两个条件,条件1是以后锁不能处于饥饿状态 // 条件2是在runtime_canSpin内实现,其逻辑是在多核CPU运行,自旋的次数小于4 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // !awoke 判断以后goroutine不是在唤醒状态 // old&mutexWoken == 0 示意没有其余正在唤醒的goroutine // old>>mutexWaiterShift != 0 示意期待队列中有正在期待的goroutine // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试将以后锁的低2位的Woken状态位设置为1,示意已被唤醒, 这是为了告诉在解锁Unlock()中不要再唤醒其余的waiter了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { // 设置以后goroutine唤醒胜利 awoke = true } // 进行自旋 runtime_doSpin() // 自旋次数 iter++ // 记录以后锁的状态 old = m.state continue }}const active_spin_cnt = 30func sync_runtime_doSpin() { procyield(active_spin_cnt)}// asm_amd64.sTEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AXagain: PAUSE SUBL $1, AX JNZ again RET
进入自旋的起因:乐观的认为以后正在持有锁的g能在短时间内偿还锁,所以须要一些条件来判断:到底能不能短时间偿还
条件如下- 自旋的次数<=4
- cpu必须为多核
- gomaxprocs>1,最大被同时执行的CPU数目大于1
- 以后机器上至多存在一个正在运行的P并且解决队列为空
满足条件之后进行循环,次数为30次,也就是执行30次PAUSE指令来占据CPU,进行自旋
解锁
func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) }}func (m *Mutex) unlockSlow(new int32) { // 这里示意解锁了一个没有上锁的锁,则间接产生panic if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 失常模式的开释锁逻辑 if new&mutexStarving == 0 { old := new for { // 如果没有期待者则间接返回即可 // 如果锁处于加锁的状态,示意曾经有goroutine获取到了锁,能够返回 // 如果锁处于唤醒状态,这表明有期待的goroutine被唤醒了,不必尝试获取其余goroutine了 // 如果锁处于饥饿模式,锁之后会间接给期待队头goroutine if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 抢占唤醒标记位,这里是想要把锁的状态设置为被唤醒,而后waiter队列-1 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 抢占胜利唤醒一个goroutine runtime_Semrelease(&m.sema, false, 1) return } // 执行抢占不胜利时从新更新一下状态信息,下次for循环持续解决 old = m.state } } else { // 饥饿模式开释锁逻辑,间接唤醒期待队列goroutine runtime_Semrelease(&m.sema, true, 1) }}func (m *Mutex) unlockSlow(new int32) { // 这里示意解锁了一个没有上锁的锁,则间接产生panic if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 失常模式的开释锁逻辑 if new&mutexStarving == 0 { old := new for { // 如果没有期待者则间接返回即可 // 如果锁处于加锁的状态,示意曾经有goroutine获取到了锁,能够返回 // 如果锁处于唤醒状态,这表明有期待的goroutine被唤醒了,不必尝试获取其余goroutine了 // 如果锁处于饥饿模式,锁之后会间接给期待队头goroutine if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 抢占唤醒标记位,这里是想要把锁的状态设置为被唤醒,而后waiter队列-1 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 抢占胜利唤醒一个goroutine runtime_Semrelease(&m.sema, false, 1) return } // 执行抢占不胜利时从新更新一下状态信息,下次for循环持续解决 old = m.state } } else { // 饥饿模式开释锁逻辑,间接唤醒期待队列goroutine runtime_Semrelease(&m.sema, true, 1) }}
解锁对于加锁来说简略很多,通过AddInt32
办法进行疾速解锁,将m.state低地位为0,而后判断值,如果为0,那么就齐全闲暇了,完结解锁。如果不为0阐明以后锁未被占用,不过有期待的g未被唤醒,须要进行一系列唤醒操作,唤醒判断锁的状态,而后进行具体的goroutine唤醒
非阻塞加锁
func (m *Mutex) TryLock() bool { // 记录以后状态 old := m.state // 处于加锁状态/饥饿状态间接获取锁失败 if old&(mutexLocked|mutexStarving) != 0 { return false } // 尝试获取锁,获取失败间接获取失败 if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) { return false } return true}
TryLock是Go 1.18
新退出的办法,不被激励应用,次要是两个判断逻辑
- 判断以后锁的状态,如果锁处于加锁状态或者饥饿状态就间接获取锁失败
- 尝试获取锁,如果失败则间接失败
本文由mdnice多平台公布