共计 6792 个字符,预计需要花费 17 分钟才能阅读完成。
原文链接:Go 官网设计了一个信号量库
前言
哈喽,大家好,我是
asong
。在写上一篇文章请勿滥用 goroutine 时,发现Go
语言扩大包提供了一个带权重的信号量库 Semaphore,应用信号量咱们能够实现一个 ” 工作池 ” 管制肯定数量的goroutine
并发工作。因为对源码抱有好奇的态度,所以在周末认真看了一下这个库并进行了解析,在这里记录一下。
何为信号量
要想晓得一个货色是什么,我都爱去百度百科上搜一搜,输出 ” 信号量 ”,这答案不就来了。
百度百科解释:
信号量(Semaphore),有时被称为信号灯,是[多线程环境下应用的一种设施,是能够用来保障两个或多个要害代码段不被并发调用。在进入一个要害代码段之前,线程必须获取一个信号量;一旦该要害代码段实现了,那么该线程必须开释信号量。其它想进入该要害代码段的线程必须期待直到第一个线程开释信号量。为了实现这个过程,须要创立一个信号量 VI,而后将 Acquire Semaphore VI 以及 Release Semaphore VI 别离搁置在每个要害代码段的首末端。确认这些信号量 VI 援用的是初始创立的信号量。
通过这段解释咱们能够得悉什么是信号量,其实信号量就是一种变量或者抽象数据类型,用于管制并发零碎中多个过程对公共资源的拜访,拜访具备原子性。信号量次要分为两类:
- 二值信号量:顾名思义,其值只有两种
0
或者1
,相当于互斥量,当值为1
时资源可用,当值为0
时,资源被锁住,过程阻塞无奈继续执行。 - 计数信号量:信号量是一个任意的整数,起始时,如果计数器的计数值为
0
,那么创立进去的信号量就是不可取得的状态,如果计数器的计数值大于0
,那么创立进去的信号量就是可取得的状态,并且总共获取的次数等于计数器的值。
信号量工作原理
信号量是由操作系统来保护的,信号量只能进行两种操作期待和发送信号,操作总结来说,外围就是 PV
操作:
- P 原语:P 是荷兰语 Proberen(测试)的首字母。为阻塞原语,负责把以后过程由运行状态转换为阻塞状态,直到另外一个过程唤醒它。操作为:申请一个闲暇资源(把信号量减 1),若胜利,则退出;若失败,则该过程被阻塞;
- V 原语:V 是荷兰语 Verhogen(减少)的首字母。为唤醒原语,负责把一个被阻塞的过程唤醒,它有一个参数表,寄存着期待被唤醒的过程信息。操作为:开释一个被占用的资源(把信号量加 1),如果发现有被阻塞的过程,则抉择一个唤醒之。
在信号量进行 PV 操作时都为原子操作,并且在 PV 原语执行期间不容许有中断的产生。
PV 原语对信号量的操作能够分为三种状况:
- 把信号量视为时某种类型的共享资源的残余个数,实现对一类共享资源的拜访
- 把信号量用作过程间的同步
- 视信号量为一个加锁标记,实现对一个共享变量的拜访
具体在什么场景应用本文就不在持续剖析,接下来咱们重点来看一下 Go
语言提供的扩大包Semaphore
,看看它是怎么实现的。
官网扩大包Semaphore
咱们之前在剖析 Go
语言源码时总会看到这几个函数:
func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
这几个函数就是信号量的 PV
操作,不过他们都是给 Go
外部应用的,如果想应用信号量,那就能够应用官网的扩大包:Semaphore,这是一个带权重的信号量,接下来咱们就重点剖析一下这个库。
装置办法:go get -u golang.org/x/sync
数据结构
type Weighted struct {
size int64 // 设置一个最大权值
cur int64 // 标识以后已被应用的资源数
mu sync.Mutex // 提供临界区爱护
waiters list.List // 阻塞期待的调用者列表
}
semaphore
库外围构造就是 Weighted
,次要有4
个字段:
size
:这个代表的是最大权值,在创立Weighted
对象指定cur
:相当于一个游标,来记录以后已应用的权值mu
:互斥锁,并发状况下做临界区爱护waiters
:阻塞期待的调用者列表,应用链表数据结构保障先进先出的程序,存储的数据是waiter
对象,waiter
数据结构如下:
type waiter struct {
n int64 // 期待调用者权重值
ready chan<- struct{} // close channel 就是唤醒}
这里只有两个字段:
n
:这个就是期待调用者的权重值ready
:这就是一个channel
,利用channel
的close
机制实现唤醒
semaphore
还提供了一个创立 Weighted
对象的办法,在初始化时须要给定最大权值:
// NewWeighted 为并发拜访创立一个新的加权信号量,该信号量具备给定的最大权值。func NewWeighted(n int64) *Weighted {w := &Weighted{size: n}
return w
}
阻塞获取权值的办法 – Acquire
先间接看代码吧:
func (s *Weighted) Acquire(ctx context.Context, n int64) error {s.mu.Lock() // 加锁爱护临界区
// 有资源可用并且没有期待获取权值的 goroutine
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n // 加权
s.mu.Unlock() // 开释锁
return nil
}
// 要获取的权值 n 大于最大的权值了
if n > s.size {
// 先开释锁,确保其余 goroutine 调用 Acquire 的中央不被阻塞
s.mu.Unlock()
// 阻塞期待 context 的返回
<-ctx.Done()
return ctx.Err()}
// 走到这里就阐明当初没有资源可用了
// 创立一个 channel 用来做告诉唤醒
ready := make(chan struct{})
// 创立 waiter 对象
w := waiter{n: n, ready: ready}
// waiter 按程序入队
elem := s.waiters.PushBack(w)
// 开释锁,期待唤醒,别阻塞其余 goroutine
s.mu.Unlock()
// 阻塞期待唤醒
select {
// context 敞开
case <-ctx.Done():
err := ctx.Err() // 先获取 context 的错误信息
s.mu.Lock()
select {
case <-ready:
// 在 context 被敞开后被唤醒了,那么试图修复队列,伪装咱们没有勾销
err = nil
default:
// 判断是否是第一个元素
isFront := s.waiters.Front() == elem
// 移除第一个元素
s.waiters.Remove(elem)
// 如果是第一个元素且有资源可用告诉其余 waiter
if isFront && s.size > s.cur {s.notifyWaiters()
}
}
s.mu.Unlock()
return err
// 被唤醒了
case <-ready:
return nil
}
}
正文曾经加到代码中了,总结一下这个办法次要有三个流程:
- 流程一:有资源可用时并且没有期待权值的
goroutine
,走失常加权流程; - 流程二:想要获取的权值
n
大于初始化时设置最大的权值了,这个goroutine
永远不会获取到信号量,所以阻塞期待context
的敞开; -
流程三:前两步都没问题的话,就阐明当初零碎没有资源可用了,这时就须要阻塞期待唤醒,在阻塞期待唤醒这里有非凡逻辑;
-
非凡逻辑一:如果在
context
被敞开后被唤醒了,那么就先疏忽掉这个cancel
,试图修复队列。- 非凡逻辑二:
context
敞开后,则依据是否有可用资源决定告诉前面期待唤醒的调用者,这样做的目标其实是为了防止当不同的context
管制不同的goroutine
时,未敞开的goroutine
不会被阻塞住,仍然执行,来看这样一个例子(因为goroutine
的抢占式调度,所以这个例子也会具备必然性):
func main() {s := semaphore.NewWeighted(3) ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2) defer cancel() for i :=0; i < 3; i++{ if i != 0{go func(num int) {if err := s.Acquire(ctx,3); err != nil{fmt.Printf("goroutine:%d, err is %s\n", num, err.Error()) return } time.Sleep(2 * time.Second) fmt.Printf("goroutine:%d run over\n",num) s.Release(3) }(i) }else {go func(num int) {ct,cancel := context.WithTimeout(context.Background(), time.Second * 3) defer cancel() if err := s.Acquire(ct,3); err != nil{fmt.Printf("goroutine:%d, err is %s\n", num, err.Error()) return } time.Sleep(3 * time.Second) fmt.Printf("goroutine:%d run over\n",num) s.Release(3) }(i) } } time.Sleep(10 * time.Second) }
下面的例子中
goroutine:0
应用ct
对象来做管制,超时工夫为3s
,goroutine:1
和goroutine:2
对象应用ctx
对象来做管制,超时工夫为2s
,这三个goroutine
占用的资源都等于最大资源数,也就是说只能有一个goruotine
运行胜利,另外两个goroutine
都会被阻塞,因为goroutine
是抢占式调度,所以咱们不能确定哪个gouroutine
会第一个被执行,这里咱们假如第一个获取到信号量的是gouroutine:2
,阻塞期待的调用者列表程序是:goroutine:1
->goroutine:0
,因为在goroutine:2
中有一个2s
的延时,所以会触发ctx
的超时,ctx
会下发Done
信号,因为goroutine:2
和goroutine:1
都是被ctx
管制的,所以就会把goroutine:1
从期待者队列中勾销,然而因为goroutine:1
属于队列的第一个队员,并且因为goroutine:2
曾经开释资源,那么就会唤醒goroutine:0
继续执行,画个图示意一下:应用这种形式能够防止
goroutine
永恒失眠。 - 非凡逻辑二:
-
不阻塞获取权值的办法 – TryAcquire
func (s *Weighted) TryAcquire(n int64) bool {s.mu.Lock() // 加锁
// 有资源可用并且没有期待获取资源的 goroutine
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {s.cur += n}
s.mu.Unlock()
return success
}
这个办法就简略很多了,不阻塞地获取权重为 n
的信号量,胜利时返回 true
,失败时返回false
并放弃信号量不变。
开释权重
func (s *Weighted) Release(n int64) {s.mu.Lock()
// 开释资源
s.cur -= n
// 开释资源大于持有的资源,则会产生 panic
if s.cur < 0 {s.mu.Unlock()
panic("semaphore: released more than held")
}
// 告诉其余期待的调用者
s.notifyWaiters()
s.mu.Unlock()}
这里就是很惯例的操作,次要就是资源开释,同时进行安全性判断,如果开释资源大于持有的资源,则会产生 panic。
唤醒waiter
在 Acquire
和Release
办法中都调用了notifyWaiters
,咱们来剖析一下这个办法:
func (s *Weighted) notifyWaiters() {
for {
// 获取期待调用者队列中的队员
next := s.waiters.Front()
// 没有要告诉的调用者了
if next == nil {break // No more waiters blocked.}
// 断言出 waiter 信息
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// 没有足够资源为下一个调用者应用时,持续阻塞该调用者,遵循先进先出的准则,// 防止须要资源数比拟大的 waiter 被饿死
//
// 思考一个场景,应用信号量作为读写锁,现有 N 个令牌,N 个 reader 和一个 writer
// 每个 reader 都能够通过 Acquire(1)获取读锁,writer 写入能够通过 Acquire(N)取得写锁定
// 但不包含所有的 reader,如果咱们容许 reader 在队列中后退,writer 将会饿死 - 总是有一个令牌可供每个 reader
break
}
// 获取资源
s.cur += w.n
// 从 waiter 列表中移除
s.waiters.Remove(next)
// 应用 channel 的 close 机制唤醒 waiter
close(w.ready)
}
}
这里只须要留神一个点:唤醒 waiter
采纳先进先出的准则,防止须要资源数比拟大的 waiter 被饿死。
何时应用Semaphore
到这里咱们就把 Semaphore
的源代码看了一篇,代码行数不多,封装的也很奇妙,那么咱们该什么时候选在应用它呢?
目前能想到一个场景就是 Semaphore
配合上 errgroup
实现一个 ” 工作池 ”,应用 Semaphore
限度 goroutine
的数量,配合上 errgroup
做并发管制,示例如下:
const (limit = 2)
func main() {serviceName := []string{
"cart",
"order",
"account",
"item",
"menu",
}
eg,ctx := errgroup.WithContext(context.Background())
s := semaphore.NewWeighted(limit)
for index := range serviceName{name := serviceName[index]
if err := s.Acquire(ctx,1); err != nil{fmt.Printf("Acquire failed and err is %s\n", err.Error())
break
}
eg.Go(func() error {defer s.Release(1)
return callService(name)
})
}
if err := eg.Wait(); err != nil{fmt.Printf("err is %s\n", err.Error())
return
}
fmt.Printf("run success\n")
}
func callService(name string) error {fmt.Println("call",name)
time.Sleep(1 * time.Second)
return nil
}
后果如下:
call order
call cart
call account
call item
call menu
run success
总结
本文咱们次要赏析了 Go
官网扩大库 Semaphore
的实现,他的设计思路简略,仅仅用几十行就实现了完满的封装,值得咱们借鉴学习。不过在理论业务场景中,咱们应用信号量的场景并不多,大多数场景咱们都能够应用 channel
来代替,然而有些场景应用 Semaphore
来实现会更好,比方上篇文章【[[警觉] 请勿滥用 goroutine](https://mp.weixin.qq.com/s/JC…)】咱们应用 channel+sync
来管制 goroutine
数量,这种实现形式并不好,因为理论曾经起来了多个 goroutine
,只不过管制了工作的goroutine
数量,如果改用 semaphore
实现才是真正的管制了 goroutine
数量。
文中代码已上传github
:https://github.com/asong2020/…,欢送star
。
欢送关注公众号:【Golang 梦工厂】
举荐往期文章:
- 学习 channel 设计:从入门到放弃
- 详解内存对齐
- Go 语言中 new 和 make 你应用哪个来分配内存?
- 源码分析 panic 与 recover,看不懂你打我好了!
- 面试官:小松子来聊一聊内存逃逸
- 面试官:两个 nil 比拟后果是什么?
- 并发编程包之 errgroup