关于后端:Semaphore带权重的信号量

7次阅读

共计 6186 个字符,预计需要花费 16 分钟才能阅读完成。

<font color=”#CD853F”> 初入门径 </font>

Java 中有 Semaphore 类, 用来限度拜访特定资源的并发线程的数量. 绝对于内置锁 synchronized 和重入锁 ReentrantLock 的互斥性来说,Semaphore能够容许多个线程同时访问共享资源

参考自 Go 并发编程 - 信号量的应用办法和其实现原理

信号量的概念由 Dijkstra 提出, 广泛应用在不同的操作系统中。零碎会给每一个过程一个信号量, 代表每个过程以后的状态, 未失去控制权的过程, 会在特定的中央被迫停下来, 期待能够持续进行的信号到来。

PV 操作

个别用信号量来爱护一组资源, 如数据库连接池、一组客户端的连贯等等。每次获取资源时, 都会将信号量中的计数器减去对应的数值, 在开释资源时从新加回来。当遇到信号量资源不够时, 尝试获取的线程就会进入休眠, 期待其余线程开释偿还信号量。如果信号量是只有 0 和 1 的二进位信号量, 那么其 P/V 就和互斥锁的 Lock/Unlock 一样了。

Go 外部应用信号量来管制 goroutine 的阻塞和唤醒, 如互斥锁 sync.Mutex 构造体的第二个字段, 就是一个信号量:

type Mutex struct {
    state int32
    sema  uint32
}

信号量的 PV 操作在 Go 外部是通过上面这几个底层函数实现的. 这几个函数都仅供 Go 语言外部调用, 不能在编程时间接应用。

func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

更多可参考 sync 包 -Mutex

不过 Go 的 扩大并发原语包 中提供了带权重的信号量 semaphore.Weighted

<font color=”#CD853F”> 应用场景 </font>

在理论开发中, 当须要 管制拜访资源的线程数量 时, 就会须要信号量.

假如有一组要抓取的网站, 资源无限, 最多容许同时执行三个抓取工作. 当同时有三个抓取工作在执行时, 在执行完一个抓取工作后能力执行下一个排队期待的工作.

这个问题用 Channel 也能解决, 在此应用 Go 提供的信号量原语来解决该问题:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    "golang.org/x/sync/semaphore"
)

func doSomething(u string) { // 模仿抓取工作的执行
    fmt.Printf("开始抓取 %s 网站 \n", u)
    time.Sleep(5 * time.Second)
}

const (
    Limit  = 3 // 同時并行运行的 goroutine 下限
    Weight = 1 // 每个 goroutine 获取信号量资源的权重
)

func main() {urls := []string{
        "http://www.apple.com",
        "http://www.baidu.net",
        "http://www.c.com",
        "http://www.d.com",
        "http://www.ebay.com",
    }
    s := semaphore.NewWeighted(Limit)
    var w sync.WaitGroup

    for _, u := range urls {

        //for 的速度远远快过起协程, 所以到这里并往下执行时, 可能是 5 个字符串元素工夫差不多
        w.Add(1)
        
        go func(u string) {s.Acquire(context.Background(), Weight)
            doSomething(u)
            s.Release(Weight)
            w.Done()}(u)
        
    }
    w.Wait()

    fmt.Println("All Done")
}

输入为:

开始抓取 http://www.ebay.com 网站
开始抓取 http://www.c.com 网站
开始抓取 http://www.apple.com 网站

(期待 5s 后)
开始抓取 http://www.baidu.net 网站
开始抓取 http://www.d.com 网站
All Done


<font color=”#CD853F”> 源码实现 </font>

源码行数不多, 加上正文一共 136 行.

<details>

<summary> 点击查看 golang.org/x/sync/semaphore/semaphore.go 源码:</summary>

// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package semaphore provides a weighted semaphore implementation.
package semaphore // import "golang.org/x/sync/semaphore"

import (
    "container/list"
    "context"
    "sync"
)


// 如果调用者申请不到信号量的资源就会被退出期待者列表里
type waiter struct {
    n     int64  // 调用者申请的资源数

    // ready 通道会在调用者能够被从新唤醒的时候被 close 掉, 从而起到告诉正在阻塞读取 ready 通道的期待者的作用
    ready chan<- struct{} // Closed when semaphore acquired. // 当调用者能够获取到信号量资源时, close 这个 chan}

// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {w := &Weighted{size: n}
    return w
}

// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
    size    int64  // 字段用来记录信号量领有的最大资源数
    cur     int64  // 标识以后已被应用的资源数
    mu      sync.Mutex  // 互斥锁, 用来提供对其余字段的临界区爱护
    waiters list.List  // 示意申请资源时因为可应用资源不够而陷入阻塞期待的调用者列表
}

// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.

// Acquire 办法会监控资源是否可用, 且还会检测传递进来的 context.Context 对象是否发送了超时过期或者勾销的信号
func (s *Weighted) Acquire(ctx context.Context, n int64) error {s.mu.Lock()

    // 如果恰好有足够的资源, 也没有排队期待获取资源的 goroutine, 则将 cur 加上 n 后间接返回
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
    }

    // 申请的资源数 > 能提供的最大资源数, 则该工作解决不了, 走错误处理逻辑
    if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        // 依赖 ctx 的状态返回, 否则始终期待
        <-ctx.Done()
        return ctx.Err()}

    // 现存资源不够, 须要把调用者退出到期待队列中
    // 创立了一个 ready chan, 以便被告诉唤醒
    ready := make(chan struct{})
    // 如果调用者申请不到信号量的资源就会被退出期待者列表里
    w := waiter{n: n, ready: ready}
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()


     // 期待
    select {case <-ctx.Done():  // context 的 Done 被敞开
        err := ctx.Err()
        s.mu.Lock()
        select {
        case <-ready: // 如果被唤醒了, 则疏忽 ctx 的状态
            // Acquired the semaphore after we were canceled.  Rather than trying to
            // fix up the queue, just pretend we didn't notice the cancelation.
            err = nil
        default: // 告诉 waiter
            isFront := s.waiters.Front() == elem
            s.waiters.Remove(elem)
            // If we're at the front and there're extra tokens left, notify other waiters.
            // 告诉其它的 waiters, 查看是否有足够的资源
            if isFront && s.size > s.cur {s.notifyWaiters()
            }
        }
        s.mu.Unlock()
        return err

    case <-ready: // 期待者被唤醒
        return nil
    }
}

// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {s.mu.Lock()
    success := s.size-s.cur >= n && s.waiters.Len() == 0
    if success {s.cur += n}
    s.mu.Unlock()
    return success
}


// Release releases the semaphore with a weight of n.
//Release 办法很简略, 它将以后计数值减去开释的资源数 n, 并调用 notifyWaiters 办法, 尝试唤醒期待队列中的调用者, 看是否有足够的资源被获取
func (s *Weighted) Release(n int64) {s.mu.Lock()
    s.cur -= n
    if s.cur < 0 {s.mu.Unlock()
        panic("semaphore: released more than held")
    }
    s.notifyWaiters()
    s.mu.Unlock()}


// notifyWaiters 办法 会一一查看队列里期待的调用者, 如果现存资源 够期待者申请的数量 n, 或者是没有期待者了, 就返回
func (s *Weighted) notifyWaiters() {
    for {next := s.waiters.Front()
        if next == nil {break // No more waiters blocked. // 没有期待者了, 间接返回}

        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            // 如果现有资源不够队列头调用者申请的资源数, 就退出所有期待者会持续期待
            // 这里还是依照先入先出的形式解决是为了防止饥饿

            // Not enough tokens for the next waiter.  We could keep going (to try to
            // find a waiter with a smaller request), but under load that could cause
            // starvation for large requests; instead, we leave all remaining waiters
            // blocked.
            //
            // Consider a semaphore used as a read-write lock, with N tokens, N
            // readers, and one writer.  Each reader can Acquire(1) to obtain a read
            // lock.  The writer can Acquire(N) to obtain a write lock, excluding all
            // of the readers.  If we allow the readers to jump ahead in the queue,
            // the writer will starve — there is always one token available for every
            // reader.
            break
        }

        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready)
    }
}
// notifyWaiters 办法 是依照先入先出的形式唤醒调用者。当开释 100 个资源时,如果第一个期待者须要 101 个资源,那么,队列中的所有期待者都会持续期待,即便队列前面有的期待者只须要 1 个资源。这样做的目标是防止饥饿,否则的话,资源可能总是被那些申请资源数小的调用者获取,这样一来,申请资源数微小的调用者,就没有机会取得资源了。

</details>


<font color=”#CD853F”> 注意事项 </font>

Go 语言中应用到信号量的场景, 个别会被 channel 所取代,因为一个 buffered chan(带缓冲的 channel) 也能够代表 n 个资源.semaphore.Weight其实理论应用得不多, 但如果用到须要留神:

  • AcquireTryAcquire办法都能够用于获取资源, 前者会 阻塞 的获取信号量, 后者会 非阻塞 的获取信号量, 如果获取不到就返回 false
  • Release偿还信号量后, 会以 先进先出 的程序唤醒期待队列中的调用者. 如果现有资源不够处于期待队列后面的调用者申请的资源数, 所有期待者会持续期待。
  • 如果一个 goroutine 申请较多的资源, 因为下面说的偿还后唤醒期待者的策略, 它可能会期待比拟长的工夫。


<font color=”#CD853F”> 官网库或出名我的项目中的应用 </font>

在 docker 我的项目中有应用:

<img src=”Semaphore- 带权重的信号量 /4.png” width = 90% height = 50% />

而在大多数我的项目中, 都用 channel 来代替 semaphore.Weight 作为信号量,

如 Go 的官网库net:

golang.org/x/net/netutil/listen.go:

如 go/pkg/mod/gopkg.in/gographics/imagick.v2@v2.6.0/imagick/magick_wand_env.go:

这里有另一种实现

本文由 mdnice 多平台公布

正文完
 0