前言
回顾上篇文章《Go 并发编程之传统同步—(1)互斥锁》其中说到,同步最终是为了达到以下两种目标:
- 维持共享数据一致性,并发平安
- 管制流程治理,更好的协同工作
示例程序通过应用互斥锁,达到了数据一致性目标,那么流程治理应该怎么做呢?
传统同步
条件变量
上篇文章的示例程序,仅仅实现了累加性能,但在事实的工作场景中,需要往往不可能这么简略,当初扩大一下这个程序,给它加上累减的性能。
加上了累减的示例程序,能够形象的了解为一个固定容量的“储水池”,能够注水、排水。
仅用互斥锁
当水注满当前,进行注水,开始排水,当水排空当前,开始注水,反反复复 …
func TestDemo1(t *testing.T) {
var mut sync.Mutex
maxSize := 10
counter := 0
// 排水口
go func() {
for {mut.Lock()
if counter == maxSize {
for i := 0; i < maxSize; i++ {
counter--
log.Printf("OUTPUT counter = %d", counter)
}
}
mut.Unlock()
time.Sleep(1 * time.Second)
}
}()
// 注水口
for {mut.Lock()
if counter == 0 {
for i := 0; i < maxSize; i++ {
counter++
log.Printf("INPUT counter = %d", counter)
}
}
mut.Unlock()
time.Sleep(1 * time.Second)
}
}
后果
=== RUN TestDemo1
···
2020/10/06 13:52:50 INPUT counter = 8
2020/10/06 13:52:50 INPUT counter = 9
2020/10/06 13:52:50 INPUT counter = 10
2020/10/06 13:52:50 OUTPUT counter = 9
2020/10/06 13:52:50 OUTPUT counter = 8
2020/10/06 13:52:50 OUTPUT counter = 7
···
看着没有什么问题,一切正常,但就是这样工作的策略效率太低。
优化互斥锁
优化策略,不必等注满水再排水,也不必放空之后,再注水,注水口和排水口一起工作。
func TestDemo2(t *testing.T) {
var mut sync.Mutex
maxSize := 10
counter := 0
// 排水口
go func() {
for {mut.Lock()
if counter != 0 {counter--}
log.Printf("OUTPUT counter = %d", counter)
mut.Unlock()
time.Sleep(5 * time.Second) // 为了演示成果,睡眠 5 秒
}
}()
// 注水口
for {mut.Lock()
if counter != maxSize {counter++}
log.Printf("INPUT counter = %d", counter)
mut.Unlock()
time.Sleep(1 * time.Second) // 为了演示成果,睡眠 1 秒
}
}
后果
=== RUN TestDemo2
···
2020/10/06 14:11:46 INPUT counter = 7
2020/10/06 14:11:47 INPUT counter = 8
2020/10/06 14:11:48 OUTPUT counter = 7
2020/10/06 14:11:48 INPUT counter = 8
2020/10/06 14:11:49 INPUT counter = 9
2020/10/06 14:11:50 INPUT counter = 10
2020/10/06 14:11:51 INPUT counter = 10
2020/10/06 14:11:52 INPUT counter = 10
2020/10/06 14:11:53 OUTPUT counter = 9
2020/10/06 14:11:53 INPUT counter = 10
2020/10/06 14:11:54 INPUT counter = 10
2020/10/06 14:11:55 INPUT counter = 10
2020/10/06 14:11:56 INPUT counter = 10
2020/10/06 14:11:57 INPUT counter = 10
2020/10/06 14:11:58 OUTPUT counter = 9
2020/10/06 14:11:58 INPUT counter = 10
2020/10/06 14:11:59 INPUT counter = 10
···
通过日志输入,能够看到程序达到了需要,运作失常。
然而,通过日志输入发现,当排水口效率低下的时候,注水口始终在轮询,这里频繁的上锁操作造成的开销很是节约。
条件变量:单发告诉
那有没有什么好的方法,省去不必要的轮询?如果注水口和排水口能相互“告诉”就好了!这个性能,条件变量 能够做到。
条件变量总是与互斥锁组合应用,除了能够应用 Lock、Unlock,还有如下三个办法:
- Wait 期待告诉
- Signal 单发告诉
- Broadcast 播送告诉
func TestDemo3(t *testing.T) {cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量
maxSize := 10
counter := 0
// 排水口
go func() {
for {cond.L.Lock() // 上锁
if counter == 0 { // 没水了
cond.Wait() // 啥时候来水?等告诉!}
counter--
log.Printf("OUTPUT counter = %d", counter)
cond.Signal() // 单发告诉:已排水
cond.L.Unlock() // 解锁
time.Sleep(5 * time.Second) // 为了演示成果,睡眠 5 秒
}
}()
// 注水口
for {cond.L.Lock() // 上锁
if counter == maxSize { // 水满了
cond.Wait() // 啥时候排水?期待告诉!}
counter++
log.Printf("INPUT counter = %d", counter)
cond.Signal() // 单发告诉:已来水
cond.L.Unlock() // 解锁
time.Sleep(1 * time.Second) // 为了演示成果,睡眠 1 秒
}
}
后果
=== RUN TestDemo3
···
2020/10/06 14:51:22 INPUT counter = 7
2020/10/06 14:51:23 INPUT counter = 8
2020/10/06 14:51:24 OUTPUT counter = 7
2020/10/06 14:51:24 INPUT counter = 8
2020/10/06 14:51:25 INPUT counter = 9
2020/10/06 14:51:26 INPUT counter = 10
2020/10/06 14:51:29 OUTPUT counter = 9
2020/10/06 14:51:29 INPUT counter = 10
2020/10/06 14:51:34 OUTPUT counter = 9
2020/10/06 14:51:34 INPUT counter = 10
···
通过日志输入,能够看进去,注水口没有始终轮询了,而是等到排水口发告诉后,再进行注水,注水口始终再等排水口。那么新的问题又来了,如何进步排水口的效率呢?
条件变量:播送告诉
多制作出一个排水口,进步排水效率。
那就不能持续应用单发告诉了(Signal),因为单发告诉只会告诉到一个期待(Wait),针对多期待的这种状况,就须要应用播送告诉(Broadcast)。
func TestDemo4(t *testing.T) {cond := sync.NewCond(new(sync.Mutex)) // 初始化条件变量
maxSize := 10
counter := 0
// 排水口 1
go func() {
for {cond.L.Lock() // 上锁
if counter == 0 { // 没水了
//for counter == 0 { // 没水了
cond.Wait() // 啥时候来水?等告诉!}
counter--
log.Printf("OUTPUT A counter = %d", counter)
cond.Broadcast() // 单发告诉:已排水
cond.L.Unlock() // 解锁
//time.Sleep(2 * time.Second) // 为了演示成果,睡眠 5 秒
}
}()
// 排水口 2
go func() {
for {cond.L.Lock() // 上锁
if counter == 0 { // 没水了
//for counter == 0 { // 没水了
cond.Wait() // 啥时候来水?等告诉!}
counter--
log.Printf("OUTPUT B counter = %d", counter)
cond.Broadcast() // 单发告诉:已排水
cond.L.Unlock() // 解锁
//time.Sleep(2 * time.Second) // 为了演示成果,睡眠 5 秒
}
}()
// 注水口
for {cond.L.Lock() // 上锁
if counter == maxSize { // 水满了
//for counter == maxSize { // 水满了
cond.Wait() // 啥时候排水?期待告诉!}
counter++
log.Printf("INPUT counter = %d", counter)
cond.Broadcast() // 单发告诉:已来水
cond.L.Unlock() // 解锁
//time.Sleep(1 * time.Second) // 为了演示成果,睡眠 1 秒
}
}
后果
=== RUN TestDemo4
···
2020/10/07 20:57:30 OUTPUT B counter = 2
2020/10/07 20:57:30 OUTPUT B counter = 1
2020/10/07 20:57:30 OUTPUT B counter = 0
2020/10/07 20:57:30 OUTPUT A counter = -1
2020/10/07 20:57:30 OUTPUT A counter = -2
2020/10/07 20:57:30 OUTPUT A counter = -3
2020/10/07 20:57:30 OUTPUT A counter = -4
···
2020/10/07 20:57:31 OUTPUT B counter = -7605
2020/10/07 20:57:31 INPUT counter = -7604
2020/10/07 20:57:31 OUTPUT A counter = -7605
2020/10/07 20:57:31 OUTPUT A counter = -7606
···
通过日志输入能够看到,刚开始的时候还很失常,到前面的时候就变成负值了,始终在负增长,What?
在《Go 并发编程之传统同步—(1)互斥锁》文章中,程序因为没有加上互斥锁,呈现过 counter 值异样的状况。
但这次程序这次加了互斥锁,按理说造成了一个临界区应该是没有问题了,所以问题应该不是出在临界区上,难道问题出在 Wait 上?
通过 IDE追踪一下 Wait 的源码
func (c *Cond) Wait() {
// 查看 c 是否是被复制的,如果是就 panic
c.checker.check()
// 将以后 goroutine 退出期待队列
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 期待以后 goroutine 被唤醒
runtime_notifyListWait(&c.notify, t)
c.L.Lock()}
原来 Wait 外部的执行流程是,先执行理解锁,而后进入期待状态,接到告诉之后,再执行加锁操作。
那依照这个代码逻辑联合输入日志,走一程序遍流程,看看能不能复现出 counter 为负值的状况:
- 注水口将 counter 累加到 10 之后,发送播送告诉(Broadcast)。
- goroutine A 在“第 1 步”之前的时候进入了期待告诉(Wait),当初接管到了播送告诉(Broadcast),从 runtime_notifyListWait() 返回,并且胜利执行了加锁(Lock)操作。
- goroutine B 在“第 1 步”之前的时候进入了期待告诉(Wait),当初接管到了播送告诉(Broadcast),从 runtime_notifyListWait() 返回,在执行加锁(Lock)操作的时候,发现 goroutine A 先抢占了临界区,所以始终阻塞在 c.L.Lock()。
- goroutine A 尽管实现工作后会开释锁,然而每次也胜利将锁抢占,所以就这样 始终将 counter 减到了 0,而后发送播送告诉(Broadcast)、解锁(Unlock)。
- goroutine B 在 goroutine A 解锁后,胜利取得锁并从 Lock 办法中返回,接下来跳出 Wait 办法、跳出 if 判断,执行 counter–(0–),这时候 counter 的值是 -1
图示
问题就呈现在第五步,只有 goroutine B 加锁胜利的时候,再判断一下 counter 是否为 0 就好了。
所以将 if counter == 0 改成 for counter == 0,这样下面的“第五步”就变成了
5.goroutine B 在 goroutine A 解锁后,胜利加锁(Lock)并从阻塞总返回,接下来跳出 Wait 办法、再次进入 for 循环,判断 counter == 0 后果为真,再次进入期待(Wait)。
代码做出相应的批改后,再执行看后果,没有问题了。
延长
发送告诉
期待告诉(Wait)必定是要在临界区外面的,那发送告诉(Signal、Broadcast)在哪里更好呢?
Luck()
Wait()
Broadcast()// Signal()
Unlock()
// 或者
Luck()
Wait()
Unlock()
Broadcast()// Signal()
// 两种写法都不会报错
在 go 的发送告诉办法(Broadcast、Signal)上有这么一段话:
// It is allowed but not required for the caller to hold c.L \
// during the call.
在我以往的 C 多线程开发的时候,发送告诉总是在锁中的:
pthread_mutex_lock(&thread->mutex);
// ...
pthread_cond_signal(&thread->cond);
pthread_mutex_unlock(&thread->mutex);
在 man 手册中有写到:
The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that threads calling pthread_cond_wait() or pthread_cond_timedwait() have associated with the condition variable during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().
集体对此并没有什么见解,就不乱下定论了,有想法的小伙伴能够在文章上面留言,一起探讨。
期待告诉
音讯告诉是有即时性的,如果没有 goroutine 在期待告诉,那么这次告诉间接被抛弃。
kubernetes
https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/cache/fifo.go
总结
- Wait() 内会执行解锁、期待、加锁。
- Wait() 必须在 for 循环外面。
- Wait() 办法会把以后的 goroutine 增加到告诉队列的队尾。
- 单发告诉,唤醒告诉队列第一个排队的 goroutine。
- 播送告诉,唤醒告诉队列外面全副的 goroutine。
- 程序示例只是为了演示成果,理论的开发中,生产者和消费者应该是异步生产,不应该应用同一个互斥锁。
文章示例代码