1. 简介

本文将介绍 Go 语言中的 Weighted 并发原语,包含 Weighted 的根本应用办法、实现原理、应用注意事项等内容。可能更好地了解和利用 Weighted 来实现资源的治理,从而进步程序的稳定性。

2. 问题引入

在微服务架构中,咱们的服务节点负责接管其余节点的申请,并提供相应的性能和数据。比方账户服务,其余服务须要获取账户信息,都会通过rpc申请向账户服务发动申请。

这些服务节点通常以集群的形式部署在服务器上,用于解决大量的并发申请。每个服务器都有其解决能力的下限,超过该下限可能导致性能降落甚至解体。

在部署服务时,通常会评估服务的并发量,并为其调配适当的资源以解决预期的申请负载。然而,在微服务架构中,存在着上游服务申请上游服务的场景。如果上游服务在某些状况下没有正确思考并发量,或者因为某些异常情况导致大量申请发送给上游服务,那么上游服务可能面临超过其解决能力的问题。这可能导致上游服务的响应工夫减少,甚至无奈失常解决申请,进而影响整个零碎的稳定性和可用性。上面用一个简略的代码来阐明一下:

package mainimport (        "fmt"        "net/http"        "sync")func main() {        // 启动上游服务,用于解决申请        http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {                // 模仿上游服务的解决逻辑                // ...                // 实现申请解决后,从期待组中删除一个期待                wg.Done()        })        // 启动上游服务的 HTTP 服务器        http.ListenAndServe(":8080", nil)}

这里启动一个简略的HTTP服务器,由其来模仿上游服务,来接管上游服务的申请。上面咱们启动一个简略的程序,由其来模仿上游服务发送申请:

func main() {        // 创立一个期待组,用于期待所有申请实现        var wg sync.WaitGroup        // 模仿上游服务发送大量申请给上游服务        go func() {                for i := 0; i < 1000000; i++ {                        wg.Add(1)                        go sendRequest(&wg)                }        }()        // 期待所有申请实现        wg.Wait()}func sendRequest(wg *sync.WaitGroup) {        // 模仿上游服务发送申请给上游服务        resp, err := http.Get("http://localhost:8080/")        if err != nil {                fmt.Println("申请失败:", err)        } else {                fmt.Println("申请胜利:", resp.Status)        }        // 申请实现后,告诉期待组        wg.Done()}

这里,咱们同时启动了1000000个协程同时往HTTP服务器发送申请,如果服务器配置不够高,亦或者是申请量更多的状况下,曾经超过了服务器的解决下限,服务器没有主够的资源去解决这些申请,此时将有可能间接将服务器打挂掉,服务间接不可用。在这种状况下,如果因为上游服务的问题,导致上游服务,甚至整个链路的零碎都间接解体,这个是不合理的,此时须要有一些伎俩爱护上游服务因为异样流量导致整个零碎的解体。

这里对下面的场景进行剖析,能够发现,此时是因为上游服务大量申请的过去,而以后服务并没有足够的资源去解决这些申请,然而并没有对其加以限度,而是持续解决,最终导致了整个零碎的不可用。那么此时就应该进行限流,对并发申请量进行管制,对服务器可能解决的并发数进行正当评估,当并发申请数超过了限度,此时应该间接回绝其拜访,防止整个零碎的不可用。

那问题来了,go语言中,有什么办法可能实现资源的治理,如果没有足够的资源,此时将间接返回,不对申请进行解决呢?其实go语言中有Weighted类型,在这种场景还挺适合的。上面咱们将对其进行介绍。

3. 根本应用

3.1 根本介绍

Weighted 是 Go 语言中 golang.org/x/sync包中的一种类型,用于限度并发拜访某个资源的数量。它提供了一种机制,容许调用者以不同的权重申请拜访资源,并在资源可用时进行授予。

Weighted的定义如下,提供了Acquire,TryAcquire,Release三个办法:

type Weighted struct {   size    int64   cur     int64   mu      sync.Mutex   waiters list.List}func (s *Weighted) Acquire(ctx context.Context, n int64) error{}func (s *Weighted) TryAcquire(n int64) bool{}func (s *Weighted) Release(n int64) {} 
  • Acquire: 以权重 n 申请获取资源,阻塞直到资源可用或上下文 ctx 完结。
  • TryAcquire: 尝试以权重 n 获取信号量,如果胜利则返回 true,否则返回 false,并放弃信号量不变。
  • Release:开释具备权重 n 的信号量。

3.2 权重阐明

有时候,不同申请对资源的耗费是不同的。通过设置权重,你能够更好地管制不同申请对资源的应用状况。例如,某些申请可能须要更多的计算资源或更长的解决工夫,你能够设置较高的权重来确保它们可能获取到足够的资源。

其次就是权重大只是代表着申请须要应用到的资源多,对于优先级并不会有作用。在Weighted 中,资源的许可是以先进先出(FIFO)的程序调配的,而不是依据权重来决定获取的优先级。当有多个申请同时期待获取资源时,它们会依照先后顺序顺次获取资源的许可。

假如先请求权重为 1 的资源,而后再请求权重为 2 的资源。如果以后可用的资源许可足够满足两个申请的总权重,那么先申请的权重为 1 的资源会先获取到许可,而后是后续申请的权重为 2 的资源。

w.Acquire(context.Background(), 1) // 权重为 1 的申请先获取到资源许可w.Acquire(context.Background(), 2) // 权重为 2 的申请在权重为 1 的申请之后获取到资源许可

3.3 根本应用

当应用Weighted来管制资源的并发拜访时,通常须要以下几个步骤:

  • 创立Weighted实例,定义好最大资源数
  • 当须要资源时,调用Acquire办法占据资源
  • 当解决实现之后,调用Release办法开释资源

上面是一个简略的代码的示例,展现了如何应用Weighted实现资源管制:

func main() {   // 1. 创立一个信号量实例,设置最大并发数   sem := semaphore.NewWeighted(10)   // 具体解决申请的函数   handleRequest := func(id int) {      // 2. 调用Acquire尝试获取资源      err := sem.Acquire(context.Background(), 1)      if err != nil {         fmt.Printf("Goroutine %d failed to acquire resource\n", id)      }      // 3. 胜利获取资源,应用defer,在工作执行完之后,主动开释资源      defer sem.Release(1)      // 执行业务逻辑      return   }   // 模仿并发申请   for i := 0; i < 20; i++ {      go handleRequest(i)   }   time.Sleep(20 * time.Second)}

首先,调用NewWeighted办法创立一个信号量实例,设置最大并发数为10。而后在每次申请解决前调用Acquire办法尝试获取资源,胜利获取资源后,应用defer关键字,在工作执行完后主动开释资源,调用Release办法开释一个资源。

保障最多同时有10个协程获取资源。如果有更多的协程尝试获取资源,它们会期待其余协程开释资源后再进行获取。

4. 实现原理

4.1 设计初衷

Weighted类型的设计初衷是为了在并发环境中实现对资源的管制和限度。它提供了一种简略而无效的机制,容许在同一时间内只有肯定数量的并发操作能够拜访或应用特定的资源。

4.2 基本原理

Weighted类型的根本实现原理是基于计数信号量的概念。计数信号量是一种用于管制并发拜访的同步原语,它保护一个可用资源的计数器。在Weighted中,该计数器示意可用的资源数量。

当一个工作须要获取资源时,它会调用Acquire办法。该办法首先会查看以后可用资源的数量,如果大于零,则示意有可用资源,并将计数器减一,工作获取到资源,并继续执行。如果以后可用资源的数量为零,则工作会被阻塞,直到有其余工作开释资源。

当一个工作实现对资源的应用后,它会调用Release办法来开释资源。该办法会将计数器加一,示意资源曾经可用,其余被阻塞的工作能够持续获取资源并执行。

通过这种形式,Weighted实现了对资源的限度和管制。它确保在同一时间内只有肯定数量的并发工作能够拜访资源,超过限度的工作会被阻塞,直到有其余工作开释资源。这样能够无效地防止资源适度应用和竞争,保证系统的稳定性和性能。

4.3 代码实现

4.3.1 构造体定义

Weighted的构造体定义如下:

type Weighted struct {   size    int64   cur     int64   mu      sync.Mutex   waiters list.List}
  • size:示意资源的总数量,即能够同时获取的最大资源数量。
  • cur:示意以后曾经被获取的资源数量。
  • mu:用于爱护Weighted类型的互斥锁,确保并发安全性。
  • waiters:应用双向链表来存储期待获取资源的工作。
4.3.2 Acquire办法

Acquire办法将获取指定数量的资源。如果以后可用资源数量有余,调用此办法的工作将被阻塞,并退出到期待队列中。

func (s *Weighted) Acquire(ctx context.Context, n int64) error {   // 1. 应用互斥锁s.mu对Weighted类型进行加锁,确保并发安全性。   s.mu.Lock()   // size - cur 代表残余可用资源数,如果大于申请资源数n, 此时代表残余可用资源 大于 须要的资源数   // 其次,Weighted资源分配的程序是FIFO,如果期待队列不为空,以后申请就须要主动放到队列最初面   if s.size-s.cur >= n && s.waiters.Len() == 0 {      s.cur += n      s.mu.Unlock()      return nil   }    // s.size 代表最大资源数,如果须要的资源数 大于 最大资源数,此时间接返回谬误   if n > s.size {      // Don't make other Acquire calls block on one that's doomed to fail.      s.mu.Unlock()      <-ctx.Done()      return ctx.Err()   }   // 这里代表着以后临时获取不到资源,此时将创立一个waiter对象放到期待队列最初   ready := make(chan struct{})   // waiter对象中蕴含须要获取的资源数量n和告诉通道ready。   w := waiter{n: n, ready: ready}   // 将waiter对象放到队列最初   elem := s.waiters.PushBack(w)   // 开释锁,让其余申请进来   s.mu.Unlock()   select {   // 如果ctx.Done()通道被敞开,示意上下文已勾销,工作须要返回谬误。   case <-ctx.Done():      err := ctx.Err()      // 新获取锁,查看是否曾经胜利获取资源。如果胜利获取资源,将谬误置为nil,示意获取胜利;      s.mu.Lock()      select {      // 通过判断ready channel是否接管到信号,从而来判断是否胜利获取资源      case <-ready:         err = nil      default:         // 判断是否是期待队列中第一个元素         isFront := s.waiters.Front() == elem         // 将该申请从期待队列中移除         s.waiters.Remove(elem)         // 如果是第一个期待对象,同时还有残余资源,唤醒前面的waiter。说不定前面的waiter刚好符合条件         if isFront && s.size > s.cur {            s.notifyWaiters()         }      }      s.mu.Unlock()      return err   // ready通道接管到数据,代表此时曾经胜利占据到资源了   case <-ready:      return nil   }}

Weighted对象用来管制可用资源的数量。它有两个重要的字段,cur和size,别离示意以后可用的资源数量和总共可用的资源数量。

当一个申请通过Acquire办法申请资源时,首先会查看残余资源数量是否足够,并且期待队列中没有其余申请在期待资源。如果满足这两个条件,申请就能够胜利获取到资源。

如果残余资源数量不足以满足申请,那么一个waiter的对象会被创立并放入期待队列中。waiter对象蕴含了申请须要的资源数量n和一个用于告诉的通道ready。当其余申请调用Release办法开释资源时,它们会查看期待队列中的waiter对象是否满足资源需要,如果满足,就会将资源分配给该waiter对象,并通过ready通道来告诉它能够执行业务逻辑了。

即便残余资源数量大于申请所需数量,如果期待队列中存在期待的申请,新的申请也会被放入期待队列中,而不论资源是否足够。这可能导致一些申请长时间期待资源,导致资源的节约和提早。因而,在应用Weighted进行资源管制时,须要审慎评估资源配额,并防止资源饥饿的状况产生,免得影响零碎的性能和响应能力。

4.3.3 Release办法

Release办法将开释指定数量的资源。当资源被开释时,会查看期待队列中的工作。它从队头开始一一查看期待的元素,并尝试为它们分配资源,直到最初一个不满足资源条件的元素为止。

func (s *Weighted) Release(n int64) {   // 1. 应用互斥锁s.mu对Weighted类型进行加锁,确保并发安全性。   s.mu.Lock()   // 2. 开释资源   s.cur -= n   // 3. 异常情况解决   if s.cur < 0 {      s.mu.Unlock()      panic("semaphore: released more than held")   }   // 4. 唤醒期待工作   s.notifyWaiters()   s.mu.Unlock()}

能够看到,Release办法实现绝对比较简单,开释资源后,便间接调用notifyWaiters办法唤醒处于期待状态的工作。上面来看看notifyWaiters办法的具体实现:

func (s *Weighted) notifyWaiters() {   for {      // 获取队头元素      next := s.waiters.Front()      // 曾经没有处于期待状态的协程,此时间接返回      if next == nil {         break // No more waiters blocked.      }      w := next.Value.(waiter)      // 如果资源不满足要求 以后waiter的要求,此时间接返回      if s.size-s.cur < w.n {         break      }      // 否则占据waiter须要的资源数      s.cur += w.n      // 移除期待元素      s.waiters.Remove(next)      // 唤醒处于期待状态的工作,Acquire办法会 <- ready 来期待信号的到来      close(w.ready)   }}

notifyWaiters办法会从队头开始获取元素,判断以后资源的残余数,是否满足waiter的要求,如果满足的话,此时先占据该waiter须要的资源,之后再将其从期待队列中移除,最初调用close办法,唤醒处于期待状态的工作。 之后,再持续队列中取出元素,判断是否满足条件,循环反复,直到不满足waiter的条件为止。

4.3.4 TryAcquire办法

TryAcquire办法将尝试获取指定数量的资源,但不会阻塞。如果可用资源有余,它会立刻返回一个谬误,而不是阻塞期待。实现比较简单,只是简略查看以后资源数是否满足要求而已,具体如下:

func (s *Weighted) TryAcquire(n int64) bool {   s.mu.Lock()   success := s.size-s.cur >= n && s.waiters.Len() == 0   if success {      s.cur += n   }   s.mu.Unlock()   return success}

5. 注意事项

5.1 及时开释资源

当应用Weighted来治理资源时,确保在应用完资源后,及时调用Release办法开释资源。如果不这样做,将会导致资源透露,最终导致所有的申请都将无奈被解决。上面展现一个简略的代码阐明:

package mainimport (        "fmt"        "sync"        "time"        "golang.org/x/sync/semaphore")func main() {        sem := semaphore.NewWeighted(5) // 创立一个最大并发数为5的Weighted实例        // 模仿应用资源的工作        task := func(id int) {                //1. 胜利获取资源                if err := sem.Acquire(context.Background(), 1); err != nil {                        fmt.Printf("Task %d failed to acquire resource: %s\n", id, err)                        return                }                // 2. 工作解决实现之后,资源没有被开释                // defer sem.Release(1) // 应用defer确保在工作实现后开释资源                       }        // 启动多个工作并发执行        var wg sync.WaitGroup        for i := 0; i < 10; i++ {                wg.Add(1)                go func(id int) {                        defer wg.Done()                        task(id)                }(i)        }        wg.Wait() // 期待所有工作实现}

在下面的代码中,咱们应用Weighted来管制最大并发数为5。咱们在工作中没有调用sem.Release(1)开释资源,这些资源将始终被占用,前面启动的5个工作将永远无奈获取到资源,此时将永远不会继续执行上来。因而,务必在应用完资源后及时调用Release办法开释资源,以确保资源的正确回收和开释,保证系统的稳定性和性能。

而且这里最好应用defer语句来实现资源的开释,防止Release函数在某些异样场景下无奈被执行到。

5.2 正当设置并发数

Weighted只是提供了一种治理资源的伎俩,具体的并发数还须要开发人员自行依据零碎的理论需要和资源限度,正当设置Weighted实例的最大并发数。过大的并发数可能导致资源适度竞争,而过小的并发数可能限度了零碎的吞吐量。

具体操作能够到线上预公布环境,一直调整察看,获取到一个最合适的并发数。

5.3 思考Weighted是否实用于以后场景

Weighted 类型能够用于限度并发拜访资源的数量,但它也存在一些潜在的毛病,须要依据具体的利用场景和需要权衡利弊。

首先是内存开销,Weighted 类型应用一个 sync.Mutex 以及一个 list.List 来治理期待队列,这可能会占用肯定的内存开销。对于大规模的并发解决,特地是在限度极高的状况下,可能会影响零碎的内存耗费。

其次是Weighted 类型一旦初始化,最大并发数是固定的,无奈在运行时动静调整。如果你的应用程序须要依据负载状况动静调整并发限度,可能须要应用其余机制或实现。

而且Weighted是严格依照FIFO申请程序来分配资源的,当某些申请的权重过大时,可能会导致其余申请饥饿,即长时间期待资源。

最初,则是因为 Weighted 类型应用了互斥锁来爱护共享状态,因而在高并发状况下,抢夺锁可能成为性能瓶颈,影响零碎的吞吐量。

因而,在应用 Weighted 类型时,须要依据具体的利用场景和需要权衡利弊,从而来决定是否应用Weighted来实现资源的管理控制。

6. 总结

本文介绍了一种解决零碎中资源管理问题的解决方案Weighted。本文从问题引出,具体介绍了Weighted的特点和应用办法。通过理解Weighted的设计初衷和实现原理,读者能够更好地了解其工作原理。

同时,文章提供了应用Weighted时须要留神的事项,如及时开释资源、正当设置并发数等,从而帮忙读者防止潜在的问题,以及可能在比拟适合的场景下应用到Weighted类型实现资源管理。基于此,咱们实现了对Weighted的介绍,心愿对你有所帮忙。你的点赞和珍藏将是我最大的能源,比心~