共计 9512 个字符,预计需要花费 24 分钟才能阅读完成。
简介
解决大量并发是 Go 语言的一大劣势。语言内置了不便的并发语法,能够十分不便的创立很多个轻量级的 goroutine 并发解决工作。相比于创立多个线程,goroutine 更轻量、资源占用更少、切换速度更快、无线程上下文切换开销更少。然而受限于资源总量,零碎中可能创立的 goroutine 数量也是受限的。默认每个 goroutine 占用 8KB 内存,一台 8GB 内存的机器满打满算也只能创立 8GB/8KB = 1000000 个 goroutine,更何况零碎还须要保留一部分内存运行日常治理工作,go 运行时须要内存运行 gc、解决 goroutine 切换等。应用的内存超过机器内存容量,零碎会应用替换区(swap),导致性能急速降落。咱们能够简略验证一下创立过多 goroutine 会产生什么:
func main() {
var wg sync.WaitGroup
wg.Add(10000000)
for i := 0; i < 10000000; i++ {go func() {time.Sleep(1 * time.Minute)
}()}
wg.Wait()}
在我的机器上(8G 内存)运行下面的程序会报 errno 1455
,即Out of Memory
谬误,这很好了解。审慎运行。
另一方面,goroutine 的治理也是一个问题。goroutine 只能本人运行完结,内部没有任何伎俩能够强制 j 完结一个 goroutine。如果一个 goroutine 因为某种原因没有自行完结,就会呈现 goroutine 泄露。此外,频繁创立 goroutine 也是一个开销。
鉴于上述起因,天然呈现了与线程池一样的需要,即 goroutine 池。个别的 goroutine 池主动治理 goroutine 的生命周期,能够按需创立,动静缩容。向 goroutine 池提交一个工作,goroutine 池会主动安顿某个 goroutine 来解决。
ants
就是其中一个实现 goroutine 池的库。
疾速应用
本文代码应用 Go Modules。
创立目录并初始化:
$ mkdir ants && cd ants
$ go mod init github.com/darjun/go-daily-lib/ants
装置 ants
库,应用 v2
版本:
$ go get -u github.com/panjf2000/ants/v2
咱们接下来要实现一个计算大量整数和的程序。首先创立根底的工作构造,并实现其执行工作办法:
type Task struct {
index int
nums []int
sum int
wg *sync.WaitGroup
}
func (t *Task) Do() {
for _, num := range t.nums {t.sum += num}
t.wg.Done()}
很简略,就是将一个切片中的所有整数相加。
而后咱们创立 goroutine 池,留神池应用完后须要手动敞开,这里应用 defer
敞开:
p, _ := ants.NewPoolWithFunc(10, taskFunc)
defer p.Release()
func taskFunc(data interface{}) {task := data.(*Task)
task.Do()
fmt.Printf("task:%d sum:%d\n", task.index, task.sum)
}
下面调用了 ants.NewPoolWithFunc()
创立了一个 goroutine 池。第一个参数是池容量,即池中最多有 10 个 goroutine。第二个参数为每次执行工作的函数。当咱们调用 p.Invoke(data)
的时候,ants
池会在其治理的 goroutine 中找出一个闲暇的,让它执行函数 taskFunc
,并将data
作为参数。
接着,咱们模仿数据,做数据切分,生成工作,交给 ants
解决:
const (
DataSize = 10000
DataPerTask = 100
)
nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
tasks := make([]*Task, 0, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
task := &Task{
index: i + 1,
nums: nums[i*DataPerTask : (i+1)*DataPerTask],
wg: &wg,
}
tasks = append(tasks, task)
p.Invoke(task)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
随机生成 10000 个整数,将这些整数分为 100 份,每份 100 个,生成 Task
构造,调用 p.Invoke(task)
解决。wg.Wait()
期待解决实现,而后输入 ants
正在运行的 goroutine 数量,这时应该是 0。
最初咱们将后果汇总,并验证一下后果,与间接相加失去的后果做一个比拟:
var sum int
for _, task := range tasks {sum += task.sum}
var expect int
for _, num := range nums {expect += num}
fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
运行:
$ go run main.go
...
task:96 sum:53275
task:88 sum:50090
task:62 sum:57114
task:45 sum:48041
task:82 sum:45269
running goroutines: 0
finish all tasks, result is 5010172 expect:5010172
的确,工作实现之后,正在运行的 goroutine 数量变为 0。而且咱们验证了,后果没有偏差。另外须要留神,goroutine 池中工作的执行程序是随机的,与提交工作的先后没有关系。由下面运行打印的工作标识咱们也能发现这一点。
函数作为工作
ants
反对将一个不承受任何参数的函数作为工作提交给 goroutine 运行。因为不承受参数,咱们提交的函数要么不须要内部数据,只须要解决本身逻辑,否则就必须用某种形式将须要的数据传递进去,例如闭包。
提交函数作为工作的 goroutine 池应用 ants.NewPool()
创立,它只承受一个参数示意池子的容量。调用池子对象的 Submit()
办法来提交工作,将一个不承受任何参数的函数传入。
最开始的例子能够改写一下。减少一个工作包装函数,将工作须要的参数作为包装函数的参数。包装函数返回理论的工作函数,该工作函数就能够通过闭包拜访它须要的数据了:
type taskFunc func()
func taskFuncWrapper(nums []int, i int, sum *int, wg *sync.WaitGroup) taskFunc {return func() {for _, num := range nums[i*DataPerTask : (i+1)*DataPerTask] {*sum += num}
fmt.Printf("task:%d sum:%d\n", i+1, *sum)
wg.Done()}
}
调用 ants.NewPool(10)
创立 goroutine 池,同样池子用完须要开释,这里应用defer
:
p, _ := ants.NewPool(10)
defer p.Release()
生成模仿数据,切分工作。提交工作给 ants
池执行,这里应用 taskFuncWrapper()
包装函数生成具体的工作,而后调用 p.Submit()
提交:
nums := make([]int, DataSize, DataSize)
for i := range nums {nums[i] = rand.Intn(1000)
}
var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partSums := make([]int, DataSize/DataPerTask, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {p.Submit(taskFuncWrapper(nums, i, &partSums[i], &wg))
}
wg.Wait()
汇总后果,验证:
var sum int
for _, partSum := range partSums {sum += partSum}
var expect int
for _, num := range nums {expect += num}
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks, result is %d expect is %d\n", sum, expect)
这个程序的性能与最开始的完全相同。
执行流程
GitHub 仓库中有个执行流程图,我从新绘制了一下:
执行流程如下:
- 初始化 goroutine 池;
-
提交工作给 goroutine 池,查看是否有闲暇的 goroutine:
- 有,获取闲暇 goroutine
-
无,查看池中的 goroutine 数量是否已到池容量下限:
-
已到下限,查看 goroutine 池是否是非阻塞的:
- 非阻塞,间接返回
nil
示意执行失败 - 阻塞,期待 goroutine 闲暇
- 非阻塞,间接返回
- 未到下限,创立一个新的 goroutine 解决工作
-
- 工作解决实现,将 goroutine 交还给池,以待解决下一个工作
选项
ants
提供了一些选项能够定制 goroutine 池的行为。选项应用 Options
构造定义:
// src/github.com/panjf2000/ants/options.go
type Options struct {
ExpiryDuration time.Duration
PreAlloc bool
MaxBlockingTasks int
Nonblocking bool
PanicHandler func(interface{})
Logger Logger
}
各个选项含意如下:
ExpiryDuration
:过期工夫。示意 goroutine 闲暇多长时间之后会被ants
池回收PreAlloc
:预调配。调用NewPool()/NewPoolWithFunc()
之后预调配worker
(治理一个工作 goroutine 的构造体)切片。而且应用预调配与否会间接影响池中治理worker
的构造。见上面源码MaxBlockingTasks
:最大阻塞工作数量。即池中 goroutine 数量已到池容量,且所有 goroutine 都解决忙碌状态,这时到来的工作会在阻塞列表期待。这个选项设置的是列表的最大长度。阻塞的工作数量达到这个值后,后续工作提交间接返回失败Nonblocking
:池是否阻塞,默认阻塞。提交工作时,如果ants
池中 goroutine 已到下限且全副忙碌,阻塞的池会将工作增加的阻塞列表期待(当然受限于阻塞列表长度,见上一个选项)。非阻塞的池间接返回失败PanicHandler
:panic 解决。遇到 panic 会调用这里设置的处理函数Logger
:指定日志记录器
NewPool()
局部源码:
if p.options.PreAlloc {
if size == -1 {return nil, ErrInvalidPreAllocSize}
p.workers = newWorkerArray(loopQueueType, size)
} else {p.workers = newWorkerArray(stackType, 0)
}
应用预调配时,创立 loopQueueType
类型的构造,反之创立 stackType
类型。这是 ants
定义的两种治理 worker
的数据结构。
ants
定义了一些 With*
函数来设置这些选项:
func WithOptions(options Options) Option {return func(opts *Options) {*opts = options}
}
func WithExpiryDuration(expiryDuration time.Duration) Option {return func(opts *Options) {opts.ExpiryDuration = expiryDuration}
}
func WithPreAlloc(preAlloc bool) Option {return func(opts *Options) {opts.PreAlloc = preAlloc}
}
func WithMaxBlockingTasks(maxBlockingTasks int) Option {return func(opts *Options) {opts.MaxBlockingTasks = maxBlockingTasks}
}
func WithNonblocking(nonblocking bool) Option {return func(opts *Options) {opts.Nonblocking = nonblocking}
}
func WithPanicHandler(panicHandler func(interface{})) Option {return func(opts *Options) {opts.PanicHandler = panicHandler}
}
func WithLogger(logger Logger) Option {return func(opts *Options) {opts.Logger = logger}
}
这里应用了 Go 语言中十分常见的一种模式,我称之为选项模式,十分不便地结构有大量参数,且大部分有默认值或个别不须要显式设置的对象。
咱们来验证几个选项。
最大期待队列长度
ants
池设置容量之后,如果所有的 goroutine 都在解决工作。这时提交的工作默认会进入期待队列,WithMaxBlockingTasks(maxBlockingTasks int)
能够设置期待队列的最大长度。超过这个长度,提交工作间接返回谬误:
func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)
time.Sleep(1 * time.Second)
wg.Done()}
}
func main() {p, _ := ants.NewPool(4, ants.WithMaxBlockingTasks(2))
defer p.Release()
var wg sync.WaitGroup
wg.Add(8)
for i := 1; i <= 8; i++ {go func(i int) {err := p.Submit(wrapper(i, &wg))
if err != nil {fmt.Printf("task:%d err:%v\n", i, err)
wg.Done()}
}(i)
}
wg.Wait()}
下面代码中,咱们设置 goroutine 池的容量为 4,最大阻塞队列长度为 2。而后一个 for 提交 8 个工作,冀望后果是:4 个工作在执行,2 个工作在期待,2 个工作提交失败。运行后果:
hello from task:8
hello from task:5
hello from task:4
hello from task:6
task:7 err:too many goroutines blocked on submit or Nonblocking is set
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
hello from task:2
咱们看到提交工作失败,打印too many goroutines blocked ...
。
代码中有 4 点须要留神:
- 提交工作必须并行进行。如果是串行提交,第 5 个工作提交时因为池中没有闲暇的 goroutine 解决该工作,
Submit()
办法会被阻塞,后续工作就都不能提交了。也就达不到验证的目标了 - 因为工作可能提交失败,失败的工作不会理论执行,所以实际上
wg.Done()
次数会小于 8。因此在err != nil
分支中咱们须要调用一次wg.Done()
。否则wg.Wait()
会永远阻塞 - 为了防止工作执行过快,空出了 goroutine,察看不到景象,每个工作中我应用
time.Sleep(1 * time.Second)
休眠 1s - 因为 goroutine 之间的执行程序未显式同步,故每次执行的程序不确定
因为简略起见,后面的例子中 Submit()
办法的返回值都被咱们疏忽了。理论开发中肯定不要疏忽。
非阻塞
ants
池默认是阻塞的,咱们能够应用 WithNonblocking(nonblocking bool)
设置其为非阻塞。非阻塞的 ants
池中,在所有 goroutine 都在解决工作时,提交新工作会间接返回谬误:
func main() {p, _ := ants.NewPool(2, ants.WithNonblocking(true))
defer p.Release()
var wg sync.WaitGroup
wg.Add(3)
for i := 1; i <= 3; i++ {err := p.Submit(wrapper(i, &wg))
if err != nil {fmt.Printf("task:%d err:%v\n", i, err)
wg.Done()}
}
wg.Wait()}
应用上个例子中的 wrapper()
函数,ants
池容量设置为 2。间断提交 3 个工作,冀望后果前两个工作失常执行,第 3 个工作提交时返回谬误:
hello from task:2
task:3 err:too many goroutines blocked on submit or Nonblocking is set
hello from task:1
panic 处理器
一个鲁棒性强的库肯定不会漠视谬误的解决,特地是宕机相干的谬误。在 Go 语言中就是 panic,也被称为运行时恐慌,在程序运行的过程中产生的严重性谬误,例如索引越界,空指针解援用等,都会触发 panic。如果不解决 panic,程序会间接意外退出,可能造成数据失落的严重后果。
ants
中如果 goroutine 在执行工作时产生 panic
,会终止当前任务的执行,将产生谬误的堆栈输入到os.Stderr
。 留神,该 goroutine 还是会被放回池中,下次能够取出执行新的工作。
func wrapper(i int, wg *sync.WaitGroup) func() {return func() {fmt.Printf("hello from task:%d\n", i)
if i%2 == 0 {panic(fmt.Sprintf("panic from task:%d", i))
}
wg.Done()}
}
func main() {p, _ := ants.NewPool(2)
defer p.Release()
var wg sync.WaitGroup
wg.Add(3)
for i := 1; i <= 2; i++ {p.Submit(wrapper(i, &wg))
}
time.Sleep(1 * time.Second)
p.Submit(wrapper(3, &wg))
p.Submit(wrapper(5, &wg))
wg.Wait()}
咱们让偶数个工作触发 panic
。提交两个工作,第二个工作肯定会触发panic
。触发panic
之后,咱们还能够持续提交工作 3、5。留神这里没有 4,提交工作 4 还是会触发panic
。
下面的程序须要留神 2 点:
- 工作函数中
wg.Done()
是在panic
办法之后,如果触发了panic
,函数中的其余失常逻辑就不会再继续执行了。所以咱们尽管wg.Add(3)
,然而一共提交了 4 个工作,其中一个工作触发了panic
,wg.Done()
没有正确执行 。理论开发中,咱们个别应用defer
语句来确保wg.Done()
肯定会执行 - 在 for 循环之后,我增加了一行代码
time.Sleep(1 * time.Second)
。如果没有这一行,后续的两条Submit()
办法能够间接执行,可能会导致工作很快就实现了,wg.Wait()
间接返回了,这时panic
的堆栈还没有输入。你能够尝试正文掉这行代码运行看看后果
除了 ants
提供的默认 panic 处理器,咱们还能够应用 WithPanicHandler(paincHandler func(interface{}))
指定咱们本人编写的 panic 处理器。处理器的参数就是传给 panic
的值:
func panicHandler(err interface{}) {fmt.Fprintln(os.Stderr, err)
}
p, _ := ants.NewPool(2, ants.WithPanicHandler(panicHandler))
defer p.Release()
其余代码与下面的完全相同,指定了 panicHandler
后触发 panic
就会执行它。运行:
hello from task:2
panic from task:2
hello from task:1
hello from task:5
hello from task:3
看到输入了传给 panic
函数的字符串(第二行输入)。
默认池
为了方便使用,很多 Go 库都喜爱提供其外围性能类型的一个默认实现。能够间接通过库提供的接口调用。例如 net/http
,例如ants
。ants
库中定义了一个默认的池,默认容量为 MaxInt32
。goroutine 池的各个办法都能够间接通过ants
包间接拜访:
// src/github.com/panjf2000/ants/ants.go
defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
func Submit(task func()) error {return defaultAntsPool.Submit(task)
}
func Running() int {return defaultAntsPool.Running()
}
func Cap() int {return defaultAntsPool.Cap()
}
func Free() int {return defaultAntsPool.Free()
}
func Release() {defaultAntsPool.Release()
}
func Reboot() {defaultAntsPool.Reboot()
}
间接应用:
func main() {defer ants.Release()
var wg sync.WaitGroup
wg.Add(2)
for i := 1; i <= 2; i++ {ants.Submit(wrapper(i, &wg))
}
wg.Wait()}
默认池也须要Release()
。
总结
本文介绍了 goroutine 池的由来,并借由 ants
库介绍了根本的应用办法,和一些细节。ants
源码不多,去掉测试的外围代码只有 1k 行左右,倡议有工夫、感兴趣的童鞋深刻浏览。
大家如果发现好玩、好用的 Go 语言库,欢送到 Go 每日一库 GitHub 上提交 issue😄
参考
- ants GitHub:github.com/panjf2000/ants
- Go 每日一库 GitHub:https://github.com/darjun/go-daily-lib
我
我的博客:https://darjun.github.io
欢送关注我的微信公众号【GoUpUp】,独特学习,一起提高~