原文链接:警觉请勿滥用goroutine
前言
哈喽,大家好,我是asong
。Go
语言中,goroutine
的创立老本很低,调度效率很高,人称能够开几百几千万个goroutine
,然而真正开几百几千万个goroutine
就不会有任何影响吗?本文咱们就一起来看一看goroutine
是否有数量限度并介绍几种正确应用goroutine
的姿态~。
现状
在Go
语言中,goroutine
的创立老本很低,调度效率高,Go
语言在设计时就是按以数万个goroutine
为标准进行设计的,数十万个并不意外,然而goroutine
在内存占用方面的确具备无限的老本,你不能发明有限数量的它们,比方这个例子:
ch := generate() go func() { for range ch { } }()
这段代码通过generate()
办法取得一个channel
,而后启动一个goroutine
始终去解决这个channel
的数据,这个goroutine
什么时候会退出?答案是不确定,ch
是由函数generate()
来决定的,所以有可能这个goroutine
永远都不会退出,这就有可能会引发内存透露。
goroutine
就是G-P-M
调度模型中的G
,咱们能够把goroutine
看成是一种协程,创立goroutine
也是有开销的,然而开销很小,初始只须要2-4k
的栈空间,当goroutine
数量越来越大时,同时存在的goroutine
也越来越多时,程序就暗藏内存透露的问题。看一个例子:
func main() { for i := 0; i < math.MaxInt64; i++ { go func(i int) { time.Sleep(5 * time.Second) }(i) }}
大家能够在本人的电脑上运行一下这个程序,察看一下CPU
和内存占用状况,我说下我运行后的景象:
- CPU使用率疯狂上涨
- 内存占用率也一直上涨
- 运行一段时间后主过程解体了。。。
因而每次在编写GO
程序时,都应该认真思考一个问题:
您将要启动的goroutine
将如何以及在什么条件下完结?
接下来咱们就来介绍几种形式能够管制goroutine
和goroutine
的数量。
管制goroutine
的办法
Context
包
Go 语言中的每一个申请的都是通过一个独自的 goroutine
进行解决的,HTTP/RPC
申请的处理器往往都会启动新的 Goroutine
拜访数据库和 RPC
服务,咱们可能会创立多个 goroutine
来解决一次申请,而 Context
的次要作用就是在不同的 goroutine
之间同步申请特定的数据、勾销信号以及解决申请的截止日期。
Context
包次要衍生了四个函数:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)func WithValue(parent Context, key, val interface{}) Context
应用这四个函数咱们对goroutine
进行管制,具体开展就不再本文说了,咱们以WithCancel
办法写一个例子:
func main() { ctx,cancel := context.WithCancel(context.Background()) go Speak(ctx) time.Sleep(10*time.Second) cancel() time.Sleep(2 * time.Second) fmt.Println("bye bye!")}func Speak(ctx context.Context) { for range time.Tick(time.Second){ select { case <- ctx.Done(): fmt.Println("asong哥,我收到信号了,要走了,拜拜!") return default: fmt.Println("asong哥,你好帅呀~balabalabalabala") } }}
运行后果:
asong哥,你好帅呀~balabalabalabala# ....... 省略局部asong哥,我收到信号了,要走了,拜拜!bye bye!
这里咱们应用withCancel
创立了一个基于Background
的ctx
,而后启动了一个goroutine
每隔1s
夸我一句,10s
后在主goroutine
中发送勾销新信号,那么启动的goroutine
在检测到信号后就会勾销退出。
channel
咱们晓得channel
是用于goroutine
的数据通信,在Go
中通过goroutine+channel
的形式,能够简略、高效地解决并发问题。下面咱们介绍了应用context
来达到对goroutine
的管制,实际上context
的外部实现也是应用的channel
,所以有时候为了实现不便,咱们能够间接通过channel+select
或者channel+close
的形式来管制goroutine
的退出,咱们别离来一写一个例子:
channel+select
func fibonacci(ch chan int, done chan struct{}) { x, y := 0, 1 for { select { case ch <- x: x, y = y, x+y case <-done: fmt.Println("over") return } }}func main() { ch := make(chan int) done := make(chan struct{}) go func() { for i := 0; i < 10; i++ { fmt.Println(<-ch) } done <- struct{}{} }() fibonacci(ch, done)}
下面的例子是计算斐波那契数列的后果,咱们应用两个channel
,一个channel
用来传输数据,另外一个channel
用来做完结信号,这里咱们应用的是select
的阻塞式的收发操作,直到有一个channel
产生状态扭转,咱们也能够在select
中应用default
语句,那么select
语句在执行时会遇到这两种状况:
- 当存在能够收发的
Channel
时,间接解决该Channel
对应的case
; - 当不存在能够收发的
Channel
时,执行default
中的语句;
倡议大家应用带default
的形式,因为在一个nil channel
上的操作会始终被阻塞,如果没有default case
,只有nil channel
的select
会始终被阻塞。
channel+close
channel
能够单个出队,也能够循环出队,因为咱们能够应用for-range
循环解决channel
,range ch
会始终迭代到channel
被敞开,依据这个个性,咱们也可做到对goroutine
的管制:
func main() { ch := make(chan int, 10) go func() { for i:=0; i<10;i++{ ch <- i } close(ch) }() go func() { for val := range ch{ fmt.Println(val) } fmt.Println("receive data over") }() time.Sleep(5* time.Second) fmt.Println("program over")}
如果对channel
不相熟的敌人能够看一下我之前的文章:学习channel设计:从入门到放弃
管制goroutine
的数量
咱们能够通过以下形式达到管制goroutine
数量的目标,不过自身Go
的goroutine
就曾经很轻量了,所以管制goroutine
的数量还是要依据具体场景剖析,并不是所有场景都须要管制goroutine
的数量的,个别在并发场景咱们会思考管制goroutine
的数量,接下来咱们来看一看如下几种形式达到管制goroutine
数量的目标。
协程池
写 go
并发程序的时候如果程序会启动大量的 goroutine
,势必会耗费大量的系统资源(内存,CPU),所以能够思考应用goroutine
池达到复用goroutine
,节俭资源,晋升性能。也有一些开源的协程池库,例如:ants
、go-playground/pool
、jeffail/tunny
等,这里咱们看ants
的一个官网例子:
var sum int32func myFunc(i interface{}) { n := i.(int32) atomic.AddInt32(&sum, n) fmt.Printf("run with %d\n", n)}func demoFunc() { time.Sleep(10 * time.Millisecond) fmt.Println("Hello World!")}func main() { defer ants.Release() runTimes := 1000 // Use the common pool. var wg sync.WaitGroup syncCalculateSum := func() { demoFunc() wg.Done() } for i := 0; i < runTimes; i++ { wg.Add(1) _ = ants.Submit(syncCalculateSum) } wg.Wait() fmt.Printf("running goroutines: %d\n", ants.Running()) fmt.Printf("finish all tasks.\n") // Use the pool with a function, // set 10 to the capacity of goroutine pool and 1 second for expired duration. p, _ := ants.NewPoolWithFunc(10, func(i interface{}) { myFunc(i) wg.Done() }) defer p.Release() // Submit tasks one by one. for i := 0; i < runTimes; i++ { wg.Add(1) _ = p.Invoke(int32(i)) } wg.Wait() fmt.Printf("running goroutines: %d\n", p.Running()) fmt.Printf("finish all tasks, result is %d\n", sum)}
这个例子其实就是计算大量整数和的程序,这里通过ants.NewPoolWithFunc()
创立了一个 goroutine
池。第一个参数是池容量,即池中最多有 10
个 goroutine
。第二个参数为每次执行工作的函数。当咱们调用p.Invoke(data)
的时候,ants
池会在其治理的 goroutine
中找出一个闲暇的,让它执行函数taskFunc
,并将data
作为参数。
具体这个库的设计就不具体开展了,前面会专门写一篇文章来介绍如何设计一个协程池。
信号量Semaphore
Go
语言的官网扩大包为咱们提供了一个基于权重的信号量Semaphore,我能够依据信号量来管制肯定数量的 goroutine
并发工作,官网也给提供了一个例子:workerPool,代码有点长就不在这里贴了,咱们来本人写一个略微简略点的例子:
const ( Limit = 3 // 同时运行的goroutine下限 Weight = 1 // 信号量的权重)func main() { names := []string{ "asong1", "asong2", "asong3", "asong4", "asong5", "asong6", "asong7", } sem := semaphore.NewWeighted(Limit) var w sync.WaitGroup for _, name := range names { w.Add(1) go func(name string) { sem.Acquire(context.Background(), Weight) fmt.Println(name) time.Sleep(2 * time.Second) // 延时能更好的体现进去管制 sem.Release(Weight) w.Done() }(name) } w.Wait() fmt.Println("over--------")}
下面的例子咱们应用 NewWeighted()
函数创立一个并发拜访的最大资源数,也就是同时运行的goroutine
下限为3
,应用Acquire
函数来获取指定个数的资源,如果以后没有闲暇资源可用,则以后goroutine
将陷入休眠状态,最初应用release
函数开释已应用资源数量(计数器)进行更新缩小,并告诉其它 waiters
。
channel+waitgroup
实现
这个办法我是在煎鱼大佬的一篇文章学到的:来,管制一下Goroutine的并发数量
次要实现原理是利用waitGroup
做并发管制,利用channel
能够在goroutine
之间进行数据通信,通过限度channel
的队列长度来管制同时运行的goroutine
数量,例子如下:
func main() { count := 9 // 要运行的goroutine数量 limit := 3 // 同时运行的goroutine为3个 ch := make(chan bool, limit) wg := sync.WaitGroup{} wg.Add(count) for i:=0; i < count; i++{ go func(num int) { defer wg.Done() ch <- true // 发送信号 fmt.Printf("%d 我在干活 at time %d\n",num,time.Now().Unix()) time.Sleep(2 * time.Second) <- ch // 接收数据代表退出了 }(i) } wg.Wait()}
这种实现形式真的妙,与信号量的实现形式根本类似,某些场景大家也能够思考应用这种形式来达到管制goroutine
的目标,不过最好封装一下,要不有点俊俏,感兴趣的能够看一下煎鱼大佬是怎么封装的:https://github.com/eddycjy/gs...
总结
本文次要目标是介绍管制goroutine
的几种形式、管制goroutine
数量的几种形式,goroutine
的创立成本低、效率高带来了很大劣势,同时也会有一些弊病,这就须要咱们在理论开发中依据具体场景抉择正确的形式应用goroutine
,本文介绍的技术计划也可能是全面的,如果你有更好的形式能够在评论区中分享进去,咱们大家一起学习学习~。
文中代码曾经上传github,欢送star:https://github.com/asong2020/...
素质三连(分享、点赞、在看)都是笔者继续创作更多优质内容的能源!我是asong
,咱们下期见。
创立了一个Golang学习交换群,欢送各位大佬们踊跃入群,咱们一起学习交换。入群形式:关注公众号获取。更多学习材料请到公众号支付。
欢送关注公众号:【Golang梦工厂】
举荐往期文章:
- 学习channel设计:从入门到放弃
- 详解内存对齐
- Go语言中new和make你应用哪个来分配内存?
- 源码分析panic与recover,看不懂你打我好了!
- 面试官:小松子来聊一聊内存逃逸
- [面试官:你能聊聊string和[]byte的转换吗?](https://mp.weixin.qq.com/s/jz...)
- 面试官:两个nil比拟后果是什么?
- 并发编程包之 errgroup