共计 8500 个字符,预计需要花费 22 分钟才能阅读完成。
Today that you are wasting is the unattainable tomorrow to someone who expired yesterday. This very moment that you detest is the unreturnable experience to your future self.
sync.Cond
实现了一个条件变量,用于期待一个或一组 goroutines 满足条件后唤醒的场景。每个
Cond 关联一个
Locker 通常是一个
*Mutex 或
RWMutex\` 依据需要初始化不同的锁。
根本用法
老规矩正式分析源码前,先来看看 sync.Cond
如何应用。比方咱们实现一个 FIFO
的队列
`package main`
`import (`
`"fmt"`
`"math/rand"`
`"os"`
`"os/signal"`
`"sync"`
`"time"`
`)`
`type FIFO struct {`
`lock sync.Mutex`
`cond *sync.Cond`
`queue []int`
`}`
`type Queue interface {`
`Pop() int`
`Offer(num int) error`
`}`
`func (f *FIFO) Offer(num int) error {`
`f.lock.Lock()`
`defer f.lock.Unlock()`
`f.queue = append(f.queue, num)`
`f.cond.Broadcast()`
`return nil`
`}`
`func (f *FIFO) Pop() int {`
`f.lock.Lock()`
`defer f.lock.Unlock()`
`for {`
`for len(f.queue) == 0 {`
`f.cond.Wait()`
`}`
`item := f.queue[0]`
`f.queue = f.queue[1:]`
`return item`
`}`
`}`
`func main() {`
`l := sync.Mutex{}`
`fifo := &FIFO{`
`lock: l,`
`cond: sync.NewCond(&l),`
`queue: []int{},`
`}`
`go func() {`
`for {`
`fifo.Offer(rand.Int())`
`}`
`}()`
`time.Sleep(time.Second)`
`go func() {`
`for {`
`fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))`
`}`
`}()`
`go func() {`
`for {`
`fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))`
`}`
`}()`
`ch := make(chan os.Signal, 1)`
`signal.Notify(ch, os.Interrupt)`
`<-ch`
`}`
咱们定一个 FIFO
队列有Offer
和Pop
两个操作,咱们起一个 gorountine
一直向队列投放数据,另外两个 gorountine
一直取拿数据。
Pop
操作会判断如果队列里没有数据len(f.queue) == 0
则调用f.cond.Wait()
将goroutine
挂起。- 等到
Offer
操作投放数据胜利,外面调用f.cond.Broadcast()
来唤醒所有挂起在这个mutex
上的goroutine
。当然sync.Cond
也提供了一个Signal()
, 有点儿相似 Java 中的notify()
和notifyAll()
的意思 次要是唤醒一个和唤醒全副的区别。
总结一下 sync.Mutex
的大抵用法
- 首先申明一个
mutex
,这里sync.Mutex
/sync.RWMutex
可依据理论状况选用 - 调用
sync.NewCond(l Locker) *Cond
应用 1 中的mutex
作为入参 留神 这里传入的是指针 为了防止c.L.Lock()
、c.L.Unlock()
调用频繁复制锁 导致死锁 - 依据业务条件 满足则调用
cond.Wait()
挂起goroutine
cond.Broadcast()
唤起所有挂起的gorotune
另一个办法cond.Signal()
唤醒一个最先挂起的goroutine
须要留神的是 cond.wait()
的应用须要参照如下模版 具体为啥咱们后续剖析
`c.L.Lock()`
`for !condition() {`
`c.Wait()`
`}`
`... make use of condition ...`
`c.L.Unlock()`
源码剖析
数据结构
剖析具体方法前,咱们先来理解下 sync.Cond
的数据结构。具体源码如下:
`type Cond struct {`
`noCopy noCopy // Cond 应用后不容许拷贝 `
`// L is held while observing or changing the condition`
`L Locker`
`// 告诉列表调用 wait()办法的 goroutine 会被放到 notifyList 中 `
`notify notifyList`
`checker copyChecker // 查看 Cond 实例是否被复制 `
`}`
noCopy
之前讲过 不分明的能够看下《你真的理解 mutex 吗》,除此之外,Locker
是咱们刚刚谈到的 mutex
,copyChecker
是用来查看 Cond 实例是否被复制的, 就有一个办法 :
`func (c *copyChecker) check() {`
`if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&`
`!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&`
`uintptr(*c) != uintptr(unsafe.Pointer(c)) {`
`panic("sync.Cond is copied")`
`}`
`}`
大抵意思是说,初始 type copyChecker uintptr
默认为 0,当第一次调用 check()
会将 copyChecker
本身的地址复制给本人,至于为什么 uintptr(*c) != uintptr(unsafe.Pointer(c))
会被调用 2 次,因为期间 goroutine
可能曾经扭转 copyChecker
。二次调用如果不相等,则阐明sync.Cond
被复制,重新分配了内存地址。
sync.Cond
比拟有意思的是notifyList
`type notifyList struct {`
`// wait is the ticket number of the next waiter. It is atomically`
`// incremented outside the lock.`
`wait uint32 // 期待 goroutine 操作的数量 `
`// notify is the ticket number of the next waiter to be notified. It can`
`// be read outside the lock, but is only written to with lock held.`
`//`
`// Both wait & notify can wrap around, and such cases will be correctly`
`// handled as long as their "unwrapped" difference is bounded by 2^31.`
`// For this not to be the case, we'd need to have 2^31+ goroutines`
`// blocked on the same condvar, which is currently not possible.`
`notify uint32 // 唤醒 goroutine 操作的数量 `
`// List of parked waiters.`
`lock mutex`
`head *sudog`
`tail *sudog`
`}`
蕴含了 3 类字段:
wait
和notify
两个无符号整型,别离示意了Wait()
操作的次数和goroutine
被唤醒的次数,wait
应该是恒大于等于notify
lock mutex
这个跟sync.Mutex
咱们剖析信号量阻塞队列时semaRoot
里的mutex
一样,并不是Go
提供开发者应用的sync.Mutex
, 而是零碎外部运行时实现的一个简略版本的互斥锁。head
和tail
看名字,咱们就能脑补出跟链表很像 没错这里就是保护了阻塞在以后sync.Cond
上的goroutine
形成的链表
整体来讲 sync.Cond
大体构造为:
cond architecture
操作方法
Wait()操作
`func (c *Cond) Wait() {`
`//1. 查看 cond 是否被拷贝 `
`c.checker.check()`
`//2. notifyList.wait+1`
`t := runtime_notifyListAdd(&c.notify)`
`//3. 开释锁 让出资源给其余 goroutine`
`c.L.Unlock()`
`//4. 挂起 goroutine`
`runtime_notifyListWait(&c.notify, t)`
`//5. 尝试取得锁 `
`c.L.Lock()`
`}`
从 Wait()
办法源码很容易看出它的操作大略分了 5 步:
- 调用
copyChecker.check()
保障sync.Cond
不会被拷贝 - 每次调用
Wait()
会将sync.Cond.notifyList.wait
属性进行加一操作,这也是它实现FIFO
的基石,依据wait
来判断 \`goroutine1 期待的程序
`//go:linkname notifyListAdd sync.runtime_notifyListAdd`
`func notifyListAdd(l *notifyList) uint32 {`
`// This may be called concurrently, for example, when called from`
`// sync.Cond.Wait while holding a RWMutex in read mode.`
`return atomic.Xadd(&l.wait, 1) - 1`
`}`
- 调用
c.L.Unlock()
开释锁,因为以后goroutine
行将被gopark
,让出锁给其余goroutine
防止死锁 - 调用
runtime_notifyListWait(&c.notify, t)
可能略微简单一点儿
`// notifyListWait waits for a notification. If one has been sent since`
`// notifyListAdd was called, it returns immediately. Otherwise, it blocks.`
`//go:linkname notifyListWait sync.runtime_notifyListWait`
`func notifyListWait(l *notifyList, t uint32) {`
`lockWithRank(&l.lock, lockRankNotifyList)`
`// 如果曾经被唤醒 则立刻返回 `
`if less(t, l.notify) {`
`unlock(&l.lock)`
`return`
`}`
`// Enqueue itself.`
`s := acquireSudog()`
`s.g = getg()`
`// 把期待递增序号赋值给 s.ticket 为 FIFO 打基础 `
`s.ticket = t`
`s.releasetime = 0`
`t0 := int64(0)`
`if blockprofilerate > 0 {`
`t0 = cputicks()`
`s.releasetime = -1`
`}`
`// 将以后 goroutine 插入到 notifyList 链表中 `
`if l.tail == nil {`
`l.head = s`
`} else {`
`l.tail.next = s`
`}`
`l.tail = s`
`// 最终调用 gopark 挂起以后 goroutine`
`goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)`
`if t0 != 0 {`
`blockevent(s.releasetime-t0, 2)`
`}`
`// goroutine 被唤醒后开释 sudog`
`releaseSudog(s)`
`}`
次要实现两个工作:
- 将以后 goroutine 插入到 notifyList 链表中
- 调用 gopark 将以后 goroutine 挂起
- 当其余 goroutine 调用了
Signal
或Broadcast
办法,以后goroutine
被唤醒后 再次尝试取得锁
Signal 操作
Signal
唤醒一个等待时间最长的goroutine
, 调用时不要求持有锁。
`func (c *Cond) Signal() {`
`c.checker.check()`
`runtime_notifyListNotifyOne(&c.notify)`
`}`
具体实现也不简单,先判断 sync.Cond
是否被复制,而后调用runtime_notifyListNotifyOne
`//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne`
`func notifyListNotifyOne(l *notifyList) {`
`// wait==notify 阐明没有期待的 goroutine 了 `
`if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
`return`
`}`
`lockWithRank(&l.lock, lockRankNotifyList)`
`// 锁下二次查看 `
`t := l.notify`
`if t == atomic.Load(&l.wait) {`
`unlock(&l.lock)`
`return`
`}`
`// 更新下一个须要被唤醒的 ticket number`
`atomic.Store(&l.notify, t+1)`
`// Try to find the g that needs to be notified.`
`// If it hasn't made it to the list yet we won't find it,`
`// but it won't park itself once it sees the new notify number.`
`//`
`// This scan looks linear but essentially always stops quickly.`
`// Because g's queue separately from taking numbers,`
`// there may be minor reorderings in the list, but we`
`// expect the g we're looking for to be near the front.`
`// The g has others in front of it on the list only to the`
`// extent that it lost the race, so the iteration will not`
`// be too long. This applies even when the g is missing:`
`// it hasn't yet gotten to sleep and has lost the race to`
`// the (few) other g's that we find on the list.`
`// 这里是 FIFO 实现的外围 其实就是遍历链表 sudog.ticket 查找指定须要唤醒的节点 `
`for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {`
`if s.ticket == t {`
`n := s.next`
`if p != nil {`
`p.next = n`
`} else {`
`l.head = n`
`}`
`if n == nil {`
`l.tail = p`
`}`
`unlock(&l.lock)`
`s.next = nil`
`readyWithTime(s, 4)`
`return`
`}`
`}`
`unlock(&l.lock)`
`}`
次要逻辑:
- 判断是否存在期待须要被唤醒的 goroutine 没有间接返回
- 递增
notify
属性,因为是依据notify
和sudog.ticket
匹配来查找须要唤醒的goroutine
, 因为其是递增生成的,故而有了FIFO
语义。 - 遍历 notifyList 持有的链表,从
head
开始根据next
指针顺次遍历。这个过程是线性的,故而工夫复杂度为 O(n), 不过官网说法这个过程理论比拟快This scan looks linear but essentially always stops quickly.
有个小细节: 还记得咱们 Wait()
操作中,wait
属性原子更新和 goroutine 插入期待链表是两个独自的步骤,所以存在竞争的状况下,链表中的节点可能会轻微的乱序产生。然而不要放心,因为 ticket 是原子递增的 所以唤醒程序不会乱。
Broadcast 操作
Broadcast()
与 Singal()
区别次要是它能够唤醒全副期待的 goroutine
,并间接将wait
属性的值赋值给notify
。
`func (c *Cond) Broadcast() {`
`c.checker.check()`
`runtime_notifyListNotifyAll(&c.notify)`
`}`
`// notifyListNotifyAll notifies all entries in the list.`
`//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll`
`func notifyListNotifyAll(l *notifyList) {`
`// Fast-path 无期待 goroutine 间接返回 `
`if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
`return`
`}`
`lockWithRank(&l.lock, lockRankNotifyList)`
`s := l.head`
`l.head = nil`
`l.tail = nil`
`// 间接更新 notify=wait`
`atomic.Store(&l.notify, atomic.Load(&l.wait))`
`unlock(&l.lock)`
`// 顺次调用 goready 唤醒 goroutine`
`for s != nil {`
`next := s.next`
`s.next = nil`
`readyWithTime(s, 4)`
`s = next`
`}`
`}`
逻辑比较简单不再赘述
总结
sync.Cond
一旦创立应用 不容许被拷贝,由noCopy
和copyChecker
来限度爱护。Wait()
操作先是递增notifyList.wait
属性 而后将goroutine
封装进sudog
,将notifyList.wait
赋值给sudog.ticket
, 而后将sudog
插入notifyList
链表中Singal()
理论是依照notifyList.notify
跟notifyList
链表中节点的ticket
匹配 来确定唤醒的 goroutine,因为notifyList.notify
和notifyList.wait
都是原子递增的,故而有了FIFO
的语义Broadcast()
绝对简略 就是唤醒全副期待的goroutine
如果浏览过程中发现本文存疑或谬误的中央,能够关注公众号留言。如果感觉还能够 帮忙点个在看😁