sync.Cond实现了一个条件变量,用于期待一个或一组 goroutines 满足条件后唤醒的场景。每个Cond 关联一个 Locker 通常是一个 *MutexRWMutex\` 依据需要初始化不同的锁。


老规矩正式分析源码前,先来看看 sync.Cond 如何应用。比方咱们实现一个 FIFO 的队列

`package main`
`import (`
`type FIFO struct {`
 `lock  sync.Mutex`
 `cond  *sync.Cond`
 `queue []int`
`type Queue interface {`
 `Pop() int`
 `Offer(num int) error`
`func (f *FIFO) Offer(num int) error {`
 `defer f.lock.Unlock()`
 `f.queue = append(f.queue, num)`
 `return nil`
`func (f *FIFO) Pop() int {`
 `defer f.lock.Unlock()`
 `for {`
 `for len(f.queue) == 0 {`
 `item := f.queue[0]`
 `f.queue = f.queue[1:]`
 `return item`
`func main() {`
 `l := sync.Mutex{}`
 `fifo := &FIFO{`
 `lock:  l,`
 `cond:  sync.NewCond(&l),`
 `queue: []int{},`
 `go func() {`
 `for {`
 `go func() {`
 `for {`
 `fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))`
 `go func() {`
 `for {`
 `fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))`
 `ch := make(chan os.Signal, 1)`
 `signal.Notify(ch, os.Interrupt)`

咱们定一个 FIFO 队列有OfferPop两个操作,咱们起一个 gorountine 一直向队列投放数据,另外两个 gorountine 一直取拿数据。

  1. Pop操作会判断如果队列里没有数据 len(f.queue) == 0 则调用 f.cond.Wait()goroutine挂起。
  2. 等到 Offer 操作投放数据胜利,外面调用 f.cond.Broadcast() 来唤醒所有挂起在这个 mutex 上的 goroutine。当然sync.Cond 也提供了一个 Signal(), 有点儿相似 Java 中的notify()notifyAll()的意思 次要是唤醒一个和唤醒全副的区别。

总结一下 sync.Mutex 的大抵用法

  1. 首先申明一个 mutex,这里sync.Mutex/sync.RWMutex 可依据理论状况选用
  2. 调用 sync.NewCond(l Locker) *Cond 应用 1 中的mutex 作为入参 留神 这里传入的是指针 为了防止 c.L.Lock()c.L.Unlock() 调用频繁复制锁 导致死锁
  3. 依据业务条件 满足则调用 cond.Wait() 挂起goroutine
  4. cond.Broadcast()唤起所有挂起的 gorotune 另一个办法cond.Signal() 唤醒一个最先挂起的goroutine

须要留神的是 cond.wait() 的应用须要参照如下模版 具体为啥咱们后续剖析

 `for !condition() {`
 `... make use of condition ...`



剖析具体方法前,咱们先来理解下 sync.Cond 的数据结构。具体源码如下:

`type Cond struct {`
 `noCopy noCopy // Cond 应用后不容许拷贝 `
 `// L is held while observing or changing the condition`
 `L Locker`
 `// 告诉列表调用 wait()办法的 goroutine 会被放到 notifyList 中 `
 `notify  notifyList`
 `checker copyChecker // 查看 Cond 实例是否被复制 `

noCopy之前讲过 不分明的能够看下《你真的理解 mutex 吗》,除此之外,Locker是咱们刚刚谈到的 mutexcopyChecker 是用来查看 Cond 实例是否被复制的, 就有一个办法 :

`func (c *copyChecker) check() {`
 `if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&`
 `!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&`
 `uintptr(*c) != uintptr(unsafe.Pointer(c)) {`
 `panic("sync.Cond is copied")`

大抵意思是说,初始 type copyChecker uintptr 默认为 0,当第一次调用 check() 会将 copyChecker 本身的地址复制给本人,至于为什么 uintptr(*c) != uintptr(unsafe.Pointer(c)) 会被调用 2 次,因为期间 goroutine 可能曾经扭转 copyChecker。二次调用如果不相等,则阐明sync.Cond 被复制,重新分配了内存地址。


`type notifyList struct {`
 `// wait is the ticket number of the next waiter. It is atomically`
 `// incremented outside the lock.`
 `wait uint32 // 期待 goroutine 操作的数量 `
 `// notify is the ticket number of the next waiter to be notified. It can`
 `// be read outside the lock, but is only written to with lock held.`
 `// Both wait & notify can wrap around, and such cases will be correctly`
 `// handled as long as their "unwrapped" difference is bounded by 2^31.`
 `// For this not to be the case, we'd need to have 2^31+ goroutines`
 `// blocked on the same condvar, which is currently not possible.`
 `notify uint32 // 唤醒 goroutine 操作的数量 `
 `// List of parked waiters.`
 `lock mutex`
 `head *sudog`
 `tail *sudog`

蕴含了 3 类字段:

  • waitnotify 两个无符号整型,别离示意了 Wait() 操作的次数和 goroutine 被唤醒的次数,wait应该是恒大于等于notify
  • lock mutex 这个跟 sync.Mutex 咱们剖析信号量阻塞队列时 semaRoot 里的 mutex 一样,并不是 Go 提供开发者应用的sync.Mutex, 而是零碎外部运行时实现的一个简略版本的互斥锁。
  • headtail 看名字,咱们就能脑补出跟链表很像 没错这里就是保护了阻塞在以后 sync.Cond 上的 goroutine 形成的链表

整体来讲 sync.Cond 大体构造为:

cond architecture



`func (c *Cond) Wait() {`
 `//1. 查看 cond 是否被拷贝 `
 `//2. notifyList.wait+1`
 `t := runtime_notifyListAdd(&c.notify)`
 `//3. 开释锁 让出资源给其余 goroutine`
 `//4. 挂起 goroutine`
 `runtime_notifyListWait(&c.notify, t)`
 `//5. 尝试取得锁 `

Wait() 办法源码很容易看出它的操作大略分了 5 步:

  1. 调用 copyChecker.check() 保障 sync.Cond 不会被拷贝
  2. 每次调用 Wait() 会将 sync.Cond.notifyList.wait 属性进行加一操作,这也是它实现 FIFO 的基石,依据 wait 来判断 \`goroutine1 期待的程序
`//go:linkname notifyListAdd sync.runtime_notifyListAdd`
`func notifyListAdd(l *notifyList) uint32 {`
 `// This may be called concurrently, for example, when called from`
 `// sync.Cond.Wait while holding a RWMutex in read mode.`
 `return atomic.Xadd(&l.wait, 1) - 1`
  1. 调用 c.L.Unlock() 开释锁,因为以后 goroutine 行将被 gopark,让出锁给其余goroutine 防止死锁
  2. 调用 runtime_notifyListWait(&c.notify, t) 可能略微简单一点儿
`// notifyListWait waits for a notification. If one has been sent since`
`// notifyListAdd was called, it returns immediately. Otherwise, it blocks.`
`//go:linkname notifyListWait sync.runtime_notifyListWait`
`func notifyListWait(l *notifyList, t uint32) {`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `// 如果曾经被唤醒 则立刻返回 `
 `if less(t, l.notify) {`
 `// Enqueue itself.`
 `s := acquireSudog()`
 `s.g = getg()`
 `// 把期待递增序号赋值给 s.ticket 为 FIFO 打基础 `
 `s.ticket = t`
 `s.releasetime = 0`
 `t0 := int64(0)`
 `if blockprofilerate > 0 {`
 `t0 = cputicks()`
 `s.releasetime = -1`
 `// 将以后 goroutine 插入到 notifyList 链表中 `
 `if l.tail == nil {`
 `l.head = s`
 `} else {`
 `l.tail.next = s`
 `l.tail = s`
 `// 最终调用 gopark 挂起以后 goroutine`
 `goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)`
 `if t0 != 0 {`
 `blockevent(s.releasetime-t0, 2)`
 `// goroutine 被唤醒后开释 sudog`


  • 将以后 goroutine 插入到 notifyList 链表中
  • 调用 gopark 将以后 goroutine 挂起
  1. 当其余 goroutine 调用了 SignalBroadcast办法,以后 goroutine 被唤醒后 再次尝试取得锁

Signal 操作

Signal唤醒一个等待时间最长的goroutine, 调用时不要求持有锁。

`func (c *Cond) Signal() {`

具体实现也不简单,先判断 sync.Cond 是否被复制,而后调用runtime_notifyListNotifyOne

`//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne`
`func notifyListNotifyOne(l *notifyList) {`
 `// wait==notify 阐明没有期待的 goroutine 了 `
 `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `// 锁下二次查看 `
 `t := l.notify`
 `if t == atomic.Load(&l.wait) {`
 `// 更新下一个须要被唤醒的 ticket number`
 `atomic.Store(&l.notify, t+1)`
 `// Try to find the g that needs to be notified.`
 `// If it hasn't made it to the list yet we won't find it,`
 `// but it won't park itself once it sees the new notify number.`
 `// This scan looks linear but essentially always stops quickly.`
 `// Because g's queue separately from taking numbers,`
 `// there may be minor reorderings in the list, but we`
 `// expect the g we're looking for to be near the front.`
 `// The g has others in front of it on the list only to the`
 `// extent that it lost the race, so the iteration will not`
 `// be too long. This applies even when the g is missing:`
 `// it hasn't yet gotten to sleep and has lost the race to`
 `// the (few) other g's that we find on the list.`
 `// 这里是 FIFO 实现的外围 其实就是遍历链表 sudog.ticket 查找指定须要唤醒的节点 `
 `for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {`
 `if s.ticket == t {`
 `n := s.next`
 `if p != nil {`
 `p.next = n`
 `} else {`
 `l.head = n`
 `if n == nil {`
 `l.tail = p`
 `s.next = nil`
 `readyWithTime(s, 4)`


  1. 判断是否存在期待须要被唤醒的 goroutine 没有间接返回
  2. 递增 notify 属性,因为是依据 notifysudog.ticket匹配来查找须要唤醒的 goroutine, 因为其是递增生成的,故而有了FIFO 语义。
  3. 遍历 notifyList 持有的链表,从 head 开始根据 next 指针顺次遍历。这个过程是线性的,故而工夫复杂度为 O(n), 不过官网说法这个过程理论比拟快This scan looks linear but essentially always stops quickly.

有个小细节: 还记得咱们 Wait() 操作中,wait属性原子更新和 goroutine 插入期待链表是两个独自的步骤,所以存在竞争的状况下,链表中的节点可能会轻微的乱序产生。然而不要放心,因为 ticket 是原子递增的 所以唤醒程序不会乱。

Broadcast 操作

Broadcast()Singal() 区别次要是它能够唤醒全副期待的 goroutine,并间接将wait 属性的值赋值给notify

`func (c *Cond) Broadcast() {`
`// notifyListNotifyAll notifies all entries in the list.`
`//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll`
`func notifyListNotifyAll(l *notifyList) {`
 `// Fast-path 无期待 goroutine 间接返回 `
 `if atomic.Load(&l.wait) == atomic.Load(&l.notify) {`
 `lockWithRank(&l.lock, lockRankNotifyList)`
 `s := l.head`
 `l.head = nil`
 `l.tail = nil`
 `// 间接更新 notify=wait`
 `atomic.Store(&l.notify, atomic.Load(&l.wait))`
 `// 顺次调用 goready 唤醒 goroutine`
 `for s != nil {`
 `next := s.next`
 `s.next = nil`
 `readyWithTime(s, 4)`
 `s = next`



  1. sync.Cond一旦创立应用 不容许被拷贝,由 noCopycopyChecker来限度爱护。
  2. Wait()操作先是递增 notifyList.wait 属性 而后将 goroutine 封装进 sudog,将notifyList.wait 赋值给 sudog.ticket, 而后将sudog 插入 notifyList 链表中
  3. Singal()理论是依照 notifyList.notifynotifyList链表中节点的 ticket 匹配 来确定唤醒的 goroutine,因为 notifyList.notifynotifyList.wait都是原子递增的,故而有了 FIFO 的语义
  4. Broadcast()绝对简略 就是唤醒全副期待的goroutine

