简介
解决大量并发是 Go 语言的一大劣势。语言内置了不便的并发语法,能够十分不便的创立很多个轻量级的 goroutine 并发解决工作。相比于创立多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。然而受限于资源总量,零碎中可能创立的 goroutine 数量也是受限的。默认每个 goroutine 占用 8KB 内存,一台 8GB 内存的机器满打满算也只能创立 8GB/8KB = 1000000 个 goroutine,更何况零碎还须要保留一部分内存运行日常治理工作,go 运行时须要内存运行 gc、解决 goroutine 切换等。应用的内存超过机器内存容量,零碎会应用替换区(swap),导致性能急速降落。咱们能够简略验证一下创立过多 goroutine 会产生什么:
func main() { var wg sync.WaitGroup wg.Add(10000000) for i := 0; i < 10000000; i++ { go func() { time.Sleep(1 * time.Minute) }() } wg.Wait()}
在我的机器上(8G内存)运行下面的程序会报errno 1455
,即Out of Memory
谬误,这很好了解。审慎运行。
另一方面,goroutine 的治理也是一个问题。goroutine 只能本人运行完结,内部没有任何伎俩能够强制j完结一个 goroutine。如果一个 goroutine 因为某种原因没有自行完结,就会呈现 goroutine 泄露。此外,频繁创立 goroutine 也是一个开销。
鉴于上述起因,天然呈现了与线程池一样的需要,即 goroutine 池。个别的 goroutine 池主动治理 goroutine 的生命周期,能够按需创立,动静缩容。向 goroutine 池提交一个工作,goroutine 池会主动安顿某个 goroutine 来解决。
ants
就是其中一个实现 goroutine 池的库。
疾速应用
本文代码应用 Go Modules。
创立目录并初始化:
$ mkdir ants && cd ants$ go mod init github.com/darjun/go-daily-lib/ants
装置ants
库,应用v2
版本:
$ go get -u github.com/panjf2000/ants/v2
咱们接下来要实现一个计算大量整数和的程序。首先创立根底的工作构造,并实现其执行工作办法:
type Task struct { index int nums []int sum int wg *sync.WaitGroup}func (t *Task) Do() { for _, num := range t.nums { t.sum += num } t.wg.Done()}
很简略,就是将一个切片中的所有整数相加。
而后咱们创立 goroutine 池,留神池应用完后须要手动敞开,这里应用defer
敞开:
p, _ := ants.NewPoolWithFunc(10, taskFunc)defer p.Release()func taskFunc(data interface{}) { task := data.(*Task) task.Do() fmt.Printf("task:%d sum:%d\n", task.index, task.sum)}
下面调用了ants.NewPoolWithFunc()
创立了一个 goroutine 池。第一个参数是池容量,即池中最多有 10 个 goroutine。第二个参数为每次执行工作的函数。当咱们调用p.Invoke(data)
的时候,ants
池会在其治理的 goroutine 中找出一个闲暇的,让它执行函数taskFunc
,并将data
作为参数。
接着,咱们模仿数据,做数据切分,生成工作,交给 ants
解决:
const ( DataSize = 10000 DataPerTask = 100)nums := make([]int, DataSize, DataSize)for i := range nums { nums[i] = rand.Intn(1000)}var wg sync.WaitGroupwg.Add(DataSize / DataPerTask)tasks := make([]*Task, 0, DataSize/DataPerTask)for i := 0; i < DataSize/DataPerTask; i++ { task := &Task{ index: i + 1, nums: nums[i*DataPerTask : (i+1)*DataPerTask], wg: &wg, } tasks = append(tasks, task) p.Invoke(task)}wg.Wait()fmt.Printf("running goroutines: %d\n", ants.Running())
随机生成 10000 个整数,将这些整数分为 100 份,每份 100 个,生成Task
构造,调用p.Invoke(task)
解决。wg.Wait()
期待解决实现,而后输入ants
正在运行的 goroutine 数量,这时应该是 0。
最初咱们将后果汇总,并验证一下后果,与间接相加失去的后果做一个比拟:
var sum intfor _, task := range tasks { sum += task.sum}var expect intfor _, num := range nums { expect += num}fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
运行:
$ go run main.go...task:96 sum:53275task:88 sum:50090task:62 sum:57114task:45 sum:48041task:82 sum:45269running goroutines: 0finish all tasks, result is 5010172 expect:5010172
的确,工作实现之后,正在运行的 goroutine 数量变为 0。而且咱们验证了,后果没有偏差。另外须要留神,goroutine 池中工作的执行程序是随机的,与提交工作的先后没有关系。由下面运行打印的工作标识咱们也能发现这一点。
函数作为工作
ants
反对将一个不承受任何参数的函数作为工作提交给 goroutine 运行。因为不承受参数,咱们提交的函数要么不须要内部数据,只须要解决本身逻辑,否则就必须用某种形式将须要的数据传递进去,例如闭包。
提交函数作为工作的 goroutine 池应用ants.NewPool()
创立,它只承受一个参数示意池子的容量。调用池子对象的Submit()
办法来提交工作,将一个不承受任何参数的函数传入。
最开始的例子能够改写一下。减少一个工作包装函数,将工作须要的参数作为包装函数的参数。包装函数返回理论的工作函数,该工作函数就能够通过闭包拜访它须要的数据了:
type taskFunc func()func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc { return func() { for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] { *sum += num } fmt.Printf("task:%d sum:%d\n", i+1, *sum) wg.Done() }}
调用ants.NewPool(10)
创立 goroutine 池,同样池子用完须要开释,这里应用defer
:
p, _ := ants.NewPool(10)defer p.Release()
生成模仿数据,切分工作。提交工作给ants
池执行,这里应用taskFuncWrapper()
包装函数生成具体的工作,而后调用p.Submit()
提交:
nums := make([]int, DataSize, DataSize)for i := range nums { nums[i] = rand.Intn(1000)}var wg sync.WaitGroupwg.Add(DataSize / DataPerTask)partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)for i := 0; i < DataSize/DataPerTask; i++ { p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))}wg.Wait()
汇总后果,验证:
var sum intfor _, partSum := range partSums { sum += partSum}var expect intfor _, num := range nums { expect += num}fmt.Printf("running goroutines: %d\n", ants.Running())fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)
这个程序的性能与最开始的完全相同。
执行流程
GitHub 仓库中有个执行流程图,我从新绘制了一下:
执行流程如下:
- 初始化 goroutine 池;
提交工作给 goroutine 池,查看是否有闲暇的 goroutine:
- 有,获取闲暇 goroutine
无,查看池中的 goroutine 数量是否已到池容量下限:
已到下限,查看 goroutine 池是否是非阻塞的:
- 非阻塞,间接返回
nil
示意执行失败 - 阻塞,期待 goroutine 闲暇
- 非阻塞,间接返回
- 未到下限,创立一个新的 goroutine 解决工作
- 工作解决实现,将 goroutine 交还给池,以待解决下一个工作
选项
ants
提供了一些选项能够定制 goroutine 池的行为。选项应用Options
构造定义:
// src/github.com/panjf2000/ants/options.gotype Options struct { ExpiryDuration time.Duration PreAlloc bool MaxBlockingTasks int Nonblocking bool PanicHandler func(interface{}) Logger Logger}
各个选项含意如下:
ExpiryDuration
:过期工夫。示意 goroutine 闲暇多长时间之后会被ants
池回收PreAlloc
:预调配。调用NewPool()/NewPoolWithFunc()
之后预调配worker
(治理一个工作 goroutine 的构造体)切片。而且应用预调配与否会间接影响池中治理worker
的构造。见上面源码MaxBlockingTasks
:最大阻塞工作数量。即池中 goroutine 数量已到池容量,且所有 goroutine 都解决忙碌状态,这时到来的工作会在阻塞列表期待。这个选项设置的是列表的最大长度。阻塞的工作数量达到这个值后,后续工作提交间接返回失败Nonblocking
:池是否阻塞,默认阻塞。提交工作时,如果ants
池中 goroutine 已到下限且全副忙碌,阻塞的池会将工作增加的阻塞列表期待(当然受限于阻塞列表长度,见上一个选项)。非阻塞的池间接返回失败PanicHandler
:panic 解决。遇到 panic 会调用这里设置的处理函数Logger
:指定日志记录器
NewPool()
局部源码:
if p.options.PreAlloc { if size == -1 { return nil, ErrInvalidPreAllocSize } p.workers = newWorkerArray(loopQueueType, size)} else { p.workers = newWorkerArray(stackType, 0)}
应用预调配时,创立loopQueueType
类型的构造,反之创立stackType
类型。这是ants
定义的两种治理worker
的数据结构。
ants
定义了一些With*
函数来设置这些选项:
func WithOptions(options Options) Option { return func(opts *Options) { *opts = options }}func WithExpiryDuration(expiryDuration time.Duration) Option { return func(opts *Options) { opts.ExpiryDuration = expiryDuration }}func WithPreAlloc(preAlloc bool) Option { return func(opts *Options) { opts.PreAlloc = preAlloc }}func WithMaxBlockingTasks(maxBlockingTasks int) Option { return func(opts *Options) { opts.MaxBlockingTasks = maxBlockingTasks }}func WithNonblocking(nonblocking bool) Option { return func(opts *Options) { opts.Nonblocking = nonblocking }}func WithPanicHandler(panicHandler func(interface{})) Option { return func(opts *Options) { opts.PanicHandler = panicHandler }}func WithLogger(logger Logger) Option { return func(opts *Options) { opts.Logger = logger }}
这里应用了 Go 语言中十分常见的一种模式,我称之为选项模式,十分不便地结构有大量参数,且大部分有默认值或个别不须要显式设置的对象。
咱们来验证几个选项。
最大期待队列长度
ants
池设置容量之后,如果所有的 goroutine 都在解决工作。这时提交的工作默认会进入期待队列,WithMaxBlockingTasks(maxBlockingTasks int)
能够设置期待队列的最大长度。超过这个长度,提交工作间接返回谬误:
func wrapper(i int, wg *sync.WaitGroup) func() { return func() { fmt.Printf("hello from task:%d\n", i) time.Sleep(1 * time.Second) wg.Done() }}func main() { p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2)) defer p.Release() var wg sync.WaitGroup wg.Add(8) for i := 1; i <= 8; i++ { go func(i int) { err := p.Submit(wrapper(i, &wg)) if err != nil { fmt.Printf("task:%d err:%v\n", i, err) wg.Done() } }(i) } wg.Wait()}
下面代码中,咱们设置 goroutine 池的容量为 4,最大阻塞队列长度为 2。而后一个 for 提交 8 个工作,冀望后果是:4 个工作在执行,2 个工作在期待,2 个工作提交失败。运行后果:
hello from task:8hello from task:5hello from task:4hello from task:6task:7 err:too many goroutines blocked on submit or Nonblocking is settask:3 err:too many goroutines blocked on submit or Nonblocking is sethello from task:1hello from task:2
咱们看到提交工作失败,打印too many goroutines blocked ...
。
代码中有 4 点须要留神:
- 提交工作必须并行进行。如果是串行提交,第 5 个工作提交时因为池中没有闲暇的 goroutine 解决该工作,
Submit()
办法会被阻塞,后续工作就都不能提交了。也就达不到验证的目标了 - 因为工作可能提交失败,失败的工作不会理论执行,所以实际上
wg.Done()
次数会小于 8。因此在err != nil
分支中咱们须要调用一次wg.Done()
。否则wg.Wait()
会永远阻塞 - 为了防止工作执行过快,空出了 goroutine,察看不到景象,每个工作中我应用
time.Sleep(1 * time.Second)
休眠 1s - 因为 goroutine 之间的执行程序未显式同步,故每次执行的程序不确定
因为简略起见,后面的例子中Submit()
办法的返回值都被咱们疏忽了。理论开发中肯定不要疏忽。
非阻塞
ants
池默认是阻塞的,咱们能够应用WithNonblocking(nonblocking bool)
设置其为非阻塞。非阻塞的ants
池中,在所有 goroutine 都在解决工作时,提交新工作会间接返回谬误:
func main() { p, _ := ants.NewPool(2, ants.WithNonblocking(true)) defer p.Release() var wg sync.WaitGroup wg.Add(3) for i := 1; i <= 3; i++ { err := p.Submit(wrapper(i, &wg)) if err != nil { fmt.Printf("task:%d err:%v\n", i, err) wg.Done() } } wg.Wait()}
应用上个例子中的wrapper()
函数,ants
池容量设置为 2。间断提交 3 个工作,冀望后果前两个工作失常执行,第 3 个工作提交时返回谬误:
hello from task:2task:3 err:too many goroutines blocked on submit or Nonblocking is sethello from task:1
panic 处理器
一个鲁棒性强的库肯定不会漠视谬误的解决,特地是宕机相干的谬误。在 Go 语言中就是 panic,也被称为运行时恐慌,在程序运行的过程中产生的严重性谬误,例如索引越界,空指针解援用等,都会触发 panic。如果不解决 panic,程序会间接意外退出,可能造成数据失落的严重后果。
ants
中如果 goroutine 在执行工作时产生panic
,会终止当前任务的执行,将产生谬误的堆栈输入到os.Stderr
。留神,该 goroutine 还是会被放回池中,下次能够取出执行新的工作。
func wrapper(i int, wg *sync.WaitGroup) func() { return func() { fmt.Printf("hello from task:%d\n", i) if i%2 == 0 { panic(fmt.Sprintf("panic from task:%d", i)) } wg.Done() }}func main() { p, _ := ants.NewPool(2) defer p.Release() var wg sync.WaitGroup wg.Add(3) for i := 1; i <= 2; i++ { p.Submit(wrapper(i, &wg)) } time.Sleep(1 * time.Second) p.Submit(wrapper(3, &wg)) p.Submit(wrapper(5, &wg)) wg.Wait()}
咱们让偶数个工作触发panic
。提交两个工作,第二个工作肯定会触发panic
。触发panic
之后,咱们还能够持续提交工作 3、5。留神这里没有 4,提交工作 4 还是会触发panic
。
下面的程序须要留神 2 点:
- 工作函数中
wg.Done()
是在panic
办法之后,如果触发了panic
,函数中的其余失常逻辑就不会再继续执行了。所以咱们尽管wg.Add(3)
,然而一共提交了 4 个工作,其中一个工作触发了panic
,wg.Done()
没有正确执行。理论开发中,咱们个别应用defer
语句来确保wg.Done()
肯定会执行 - 在 for 循环之后,我增加了一行代码
time.Sleep(1 * time.Second)
。如果没有这一行,后续的两条Submit()
办法能够间接执行,可能会导致工作很快就实现了,wg.Wait()
间接返回了,这时panic
的堆栈还没有输入。你能够尝试正文掉这行代码运行看看后果
除了ants
提供的默认 panic 处理器,咱们还能够应用WithPanicHandler(paincHandler func(interface{}))
指定咱们本人编写的 panic 处理器。处理器的参数就是传给panic
的值:
func panicHandler(err interface{}) { fmt.Fprintln(os.Stderr, err)}p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))defer p.Release()
其余代码与下面的完全相同,指定了panicHandler
后触发panic
就会执行它。运行:
hello from task:2panic from task:2hello from task:1hello from task:5hello from task:3
看到输入了传给panic
函数的字符串(第二行输入)。
默认池
为了方便使用,很多 Go 库都喜爱提供其外围性能类型的一个默认实现。能够间接通过库提供的接口调用。例如net/http
,例如ants
。ants
库中定义了一个默认的池,默认容量为MaxInt32
。goroutine 池的各个办法都能够间接通过ants
包间接拜访:
// src/github.com/panjf2000/ants/ants.godefaultAntsPool, _ = NewPool(DefaultAntsPoolSize)func Submit(task func()) error { return defaultAntsPool.Submit(task)}func Running() int { return defaultAntsPool.Running()}func Cap() int { return defaultAntsPool.Cap()}func Free() int { return defaultAntsPool.Free()}func Release() { defaultAntsPool.Release()}func Reboot() { defaultAntsPool.Reboot()}
间接应用:
func main() { defer ants.Release() var wg sync.WaitGroup wg.Add(2) for i := 1; i <= 2; i++ { ants.Submit(wrapper(i, &wg)) } wg.Wait()}
默认池也须要Release()
。
总结
本文介绍了 goroutine 池的由来,并借由ants
库介绍了根本的应用办法,和一些细节。ants
源码不多,去掉测试的外围代码只有 1k 行左右,倡议有工夫、感兴趣的童鞋深刻浏览。
大家如果发现好玩、好用的 Go 语言库,欢送到 Go 每日一库 GitHub 上提交 issue
参考
- ants GitHub:github.com/panjf2000/ants
- Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib
我
我的博客:https://darjun.github.io
欢送关注我的微信公众号【GoUpUp】,独特学习,一起提高~