WaitGroup底层

回顾一下Go的并发管制,其实大略有以下几种管制伎俩

  • 全局变量(不优雅)
  • wg 只有三个办法,不便简略
  • channel 有三种panic的状况,且操作比较复杂
  • context 官网举荐,在带层级的goroutine中有奇效,比方goroutine嵌套着goroutine;而且可能在不同的goroutine之间进行值传递,不便

sync.WaitGroup()就是解决go中并发时,多个goroutine同步的问题
用法如下

  1. 主goroutine通过wg.Add(i)让设置须要启动的goroutine数量
  2. 子goroutine之间通过wg.Done()来示意子goroutine执行结束,Add(-1)
  3. 主goroutine之间通过Wait()来期待所有的子goroutine执行结束

    func main() { var wg sync.WaitGroup  wg.Add(2)  go func() { defer wg.Done()   fmt.Println("here1")   }() go func() {   defer wg.Done()    fmt.Println("here2")  }()   wg.Wait()  fmt.Println("finish")  }//输入// here2// here1 // finish因为这里用了defer 所以答案是确定的 先进后出 必先打印here2

    底层

    //https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go// A WaitGroup waits for a collection of goroutines to finish.// The main goroutine calls Add to set the number of// goroutines to wait for. Then each of the goroutines// runs and calls Done when finished. At the same time,// Wait can be used to block until all goroutines have finished.//// A WaitGroup must not be copied after first use.type WaitGroup struct { noCopy noCopy // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. // 64-bit atomic operations require 64-bit alignment, but 32-bit // compilers do not ensure it. So we allocate 12 bytes and then use // the aligned 8 bytes in them as state, and the other 4 as storage // for the sema. state1 [3]uint32}

第一个字段:noCopy

Go中检测禁止复制的技术,如果有复制行为,那么就认为违规,在编译阶段禁止赋值和复制wg实例,因为wg中有一个计数器,多个goroutine会并发的批改该计数器。在应用wg的时候,倡议依照官网文档的要求,应用指针传递wg实例,同时如果wg作为构造体的成员,也须要显式地定义一个nocopy字段,以防止在构造体复制时导致竞态问题

第二个字段:state1数组

这是一个长度为3的数组,蕴含了wg应用到的三种数据:counter、waiter、semaphore
三个元素的具体作用如下

  • counter:计数器,用来计算要执行的协程g的数量,示意以后要执行的goroutine个数,wg.Add(i)时,counter+=i,wg.Done()时,count-=1
  • waiter:计数器,示意曾经调用Wait()函数的goroutine-group个数,也就是须要完结的goroutine组数,当wg.Wait()的时候,waiter+=1,并且挂起以后的goroutine
  • semaphore:go runtime外部的信号量实现,会用到以下两个函数

1.runtime_Semacquire 减少一个信号量,并且挂起以后goroutine
2.runtime_Semrelease 缩小一个信号量,并唤醒semaphore上其中一个正在期待的字段

Add

// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L53func (wg *WaitGroup) Add(delta int) {    statep, semap := wg.state() // 获取state(counter+waiter)和semaphore信号量的指针        ... ...    // uint64(delta)<<32 把 delta 左移32位,因为counter在statep的高32位    // 而后把delta原子的减少到counter中    state := atomic.AddUint64(statep, uint64(delta)<<32)    // v => counter, w => waiter    v := int32(state >> 32)//获取counter值    w := uint32(state)     //获取waiter值        ... ...    //counter变为负值了,panic报错    if v < 0 {        panic("sync: negative WaitGroup counter")    }    //waiter不等于0,阐明曾经执行了waiter,这时你又调用Add(),是不容许的    if w != 0 && delta > 0 && v == int32(delta) {        panic("sync: WaitGroup misuse: Add called concurrently with Wait")    }    //v->counter,counter>0,阐明还有goroutine没执行完,不须要开释信号量,间接返回    //w->waiter, waiter=0,没有期待的goroutine,不须要开释信号量,间接返回    if v > 0 || w == 0 {        return    }        // This goroutine has set counter to 0 when waiters > 0.    // Now there can't be concurrent mutations of state:    // - Adds must not happen concurrently with Wait,    // - Wait does not increment waiters if it sees counter == 0.    // Still do a cheap sanity check to detect WaitGroup misuse.    // Add()和Wait()不能并行操作    // counter==0,也不能执行Wait()操作    if *statep != state {        panic("sync: WaitGroup misuse: Add called concurrently with Wait")    }        *statep = 0 // 完结了将counter清零,上面在开释waiter数的信号量    for ; w != 0; w-- {// 循环开释waiter个数的信号量        runtime_Semrelease(semap, false, 0)// 一次开释一个信号量,唤醒一个期待者    }}

能看进去Add次要干了两件事
1.将delta值累加到counter计数器中
2.当counter=0的时候,waiter开释相应的信号量,把期待的goroutine全副唤醒,如果<0,则panic

Done

// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L97// Done decrements the WaitGroup counter by one.func (wg *WaitGroup) Done() {    wg.Add(-1)}

间接调用wg.Add(-1),让counter的值减一

Wait

// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L103func (wg *WaitGroup) Wait() {    statep, semap := wg.state() //获取state(counter+waiter)和semaphore信号量的指针    ... ...    for {// 死循环        state := atomic.LoadUint64(statep) //原子的获取state值        v := int32(state >> 32) // 获取counter值        w := uint32(state)      //获取waiter值        if v == 0 {// counter=0,不须要wait间接返回            // Counter is 0, no need to wait.            if race.Enabled {                race.Enable()                race.Acquire(unsafe.Pointer(wg))            }            return        }        ... ...        // Increment waiters count.        if atomic.CompareAndSwapUint64(statep, state, state+1) {// 应用CAS累加wiater            ... ...            runtime_Semacquire(semap) //减少信号量,期待信号量唤醒            // 这时 *statep 还不等于 0,那么应用过程必定有误,间接 panic            if *statep != 0 {                panic("sync: WaitGroup is reused before previous Wait has returned")            }            ... ...            return        }    }}

累加waiter计数器的值,减少信号量,期待唤醒

注意事项

  1. Add()操作必须早于Wait()操作
  2. Done()的次数必须和Add的值相等
  3. 不能让counter的值<0,也就是Done不能大于Add(i)
  4. Add和Wait不能并行调用,必须一个在子协程一个在主携程
  5. 如果想反复调用wg,必须得期待Wait()执行完后能力进行下一轮调用

    锁底层

    go中的sync包提供了两种锁的类型,别离是互斥锁sync.Mutex和读写锁sync.RWMutex,这两种锁都属于乐观锁
    锁的应用场景是解决多协程下数据竞态的问题,为了保证数据的平安,锁住一些共享资源。以避免并发拜访这些共享数据时可能导致的数据不统一问题,获取锁的线程能够失常拜访临界区,未获取到锁的线程期待锁开释之后能够尝试获取锁
    注:当你想让一个构造体是并发平安的,能够加一个锁字段,比方channel就是这么做的,要留神的是,这个锁字段必须小写,不然调用方也能够进行lock和unlock操作,相当于你把钥匙和锁都交给了他人,锁就失去了应有的作用

    mutex

    提供了三个办法

  • Lock() 进行加锁操作,在同一个goroutine中必须在锁开释之后能力进行再次上锁,不然会panic
  • Unlock() 进行解锁操作,如果这个时候未加锁会panic,mutex和goroutine不关联,也就是说对于mutex的加锁解锁操作能够产生在多个goroutine间
  • tryLock() 尝试获取锁,当锁被其余goroutine占有,或者锁处于饥饿模式,将立即返回false,当锁可用时尝试获取锁,获取失败也返回false

实现如下

type Mutex struct {    state int32    sema  uint32}

Mutex只有两个字段

  • state 示意以后互斥锁的状态,复合型字段
  • sema 信号量变量,用来管制期待goroutine的阻塞休眠和唤醒

state的不同位标识了不同的状态,以此实现了用最小的内存来示意更多的意义

// 前三个字段标识了锁的状态  剩下的位来标识以后共有多少个goroutine在期待锁const (   mutexLocked = 1 << iota // 示意互斥锁的锁定状态   mutexWoken // 示意从失常模式被从唤醒   mutexStarving // 以后的互斥锁进入饥饿状态   mutexWaiterShift = iota // 以后互斥锁上期待者的数量)

mutex的最开始实现只有失常模式,在失常模式下期待的线程依照先进先出的形式获取锁,然而新创建的goroutine会与刚被唤醒的goroutine竞争,导致刚被唤起的goroutine拿不到锁,从而长期被阻塞。
因而Go在1.9版本中引入了饥饿模式,当goroutine超过1ms没有获取锁,那么就将以后的互斥锁切换到饥饿模式,在该模式下,互斥锁会间接交给期待队列最后面的g,新的g在该状态下既不能获取锁,也不会进入自旋状态,只会在队列的开端期待。如果一个g获取了互斥锁,并且它在队列的开端或者期待的工夫少于1ms,那么就回到失常模式

加锁

func (m *Mutex) Lock() {    // 判断以后锁的状态,如果锁是齐全闲暇的,即m.state为0,则对其加锁,将m.state的值赋为1    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {        if race.Enabled {            race.Acquire(unsafe.Pointer(m))        }        return    }    // Slow path (outlined so that the fast path can be inlined)    m.lockSlow()}func (m *Mutex) lockSlow() {    var waitStartTime int64     starving := false    awoke := false    iter := 0    old := m.state    ........}
  1. 通过CAS零碎调用判断以后锁的状态,如果是闲暇则m.state为0,这个时候对其加锁,将m.state设为1
  2. 如果以后锁已被占用,通过lockSlow办法尝试自旋或者饥饿状态下的竞争,期待锁的开释

lockSlow:
初始化五个字段

  • waitStartTime 用来计算waiter的等待时间
  • starving 饥饿模式标记,如果等待时间超过1ms,则为true
  • awoke 协程是否唤醒,当g在自旋的时候,相当于CPU上曾经有正在等锁的协程,为了防止mutex解锁时再唤醒其余协程,自旋时要尝试把mutex设为唤醒状态
  • iter 用来记录协程的自旋次数
  • old 记录以后锁的状态

    判断自旋

    for {  // 判断是否容许进入自旋 两个条件,条件1是以后锁不能处于饥饿状态  // 条件2是在runtime_canSpin内实现,其逻辑是在多核CPU运行,自旋的次数小于4      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {    // !awoke 判断以后goroutine不是在唤醒状态    // old&mutexWoken == 0 示意没有其余正在唤醒的goroutine    // old>>mutexWaiterShift != 0 示意期待队列中有正在期待的goroutine    // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试将以后锁的低2位的Woken状态位设置为1,示意已被唤醒, 这是为了告诉在解锁Unlock()中不要再唤醒其余的waiter了          if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&              atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {                  // 设置以后goroutine唤醒胜利        awoke = true          }    // 进行自旋          runtime_doSpin()    // 自旋次数          iter++    // 记录以后锁的状态          old = m.state          continue      }}const active_spin_cnt = 30func sync_runtime_doSpin() {  procyield(active_spin_cnt)}// asm_amd64.sTEXT runtime·procyield(SB),NOSPLIT,$0-0  MOVL    cycles+0(FP), AXagain:  PAUSE  SUBL    $1, AX  JNZ    again  RET

    进入自旋的起因:乐观的认为以后正在持有锁的g能在短时间内偿还锁,所以须要一些条件来判断:到底能不能短时间偿还
    条件如下

  • 自旋的次数<=4
  • cpu必须为多核
  • gomaxprocs>1,最大被同时执行的CPU数目大于1
  • 以后机器上至多存在一个正在运行的P并且解决队列为空

满足条件之后进行循环,次数为30次,也就是执行30次PAUSE指令来占据CPU,进行自旋

解锁

func (m *Mutex) Unlock() {    // Fast path: drop lock bit.    new := atomic.AddInt32(&m.state, -mutexLocked)    if new != 0 {        // Outlined slow path to allow inlining the fast path.        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.        m.unlockSlow(new)    }}func (m *Mutex) unlockSlow(new int32) {  // 这里示意解锁了一个没有上锁的锁,则间接产生panic    if (new+mutexLocked)&mutexLocked == 0 {        throw("sync: unlock of unlocked mutex")    }  // 失常模式的开释锁逻辑    if new&mutexStarving == 0 {        old := new        for {      // 如果没有期待者则间接返回即可      // 如果锁处于加锁的状态,示意曾经有goroutine获取到了锁,能够返回      // 如果锁处于唤醒状态,这表明有期待的goroutine被唤醒了,不必尝试获取其余goroutine了      // 如果锁处于饥饿模式,锁之后会间接给期待队头goroutine            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {                return            }            // 抢占唤醒标记位,这里是想要把锁的状态设置为被唤醒,而后waiter队列-1            new = (old - 1<<mutexWaiterShift) | mutexWoken            if atomic.CompareAndSwapInt32(&m.state, old, new) {        // 抢占胜利唤醒一个goroutine                runtime_Semrelease(&m.sema, false, 1)                return            }      // 执行抢占不胜利时从新更新一下状态信息,下次for循环持续解决            old = m.state        }    } else {    // 饥饿模式开释锁逻辑,间接唤醒期待队列goroutine        runtime_Semrelease(&m.sema, true, 1)    }}func (m *Mutex) unlockSlow(new int32) {  // 这里示意解锁了一个没有上锁的锁,则间接产生panic    if (new+mutexLocked)&mutexLocked == 0 {        throw("sync: unlock of unlocked mutex")    }  // 失常模式的开释锁逻辑    if new&mutexStarving == 0 {        old := new        for {      // 如果没有期待者则间接返回即可      // 如果锁处于加锁的状态,示意曾经有goroutine获取到了锁,能够返回      // 如果锁处于唤醒状态,这表明有期待的goroutine被唤醒了,不必尝试获取其余goroutine了      // 如果锁处于饥饿模式,锁之后会间接给期待队头goroutine            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {                return            }            // 抢占唤醒标记位,这里是想要把锁的状态设置为被唤醒,而后waiter队列-1            new = (old - 1<<mutexWaiterShift) | mutexWoken            if atomic.CompareAndSwapInt32(&m.state, old, new) {        // 抢占胜利唤醒一个goroutine                runtime_Semrelease(&m.sema, false, 1)                return            }      // 执行抢占不胜利时从新更新一下状态信息,下次for循环持续解决            old = m.state        }    } else {    // 饥饿模式开释锁逻辑,间接唤醒期待队列goroutine        runtime_Semrelease(&m.sema, true, 1)    }}

解锁对于加锁来说简略很多,通过AddInt32办法进行疾速解锁,将m.state低地位为0,而后判断值,如果为0,那么就齐全闲暇了,完结解锁。如果不为0阐明以后锁未被占用,不过有期待的g未被唤醒,须要进行一系列唤醒操作,唤醒判断锁的状态,而后进行具体的goroutine唤醒

非阻塞加锁

func (m *Mutex) TryLock() bool {  // 记录以后状态    old := m.state  //  处于加锁状态/饥饿状态间接获取锁失败    if old&(mutexLocked|mutexStarving) != 0 {        return false    }    // 尝试获取锁,获取失败间接获取失败    if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {        return false    }    return true}

TryLock是Go 1.18新退出的办法,不被激励应用,次要是两个判断逻辑

  • 判断以后锁的状态,如果锁处于加锁状态或者饥饿状态就间接获取锁失败
  • 尝试获取锁,如果失败则间接失败

本文由mdnice多平台公布