关于golang:手摸手Go-深入理解syncCond

8次阅读

共计 8500 个字符,预计需要花费 22 分钟才能阅读完成。

Today that you are wasting is the unattainable tomorrow to someone who expired yesterday. This very moment that you detest is the unreturnable experience to your future self.

sync.Cond实现了一个条件变量,用于期待一个或一组 goroutines 满足条件后唤醒的场景。每个Cond 关联一个 Locker 通常是一个 *MutexRWMutex\` 依据需要初始化不同的锁。

根本用法

老规矩正式分析源码前,先来看看 sync.Cond 如何应用。比方咱们实现一个 FIFO 的队列

`package main`
`import (`
 `"fmt"`
 `"math/rand"`
 `"os"`
 `"os/signal"`
 `"sync"`
 `"time"`
`)`
`type FIFO struct {`
 `lock  sync.Mutex`
 `cond  *sync.Cond`
 `queue []int`
`}`
`type Queue interface {`
 `Pop() int`
 `Offer(num int) error`
`}`
`func (f *FIFO) Offer(num int) error {`
 `f.lock.Lock()`
 `defer f.lock.Unlock()`
 `f.queue = append(f.queue, num)`
 `f.cond.Broadcast()`
 `return nil`
`}`
`func (f *FIFO) Pop() int {`
 `f.lock.Lock()`
 `defer f.lock.Unlock()`
 `for {`
 `for len(f.queue) == 0 {`
 `f.cond.Wait()`
 `}`
 `item := f.queue[0]`
 `f.queue = f.queue[1:]`
 `return item`
 `}`
`}`
`func main() {`
 `l := sync.Mutex{}`
 `fifo := &FIFO{`
 `lock:  l,`
 `cond:  sync.NewCond(&l),`
 `queue: []int{},`
 `}`
 `go func() {`
 `for {`
 `fifo.Offer(rand.Int())`
 `}`
 `}()`
 `time.Sleep(time.Second)`
 `go func() {`
 `for {`
 `fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))`
 `}`
 `}()`
 `go func() {`
 `for {`
 `fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))`
 `}`
 `}()`
 `ch := make(chan os.Signal, 1)`
 `signal.Notify(ch, os.Interrupt)`
 `<-ch`
`}`

咱们定一个 FIFO 队列有OfferPop两个操作,咱们起一个 gorountine 一直向队列投放数据,另外两个 gorountine 一直取拿数据。

  1. Pop操作会判断如果队列里没有数据 len(f.queue) == 0 则调用 f.cond.Wait()goroutine挂起。
  2. 等到 Offer 操作投放数据胜利,外面调用 f.cond.Broadcast() 来唤醒所有挂起在这个 mutex 上的 goroutine。当然sync.Cond 也提供了一个 Signal(), 有点儿相似 Java 中的notify()notifyAll()的意思 次要是唤醒一个和唤醒全副的区别。

总结一下 sync.Mutex 的大抵用法

  1. 首先申明一个 mutex,这里sync.Mutex/sync.RWMutex 可依据理论状况选用
  2. 调用 sync.NewCond(l Locker) *Cond 应用 1 中的mutex 作为入参 留神 这里传入的是指针 为了防止 c.L.Lock()c.L.Unlock() 调用频繁复制锁 导致死锁
  3. 依据业务条件 满足则调用 cond.Wait() 挂起goroutine
  4. cond.Broadcast()唤起所有挂起的 gorotune 另一个办法cond.Signal() 唤醒一个最先挂起的goroutine

须要留神的是 cond.wait() 的应用须要参照如下模版 具体为啥咱们后续剖析

 `c.L.Lock()`
 `for !condition() {`
 `c.Wait()`
 `}`
 `... make use of condition ...`
 `c.L.Unlock()`

源码剖析

数据结构

剖析具体方法前,咱们先来理解下 sync.Cond 的数据结构。具体源码如下:

`type Cond struct {`
 `noCopy noCopy // Cond 应用后不容许拷贝 `
 `// L is held while observing or changing the condition`
 `L Locker`
 `// 告诉列表调用 wait()办法的 goroutine 会被放到 notifyList 中 `
 `notify  notifyList`
 `checker copyChecker // 查看 Cond 实例是否被复制 `
`}`

noCopy之前讲过 不分明的能够看下《你真的理解 mutex 吗》,除此之外,Locker是咱们刚刚谈到的 mutexcopyChecker 是用来查看 Cond 实例是否被复制的, 就有一个办法 :

`func (c *copyChecker) check() {`
 `if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&`
 `!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&`
 `uintptr(*c) != uintptr(unsafe.Pointer(c)) {`
 `panic("sync.Cond is copied")`
 `}`
`}`

大抵意思是说,初始 type copyChecker uintptr 默认为 0,当第一次调用 check() 会将 copyChecker 本身的地址复制给本人,至于为什么 uintptr(*c) != uintptr(unsafe.Pointer(c)) 会被调用 2 次,因为期间 goroutine 可能曾经扭转 copyChecker。二次调用如果不相等,则阐明sync.Cond 被复制,重新分配了内存地址。

sync.Cond比拟有意思的是notifyList

`type notifyList struct {`
 `// wait is the ticket number of the next waiter. It is atomically`
 `// incremented outside the lock.`
 `wait uint32 // 期待 goroutine 操作的数量 `
 `// notify is the ticket number of the next waiter to be notified. It can`
 `// be read outside the lock, but is only written to with lock held.`
 `//`
 `// Both wait & notify can wrap around, and such cases will be correctly`
 `// handled as long as their "unwrapped" difference is bounded by 2^31.`
 `// For this not to be the case, we'd need to have 2^31+ goroutines`
 `// blocked on the same condvar, which is currently not possible.`
 `notify uint32 // 唤醒 goroutine 操作的数量 `
 `// List of parked waiters.`
 `lock mutex`
 `head *sudog`
 `tail *sudog`
`}`

蕴含了 3 类字段:

  • waitnotify 两个无符号整型,别离示意了 Wait() 操作的次数和 goroutine 被唤醒的次数,wait应该是恒大于等于notify
  • lock mutex 这个跟 sync.Mutex 咱们剖析信号量阻塞队列时 semaRoot 里的 mutex 一样,并不是 Go 提供开发者应用的sync.Mutex, 而是零碎外部运行时实现的一个简略版本的互斥锁。
  • headtail 看名字,咱们就能脑补出跟链表很像 没错这里就是保护了阻塞在以后 sync.Cond 上的 goroutine 形成的链表

整体来讲 sync.Cond 大体构造为:

cond architecture

操作方法

Wait()操作

`func (c *Cond) Wait() {`
 `//1. 查看 cond 是否被拷贝 `
 `c.checker.check()`
 `//2. notifyList.wait+1`
 `t := runtime_notifyListAdd(&c.notify)`
 `//3. 开释锁 让出资源给其余 goroutine`
 `c.L.Unlock()`
 `//4. 挂起 goroutine`
 `runtime_notifyListWait(&c.notify, t)`
 `//5. 尝试取得锁 `
 `c.L.Lock()`
`}`

Wait() 办法源码很容易看出它的操作大略分了 5 步:

  1. 调用 copyChecker.check() 保障 sync.Cond 不会被拷贝
  2. 每次调用 Wait() 会将 sync.Cond.notifyList.wait 属性进行加一操作,这也是它实现 FIFO 的基石,依据 wait 来判断 \`goroutine1 期待的程序
`//go:linkname notifyListAdd sync.runtime_notifyListAdd`
`func notifyListAdd(l *notifyList) uint32 {`
 `// This may be called concurrently, for example, when called from`
 `// sync.Cond.Wait while holding a RWMutex in read mode.`
 `return atomic.Xadd(&l.wait, 1) - 1`
`}`
  1. 调用 c.L.Unlock() 开释锁,因为以后 goroutine 行将被 gopark,让出锁给其余goroutine 防止死锁
  2. 调用 runtime_notifyListWait(&c.notify, t) 可能略微简单一点儿
`// notifyListWait waits for a notification. If one has been sent since`
`// notifyListAdd was called, it returns immediately. Otherwise, it blocks.`
`//go:linkname notifyListWait sync.runtime_notifyListWait`
`func notifyListWait(l *notifyList, t uint32) {`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `// 如果曾经被唤醒 则立刻返回 `
 `if less(t, l.notify) {`
 `unlock(&l.lock)`
 `return`
 `}`
 `// Enqueue itself.`
 `s := acquireSudog()`
 `s.g = getg()`
 `// 把期待递增序号赋值给 s.ticket 为 FIFO 打基础 `
 `s.ticket = t`
 `s.releasetime = 0`
 `t0 := int64(0)`
 `if blockprofilerate > 0 {`
 `t0 = cputicks()`
 `s.releasetime = -1`
 `}`
 `// 将以后 goroutine 插入到 notifyList 链表中 `
 `if l.tail == nil {`
 `l.head = s`
 `} else {`
 `l.tail.next = s`
 `}`
 `l.tail = s`
 `// 最终调用 gopark 挂起以后 goroutine`
 `goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)`
 `if t0 != 0 {`
 `blockevent(s.releasetime-t0, 2)`
 `}`
 `// goroutine 被唤醒后开释 sudog`
 `releaseSudog(s)`
`}`

次要实现两个工作:

  • 将以后 goroutine 插入到 notifyList 链表中
  • 调用 gopark 将以后 goroutine 挂起
  1. 当其余 goroutine 调用了 SignalBroadcast办法,以后 goroutine 被唤醒后 再次尝试取得锁

Signal 操作

Signal唤醒一个等待时间最长的goroutine, 调用时不要求持有锁。

`func (c *Cond) Signal() {`
 `c.checker.check()`
 `runtime_notifyListNotifyOne(&c.notify)`
`}`

具体实现也不简单,先判断 sync.Cond 是否被复制,而后调用runtime_notifyListNotifyOne

`//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne`
`func notifyListNotifyOne(l *notifyList) {`
 `// wait==notify 阐明没有期待的 goroutine 了 `
 `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
 `return`
 `}`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `// 锁下二次查看 `
 `t := l.notify`
 `if t == atomic.Load(&l.wait) {`
 `unlock(&l.lock)`
 `return`
 `}`
 `// 更新下一个须要被唤醒的 ticket number`
 `atomic.Store(&l.notify, t+1)`
 `// Try to find the g that needs to be notified.`
 `// If it hasn't made it to the list yet we won't find it,`
 `// but it won't park itself once it sees the new notify number.`
 `//`
 `// This scan looks linear but essentially always stops quickly.`
 `// Because g's queue separately from taking numbers,`
 `// there may be minor reorderings in the list, but we`
 `// expect the g we're looking for to be near the front.`
 `// The g has others in front of it on the list only to the`
 `// extent that it lost the race, so the iteration will not`
 `// be too long. This applies even when the g is missing:`
 `// it hasn't yet gotten to sleep and has lost the race to`
 `// the (few) other g's that we find on the list.`
 `// 这里是 FIFO 实现的外围 其实就是遍历链表 sudog.ticket 查找指定须要唤醒的节点 `
 `for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {`
 `if s.ticket == t {`
 `n := s.next`
 `if p != nil {`
 `p.next = n`
 `} else {`
 `l.head = n`
 `}`
 `if n == nil {`
 `l.tail = p`
 `}`
 `unlock(&l.lock)`
 `s.next = nil`
 `readyWithTime(s, 4)`
 `return`
 `}`
 `}`
 `unlock(&l.lock)`
`}`

次要逻辑:

  1. 判断是否存在期待须要被唤醒的 goroutine 没有间接返回
  2. 递增 notify 属性,因为是依据 notifysudog.ticket匹配来查找须要唤醒的 goroutine, 因为其是递增生成的,故而有了FIFO 语义。
  3. 遍历 notifyList 持有的链表,从 head 开始根据 next 指针顺次遍历。这个过程是线性的,故而工夫复杂度为 O(n), 不过官网说法这个过程理论比拟快This scan looks linear but essentially always stops quickly.

有个小细节: 还记得咱们 Wait() 操作中,wait属性原子更新和 goroutine 插入期待链表是两个独自的步骤,所以存在竞争的状况下,链表中的节点可能会轻微的乱序产生。然而不要放心,因为 ticket 是原子递增的 所以唤醒程序不会乱。

Broadcast 操作

Broadcast()Singal() 区别次要是它能够唤醒全副期待的 goroutine,并间接将wait 属性的值赋值给notify

`func (c *Cond) Broadcast() {`
 `c.checker.check()`
 `runtime_notifyListNotifyAll(&c.notify)`
`}`
`// notifyListNotifyAll notifies all entries in the list.`
`//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll`
`func notifyListNotifyAll(l *notifyList) {`
 `// Fast-path 无期待 goroutine 间接返回 `
 `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
 `return`
 `}`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `s := l.head`
 `l.head = nil`
 `l.tail = nil`
 `// 间接更新 notify=wait`
 `atomic.Store(&l.notify, atomic.Load(&l.wait))`
 `unlock(&l.lock)`
 `// 顺次调用 goready 唤醒 goroutine`
 `for s != nil {`
 `next := s.next`
 `s.next = nil`
 `readyWithTime(s, 4)`
 `s = next`
 `}`
`}`

逻辑比较简单不再赘述

总结

  1. sync.Cond一旦创立应用 不容许被拷贝,由 noCopycopyChecker来限度爱护。
  2. Wait()操作先是递增 notifyList.wait 属性 而后将 goroutine 封装进 sudog,将notifyList.wait 赋值给 sudog.ticket, 而后将sudog 插入 notifyList 链表中
  3. Singal()理论是依照 notifyList.notifynotifyList链表中节点的 ticket 匹配 来确定唤醒的 goroutine,因为 notifyList.notifynotifyList.wait都是原子递增的,故而有了 FIFO 的语义
  4. Broadcast()绝对简略 就是唤醒全副期待的goroutine

如果浏览过程中发现本文存疑或谬误的中央,能够关注公众号留言。如果感觉还能够 帮忙点个在看😁

正文完
 0