乐趣区

关于godailylib:Go-每日一库之-ants

简介

解决大量并发是 Go 语言的一大劣势。语言内置了不便的并发语法,能够十分不便的创立很多个轻量级的 goroutine 并发解决工作。相比于创立多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。然而受限于资源总量,零碎中可能创立的 goroutine 数量也是受限的。默认每个 goroutine 占用 8KB 内存,一台 8GB 内存的机器满打满算也只能创立 8GB/8KB = 1000000 个 goroutine,更何况零碎还须要保留一部分内存运行日常治理工作,go 运行时须要内存运行 gc、解决 goroutine 切换等。应用的内存超过机器内存容量,零碎会应用替换区(swap),导致性能急速降落。咱们能够简略验证一下创立过多 goroutine 会产生什么:

func main() {
  var wg sync.WaitGroup
  wg.Add(10000000)
  for i := 0; i < 10000000; i++ {go func() {time.Sleep(1 * time.Minute)
    }()}
  wg.Wait()}

在我的机器上(8G 内存)运行下面的程序会报 errno 1455,即Out of Memory 谬误,这很好了解。审慎运行

另一方面,goroutine 的治理也是一个问题。goroutine 只能本人运行完结,内部没有任何伎俩能够强制 j 完结一个 goroutine。如果一个 goroutine 因为某种原因没有自行完结,就会呈现 goroutine 泄露。此外,频繁创立 goroutine 也是一个开销。

鉴于上述起因,天然呈现了与线程池一样的需要,即 goroutine 池。个别的 goroutine 池主动治理 goroutine 的生命周期,能够按需创立,动静缩容。向 goroutine 池提交一个工作,goroutine 池会主动安顿某个 goroutine 来解决。

ants就是其中一个实现 goroutine 池的库。

疾速应用

本文代码应用 Go Modules。

创立目录并初始化:

$ mkdir ants && cd ants
$ go mod init github.com/darjun/go-daily-lib/ants

装置 ants 库,应用 v2 版本:

$ go get -u github.com/panjf2000/ants/v2

咱们接下来要实现一个计算大量整数和的程序。首先创立根底的工作构造,并实现其执行工作办法:

type Task struct {
  index int
  nums  []int
  sum   int
  wg    *sync.WaitGroup
}

func (t *Task) Do() {
  for _, num := range t.nums {t.sum += num}

  t.wg.Done()}

很简略,就是将一个切片中的所有整数相加。

而后咱们创立 goroutine 池,留神池应用完后须要手动敞开,这里应用 defer 敞开:

p, _ := ants.NewPoolWithFunc(10, taskFunc)
defer p.Release()

func taskFunc(data interface{}) {task := data.(*Task)
  task.Do()
  fmt.Printf("task:%d sum:%d\n", task.index, task.sum)
}

下面调用了 ants.NewPoolWithFunc() 创立了一个 goroutine 池。第一个参数是池容量,即池中最多有 10 个 goroutine。第二个参数为每次执行工作的函数。当咱们调用 p.Invoke(data) 的时候,ants池会在其治理的 goroutine 中找出一个闲暇的,让它执行函数 taskFunc,并将data 作为参数。

接着,咱们模仿数据,做数据切分,生成工作,交给 ants 解决:

const (
  DataSize    = 10000
  DataPerTask = 100
)

nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
tasks := make([]*Task, 0, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
  task := &Task{
    index: i + 1,
    nums:  nums[i*DataPerTask : (i+1)*DataPerTask],
    wg:    &wg,
  }

  tasks = append(tasks, task)
  p.Invoke(task)
}

wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())

随机生成 10000 个整数,将这些整数分为 100 份,每份 100 个,生成 Task 构造,调用 p.Invoke(task) 解决。wg.Wait()期待解决实现,而后输入 ants 正在运行的 goroutine 数量,这时应该是 0。

最初咱们将后果汇总,并验证一下后果,与间接相加失去的后果做一个比拟:

var sum int
for _, task := range tasks {sum += task.sum}

var expect int
for _, num := range nums {expect += num}

fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)

运行:

$ go run main.go
...
task:96 sum:53275
task:88 sum:50090
task:62 sum:57114
task:45 sum:48041
task:82 sum:45269
running goroutines: 0
finish all tasks, result is 5010172 expect:5010172

的确,工作实现之后,正在运行的 goroutine 数量变为 0。而且咱们验证了,后果没有偏差。另外须要留神,goroutine 池中工作的执行程序是随机的,与提交工作的先后没有关系。由下面运行打印的工作标识咱们也能发现这一点。

函数作为工作

ants反对将一个不承受任何参数的函数作为工作提交给 goroutine 运行。因为不承受参数,咱们提交的函数要么不须要内部数据,只须要解决本身逻辑,否则就必须用某种形式将须要的数据传递进去,例如闭包。

提交函数作为工作的 goroutine 池应用 ants.NewPool() 创立,它只承受一个参数示意池子的容量。调用池子对象的 Submit() 办法来提交工作,将一个不承受任何参数的函数传入。

最开始的例子能够改写一下。减少一个工作包装函数,将工作须要的参数作为包装函数的参数。包装函数返回理论的工作函数,该工作函数就能够通过闭包拜访它须要的数据了:

type taskFunc func()

func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {return func() {for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {*sum += num}

    fmt.Printf("task:%d sum:%d\n", i+1, *sum)
    wg.Done()}
}

调用 ants.NewPool(10) 创立 goroutine 池,同样池子用完须要开释,这里应用defer

p, _ := ants.NewPool(10)
defer p.Release()

生成模仿数据,切分工作。提交工作给 ants 池执行,这里应用 taskFuncWrapper() 包装函数生成具体的工作,而后调用 p.Submit() 提交:

nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()

汇总后果,验证:

var sum int
for _, partSum := range partSums {sum += partSum}

var expect int
for _, num := range nums {expect += num}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)

这个程序的性能与最开始的完全相同。

执行流程

GitHub 仓库中有个执行流程图,我从新绘制了一下:

执行流程如下:

  • 初始化 goroutine 池;
  • 提交工作给 goroutine 池,查看是否有闲暇的 goroutine:

    • 有,获取闲暇 goroutine
    • 无,查看池中的 goroutine 数量是否已到池容量下限:

      • 已到下限,查看 goroutine 池是否是非阻塞的:

        • 非阻塞,间接返回 nil 示意执行失败
        • 阻塞,期待 goroutine 闲暇
      • 未到下限,创立一个新的 goroutine 解决工作
  • 工作解决实现,将 goroutine 交还给池,以待解决下一个工作

选项

ants提供了一些选项能够定制 goroutine 池的行为。选项应用 Options 构造定义:

// src/github.com/panjf2000/ants/options.go
type Options struct {
  ExpiryDuration time.Duration
  PreAlloc bool
  MaxBlockingTasks int
  Nonblocking bool
  PanicHandler func(interface{})
  Logger Logger
}

各个选项含意如下:

  • ExpiryDuration:过期工夫。示意 goroutine 闲暇多长时间之后会被 ants 池回收
  • PreAlloc:预调配。调用 NewPool()/NewPoolWithFunc() 之后预调配 worker(治理一个工作 goroutine 的构造体)切片。而且应用预调配与否会间接影响池中治理worker 的构造。见上面源码
  • MaxBlockingTasks:最大阻塞工作数量。即池中 goroutine 数量已到池容量,且所有 goroutine 都解决忙碌状态,这时到来的工作会在阻塞列表期待。这个选项设置的是列表的最大长度。阻塞的工作数量达到这个值后,后续工作提交间接返回失败
  • Nonblocking:池是否阻塞,默认阻塞。提交工作时,如果 ants 池中 goroutine 已到下限且全副忙碌,阻塞的池会将工作增加的阻塞列表期待(当然受限于阻塞列表长度,见上一个选项)。非阻塞的池间接返回失败
  • PanicHandler:panic 解决。遇到 panic 会调用这里设置的处理函数
  • Logger:指定日志记录器

NewPool()局部源码:

if p.options.PreAlloc {
  if size == -1 {return nil, ErrInvalidPreAllocSize}
  p.workers = newWorkerArray(loopQueueType, size)
} else {p.workers = newWorkerArray(stackType, 0)
}

应用预调配时,创立 loopQueueType 类型的构造,反之创立 stackType 类型。这是 ants 定义的两种治理 worker 的数据结构。

ants定义了一些 With* 函数来设置这些选项:

func WithOptions(options Options) Option {return func(opts *Options) {*opts = options}
}

func WithExpiryDuration(expiryDuration time.Duration) Option {return func(opts *Options) {opts.ExpiryDuration = expiryDuration}
}

func WithPreAlloc(preAlloc bool) Option {return func(opts *Options) {opts.PreAlloc = preAlloc}
}

func WithMaxBlockingTasks(maxBlockingTasks int) Option {return func(opts *Options) {opts.MaxBlockingTasks = maxBlockingTasks}
}

func WithNonblocking(nonblocking bool) Option {return func(opts *Options) {opts.Nonblocking = nonblocking}
}

func WithPanicHandler(panicHandler func(interface{})) Option {return func(opts *Options) {opts.PanicHandler = panicHandler}
}

func WithLogger(logger Logger) Option {return func(opts *Options) {opts.Logger = logger}
}

这里应用了 Go 语言中十分常见的一种模式,我称之为选项模式,十分不便地结构有大量参数,且大部分有默认值或个别不须要显式设置的对象。

咱们来验证几个选项。

最大期待队列长度

ants池设置容量之后,如果所有的 goroutine 都在解决工作。这时提交的工作默认会进入期待队列,WithMaxBlockingTasks(maxBlockingTasks int)能够设置期待队列的最大长度。超过这个长度,提交工作间接返回谬误:

func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)
    time.Sleep(1 * time.Second)
    wg.Done()}
}

func main() {p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(8)
  for i := 1; i <= 8; i++ {go func(i int) {err := p.Submit(wrapper(i, &wg))
      if err != nil {fmt.Printf("task:%d err:%v\n", i, err)
        wg.Done()}
    }(i)
  }

  wg.Wait()}

下面代码中,咱们设置 goroutine 池的容量为 4,最大阻塞队列长度为 2。而后一个 for 提交 8 个工作,冀望后果是:4 个工作在执行,2 个工作在期待,2 个工作提交失败。运行后果:

hello from task:8
hello from task:5
hello from task:4
hello from task:6
task:7 err:too many goroutines blocked on submit or Nonblocking is set
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
hello from task:2

咱们看到提交工作失败,打印too many goroutines blocked ...

代码中有 4 点须要留神:

  • 提交工作必须并行进行。如果是串行提交,第 5 个工作提交时因为池中没有闲暇的 goroutine 解决该工作,Submit()办法会被阻塞,后续工作就都不能提交了。也就达不到验证的目标了
  • 因为工作可能提交失败,失败的工作不会理论执行,所以实际上 wg.Done() 次数会小于 8。因此在 err != nil 分支中咱们须要调用一次 wg.Done()。否则wg.Wait() 会永远阻塞
  • 为了防止工作执行过快,空出了 goroutine,察看不到景象,每个工作中我应用 time.Sleep(1 * time.Second) 休眠 1s
  • 因为 goroutine 之间的执行程序未显式同步,故每次执行的程序不确定

因为简略起见,后面的例子中 Submit() 办法的返回值都被咱们疏忽了。理论开发中肯定不要疏忽。

非阻塞

ants池默认是阻塞的,咱们能够应用 WithNonblocking(nonblocking bool) 设置其为非阻塞。非阻塞的 ants 池中,在所有 goroutine 都在解决工作时,提交新工作会间接返回谬误:

func main() {p, _ := ants.NewPool(2, ants.WithNonblocking(true))
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(3)
  for i := 1; i <= 3; i++ {err := p.Submit(wrapper(i, &wg))
    if err != nil {fmt.Printf("task:%d err:%v\n", i, err)
      wg.Done()}
  }

  wg.Wait()}

应用上个例子中的 wrapper() 函数,ants池容量设置为 2。间断提交 3 个工作,冀望后果前两个工作失常执行,第 3 个工作提交时返回谬误:

hello from task:2
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1

panic 处理器

一个鲁棒性强的库肯定不会漠视谬误的解决,特地是宕机相干的谬误。在 Go 语言中就是 panic,也被称为运行时恐慌,在程序运行的过程中产生的严重性谬误,例如索引越界,空指针解援用等,都会触发 panic。如果不解决 panic,程序会间接意外退出,可能造成数据失落的严重后果。

ants中如果 goroutine 在执行工作时产生 panic,会终止当前任务的执行,将产生谬误的堆栈输入到os.Stderr 留神,该 goroutine 还是会被放回池中,下次能够取出执行新的工作

func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)
    if i%2 == 0 {panic(fmt.Sprintf("panic from task:%d", i))
    }
    wg.Done()}
}

func main() {p, _ := ants.NewPool(2)
  defer p.Release()

  var wg sync.WaitGroup
  wg.Add(3)
  for i := 1; i <= 2; i++ {p.Submit(wrapper(i, &wg))
  }

  time.Sleep(1 * time.Second)
  p.Submit(wrapper(3, &wg))
  p.Submit(wrapper(5, &wg))
  wg.Wait()}

咱们让偶数个工作触发 panic。提交两个工作,第二个工作肯定会触发panic。触发panic 之后,咱们还能够持续提交工作 3、5。留神这里没有 4,提交工作 4 还是会触发panic

下面的程序须要留神 2 点:

  • 工作函数中 wg.Done() 是在 panic 办法之后,如果触发了 panic,函数中的其余失常逻辑就不会再继续执行了。所以咱们尽管wg.Add(3),然而一共提交了 4 个工作,其中一个工作触发了panicwg.Done() 没有正确执行 。理论开发中,咱们个别应用defer 语句来确保 wg.Done() 肯定会执行
  • 在 for 循环之后,我增加了一行代码 time.Sleep(1 * time.Second)。如果没有这一行,后续的两条Submit() 办法能够间接执行,可能会导致工作很快就实现了,wg.Wait()间接返回了,这时 panic 的堆栈还没有输入。你能够尝试正文掉这行代码运行看看后果

除了 ants 提供的默认 panic 处理器,咱们还能够应用 WithPanicHandler(paincHandler func(interface{})) 指定咱们本人编写的 panic 处理器。处理器的参数就是传给 panic 的值:

func panicHandler(err interface{}) {fmt.Fprintln(os.Stderr, err)
}

p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))
defer p.Release()

其余代码与下面的完全相同,指定了 panicHandler 后触发 panic 就会执行它。运行:

hello from task:2
panic from task:2
hello from task:1
hello from task:5
hello from task:3

看到输入了传给 panic 函数的字符串(第二行输入)。

默认池

为了方便使用,很多 Go 库都喜爱提供其外围性能类型的一个默认实现。能够间接通过库提供的接口调用。例如 net/http,例如antsants 库中定义了一个默认的池,默认容量为 MaxInt32。goroutine 池的各个办法都能够间接通过ants 包间接拜访:

// src/github.com/panjf2000/ants/ants.go
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)

func Submit(task func()) error {return defaultAntsPool.Submit(task)
}

func Running() int {return defaultAntsPool.Running()
}

func Cap() int {return defaultAntsPool.Cap()
}

func Free() int {return defaultAntsPool.Free()
}

func Release() {defaultAntsPool.Release()
}

func Reboot() {defaultAntsPool.Reboot()
}

间接应用:

func main() {defer ants.Release()

  var wg sync.WaitGroup
  wg.Add(2)
  for i := 1; i <= 2; i++ {ants.Submit(wrapper(i, &wg))
  }
  wg.Wait()}

默认池也须要Release()

总结

本文介绍了 goroutine 池的由来,并借由 ants 库介绍了根本的应用办法,和一些细节。ants源码不多,去掉测试的外围代码只有 1k 行左右,倡议有工夫、感兴趣的童鞋深刻浏览。

大家如果发现好玩、好用的 Go 语言库,欢送到 Go 每日一库 GitHub 上提交 issue😄

参考

  1. ants GitHub:github.com/panjf2000/ants
  2. Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib

我的博客:https://darjun.github.io

欢送关注我的微信公众号【GoUpUp】,独特学习,一起提高~

退出移动版