原文链接:Go官网设计了一个信号量库
前言
哈喽,大家好,我是asong
。在写上一篇文章请勿滥用goroutine时,发现Go
语言扩大包提供了一个带权重的信号量库Semaphore,应用信号量咱们能够实现一个"工作池"管制肯定数量的goroutine
并发工作。因为对源码抱有好奇的态度,所以在周末认真看了一下这个库并进行了解析,在这里记录一下。
何为信号量
要想晓得一个货色是什么,我都爱去百度百科上搜一搜,输出"信号量",这答案不就来了。
百度百科解释:
信号量(Semaphore),有时被称为信号灯,是[多线程环境下应用的一种设施,是能够用来保障两个或多个要害代码段不被并发调用。在进入一个要害代码段之前,线程必须获取一个信号量;一旦该要害代码段实现了,那么该线程必须开释信号量。其它想进入该要害代码段的线程必须期待直到第一个线程开释信号量。为了实现这个过程,须要创立一个信号量VI,而后将Acquire Semaphore VI以及Release Semaphore VI别离搁置在每个要害代码段的首末端。确认这些信号量VI援用的是初始创立的信号量。
通过这段解释咱们能够得悉什么是信号量,其实信号量就是一种变量或者抽象数据类型,用于管制并发零碎中多个过程对公共资源的拜访,拜访具备原子性。信号量次要分为两类:
- 二值信号量:顾名思义,其值只有两种
0
或者1
,相当于互斥量,当值为1
时资源可用,当值为0
时,资源被锁住,过程阻塞无奈继续执行。 - 计数信号量:信号量是一个任意的整数,起始时,如果计数器的计数值为
0
,那么创立进去的信号量就是不可取得的状态,如果计数器的计数值大于0
,那么创立进去的信号量就是可取得的状态,并且总共获取的次数等于计数器的值。
信号量工作原理
信号量是由操作系统来保护的,信号量只能进行两种操作期待和发送信号,操作总结来说,外围就是PV
操作:
- P原语:P是荷兰语Proberen(测试)的首字母。为阻塞原语,负责把以后过程由运行状态转换为阻塞状态,直到另外一个过程唤醒它。操作为:申请一个闲暇资源(把信号量减1),若胜利,则退出;若失败,则该过程被阻塞;
- V原语:V是荷兰语Verhogen(减少)的首字母。为唤醒原语,负责把一个被阻塞的过程唤醒,它有一个参数表,寄存着期待被唤醒的过程信息。操作为:开释一个被占用的资源(把信号量加1),如果发现有被阻塞的过程,则抉择一个唤醒之。
在信号量进行PV操作时都为原子操作,并且在PV原语执行期间不容许有中断的产生。
PV原语对信号量的操作能够分为三种状况:
- 把信号量视为时某种类型的共享资源的残余个数,实现对一类共享资源的拜访
- 把信号量用作过程间的同步
- 视信号量为一个加锁标记,实现对一个共享变量的拜访
具体在什么场景应用本文就不在持续剖析,接下来咱们重点来看一下Go
语言提供的扩大包Semaphore
,看看它是怎么实现的。
官网扩大包Semaphore
咱们之前在剖析Go
语言源码时总会看到这几个函数:
func runtime_Semacquire(s *uint32)func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
这几个函数就是信号量的PV
操作,不过他们都是给Go
外部应用的,如果想应用信号量,那就能够应用官网的扩大包:Semaphore,这是一个带权重的信号量,接下来咱们就重点剖析一下这个库。
装置办法:go get -u golang.org/x/sync
数据结构
type Weighted struct { size int64 // 设置一个最大权值 cur int64 // 标识以后已被应用的资源数 mu sync.Mutex // 提供临界区爱护 waiters list.List // 阻塞期待的调用者列表}
semaphore
库外围构造就是Weighted
,次要有4
个字段:
size
:这个代表的是最大权值,在创立Weighted
对象指定cur
:相当于一个游标,来记录以后已应用的权值mu
:互斥锁,并发状况下做临界区爱护waiters
:阻塞期待的调用者列表,应用链表数据结构保障先进先出的程序,存储的数据是waiter
对象,waiter
数据结构如下:
type waiter struct { n int64 // 期待调用者权重值 ready chan<- struct{} // close channel就是唤醒}
这里只有两个字段:
n
:这个就是期待调用者的权重值ready
:这就是一个channel
,利用channel
的close
机制实现唤醒
semaphore
还提供了一个创立Weighted
对象的办法,在初始化时须要给定最大权值:
// NewWeighted为并发拜访创立一个新的加权信号量,该信号量具备给定的最大权值。func NewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w}
阻塞获取权值的办法 - Acquire
先间接看代码吧:
func (s *Weighted) Acquire(ctx context.Context, n int64) error { s.mu.Lock() // 加锁爱护临界区 // 有资源可用并且没有期待获取权值的goroutine if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n // 加权 s.mu.Unlock() // 开释锁 return nil } // 要获取的权值n大于最大的权值了 if n > s.size { // 先开释锁,确保其余goroutine调用Acquire的中央不被阻塞 s.mu.Unlock() // 阻塞期待context的返回 <-ctx.Done() return ctx.Err() } // 走到这里就阐明当初没有资源可用了 // 创立一个channel用来做告诉唤醒 ready := make(chan struct{}) // 创立waiter对象 w := waiter{n: n, ready: ready} // waiter按程序入队 elem := s.waiters.PushBack(w) // 开释锁,期待唤醒,别阻塞其余goroutine s.mu.Unlock() // 阻塞期待唤醒 select { // context敞开 case <-ctx.Done(): err := ctx.Err() // 先获取context的错误信息 s.mu.Lock() select { case <-ready: // 在context被敞开后被唤醒了,那么试图修复队列,伪装咱们没有勾销 err = nil default: // 判断是否是第一个元素 isFront := s.waiters.Front() == elem // 移除第一个元素 s.waiters.Remove(elem) // 如果是第一个元素且有资源可用告诉其余waiter if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err // 被唤醒了 case <-ready: return nil }}
正文曾经加到代码中了,总结一下这个办法次要有三个流程:
- 流程一:有资源可用时并且没有期待权值的
goroutine
,走失常加权流程; - 流程二:想要获取的权值
n
大于初始化时设置最大的权值了,这个goroutine
永远不会获取到信号量,所以阻塞期待context
的敞开; 流程三:前两步都没问题的话,就阐明当初零碎没有资源可用了,这时就须要阻塞期待唤醒,在阻塞期待唤醒这里有非凡逻辑;
非凡逻辑一:如果在
context
被敞开后被唤醒了,那么就先疏忽掉这个cancel
,试图修复队列。- 非凡逻辑二:
context
敞开后,则依据是否有可用资源决定告诉前面期待唤醒的调用者,这样做的目标其实是为了防止当不同的context
管制不同的goroutine
时,未敞开的goroutine
不会被阻塞住,仍然执行,来看这样一个例子(因为goroutine
的抢占式调度,所以这个例子也会具备必然性):
func main() { s := semaphore.NewWeighted(3) ctx,cancel := context.WithTimeout(context.Background(), time.Second * 2) defer cancel() for i :=0; i < 3; i++{ if i != 0{ go func(num int) { if err := s.Acquire(ctx,3); err != nil{ fmt.Printf("goroutine: %d, err is %s\n", num, err.Error()) return } time.Sleep(2 * time.Second) fmt.Printf("goroutine: %d run over\n",num) s.Release(3) }(i) }else { go func(num int) { ct,cancel := context.WithTimeout(context.Background(), time.Second * 3) defer cancel() if err := s.Acquire(ct,3); err != nil{ fmt.Printf("goroutine: %d, err is %s\n", num, err.Error()) return } time.Sleep(3 * time.Second) fmt.Printf("goroutine: %d run over\n",num) s.Release(3) }(i) } } time.Sleep(10 * time.Second)}
下面的例子中
goroutine:0
应用ct
对象来做管制,超时工夫为3s
,goroutine:1
和goroutine:2
对象应用ctx
对象来做管制,超时工夫为2s
,这三个goroutine
占用的资源都等于最大资源数,也就是说只能有一个goruotine
运行胜利,另外两个goroutine
都会被阻塞,因为goroutine
是抢占式调度,所以咱们不能确定哪个gouroutine
会第一个被执行,这里咱们假如第一个获取到信号量的是gouroutine:2
,阻塞期待的调用者列表程序是:goroutine:1
->goroutine:0
,因为在goroutine:2
中有一个2s
的延时,所以会触发ctx
的超时,ctx
会下发Done
信号,因为goroutine:2
和goroutine:1
都是被ctx
管制的,所以就会把goroutine:1
从期待者队列中勾销,然而因为goroutine:1
属于队列的第一个队员,并且因为goroutine:2
曾经开释资源,那么就会唤醒goroutine:0
继续执行,画个图示意一下:应用这种形式能够防止
goroutine
永恒失眠。- 非凡逻辑二:
不阻塞获取权值的办法 - TryAcquire
func (s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() // 加锁 // 有资源可用并且没有期待获取资源的goroutine success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n } s.mu.Unlock() return success}
这个办法就简略很多了,不阻塞地获取权重为n
的信号量,胜利时返回true
,失败时返回false
并放弃信号量不变。
开释权重
func (s *Weighted) Release(n int64) { s.mu.Lock() // 开释资源 s.cur -= n // 开释资源大于持有的资源,则会产生panic if s.cur < 0 { s.mu.Unlock() panic("semaphore: released more than held") } // 告诉其余期待的调用者 s.notifyWaiters() s.mu.Unlock()}
这里就是很惯例的操作,次要就是资源开释,同时进行安全性判断,如果开释资源大于持有的资源,则会产生panic。
唤醒waiter
在Acquire
和Release
办法中都调用了notifyWaiters
,咱们来剖析一下这个办法:
func (s *Weighted) notifyWaiters() { for { // 获取期待调用者队列中的队员 next := s.waiters.Front() // 没有要告诉的调用者了 if next == nil { break // No more waiters blocked. } // 断言出waiter信息 w := next.Value.(waiter) if s.size-s.cur < w.n { // 没有足够资源为下一个调用者应用时,持续阻塞该调用者,遵循先进先出的准则, // 防止须要资源数比拟大的waiter被饿死 // // 思考一个场景,应用信号量作为读写锁,现有N个令牌,N个reader和一个writer // 每个reader都能够通过Acquire(1)获取读锁,writer写入能够通过Acquire(N)取得写锁定 // 但不包含所有的reader,如果咱们容许reader在队列中后退,writer将会饿死-总是有一个令牌可供每个reader break } // 获取资源 s.cur += w.n // 从waiter列表中移除 s.waiters.Remove(next) // 应用channel的close机制唤醒waiter close(w.ready) }}
这里只须要留神一个点:唤醒waiter
采纳先进先出的准则,防止须要资源数比拟大的waiter被饿死。
何时应用Semaphore
到这里咱们就把Semaphore
的源代码看了一篇,代码行数不多,封装的也很奇妙,那么咱们该什么时候选在应用它呢?
目前能想到一个场景就是Semaphore
配合上errgroup
实现一个"工作池",应用Semaphore
限度goroutine
的数量,配合上errgroup
做并发管制,示例如下:
const ( limit = 2) func main() { serviceName := []string{ "cart", "order", "account", "item", "menu", } eg,ctx := errgroup.WithContext(context.Background()) s := semaphore.NewWeighted(limit) for index := range serviceName{ name := serviceName[index] if err := s.Acquire(ctx,1); err != nil{ fmt.Printf("Acquire failed and err is %s\n", err.Error()) break } eg.Go(func() error { defer s.Release(1) return callService(name) }) } if err := eg.Wait(); err != nil{ fmt.Printf("err is %s\n", err.Error()) return } fmt.Printf("run success\n")}func callService(name string) error { fmt.Println("call ",name) time.Sleep(1 * time.Second) return nil}
后果如下:
call ordercall cartcall accountcall itemcall menurun success
总结
本文咱们次要赏析了Go
官网扩大库Semaphore
的实现,他的设计思路简略,仅仅用几十行就实现了完满的封装,值得咱们借鉴学习。不过在理论业务场景中,咱们应用信号量的场景并不多,大多数场景咱们都能够应用channel
来代替,然而有些场景应用Semaphore
来实现会更好,比方上篇文章【[[警觉] 请勿滥用goroutine](https://mp.weixin.qq.com/s/JC...)】咱们应用channel+sync
来管制goroutine
数量,这种实现形式并不好,因为理论曾经起来了多个goroutine
,只不过管制了工作的goroutine
数量,如果改用semaphore
实现才是真正的管制了goroutine
数量。
文中代码已上传github
:https://github.com/asong2020/...,欢送star
。
欢送关注公众号:【Golang梦工厂】
举荐往期文章:
- 学习channel设计:从入门到放弃
- 详解内存对齐
- Go语言中new和make你应用哪个来分配内存?
- 源码分析panic与recover,看不懂你打我好了!
- 面试官:小松子来聊一聊内存逃逸
- 面试官:两个nil比拟后果是什么?
- 并发编程包之 errgroup