乐趣区

关于godailylib:Go-每日一库之-tunny

简介

之前写过一篇文章介绍了 ants 这个 goroutine 池实现。过后在网上查看相干材料的时候,发现了另外一个实现tunny。趁着工夫相近,正好钻研一番。也好比拟一下这两个库。那就让咱们开始吧。

疾速开始

本文代码应用 Go Modules。

创立目录并初始化:

$ mkdir tunny && cd tunny
$ go mod init github.com/darjun/go-daily-lib/tunny

应用 go get 从 GitHub 获取 tunny 库:

$ go get -u github.com/Jeffail/tunny

为了不便地和 ants 做一个比照,咱们将 ants 中的示例从新用 tunny 实现一遍:还是那个分段求和的例子:

const (
  DataSize    = 10000
  DataPerTask = 100
)

func main() {numCPUs := runtime.NumCPU()
  p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
    var sum int
    for _, n := range payload.([]int) {sum += n}
    return sum
  })
  defer p.Close()
  // ...
}

应用也非常简单,首先创立一个Pool,这里应用tunny.NewFunc()

第一个参数为池子大小,即同时有多少个 worker(也即 goroutine)在工作,这里设置成逻辑 CPU 个数,对于 CPU 密集型工作,这个值设置太大无意义,反而有可能导致 goroutine 切换频繁而升高性能。

第二个参数传入一个 func(interface{})interface{} 的参数作为工作处理函数。后续传入数据就会调用这个函数解决。

池子应用完须要敞开,这里应用 defer p.Close() 在程序退出前敞开。

而后,生成测试数据,还是 10000 个随机数,分成 100 组:

nums := make([]int, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}

解决每组数据:

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partialSums := make([]int, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {go func(i int) {partialSums[i] = p.Process(nums[i*DataPerTask : (i+1)*DataPerTask]).(int)
    wg.Done()}(i)
}

wg.Wait()

调用 p.Process() 办法,传入工作数据,池子中会抉择闲暇的 goroutine 来解决这个数据。因为咱们下面设置了处理函数,goroutine 会间接调用该函数,将这个切片作为参数传入。

tunnyants 不同的是,tunny的工作解决是同步的,即调用 p.Process() 办法之后,以后 goroutine 会挂起,直到工作解决实现之后才会被唤醒。因为是同步的,所以 p.Process() 办法能够间接返回处理结果。这也是下面程序在散发工作的时候,启动多个 goroutine 的起因。如果不是每个工作都启动一个 goroutine,p.Process()办法会始终期待工作实现,那么前面的工作要等到后面的工作全副执行完之后能力执行。这样就施展不了并发的劣势了。

这里留神一个小细节,我将 for 循环变量作为参数传给 goroutine 函数了。如果不这样做,所有 goroutine 都共用外层的 i,而且 goroutine 开始运行时,for 循环大概率曾经完结了,这时 i = DataSize/DataPerTask,索引nums[i*DataPerTask : (i+1)*DataPerTask] 会越界触发 panic。

最初统计数据,验证后果:

var sum int
for _, s := range partialSums {sum += s}

var expect int
for _, num := range nums {expect += num}
fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)

运行:

$ go run main.go
finish all tasks, result is 5010172 expect:5010172

超时

默认状况下,p.Process()会始终阻塞直到工作实现,即便以后没有闲暇 worker 也会阻塞。咱们也能够应用带超时的 Process() 办法:ProcessTimed()。传入一个超时工夫距离,如果超过这个工夫还没有闲暇 worker,或者工作还没有解决实现,就会终止,并返回一个谬误。

超时有 2 种状况:

  • 等不到闲暇的 worker:所有 worker 始终解决忙碌状态,正在解决的工作比拟耗时,无奈短时间内实现;
  • 工作自身比拟耗时。

上面咱们编写一个计算斐波那契的函数,应用递归这种低效的实现办法:

func fib(n int) int {
  if n <= 1 {return 1}

  return fib(n-1) + fib(n-2)
}

咱们先看工作比拟耗时的状况,创立 Pool 对象。为了察看更显著,在处理函数中增加了 time.Sleep() 语句:

p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {n := payload.(int)
  result := fib(n)
  time.Sleep(5 * time.Second)
  return result
})
defer p.Close()

生成与池容量相等的工作数,调用 p.ProcessTimed() 办法,设置超时为 1s:

var wg sync.WaitGroup
wg.Add(numCPUs)
for i := 0; i < numCPUs; i++ {go func(i int) {n := rand.Intn(30)
    result, err := p.ProcessTimed(n, time.Second)
    nowStr := time.Now().Format("2006-01-02 15:04:05")
    if err != nil {fmt.Printf("[%s]task(%d) failed:%v\n", nowStr, i, err)
    } else {fmt.Printf("[%s]fib(%d) = %d\n", nowStr, n, result)
    }
    wg.Done()}(i)
}

wg.Wait()

因为处理函数中 sleep 5s,所以工作在执行过程中就超时了。运行:

$ go run main.go 
[2021-06-10 16:36:26]task(7) failed:job request timed out
[2021-06-10 16:36:26]task(4) failed:job request timed out
[2021-06-10 16:36:26]task(1) failed:job request timed out
[2021-06-10 16:36:26]task(6) failed:job request timed out
[2021-06-10 16:36:26]task(5) failed:job request timed out
[2021-06-10 16:36:26]task(0) failed:job request timed out
[2021-06-10 16:36:26]task(3) failed:job request timed out
[2021-06-10 16:36:26]task(2) failed:job request timed out

都在同一秒中超时。

咱们将工作数量翻倍,再将处理函数中的 sleep 改为 990ms,保障前一批工作能顺利完成,后续工作或者因为等不到闲暇 worker,或者因为执行工夫过长而超时返回。运行:

$ go run main.go
[2021-06-10 16:42:46]fib(11) = 144
[2021-06-10 16:42:46]fib(25) = 121393
[2021-06-10 16:42:46]fib(27) = 317811
[2021-06-10 16:42:46]fib(1) = 1
[2021-06-10 16:42:46]fib(18) = 4181
[2021-06-10 16:42:46]fib(29) = 832040
[2021-06-10 16:42:46]fib(17) = 2584
[2021-06-10 16:42:46]fib(20) = 10946
[2021-06-10 16:42:46]task(5) failed:job request timed out
[2021-06-10 16:42:46]task(14) failed:job request timed out
[2021-06-10 16:42:46]task(8) failed:job request timed out
[2021-06-10 16:42:46]task(7) failed:job request timed out
[2021-06-10 16:42:46]task(13) failed:job request timed out
[2021-06-10 16:42:46]task(12) failed:job request timed out
[2021-06-10 16:42:46]task(11) failed:job request timed out
[2021-06-10 16:42:46]task(6) failed:job request timed out

context

context 是协调 goroutine 的工具。tunny反对带 context.Context 参数的办法:ProcessCtx()。以后 context 状态变为 Done 之后,工作也会进行执行。context 会因为超时、勾销等起因切换为 Done 状态。还是拿下面的例子:

go func(i int) {n := rand.Intn(30)
  ctx, cancel := context.WithCancel(context.Background())
  if i%2 == 0 {go func() {time.Sleep(500 * time.Millisecond)
      cancel()}()}

  result, err := p.ProcessCtx(ctx, n)
  if err != nil {fmt.Printf("task(%d) failed:%v\n", i, err)
  } else {fmt.Printf("fib(%d) = %d\n", n, result)
  }
  wg.Done()}(i)

其余代码都一样,咱们调用 p.ProcessCtx() 办法来执行工作。参数是一个可勾销的 Context。对于序号为偶数的工作,咱们启动一个 goroutine 在 500ms 之后cancel() 掉这个Context。代码运行后果如下:

$ go run main.go
task(4) failed:context canceled
task(6) failed:context canceled
task(0) failed:context canceled
task(2) failed:context canceled
fib(27) = 317811
fib(25) = 121393
fib(1) = 1
fib(18) = 4181

咱们看到偶数序号的工作都被勾销了。

源码

tunny的源码更少,除去测试代码和正文,连 500 行都不到。那么就一起来看一下吧。Pool构造如下:

// src/github.com/Jeffail/tunny.go
type Pool struct {
  queuedJobs int64

  ctor    func() Worker
  workers []*workerWrapper
  reqChan chan workRequest

  workerMut sync.Mutex
}

Pool构造中有一个 ctor 字段,这是一个函数对象,用于返回一个实现 Worker 接口的值:

type Worker interface {Process(interface{}) interface{}
  BlockUntilReady()
  Interrupt()
  Terminate()}

这个接口不同的办法在工作执行的不同阶段调用。最重要的当属 Process(interface{}) interface{} 办法了。这个就是执行工作的函数。tunny提供 New() 办法创立 Pool 对象,这个办法须要咱们本人结构 ctor 函数对象,应用多有不便。tunny提供了另外两个默认实现 closureWorkercallbackWorker

type closureWorker struct {processor func(interface{}) interface{}}

func (w *closureWorker) Process(payload interface{}) interface{} {return w.processor(payload)
}

func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt()       {}
func (w *closureWorker) Terminate()       {}

type callbackWorker struct{}

func (w *callbackWorker) Process(payload interface{}) interface{} {f, ok := payload.(func())
  if !ok {return ErrJobNotFunc}
  f()
  return nil
}

func (w *callbackWorker) BlockUntilReady() {}
func (w *callbackWorker) Interrupt()       {}
func (w *callbackWorker) Terminate()       {}

tunny.NewFunc()办法应用的就是closureWorker

func NewFunc(n int, f func(interface{}) interface{}) *Pool {return New(n, func() Worker {
    return &closureWorker{processor: f,}
  })
}

创立的 closureWorker 间接将参数 f 作为工作处理函数。

tunny.NewCallback()办法应用callbackWorker

func NewCallback(n int) *Pool {return New(n, func() Worker {return &callbackWorker{}
  })
}

callbackWorker构造中没有处理函数,只能给它发送无参无返回值的函数对象作为工作,它的 Process() 办法就是执行这个函数。

创立 Pool 对象后,都是调用它的 SetSize() 办法,设置 worker 数量。在这个办法中会启动相应数量的 goroutine:

func (p *Pool) SetSize(n int) {p.workerMut.Lock()
  defer p.workerMut.Unlock()

  lWorkers := len(p.workers)
  if lWorkers == n {return}

  for i := lWorkers; i < n; i++ {p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
  }

  // 进行过多的 worker
  for i := n; i < lWorkers; i++ {p.workers[i].stop()}

  // 期待 worker 进行
  for i := n; i < lWorkers; i++ {p.workers[i].join()
    // -----------------
  }
  p.workers = p.workers[:n]
}

SetSize()其实在扩容和缩容的时候也会调用。对于扩容,它会创立相应数量的 worker。对于缩容,它会将多余的 worker 停掉。与 ants 不同,tunny的扩容缩容都是即时失效的。

代码中,我用 ----------------- 标出来的中央我感觉有点问题。对于缩容,因为底层的数组没有变动,workers切片长度放大之后,数组中前面的元素实际上就拜访不到了,然而数组还持有它的援用,算是一种内存透露吧。所以稳当起见最好加上p.workers[i] = nil

这里创立的 worker 实际上是包装了一层的 workerWrapper 构造:

// src/github.com/Jeffail/worker.go
type workerWrapper struct {
  worker        Worker
  interruptChan chan struct{}
  reqChan chan<- workRequest
  closeChan chan struct{}
  closedChan chan struct{}}

func newWorkerWrapper(
  reqChan chan<- workRequest,
  worker Worker,
) *workerWrapper {
  w := workerWrapper{
    worker:        worker,
    interruptChan: make(chan struct{}),
    reqChan:       reqChan,
    closeChan:     make(chan struct{}),
    closedChan:    make(chan struct{}),
  }

  go w.run()
  return &w
}

workerWrapper构造创立之后会立即调用 run() 办法启动一个 goroutine:

func (w *workerWrapper) run() {jobChan, retChan := make(chan interface{}), make(chan interface{})
  defer func() {w.worker.Terminate()
    close(retChan)
    close(w.closedChan)
  }()

  for {w.worker.BlockUntilReady()
    select {
    case w.reqChan <- workRequest{
      jobChan:       jobChan,
      retChan:       retChan,
      interruptFunc: w.interrupt,
    }:
      select {
      case payload := <-jobChan:
        result := w.worker.Process(payload)
        select {
        case retChan <- result:
        case <-w.interruptChan:
          w.interruptChan = make(chan struct{})
        }
      case _, _ = <-w.interruptChan:
        w.interruptChan = make(chan struct{})
      }
    case <-w.closeChan:
      return
    }
  }
}

每个 worker goroutine 都在尝试向 w.reqChan 通道中发送一个 workRequest 构造数据,发送胜利之后,从 jobChan 中获取工作数据,而后调用 Worker.Process() 办法执行工作,最初将后果发送到 retChan 通道中。这里其实有好几个交互。须要联合 Process() 办法来看才更清晰:

func (p *Pool) Process(payload interface{}) interface{} {
  request, open := <-p.reqChan
  request.jobChan <- payload
  payload, open = <-request.retChan
  return payload
}

删掉无相干的代码,最初就是下面这样。咱们在调用池对象的 Process() 办法时,尝试从通道 reqChan 中接收数据,而后将工作数据发送到 jobChan 通道中,最初从 retChan 通道中接管后果。与下面的 run 流程联合来看,实际上在失常执行一个工作时,PoolworkerWrapper 有 3 次交互。

察看 Pool 创立到 workerWrapper 创立的流程,咱们能够看到实际上 Pool 构造中的 reqChanworkerWrapper构造中的 reqChan 是同一个通道。即 workerWrapper 启动后,会阻塞在向 reqChan 通道发送数据上,直到调用了 PoolProcess*()办法,从通道 reqChan 取出数据。Process()办法失去 workRequest 会向它的 jobChan 通道中发送工作数据。而 workerWrapper.run() 办法胜利发送数据到 reqChan 之后就开始期待从 jobChan 通道中接收数据,这时接管到 Process() 办法发送过去的数据。开始执行 w.worker.Process() 办法,而后向 retChan 通道发送后果数据,Process()办法在胜利发送数据到 jobChan 之后,就开始期待从 retChan 通道中接收数据。接管胜利之后,Process()办法返回,workerWrapper.run()持续阻塞在 w.reqChan <- 这条语句上,期待解决下一个工作。留神 jobChanretChan都是 workerWrapper.run() 办法中创立的通道。

那么超时是怎么实现的呢?看办法 ProcessTimed() 的实现:

func (p *Pool) ProcessTimed(payload interface{},
  timeout time.Duration,
) (interface{}, error) {tout := time.NewTimer(timeout)
  var request workRequest
  select {
  case request, open = <-p.reqChan:
  case <-tout.C:
    return nil, ErrJobTimedOut
  }

  select {
  case request.jobChan <- payload:
  case <-tout.C:
    request.interruptFunc()
    return nil, ErrJobTimedOut
  }

  select {
  case payload, open = <-request.retChan:
  case <-tout.C:
    request.interruptFunc()
    return nil, ErrJobTimedOut
  }

  tout.Stop()
  return payload, nil
}

同样地,删除不相干的代码。首先,创立一个 timer,超时工夫由传入参数指定。前面有 3 个select 语句:

  • 期待从 p.reqChan 取数据,即期待有 worker 闲暇;
  • 期待发送数据到 jobChan,即期待 worker 从jobChan 取出工作数据;
  • 期待从 retChan 取数据,即期待 worker 将后果发送到retChan

第一种状况,如果超时了,阐明 worker 都处于忙碌状态,间接返回工作超时。前面两种状况实际上是工作曾经开始执行了,然而在规定的工夫内没有实现。这两种状况,须要终止工作的执行。咱们看到下面调用了 workerRequest.interruptFunc() 办法,也就是 workerWrapper.interrupt() 办法:

func (w *workerWrapper) interrupt() {close(w.interruptChan)
  w.worker.Interrupt()}

这个办法就是简略敞开了 interrupteChan 通道,而后调用 worker 对象的 Interrupt() 办法,默认实现中这个办法都是空的。

interruptChan通道敞开后,goroutine 中期待从 jobChan 接收数据和期待向 retChan 发送数据的操作都会勾销:

select {
case payload := <-jobChan:
  result := w.worker.Process(payload)
  select {
  case retChan <- result:
  case <-w.interruptChan:
    w.interruptChan = make(chan struct{})
  }
case _, _ = <-w.interruptChan:
  w.interruptChan = make(chan struct{})
}

ProcessCtx()实现也是相似的。

最初调用 workerWrapper.stop() 会敞开 closeChan 通道,这会导致 workerWrapper.run() 办法中的 for 循环跳出,进而执行 defer 函数中的 close(retChan)close(closedChan)

defer func() {w.worker.Terminate()
  close(retChan)
  close(w.closedChan)
}()

这里须要敞开 retChan 通道是为了避免 Process*() 办法在期待 retChan 数据。

closedChan通道敞开后,workerWrapper.join()办法就返回了。

func (w *workerWrapper) join() {<-w.closedChan}

Worker几个办法的调用机会:

  • Process():执行工作时;
  • Interrupt():工作因为超时会被 context 勾销时;
  • BlockUntilReady():每次执行新工作前,可能须要筹备一些资源;
  • Terminate()workerWrapper.run()中的 defer 函数中,即进行 worker 后。

这些机会在代码中都能清晰地看到。

基于源码,我画了一个流程图:

图中省略了中断的流程。

tunny vs ants

tunny设计的思路与 ants 有较大的区别:

tunny只反对同步的形式执行工作,尽管工作在另一个 goroutine 执行,然而提交工作的 goroutine 必须期待后果返回或超时。不能做其余事件。正是因为这一点,导致 tunny 的设计略微一点简单,而且为了反对超时和勾销,设计了多个通道用于和执行工作的 goroutine 通信。一次工作执行的过程波及屡次通信,性能是有损失的。从另一方面说,同步的编程形式更合乎人类的直觉。

ants齐全是异步的工作执行流程,相比 tunny 性能是稍高一些的。然而也因为它的异步个性,导致没有工作超时、勾销这些机制。而且如果须要收集后果,必须要本人编写额定的代码。

总结

本文介绍了另一个 goroutine 池的实现 tunny。它以同步的形式来解决工作,编写代码更加直观,对工作的执行流程有更强的管制,如超时、勾销等。当然实现也简单一些。tunny 代码不走 500 行,十分倡议读一读。

大家如果发现好玩、好用的 Go 语言库,欢送到 Go 每日一库 GitHub 上提交 issue😄

参考

  1. tunny GitHub:https://github.com/Jeffail/tunny
  2. ants GitHub:github.com/panjf2000/ants
  3. Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib

我的博客:https://darjun.github.io

欢送关注我的微信公众号【GoUpUp】,独特学习,一起提高~

退出移动版