共计 3390 个字符,预计需要花费 9 分钟才能阅读完成。
概述
cond 是 go 语言 sync 提供的条件变量,通过 cond 可以让一系列的 goroutine 在触发某个条件时才被唤醒。每一个 cond 结构体都包含一个锁 L。cond 提供了三个方法:
- Signal:调用 Signal 之后可以唤醒单个 goroutine。
- Broadcast:唤醒等待队列中所有的 goroutine。
- Wait:会把当前 goroutine 放入到队列中等待获取通知,调用此方法必须先 Lock, 不然方法里会调用 Unlock() 报错。
简单使用
创建 40 个 goroutine 都 wait 阻塞住。调用 Signal 则唤醒第一个 goroutine。调用 Broadcast 则唤醒所有等待的 goroutine。
package main
import (
"fmt"
"sync"
"time"
)
var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)
func test(x int) {cond.L.Lock() // 获取锁
cond.Wait() // 等待通知 暂时阻塞
fmt.Println(x)
time.Sleep(time.Second * 1)
cond.L.Unlock() // 释放锁}
func main() {
for i := 0; i < 40; i++ {go test(i)
}
fmt.Println("start all")
time.Sleep(time.Second * 3)
fmt.Println("broadcast")
cond.Signal() // 下发一个通知给已经获取锁的 goroutine
time.Sleep(time.Second * 3)
cond.Signal() // 3 秒之后 下发一个通知给已经获取锁的 goroutine
time.Sleep(time.Second * 3)
cond.Broadcast() // 3 秒之后 下发广播给所有等待的 goroutine
time.Sleep(time.Second * 60)
}
源码分析
Cond
type Cond struct {
noCopy noCopy
// 锁的具体实现,通常为 mutex 或者 rwmutex
L Locker
// notifyList 对象,维护等待唤醒的 goroutine 队列, 使用链表实现
notify notifyList
checker copyChecker
}
// 新建 cond 初始化 cond 对象
func NewCond(l Locker) *Cond {return &Cond{L: l}
}
type notifyList struct {
// 等待数量
wait uint32
// 通知数量
notify uint32
// 锁对象
lock mutex
// 链表头
head *sudog
// 链表尾
tail *sudog
}
Wait
// 等待函数
func (c *Cond) Wait() {c.checker.check()
// 等待计数器加 1 看下面具体实现
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
//
runtime_notifyListWait(&c.notify, t)
c.L.Lock()}
// 此函数在 sema.go 中控制计数器加 1
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
// 此函数在 sema.go 中
// 获取当前 goroutine 添加到链表末端,然后 goparkunlock 函数休眠阻塞当前 goroutine
// goparkunlock 函数会让出当前处理器的使用权并等待调度器的唤醒
func notifyListWait(l *notifyList, t uint32) {lock(&l.lock)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {unlock(&l.lock)
return
}
// Enqueue itself.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {l.head = s} else {l.tail.next = s}
l.tail = s
goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
if t0 != 0 {blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}
Broadcast
唤醒链表中所有的阻塞中的 goroutine,还是使用 readyWithTime 来实现这个功能
func (c *Cond) Broadcast() {c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
// 源代码在 sema.go 中
func notifyListNotifyAll(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock.
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {return}
// Pull the list out into a local variable, waiters will be readied
// outside the lock.
lock(&l.lock)
s := l.head
l.head = nil
l.tail = nil
// Update the next ticket to be notified. We can set it to the current
// value of wait because any previous waiters are already in the list
// or will notice that they have already been notified when trying to
// add themselves to the list.
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// Go through the local list and ready all waiters.
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Signal
// 调用 runtime_notifyListNotifyOne 方法唤醒链表头的 goroutine
func (c *Cond) Signal() {c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
// runtime_notifyListNotifyOne 具体实现 获取链表头部的 G,然后调用 readyWithTime 唤醒 goroutine
// 源代码在 sema.go 中
func notifyListNotifyOne(l *notifyList) {if atomic.Load(&l.wait) == atomic.Load(&l.notify) {return}
lock(&l.lock)
t := l.notify
if t == atomic.Load(&l.wait) {unlock(&l.lock)
return
}
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {p.next = n} else {l.head = n}
if n == nil {l.tail = p}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
正文完