Go-cond-源码学习

53次阅读

共计 3390 个字符,预计需要花费 9 分钟才能阅读完成。

概述

cond 是 go 语言 sync 提供的条件变量,通过 cond 可以让一系列的 goroutine 在触发某个条件时才被唤醒。每一个 cond 结构体都包含一个锁 L。cond 提供了三个方法:

  • Signal:调用 Signal 之后可以唤醒单个 goroutine。
  • Broadcast:唤醒等待队列中所有的 goroutine。
  • Wait:会把当前 goroutine 放入到队列中等待获取通知,调用此方法必须先 Lock, 不然方法里会调用 Unlock() 报错。

简单使用

创建 40 个 goroutine 都 wait 阻塞住。调用 Signal 则唤醒第一个 goroutine。调用 Broadcast 则唤醒所有等待的 goroutine。

package main

import (
    "fmt"
    "sync"
    "time"
)

var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)

func test(x int) {cond.L.Lock() // 获取锁
    cond.Wait()   // 等待通知  暂时阻塞
    fmt.Println(x)
    time.Sleep(time.Second * 1)
    cond.L.Unlock() // 释放锁}
func main() {
    for i := 0; i < 40; i++ {go test(i)
    }
    fmt.Println("start all")
    time.Sleep(time.Second * 3)
    fmt.Println("broadcast")
    cond.Signal() // 下发一个通知给已经获取锁的 goroutine
    time.Sleep(time.Second * 3)
    cond.Signal() // 3 秒之后 下发一个通知给已经获取锁的 goroutine
    time.Sleep(time.Second * 3)
    cond.Broadcast() // 3 秒之后 下发广播给所有等待的 goroutine
    time.Sleep(time.Second * 60)
}

源码分析

Cond

type Cond struct {
    noCopy noCopy

    // 锁的具体实现,通常为 mutex 或者 rwmutex
    L Locker
    // notifyList 对象,维护等待唤醒的 goroutine 队列, 使用链表实现
    notify  notifyList
    checker copyChecker
}

// 新建 cond 初始化 cond 对象
func NewCond(l Locker) *Cond {return &Cond{L: l}
}

type notifyList struct {
    // 等待数量
    wait uint32

    // 通知数量
    notify uint32

    // 锁对象
    lock mutex
    // 链表头
    head *sudog
    // 链表尾
    tail *sudog
}

Wait

// 等待函数
func (c *Cond) Wait() {c.checker.check()
    // 等待计数器加 1 看下面具体实现
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    // 
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()}

// 此函数在 sema.go 中控制计数器加 1
func notifyListAdd(l *notifyList) uint32 {
    // This may be called concurrently, for example, when called from
    // sync.Cond.Wait while holding a RWMutex in read mode.
    return atomic.Xadd(&l.wait, 1) - 1
}

// 此函数在 sema.go 中
// 获取当前 goroutine 添加到链表末端,然后 goparkunlock 函数休眠阻塞当前 goroutine
// goparkunlock 函数会让出当前处理器的使用权并等待调度器的唤醒
func notifyListWait(l *notifyList, t uint32) {lock(&l.lock)

    // Return right away if this ticket has already been notified.
    if less(t, l.notify) {unlock(&l.lock)
        return
    }

    // Enqueue itself.
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    s.releasetime = 0
    t0 := int64(0)
    if blockprofilerate > 0 {t0 = cputicks()
        s.releasetime = -1
    }
    if l.tail == nil {l.head = s} else {l.tail.next = s}
    l.tail = s
    goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
    if t0 != 0 {blockevent(s.releasetime-t0, 2)
    }
    releaseSudog(s)
}

Broadcast

唤醒链表中所有的阻塞中的 goroutine,还是使用 readyWithTime 来实现这个功能

func (c *Cond) Broadcast() {c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

// 源代码在 sema.go 中
func notifyListNotifyAll(l *notifyList) {
    // Fast-path: if there are no new waiters since the last notification
    // we don't need to acquire the lock.
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {return}

    // Pull the list out into a local variable, waiters will be readied
    // outside the lock.
    lock(&l.lock)
    s := l.head
    l.head = nil
    l.tail = nil

    // Update the next ticket to be notified. We can set it to the current
    // value of wait because any previous waiters are already in the list
    // or will notice that they have already been notified when trying to
    // add themselves to the list.
    atomic.Store(&l.notify, atomic.Load(&l.wait))
    unlock(&l.lock)

    // Go through the local list and ready all waiters.
    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

Signal

// 调用 runtime_notifyListNotifyOne 方法唤醒链表头的 goroutine
func (c *Cond) Signal() {c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

// runtime_notifyListNotifyOne 具体实现 获取链表头部的 G,然后调用 readyWithTime 唤醒 goroutine
// 源代码在 sema.go 中
func notifyListNotifyOne(l *notifyList) {if atomic.Load(&l.wait) == atomic.Load(&l.notify) {return}

    lock(&l.lock)

    t := l.notify
    if t == atomic.Load(&l.wait) {unlock(&l.lock)
        return
    }

    atomic.Store(&l.notify, t+1)
    
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {p.next = n} else {l.head = n}
            if n == nil {l.tail = p}
            unlock(&l.lock)
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
    unlock(&l.lock)
}

正文完
 0