<font color="#CD853F">初入门径</font>


Java中有Semaphore类,用来限度拜访特定资源的并发线程的数量.绝对于内置锁synchronized和重入锁ReentrantLock的互斥性来说,Semaphore能够容许多个线程同时访问共享资源

参考自 Go 并发编程-信号量的应用办法和其实现原理

信号量的概念由 Dijkstra 提出,广泛应用在不同的操作系统中。零碎会给每一个过程一个信号量,代表每个过程以后的状态,未失去控制权的过程,会在特定的中央被迫停下来,期待能够持续进行的信号到来。

PV操作

个别用信号量来爱护一组资源, 如数据库连接池、一组客户端的连贯等等。每次获取资源时,都会将信号量中的计数器减去对应的数值,在开释资源时从新加回来。当遇到信号量资源不够时,尝试获取的线程就会进入休眠,期待其余线程开释偿还信号量。如果信号量是只有0和1的二进位信号量,那么其 P/V 就和互斥锁的 Lock/Unlock 一样了。


Go 外部应用信号量来管制goroutine的阻塞和唤醒,如互斥锁sync.Mutex构造体的第二个字段,就是一个信号量:

type Mutex struct {    state int32    sema  uint32}


信号量的PV操作在Go外部是通过上面这几个底层函数实现的. 这几个函数都仅供Go语言外部调用,不能在编程时间接应用。

func runtime_Semacquire(s *uint32)func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

更多可参考 sync包-Mutex

不过Go的 扩大并发原语包 中提供了带权重的信号量 semaphore.Weighted


<font color="#CD853F">应用场景</font>


在理论开发中,当须要管制拜访资源的线程数量时,就会须要信号量.

假如有一组要抓取的网站, 资源无限,最多容许同时执行三个抓取工作. 当同时有三个抓取工作在执行时,在执行完一个抓取工作后能力执行下一个排队期待的工作.

这个问题用Channel也能解决,在此应用Go提供的信号量原语来解决该问题:

package mainimport (    "context"    "fmt"    "sync"    "time"    "golang.org/x/sync/semaphore")func doSomething(u string) { // 模仿抓取工作的执行    fmt.Printf("开始抓取%s网站\n", u)    time.Sleep(5 * time.Second)}const (    Limit  = 3 // 同時并行运行的goroutine下限    Weight = 1 // 每个goroutine获取信号量资源的权重)func main() {    urls := []string{        "http://www.apple.com",        "http://www.baidu.net",        "http://www.c.com",        "http://www.d.com",        "http://www.ebay.com",    }    s := semaphore.NewWeighted(Limit)    var w sync.WaitGroup    for _, u := range urls {        //for的速度远远快过起协程,所以到这里并往下执行时,可能是5个字符串元素工夫差不多        w.Add(1)                go func(u string) {            s.Acquire(context.Background(), Weight)            doSomething(u)            s.Release(Weight)            w.Done()        }(u)            }    w.Wait()    fmt.Println("All Done")}

输入为:

开始抓取http://www.ebay.com网站开始抓取http://www.c.com网站开始抓取http://www.apple.com网站(期待5s后)开始抓取http://www.baidu.net网站开始抓取http://www.d.com网站All Done




<font color="#CD853F">源码实现</font>


源码行数不多,加上正文一共136行.


<details>

<summary>点击查看 golang.org/x/sync/semaphore/semaphore.go源码:</summary>

// Copyright 2017 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.// Package semaphore provides a weighted semaphore implementation.package semaphore // import "golang.org/x/sync/semaphore"import (    "container/list"    "context"    "sync")// 如果调用者申请不到信号量的资源就会被退出期待者列表里type waiter struct {    n     int64  // 调用者申请的资源数    // ready通道会在调用者能够被从新唤醒的时候被close掉,从而起到告诉正在阻塞读取ready通道的期待者的作用    ready chan<- struct{} // Closed when semaphore acquired. // 当调用者能够获取到信号量资源时, close这个chan}// NewWeighted creates a new weighted semaphore with the given// maximum combined weight for concurrent access.func NewWeighted(n int64) *Weighted {    w := &Weighted{size: n}    return w}// Weighted provides a way to bound concurrent access to a resource.// The callers can request access with a given weight.type Weighted struct {    size    int64  // 字段用来记录信号量领有的最大资源数    cur     int64  // 标识以后已被应用的资源数    mu      sync.Mutex  // 互斥锁,用来提供对其余字段的临界区爱护    waiters list.List  // 示意申请资源时因为可应用资源不够而陷入阻塞期待的调用者列表}// Acquire acquires the semaphore with a weight of n, blocking until resources// are available or ctx is done. On success, returns nil. On failure, returns// ctx.Err() and leaves the semaphore unchanged.//// If ctx is already done, Acquire may still succeed without blocking.// Acquire办法会监控资源是否可用,且还会检测传递进来的context.Context对象是否发送了超时过期或者勾销的信号func (s *Weighted) Acquire(ctx context.Context, n int64) error {    s.mu.Lock()    // 如果恰好有足够的资源,也没有排队期待获取资源的goroutine,则将cur加上n后间接返回    if s.size-s.cur >= n && s.waiters.Len() == 0 {        s.cur += n        s.mu.Unlock()        return nil    }    // 申请的资源数 > 能提供的最大资源数, 则该工作解决不了,走错误处理逻辑    if n > s.size {        // Don't make other Acquire calls block on one that's doomed to fail.        s.mu.Unlock()        // 依赖ctx的状态返回,否则始终期待        <-ctx.Done()        return ctx.Err()    }    // 现存资源不够, 须要把调用者退出到期待队列中    // 创立了一个ready chan,以便被告诉唤醒    ready := make(chan struct{})    //如果调用者申请不到信号量的资源就会被退出期待者列表里    w := waiter{n: n, ready: ready}    elem := s.waiters.PushBack(w)    s.mu.Unlock()     // 期待    select {    case <-ctx.Done():  // context的Done被敞开        err := ctx.Err()        s.mu.Lock()        select {        case <-ready: // 如果被唤醒了,则疏忽ctx的状态            // Acquired the semaphore after we were canceled.  Rather than trying to            // fix up the queue, just pretend we didn't notice the cancelation.            err = nil        default: // 告诉waiter            isFront := s.waiters.Front() == elem            s.waiters.Remove(elem)            // If we're at the front and there're extra tokens left, notify other waiters.            // 告诉其它的waiters,查看是否有足够的资源            if isFront && s.size > s.cur {                s.notifyWaiters()            }        }        s.mu.Unlock()        return err    case <-ready: // 期待者被唤醒        return nil    }}// TryAcquire acquires the semaphore with a weight of n without blocking.// On success, returns true. On failure, returns false and leaves the semaphore unchanged.func (s *Weighted) TryAcquire(n int64) bool {    s.mu.Lock()    success := s.size-s.cur >= n && s.waiters.Len() == 0    if success {        s.cur += n    }    s.mu.Unlock()    return success}// Release releases the semaphore with a weight of n.//Release办法很简略, 它将以后计数值减去开释的资源数 n, 并调用notifyWaiters办法,尝试唤醒期待队列中的调用者,看是否有足够的资源被获取func (s *Weighted) Release(n int64) {    s.mu.Lock()    s.cur -= n    if s.cur < 0 {        s.mu.Unlock()        panic("semaphore: released more than held")    }    s.notifyWaiters()    s.mu.Unlock()}// notifyWaiters办法 会一一查看队列里期待的调用者,如果现存资源 够期待者申请的数量n,或者是没有期待者了,就返回func (s *Weighted) notifyWaiters() {    for {        next := s.waiters.Front()        if next == nil {            break // No more waiters blocked. // 没有期待者了,间接返回        }        w := next.Value.(waiter)        if s.size-s.cur < w.n {            // 如果现有资源不够队列头调用者申请的资源数,就退出所有期待者会持续期待            // 这里还是依照先入先出的形式解决是为了防止饥饿            // Not enough tokens for the next waiter.  We could keep going (to try to            // find a waiter with a smaller request), but under load that could cause            // starvation for large requests; instead, we leave all remaining waiters            // blocked.            //            // Consider a semaphore used as a read-write lock, with N tokens, N            // readers, and one writer.  Each reader can Acquire(1) to obtain a read            // lock.  The writer can Acquire(N) to obtain a write lock, excluding all            // of the readers.  If we allow the readers to jump ahead in the queue,            // the writer will starve — there is always one token available for every            // reader.            break        }        s.cur += w.n        s.waiters.Remove(next)        close(w.ready)    }}// notifyWaiters办法 是依照先入先出的形式唤醒调用者。当开释 100 个资源时,如果第一个期待者须要 101 个资源,那么,队列中的所有期待者都会持续期待,即便队列前面有的期待者只须要 1 个资源。这样做的目标是防止饥饿,否则的话,资源可能总是被那些申请资源数小的调用者获取,这样一来,申请资源数微小的调用者,就没有机会取得资源了。

</details>




<font color="#CD853F">注意事项</font>


Go语言中应用到信号量的场景,个别会被channel所取代,因为一个buffered chan(带缓冲的channel)也能够代表 n 个资源.semaphore.Weight其实理论应用得不多,但如果用到须要留神:

  • AcquireTryAcquire办法都能够用于获取资源,前者会阻塞的获取信号量,后者会非阻塞的获取信号量,如果获取不到就返回false
  • Release偿还信号量后,会以先进先出的程序唤醒期待队列中的调用者.如果现有资源不够处于期待队列后面的调用者申请的资源数,所有期待者会持续期待。
  • 如果一个goroutine申请较多的资源,因为下面说的偿还后唤醒期待者的策略,它可能会期待比拟长的工夫。




<font color="#CD853F">官网库或出名我的项目中的应用</font>


在docker我的项目中有应用:

<img src="Semaphore-带权重的信号量/4.png" width = 90% height = 50% />


而在大多数我的项目中,都用channel来代替semaphore.Weight作为信号量,

如Go的官网库net:


golang.org/x/net/netutil/listen.go:


如go/pkg/mod/gopkg.in/gographics/imagick.v2@v2.6.0/imagick/magick_wand_env.go:


这里有另一种实现

本文由mdnice多平台公布