关于golang:一文读懂-Go-syncCond-设计

14次阅读

共计 5883 个字符,预计需要花费 15 分钟才能阅读完成。

Go 语言通过 go 关键字开启 goroutine 让开发者能够轻松地实现并发编程,而并发程序的无效运行,往往离不开 sync 包的保驾护航。目前,sync 包的赋能列表包含:sync.atomic 下的原子操作、sync.Map 并发平安 map、sync.Mutexsync.RWMutex 提供的互斥锁与读写锁、sync.Pool 复用对象池、sync.Once 单例模式、sync.Waitgroup 的多任务合作模式、sync.Cond 的监视器模式。当然,除了 sync 包,还有封装层面更高的 channelcontext

要想写出合格的 Go 程序,以上的这些并发原语是必须要把握的。对于大多数 Gopher 而言,sync.Cond 应该是最为生疏,本文将一探到底。

初识 sync.Cond

sync.Cond 字面意义就是同步条件变量,它实现的是一种监视器(Monitor)模式。

In concurrent programming(also known as parallel programming), a monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become false.

对于 Cond 而言,它实现一个条件变量,是 goroutine 间期待和告诉的点。条件变量与共享的数据隔离,它能够同时阻塞多个 goroutine,直到另外的 goroutine 更改了条件变量,并告诉唤醒阻塞着的一个或多个 goroutine。

首次接触的读者,可能会不太明确,那么上面咱们看一下 GopherCon 2018 上《Rethinking Classical Concurrency Patterns》中的演示代码例子。

type Item = int

type Queue struct {items     []Item
   itemAdded sync.Cond
}

func NewQueue() *Queue {q := new(Queue)
   q.itemAdded.L = &sync.Mutex{} // 为 Cond 绑定锁
   return q
}

func (q *Queue) Put(item Item) {q.itemAdded.L.Lock()
   defer q.itemAdded.L.Unlock()
   q.items = append(q.items, item)
   q.itemAdded.Signal()        // 当 Queue 中退出数据胜利,调用 Singal 发送告诉}

func (q *Queue) GetMany(n int) []Item {q.itemAdded.L.Lock()
   defer q.itemAdded.L.Unlock()
   for len(q.items) < n {     // 期待 Queue 中有 n 个数据
      q.itemAdded.Wait()      // 阻塞期待 Singal 发送告诉}
   items := q.items[:n:n]
   q.items = q.items[n:]
   return items
}

func main() {q := NewQueue()

   var wg sync.WaitGroup
   for n := 10; n > 0; n-- {wg.Add(1)
      go func(n int) {items := q.GetMany(n)
         fmt.Printf("%2d: %2d\n", n, items)
         wg.Done()}(n)
   }

   for i := 0; i < 100; i++ {q.Put(i)
   }

   wg.Wait()}

在这个例子中,Queue 是存储数据 Item 的构造体,它通过 Cond 类型的 itemAdded 来控制数据的输出与输入。能够留神到,这里通过 10 个 goroutine 来生产数据,但它们所需的数据量并不相等,咱们能够称之为 batch,顺次在 1-10 之间。之后,逐渐增加 100 个数据至 Queue 中。最初,咱们可能看到 10 个 gotoutine 都能被唤醒,失去它想要的数据。

程序运行后果如下

 6: [7  8  9 10 11 12]
 5: [50 51 52 53 54]
 9: [14 15 16 17 18 19 20 21 22]
 1: [13]
 2: [33 34]
 4: [35 36 37 38]
 3: [39 40 41]
 7: [0  1  2  3  4  5  6]
 8: [42 43 44 45 46 47 48 49]
10: [23 24 25 26 27 28 29 30 31 32]

当然,程序每次运行后果都不会雷同,以上输入只是某一种状况。

sync.Cond 实现

$GOPATH/src/sync/cond.go 中,Cond 的构造体定义如下

type Cond struct {
   noCopy noCopy
   L Locker
   notify  notifyList
   checker copyChecker
}

其中,noCopychecker 字段均是为了防止 Cond 在应用过程中被复制,详见小菜刀的《no copy 机制》一文。

L 是 Locker 接口,个别该字段的理论对象是 *RWmutex 或者 *Mutex

type Locker interface {Lock()
   Unlock()}

notifyList 记录的是一个基于票号的告诉列表,这里首次看正文看不懂没关系,和下文来回连贯着看。

type notifyList struct {
   wait   uint32         // 用于记录下一个期待者 waiter 的票号
   notify uint32         // 用于记录下一个应该被告诉的 waiter 的票号
   lock   uintptr        // 外部锁
   head   unsafe.Pointer // 指向期待者 waiter 的队列队头
   tail   unsafe.Pointer // 指向期待者 waiter 的队列队尾
}

其中,headtail 是指向 sudog 构造体的指针,sudog 是代表的处于期待列表的 goroutine,它自身就是双向链表。值得一提的是,在 sudog 中有一个字段 ticket 就是用于给以后 goroutine 记录票号应用的。

Cond 实现的外围模式为 票务零碎(ticket system),每一个想要来买票的 goroutine(调用 Cond.Wait())咱们称之为 waiter,票务零碎会给每个 waiter 调配一个取票码,等供票方有该取票码的号时,就会唤醒 waiter。卖票的 goroutine 有两种,第一种是调用 Cond.Signal() 的,它会依照票号唤醒一个买票的 waiter(如果有的话),第二种是调用 Cond.Broadcast() 的,它会告诉唤醒所有的阻塞 waiter。为了不便读者可能比拟轻松地了解票务零碎,上面咱们给出图解示例。

在 上文中,咱们晓得 Cond 字段中 notifyList 构造体是一个记录票号的告诉列表。这里将 notifyList 比作排队取票买电影票,当 G1 通过 Wait 来买票时,发现此时并没有票可买,因而他只能阻塞期待有票之后的告诉,此时他手上曾经获得了专属取票码 0。同样的,G2 和 G3 也同样无票可买,它们别离取到了本人的取票码 1 和 2。而 G4 是电影票提供商,它是卖票的,它通过两次 Signal 先后带来了两张票,依照票号程序顺次告诉了 G1 和 G2 来取票,并把 notify 更新为了最新的 1。G5 也是买票的,它发现此时曾经无票可买了,拿了本人的取票码 3,就阻塞期待了。G6 是个大票商,它通过 Broadcast 能够满足所有正在期待的买票者都买到票,此时期待的是 G3 和 G5,因而他间接唤醒了 G3 和 G5,并将 notify 更新到和 wait 值相等。

了解了上述取票零碎的运作原理后,咱们上面来看 Cond 包下四个理论对外办法函数的实现。

  • NewCond 办法
func NewCond(l Locker) *Cond {return &Cond{L: l}
}

用于初始化 Cond 对象,就是初始化管制锁。

  • Cond.Wait 办法
func (c *Cond) Wait() {c.checker.check()
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()}

runtime_notifyListAdd 的实现在 runtime/sema.go 的 notifyListAdd,它用于原子性地减少期待者的 waiter 票号,并返回以后 goroutine 应该取的票号值 t。runtime_notifyListWait 的实现在 runtime/sema.go 的 notifyListWait,它会尝试去比拟此时 goroutine 的应取票号 tnotify 中记录的以后应该被告诉的票号。如果 t 小于以后票号,那么间接能失去返回,否则将会则塞期待,告诉取号。

同时,这里须要留神的是,因为在进入 runtime_notifyListWait 时,以后 goroutine 通过 c.L.Unlock() 将锁解了,这就意味着有可能会有多个 goroutine 来让条件发生变化。那么,以后 goroutine 是不能保障在 runtime_notifyListWait 返回后,条件就肯定是真的,因而须要循环判断条件。正确的 Wait 应用姿态如下:

//    c.L.Lock()
//    for !condition() {//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
  • Cond.Signal 办法
func (c *Cond) Signal() {   c.checker.check()   runtime_notifyListNotifyOne(&c.notify)}

runtime_notifyListNotifyOne 的具体实现在 runtime/sema.go 的 notifyListNotifyOne,它的目标就是告诉 waiter 取票。具体操作是:如果在上一次告诉取票之后没有新的 waiter 取票者,那么该函数会间接返回。否则,它会将取票号 +1,并告诉唤醒期待取票的 waiter。

须要留神的是,调用 Signal 办法时,并不需要持有 c.L 锁。

  • Cond.Broadcast 办法
func (c *Cond) Broadcast() {   c.checker.check()   runtime_notifyListNotifyAll(&c.notify)}

runtime_notifyListNotifyAll 的具体实现在 runtime/sema.go 的 notifyListNotifyAll,它会告诉唤醒所有的 waiter,并将 notify 值置为 和 wait 值相等。调用 Broadcast 办法时,也不须要持有 c.L 锁。

探讨

$GOPATH/src/sync/cond.go 下,咱们能够发现其代码量十分之少,但它出现的只是外围逻辑,其实现细节位于 runtime/sema.go 之中,依赖的是 runtime 层的调度原语,对细节感兴趣的读者能够深刻学习。

问题来了,为什么在日常开发中,咱们很少会应用到 sync.Cond?

  • 有效唤醒

前文中咱们提到,应用 Cond.Wait 正确姿态如下

    c.L.Lock()    for !condition() {c.Wait()    }    ... make use of condition ...    c.L.Unlock()

以文章结尾的例子而言,如果在每次调用 Put 办法时,应用 Broadcast 办法唤醒所有的 waiter,那么很大概率上被唤醒的 waiter 醒来发现条件并不满足,又会从新进入期待。只管是调用 Signal 办法唤醒指定的 waiter,然而它也不能保障唤醒的 waiter 条件肯定满足。因而,在理论的应用中,咱们须要尽量保障唤醒操作是无效地,为了做到这点,代码的复杂度难免会减少。

  • 饥饿问题

还是以文章结尾例子为例,如果同时有多个 goroutine 执行 GetMany(3) 和 GetMany(3000),执行 GetMany(3) 与执行 GetMany(3000) 的 goroutine 被唤醒的概率是一样的,然而因为 GetMany(3) 只须要 3 个数据就能满足条件,那么如果始终存在 GetMany(3) 的 goroutine,执行 GetMany(3000) 的 goroutine 将永远拿不到数据,始终被有效唤醒。

  • 不能响应其余事件

条件变量的意义在于让 goroutine 期待某种条件产生时进入睡眠状态。然而这会让 goroutine 在期待条件时,可能会错过一些须要留神的其余事件。例如,调用 Cond.Wait 的函数中蕴含了 context 上下文,当 context 传来勾销信号时,它并不能像咱们冀望的一样,获取到勾销信号并退出。Cond 的应用,让咱们不能同时抉择(select)条件和其余事件。

  • 可替代性

通过对 sync.Cond 几个对外办法的剖析,咱们不难看到,它的应用场景是能够被 channel 所代替的,然而这也会减少代码的复杂性。上文中的例子,能够应用 channel 改写如下。

type Item = inttype waiter struct {n int    c chan []Item}type state struct {items []Item    wait  []waiter}type Queue struct {s chan state}func NewQueue() *Queue {    s := make(chan state, 1)    s <- state{}    return &Queue{s}}func (q *Queue) Put(item Item) {s := <-q.s    s.items = append(s.items, item)    for len(s.wait) > 0 {w := s.wait[0]        if len(s.items) < w.n {break}        w.c <- s.items[:w.n:w.n]        s.items = s.items[w.n:]        s.wait = s.wait[1:]    }    q.s <- s}func (q *Queue) GetMany(n int) []Item {    s := <-q.s    if len(s.wait) == 0 && len(s.items) >= n {items := s.items[:n:n]        s.items = s.items[n:]        q.s <- s        return items    }    c := make(chan []Item)    s.wait = append(s.wait, waiter{n, c})    q.s <- s    return <-c}

最初,尽管在上文的探讨中都是列出的 sync.Cond 潜在问题,然而如果开发者可能在应用中思考到以上的几点问题,对于监视器模型的实现而言,在代码的语义逻辑上,sync.Cond 的应用会比 channel 的模式更易了解和保护。记住一点,通俗易懂的代码模型总是比深奥的炫技要接地气。

正文完
 0