原文链接:警觉请勿滥用 goroutine
前言
哈喽,大家好,我是
asong
。Go
语言中,goroutine
的创立老本很低,调度效率很高,人称能够开几百几千万个goroutine
,然而真正开几百几千万个goroutine
就不会有任何影响吗?本文咱们就一起来看一看goroutine
是否有数量限度并介绍几种正确应用goroutine
的姿态~。
现状
在 Go
语言中,goroutine
的创立老本很低,调度效率高,Go
语言在设计时就是按以数万个 goroutine
为标准进行设计的,数十万个并不意外,然而 goroutine
在内存占用方面的确具备无限的老本,你不能发明有限数量的它们,比方这个例子:
ch := generate()
go func() {for range ch {}
}()
这段代码通过 generate()
办法取得一个 channel
,而后启动一个goroutine
始终去解决这个 channel
的数据,这个 goroutine
什么时候会退出?答案是不确定,ch
是由函数 generate()
来决定的,所以有可能这个 goroutine
永远都不会退出,这就有可能会引发内存透露。
goroutine
就是 G-P-M
调度模型中的 G
,咱们能够把goroutine
看成是一种协程,创立 goroutine
也是有开销的,然而开销很小,初始只须要 2-4k
的栈空间,当 goroutine
数量越来越大时,同时存在的 goroutine
也越来越多时,程序就暗藏内存透露的问题。看一个例子:
func main() {
for i := 0; i < math.MaxInt64; i++ {go func(i int) {time.Sleep(5 * time.Second)
}(i)
}
}
大家能够在本人的电脑上运行一下这个程序,察看一下 CPU
和内存占用状况,我说下我运行后的景象:
- CPU 使用率疯狂上涨
- 内存占用率也一直上涨
- 运行一段时间后主过程解体了。。。
因而每次在编写 GO
程序时,都应该认真思考一个问题:
您将要启动的
goroutine
将如何以及在什么条件下完结?
接下来咱们就来介绍几种形式能够管制 goroutine
和goroutine
的数量。
管制 goroutine
的办法
Context
包
Go 语言中的每一个申请的都是通过一个独自的 goroutine
进行解决的,HTTP/RPC
申请的处理器往往都会启动新的 Goroutine
拜访数据库和 RPC
服务,咱们可能会创立多个 goroutine
来解决一次申请,而 Context
的次要作用就是在不同的 goroutine
之间同步申请特定的数据、勾销信号以及解决申请的截止日期。
Context
包次要衍生了四个函数:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
应用这四个函数咱们对 goroutine
进行管制,具体开展就不再本文说了,咱们以 WithCancel
办法写一个例子:
func main() {ctx,cancel := context.WithCancel(context.Background())
go Speak(ctx)
time.Sleep(10*time.Second)
cancel()
time.Sleep(2 * time.Second)
fmt.Println("bye bye!")
}
func Speak(ctx context.Context) {for range time.Tick(time.Second){
select {case <- ctx.Done():
fmt.Println("asong 哥,我收到信号了,要走了,拜拜!")
return
default:
fmt.Println("asong 哥,你好帅呀~balabalabalabala")
}
}
}
运行后果:
asong 哥,你好帅呀~balabalabalabala
# ....... 省略局部
asong 哥,我收到信号了,要走了,拜拜!bye bye!
这里咱们应用 withCancel
创立了一个基于 Background
的ctx
,而后启动了一个 goroutine
每隔 1s
夸我一句,10s
后在主 goroutine
中发送勾销新信号,那么启动的 goroutine
在检测到信号后就会勾销退出。
channel
咱们晓得 channel
是用于 goroutine
的数据通信,在 Go
中通过 goroutine+channel
的形式,能够简略、高效地解决并发问题。下面咱们介绍了应用 context
来达到对 goroutine
的管制,实际上 context
的外部实现也是应用的 channel
,所以有时候为了实现不便,咱们能够间接通过channel+select
或者 channel+close
的形式来管制 goroutine
的退出,咱们别离来一写一个例子:
channel+select
func fibonacci(ch chan int, done chan struct{}) {
x, y := 0, 1
for {
select {
case ch <- x:
x, y = y, x+y
case <-done:
fmt.Println("over")
return
}
}
}
func main() {ch := make(chan int)
done := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {fmt.Println(<-ch)
}
done <- struct{}{}
}()
fibonacci(ch, done)
}
下面的例子是计算斐波那契数列的后果,咱们应用两个 channel
,一个channel
用来传输数据,另外一个 channel
用来做完结信号,这里咱们应用的是 select
的阻塞式的收发操作,直到有一个 channel
产生状态扭转,咱们也能够在 select
中应用 default
语句,那么 select
语句在执行时会遇到这两种状况:
- 当存在能够收发的
Channel
时,间接解决该Channel
对应的case
; - 当不存在能够收发的
Channel
时,执行default
中的语句;
倡议大家应用带 default
的形式,因为在一个 nil channel
上的操作会始终被阻塞,如果没有 default case
, 只有nil channel
的select
会始终被阻塞。
channel+close
channel
能够单个出队,也能够循环出队,因为咱们能够应用 for-range
循环解决 channel
,range ch
会始终迭代到 channel
被敞开,依据这个个性,咱们也可做到对 goroutine
的管制:
func main() {ch := make(chan int, 10)
go func() {
for i:=0; i<10;i++{ch <- i}
close(ch)
}()
go func() {
for val := range ch{fmt.Println(val)
}
fmt.Println("receive data over")
}()
time.Sleep(5* time.Second)
fmt.Println("program over")
}
如果对 channel
不相熟的敌人能够看一下我之前的文章:学习 channel 设计:从入门到放弃
管制 goroutine
的数量
咱们能够通过以下形式达到管制 goroutine
数量的目标,不过自身 Go
的goroutine
就曾经很轻量了,所以管制 goroutine
的数量还是要依据具体场景剖析,并不是所有场景都须要管制 goroutine
的数量的,个别在并发场景咱们会思考管制 goroutine
的数量,接下来咱们来看一看如下几种形式达到管制 goroutine
数量的目标。
协程池
写 go
并发程序的时候如果程序会启动大量的 goroutine
,势必会耗费大量的系统资源(内存,CPU),所以能够思考应用goroutine
池达到复用 goroutine
,节俭资源,晋升性能。也有一些开源的协程池库,例如:ants
、go-playground/pool
、jeffail/tunny
等,这里咱们看 ants
的一个官网例子:
var sum int32
func myFunc(i interface{}) {n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}
func demoFunc() {time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {defer ants.Release()
runTimes := 1000
// Use the common pool.
var wg sync.WaitGroup
syncCalculateSum := func() {demoFunc()
wg.Done()}
for i := 0; i < runTimes; i++ {wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {myFunc(i)
wg.Done()})
defer p.Release()
// Submit tasks one by one.
for i := 0; i < runTimes; i++ {wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
}
这个例子其实就是计算大量整数和的程序,这里通过 ants.NewPoolWithFunc()
创立了一个 goroutine
池。第一个参数是池容量,即池中最多有 10
个 goroutine
。第二个参数为每次执行工作的函数。当咱们调用p.Invoke(data)
的时候,ants
池会在其治理的 goroutine
中找出一个闲暇的,让它执行函数 taskFunc
,并将data
作为参数。
具体这个库的设计就不具体开展了,前面会专门写一篇文章来介绍如何设计一个协程池。
信号量Semaphore
Go
语言的官网扩大包为咱们提供了一个基于权重的信号量 Semaphore,我能够依据信号量来管制肯定数量的 goroutine
并发工作,官网也给提供了一个例子:workerPool,代码有点长就不在这里贴了,咱们来本人写一个略微简略点的例子:
const (
Limit = 3 // 同时运行的 goroutine 下限
Weight = 1 // 信号量的权重
)
func main() {names := []string{
"asong1",
"asong2",
"asong3",
"asong4",
"asong5",
"asong6",
"asong7",
}
sem := semaphore.NewWeighted(Limit)
var w sync.WaitGroup
for _, name := range names {w.Add(1)
go func(name string) {sem.Acquire(context.Background(), Weight)
fmt.Println(name)
time.Sleep(2 * time.Second) // 延时能更好的体现进去管制
sem.Release(Weight)
w.Done()}(name)
}
w.Wait()
fmt.Println("over--------")
}
下面的例子咱们应用 NewWeighted()
函数创立一个并发拜访的最大资源数,也就是同时运行的 goroutine
下限为 3
,应用Acquire
函数来获取指定个数的资源,如果以后没有闲暇资源可用,则以后 goroutine
将陷入休眠状态,最初应用 release
函数开释已应用资源数量(计数器)进行更新缩小,并告诉其它 waiters
。
channel+waitgroup
实现
这个办法我是在煎鱼大佬的一篇文章学到的:来,管制一下 Goroutine 的并发数量
次要实现原理是利用 waitGroup
做并发管制,利用 channel
能够在 goroutine
之间进行数据通信,通过限度 channel
的队列长度来管制同时运行的 goroutine
数量,例子如下:
func main() {
count := 9 // 要运行的 goroutine 数量
limit := 3 // 同时运行的 goroutine 为 3 个
ch := make(chan bool, limit)
wg := sync.WaitGroup{}
wg.Add(count)
for i:=0; i < count; i++{go func(num int) {defer wg.Done()
ch <- true // 发送信号
fmt.Printf("%d 我在干活 at time %d\n",num,time.Now().Unix())
time.Sleep(2 * time.Second)
<- ch // 接收数据代表退出了
}(i)
}
wg.Wait()}
这种实现形式真的妙,与信号量的实现形式根本类似,某些场景大家也能够思考应用这种形式来达到管制 goroutine
的目标,不过最好封装一下,要不有点俊俏,感兴趣的能够看一下煎鱼大佬是怎么封装的:https://github.com/eddycjy/gs…
总结
本文次要目标是介绍管制 goroutine
的几种形式、管制 goroutine
数量的几种形式,goroutine
的创立成本低、效率高带来了很大劣势,同时也会有一些弊病,这就须要咱们在理论开发中依据具体场景抉择正确的形式应用goroutine
,本文介绍的技术计划也可能是全面的,如果你有更好的形式能够在评论区中分享进去,咱们大家一起学习学习~。
文中代码曾经上传 github,欢送 star:https://github.com/asong2020/…
素质三连(分享、点赞、在看)都是笔者继续创作更多优质内容的能源!我是asong
,咱们下期见。
创立了一个 Golang 学习交换群,欢送各位大佬们踊跃入群,咱们一起学习交换。入群形式:关注公众号获取。更多学习材料请到公众号支付。
欢送关注公众号:【Golang 梦工厂】
举荐往期文章:
- 学习 channel 设计:从入门到放弃
- 详解内存对齐
- Go 语言中 new 和 make 你应用哪个来分配内存?
- 源码分析 panic 与 recover,看不懂你打我好了!
- 面试官:小松子来聊一聊内存逃逸
- [面试官:你能聊聊 string 和[]byte 的转换吗?](https://mp.weixin.qq.com/s/jz…)
- 面试官:两个 nil 比拟后果是什么?
- 并发编程包之 errgroup