乐趣区

关于后端:waitgroup与互斥锁底层

WaitGroup 底层

回顾一下 Go 的并发管制,其实大略有以下几种管制伎俩

  • 全局变量(不优雅)
  • wg 只有三个办法,不便简略
  • channel 有三种 panic 的状况,且操作比较复杂
  • context 官网举荐,在带层级的 goroutine 中有奇效,比方 goroutine 嵌套着 goroutine;而且可能在不同的 goroutine 之间进行值传递,不便

sync.WaitGroup()就是解决 go 中并发时,多个 goroutine 同步的问题
用法如下

  1. 主 goroutine 通过 wg.Add(i)让设置须要启动的 goroutine 数量
  2. 子 goroutine 之间通过 wg.Done()来示意子 goroutine 执行结束,Add(-1)
  3. 主 goroutine 之间通过 Wait()来期待所有的子 goroutine 执行结束

    func main() {
     var wg sync.WaitGroup 
     wg.Add(2)
      go func() {defer wg.Done()
       fmt.Println("here1")  
     }()
     go func() {defer wg.Done() 
       fmt.Println("here2") 
     }()  
     wg.Wait() 
     fmt.Println("finish")  
    }
    
    // 输入
    // here2
    // here1 
    // finish
    因为这里用了 defer 所以答案是确定的 先进后出 必先打印 here2

    底层

    //https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go
    // A WaitGroup waits for a collection of goroutines to finish.
    // The main goroutine calls Add to set the number of
    // goroutines to wait for. Then each of the goroutines
    // runs and calls Done when finished. At the same time,
    // Wait can be used to block until all goroutines have finished.
    //
    // A WaitGroup must not be copied after first use.
    type WaitGroup struct {
     noCopy noCopy
    
     // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
     // 64-bit atomic operations require 64-bit alignment, but 32-bit
     // compilers do not ensure it. So we allocate 12 bytes and then use
     // the aligned 8 bytes in them as state, and the other 4 as storage
     // for the sema.
     state1 [3]uint32
    }

第一个字段:noCopy

Go 中检测禁止复制的技术,如果有复制行为,那么就认为违规,在编译阶段禁止赋值和复制 wg 实例,因为 wg 中有一个计数器,多个 goroutine 会并发的批改该计数器。在应用 wg 的时候,倡议依照官网文档的要求,应用指针传递 wg 实例,同时如果 wg 作为构造体的成员,也须要显式地定义一个 nocopy 字段,以防止在构造体复制时导致竞态问题

第二个字段:state1 数组

这是一个长度为 3 的数组,蕴含了 wg 应用到的三种数据:counter、waiter、semaphore
三个元素的具体作用如下

  • counter:计数器,用来计算要执行的协程 g 的数量,示意以后要执行的 goroutine 个数,wg.Add(i)时,counter+=i,wg.Done()时,count-=1
  • waiter:计数器,示意曾经调用 Wait()函数的 goroutine-group 个数,也就是须要完结的 goroutine 组数,当 wg.Wait()的时候,waiter+=1,并且挂起以后的 goroutine
  • semaphore:go runtime 外部的信号量实现,会用到以下两个函数

1.runtime_Semacquire 减少一个信号量,并且挂起以后 goroutine
2.runtime_Semrelease 缩小一个信号量,并唤醒 semaphore 上其中一个正在期待的字段

Add

// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L53
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state() // 获取 state(counter+waiter)和 semaphore 信号量的指针
    
    ... ...
    // uint64(delta)<<32 把 delta 左移 32 位,因为 counter 在 statep 的高 32 位
    // 而后把 delta 原子的减少到 counter 中
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    // v => counter, w => waiter
    v := int32(state >> 32)// 获取 counter 值
    w := uint32(state)     // 获取 waiter 值
    
    ... ...
    //counter 变为负值了,panic 报错
    if v < 0 {panic("sync: negative WaitGroup counter")
    }
    //waiter 不等于 0,阐明曾经执行了 waiter,这时你又调用 Add(),是不容许的
    if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    //v->counter,counter>0,阐明还有 goroutine 没执行完,不须要开释信号量,间接返回
    //w->waiter, waiter=0,没有期待的 goroutine,不须要开释信号量,间接返回
    if v > 0 || w == 0 {return}
    
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
    // Add()和 Wait()不能并行操作
    // counter==0,也不能执行 Wait()操作
    if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    
    *statep = 0 // 完结了将 counter 清零,上面在开释 waiter 数的信号量
    for ; w != 0; w-- {// 循环开释 waiter 个数的信号量
        runtime_Semrelease(semap, false, 0)// 一次开释一个信号量,唤醒一个期待者
    }
}

能看进去 Add 次要干了两件事
1. 将 delta 值累加到 counter 计数器中
2. 当 counter= 0 的时候,waiter 开释相应的信号量,把期待的 goroutine 全副唤醒,如果 <0,则 panic

Done

// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L97
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {wg.Add(-1)
}

间接调用 wg.Add(-1),让 counter 的值减一

Wait

// https://github.com/golang/go/blob/go1.17.10/src/sync/waitgroup.go#L103
func (wg *WaitGroup) Wait() {statep, semap := wg.state() // 获取 state(counter+waiter)和 semaphore 信号量的指针
    ... ...
    for {// 死循环
        state := atomic.LoadUint64(statep) // 原子的获取 state 值
        v := int32(state >> 32) // 获取 counter 值
        w := uint32(state)      // 获取 waiter 值
        if v == 0 {// counter=0,不须要 wait 间接返回
            // Counter is 0, no need to wait.
            if race.Enabled {race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        ... ...
        // Increment waiters count.
        if atomic.CompareAndSwapUint64(statep, state, state+1) {// 应用 CAS 累加 wiater
            ... ...
            runtime_Semacquire(semap) // 减少信号量,期待信号量唤醒
            // 这时 *statep 还不等于 0,那么应用过程必定有误,间接 panic
            if *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            ... ...
            return
        }
    }
}

累加 waiter 计数器的值,减少信号量,期待唤醒

注意事项

  1. Add()操作必须早于 Wait()操作
  2. Done()的次数必须和 Add 的值相等
  3. 不能让 counter 的值 <0,也就是 Done 不能大于 Add(i)
  4. Add 和 Wait 不能并行调用,必须一个在子协程一个在主携程
  5. 如果想反复调用 wg,必须得期待 Wait()执行完后能力进行下一轮调用

    锁底层

    go 中的 sync 包提供了两种锁的类型,别离是互斥锁 sync.Mutex 和读写锁 sync.RWMutex,这两种锁都属于乐观锁
    锁的应用场景是解决多协程下数据竞态的问题,为了保证数据的平安,锁住一些共享资源。以避免并发拜访这些共享数据时可能导致的数据不统一问题,获取锁的线程能够失常拜访临界区,未获取到锁的线程期待锁开释之后能够尝试获取锁
    注:当你想让一个构造体是并发平安的,能够加一个锁字段,比方 channel 就是这么做的,要留神的是,这个锁字段必须小写,不然调用方也能够进行 lock 和 unlock 操作,相当于你把钥匙和锁都交给了他人,锁就失去了应有的作用

    mutex

    提供了三个办法

  • Lock() 进行加锁操作,在同一个 goroutine 中必须在锁开释之后能力进行再次上锁,不然会 panic
  • Unlock() 进行解锁操作,如果这个时候未加锁会 panic,mutex 和 goroutine 不关联,也就是说对于 mutex 的加锁解锁操作能够产生在多个 goroutine 间
  • tryLock() 尝试获取锁,当锁被其余 goroutine 占有,或者锁处于饥饿模式,将立即返回 false,当锁可用时尝试获取锁,获取失败也返回 false

实现如下

type Mutex struct {
    state int32
    sema  uint32
}

Mutex 只有两个字段

  • state 示意以后互斥锁的状态,复合型字段
  • sema 信号量变量,用来管制期待 goroutine 的阻塞休眠和唤醒

state 的不同位标识了不同的状态,以此实现了用最小的内存来示意更多的意义

// 前三个字段标识了锁的状态  剩下的位来标识以后共有多少个 goroutine 在期待锁
const (
   mutexLocked = 1 << iota // 示意互斥锁的锁定状态
   mutexWoken // 示意从失常模式被从唤醒
   mutexStarving // 以后的互斥锁进入饥饿状态
   mutexWaiterShift = iota // 以后互斥锁上期待者的数量
)

mutex 的最开始实现只有失常模式,在失常模式下期待的线程依照先进先出的形式获取锁,然而新创建的 goroutine 会与刚被唤醒的 goroutine 竞争,导致刚被唤起的 goroutine 拿不到锁,从而长期被阻塞。
因而 Go 在 1.9 版本中引入了饥饿模式,当 goroutine 超过 1ms 没有获取锁,那么就将以后的互斥锁切换到饥饿模式,在该模式下,互斥锁会间接交给期待队列最后面的 g,新的 g 在该状态下既不能获取锁,也不会进入自旋状态,只会在队列的开端期待。如果一个 g 获取了互斥锁,并且它在队列的开端或者期待的工夫少于 1ms,那么就回到失常模式

加锁

func (m *Mutex) Lock() {
    // 判断以后锁的状态,如果锁是齐全闲暇的,即 m.state 为 0,则对其加锁,将 m.state 的值赋为 1
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()}

func (m *Mutex) lockSlow() {
    var waitStartTime int64 
    starving := false
    awoke := false
    iter := 0
    old := m.state
    ........
}
  1. 通过 CAS 零碎调用判断以后锁的状态,如果是闲暇则 m.state 为 0,这个时候对其加锁,将 m.state 设为 1
  2. 如果以后锁已被占用,通过 lockSlow 办法尝试自旋或者饥饿状态下的竞争,期待锁的开释

lockSlow:
初始化五个字段

  • waitStartTime 用来计算 waiter 的等待时间
  • starving 饥饿模式标记,如果等待时间超过 1ms,则为 true
  • awoke 协程是否唤醒,当 g 在自旋的时候,相当于 CPU 上曾经有正在等锁的协程,为了防止 mutex 解锁时再唤醒其余协程,自旋时要尝试把 mutex 设为唤醒状态
  • iter 用来记录协程的自旋次数
  • old 记录以后锁的状态

    判断自旋

    for {
      // 判断是否容许进入自旋 两个条件,条件 1 是以后锁不能处于饥饿状态
      // 条件 2 是在 runtime_canSpin 内实现,其逻辑是在多核 CPU 运行,自旋的次数小于 4
          if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // !awoke 判断以后 goroutine 不是在唤醒状态
        // old&mutexWoken == 0 示意没有其余正在唤醒的 goroutine
        // old>>mutexWaiterShift != 0 示意期待队列中有正在期待的 goroutine
        // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 尝试将以后锁的低 2 位的 Woken 状态位设置为 1,示意已被唤醒, 这是为了告诉在解锁 Unlock()中不要再唤醒其余的 waiter 了
              if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                  atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                      // 设置以后 goroutine 唤醒胜利
            awoke = true
              }
        // 进行自旋
              runtime_doSpin()
        // 自旋次数
              iter++
        // 记录以后锁的状态
              old = m.state
              continue
          }
    }
    
    const active_spin_cnt = 30
    func sync_runtime_doSpin() {procyield(active_spin_cnt)
    }
    // asm_amd64.s
    TEXT runtime·procyield(SB),NOSPLIT,$0-0
      MOVL    cycles+0(FP), AX
    again:
      PAUSE
      SUBL    $1, AX
      JNZ    again
      RET

    进入自旋的起因:乐观的认为以后正在持有锁的 g 能在短时间内偿还锁,所以须要一些条件来判断:到底能不能短时间偿还
    条件如下

  • 自旋的次数 <=4
  • cpu 必须为多核
  • gomaxprocs>1,最大被同时执行的 CPU 数目大于 1
  • 以后机器上至多存在一个正在运行的 P 并且解决队列为空

满足条件之后进行循环,次数为 30 次,也就是执行 30 次 PAUSE 指令来占据 CPU,进行自旋

解锁

func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}
func (m *Mutex) unlockSlow(new int32) {
  // 这里示意解锁了一个没有上锁的锁,则间接产生 panic
    if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")
    }
  // 失常模式的开释锁逻辑
    if new&mutexStarving == 0 {
        old := new
        for {
      // 如果没有期待者则间接返回即可
      // 如果锁处于加锁的状态,示意曾经有 goroutine 获取到了锁,能够返回
      // 如果锁处于唤醒状态,这表明有期待的 goroutine 被唤醒了,不必尝试获取其余 goroutine 了
      // 如果锁处于饥饿模式,锁之后会间接给期待队头 goroutine
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}
            // 抢占唤醒标记位,这里是想要把锁的状态设置为被唤醒,而后 waiter 队列 -1
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 抢占胜利唤醒一个 goroutine
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
      // 执行抢占不胜利时从新更新一下状态信息,下次 for 循环持续解决
            old = m.state
        }
    } else {
    // 饥饿模式开释锁逻辑,间接唤醒期待队列 goroutine
        runtime_Semrelease(&m.sema, true, 1)
    }
}

func (m *Mutex) unlockSlow(new int32) {
  // 这里示意解锁了一个没有上锁的锁,则间接产生 panic
    if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")
    }
  // 失常模式的开释锁逻辑
    if new&mutexStarving == 0 {
        old := new
        for {
      // 如果没有期待者则间接返回即可
      // 如果锁处于加锁的状态,示意曾经有 goroutine 获取到了锁,能够返回
      // 如果锁处于唤醒状态,这表明有期待的 goroutine 被唤醒了,不必尝试获取其余 goroutine 了
      // 如果锁处于饥饿模式,锁之后会间接给期待队头 goroutine
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}
            // 抢占唤醒标记位,这里是想要把锁的状态设置为被唤醒,而后 waiter 队列 -1
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 抢占胜利唤醒一个 goroutine
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
      // 执行抢占不胜利时从新更新一下状态信息,下次 for 循环持续解决
            old = m.state
        }
    } else {
    // 饥饿模式开释锁逻辑,间接唤醒期待队列 goroutine
        runtime_Semrelease(&m.sema, true, 1)
    }
}

解锁对于加锁来说简略很多,通过 AddInt32 办法进行疾速解锁,将 m.state 低地位为 0,而后判断值,如果为 0,那么就齐全闲暇了,完结解锁。如果不为 0 阐明以后锁未被占用,不过有期待的 g 未被唤醒,须要进行一系列唤醒操作,唤醒判断锁的状态,而后进行具体的 goroutine 唤醒

非阻塞加锁

func (m *Mutex) TryLock() bool {
  // 记录以后状态
    old := m.state
  //  处于加锁状态 / 饥饿状态间接获取锁失败
    if old&(mutexLocked|mutexStarving) != 0 {return false}
    // 尝试获取锁,获取失败间接获取失败
    if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {return false}


    return true
}

TryLock 是 Go 1.18 新退出的办法,不被激励应用,次要是两个判断逻辑

  • 判断以后锁的状态,如果锁处于加锁状态或者饥饿状态就间接获取锁失败
  • 尝试获取锁,如果失败则间接失败

本文由 mdnice 多平台公布

退出移动版