原文链接:警觉请勿滥用goroutine

前言

哈喽,大家好,我是asongGo语言中,goroutine的创立老本很低,调度效率很高,人称能够开几百几千万个goroutine,然而真正开几百几千万个goroutine就不会有任何影响吗?本文咱们就一起来看一看goroutine是否有数量限度并介绍几种正确应用goroutine的姿态~。

现状

Go语言中,goroutine的创立老本很低,调度效率高,Go语言在设计时就是按以数万个goroutine为标准进行设计的,数十万个并不意外,然而goroutine在内存占用方面的确具备无限的老本,你不能发明有限数量的它们,比方这个例子:

ch := generate() go func() {         for range ch { } }()

这段代码通过generate()办法取得一个channel,而后启动一个goroutine始终去解决这个channel的数据,这个goroutine什么时候会退出?答案是不确定,ch是由函数generate()来决定的,所以有可能这个goroutine永远都不会退出,这就有可能会引发内存透露。

goroutine就是G-P-M调度模型中的G,咱们能够把goroutine看成是一种协程,创立goroutine也是有开销的,然而开销很小,初始只须要2-4k的栈空间,当goroutine数量越来越大时,同时存在的goroutine也越来越多时,程序就暗藏内存透露的问题。看一个例子:

func main()  {    for i := 0; i < math.MaxInt64; i++ {        go func(i int) {            time.Sleep(5 * time.Second)        }(i)    }}

大家能够在本人的电脑上运行一下这个程序,察看一下CPU和内存占用状况,我说下我运行后的景象:

  • CPU使用率疯狂上涨
  • 内存占用率也一直上涨
  • 运行一段时间后主过程解体了。。。

因而每次在编写GO程序时,都应该认真思考一个问题:

您将要启动的goroutine将如何以及在什么条件下完结?

接下来咱们就来介绍几种形式能够管制goroutinegoroutine的数量。

管制goroutine的办法

Context

Go 语言中的每一个申请的都是通过一个独自的 goroutine 进行解决的,HTTP/RPC 申请的处理器往往都会启动新的 Goroutine 拜访数据库和 RPC 服务,咱们可能会创立多个 goroutine 来解决一次申请,而 Context 的次要作用就是在不同的 goroutine 之间同步申请特定的数据、勾销信号以及解决申请的截止日期。

Context包次要衍生了四个函数:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)func WithValue(parent Context, key, val interface{}) Context

应用这四个函数咱们对goroutine进行管制,具体开展就不再本文说了,咱们以WithCancel办法写一个例子:

func main()  {    ctx,cancel := context.WithCancel(context.Background())    go Speak(ctx)    time.Sleep(10*time.Second)    cancel()    time.Sleep(2 * time.Second)    fmt.Println("bye bye!")}func Speak(ctx context.Context)  {    for range time.Tick(time.Second){        select {        case <- ctx.Done():            fmt.Println("asong哥,我收到信号了,要走了,拜拜!")            return        default:            fmt.Println("asong哥,你好帅呀~balabalabalabala")        }    }}

运行后果:

asong哥,你好帅呀~balabalabalabala# ....... 省略局部asong哥,我收到信号了,要走了,拜拜!bye bye!

这里咱们应用withCancel创立了一个基于Backgroundctx,而后启动了一个goroutine每隔1s夸我一句,10s后在主goroutine中发送勾销新信号,那么启动的goroutine在检测到信号后就会勾销退出。

channel

咱们晓得channel是用于goroutine的数据通信,在Go中通过goroutine+channel的形式,能够简略、高效地解决并发问题。下面咱们介绍了应用context来达到对goroutine的管制,实际上context的外部实现也是应用的channel,所以有时候为了实现不便,咱们能够间接通过channel+select或者channel+close的形式来管制goroutine的退出,咱们别离来一写一个例子:

  • channel+select
func fibonacci(ch chan int, done chan struct{}) {    x, y := 0, 1    for {        select {        case ch <- x:            x, y = y, x+y        case <-done:            fmt.Println("over")            return        }    }}func main() {    ch := make(chan int)    done := make(chan struct{})    go func() {        for i := 0; i < 10; i++ {            fmt.Println(<-ch)        }        done <- struct{}{}    }()    fibonacci(ch, done)}

下面的例子是计算斐波那契数列的后果,咱们应用两个channel,一个channel用来传输数据,另外一个channel用来做完结信号,这里咱们应用的是select的阻塞式的收发操作,直到有一个channel产生状态扭转,咱们也能够在select中应用default语句,那么select语句在执行时会遇到这两种状况:

  • 当存在能够收发的Channel时,间接解决该Channel 对应的 case
  • 当不存在能够收发的Channel 时,执行 default 中的语句;

倡议大家应用带default的形式,因为在一个nil channel上的操作会始终被阻塞,如果没有default case,只有nil channelselect会始终被阻塞。

  • channel+close

channel能够单个出队,也能够循环出队,因为咱们能够应用for-range循环解决channelrange ch会始终迭代到channel被敞开,依据这个个性,咱们也可做到对goroutine的管制:

func main()  {    ch := make(chan int, 10)    go func() {        for i:=0; i<10;i++{            ch <- i        }        close(ch)    }()    go func() {        for val := range ch{            fmt.Println(val)        }        fmt.Println("receive data over")    }()    time.Sleep(5* time.Second)    fmt.Println("program over")}

如果对channel不相熟的敌人能够看一下我之前的文章:学习channel设计:从入门到放弃

管制goroutine的数量

咱们能够通过以下形式达到管制goroutine数量的目标,不过自身Gogoroutine就曾经很轻量了,所以管制goroutine的数量还是要依据具体场景剖析,并不是所有场景都须要管制goroutine的数量的,个别在并发场景咱们会思考管制goroutine的数量,接下来咱们来看一看如下几种形式达到管制goroutine数量的目标。

协程池

go 并发程序的时候如果程序会启动大量的 goroutine ,势必会耗费大量的系统资源(内存,CPU),所以能够思考应用goroutine池达到复用goroutine,节俭资源,晋升性能。也有一些开源的协程池库,例如:antsgo-playground/pooljeffail/tunny等,这里咱们看ants的一个官网例子:

var sum int32func myFunc(i interface{}) {    n := i.(int32)    atomic.AddInt32(&sum, n)    fmt.Printf("run with %d\n", n)}func demoFunc() {    time.Sleep(10 * time.Millisecond)    fmt.Println("Hello World!")}func main() {    defer ants.Release()    runTimes := 1000    // Use the common pool.    var wg sync.WaitGroup    syncCalculateSum := func() {        demoFunc()        wg.Done()    }    for i := 0; i < runTimes; i++ {        wg.Add(1)        _ = ants.Submit(syncCalculateSum)    }    wg.Wait()    fmt.Printf("running goroutines: %d\n", ants.Running())    fmt.Printf("finish all tasks.\n")    // Use the pool with a function,    // set 10 to the capacity of goroutine pool and 1 second for expired duration.    p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {        myFunc(i)        wg.Done()    })    defer p.Release()    // Submit tasks one by one.    for i := 0; i < runTimes; i++ {        wg.Add(1)        _ = p.Invoke(int32(i))    }    wg.Wait()    fmt.Printf("running goroutines: %d\n", p.Running())    fmt.Printf("finish all tasks, result is %d\n", sum)}

这个例子其实就是计算大量整数和的程序,这里通过ants.NewPoolWithFunc()创立了一个 goroutine 池。第一个参数是池容量,即池中最多有 10 goroutine。第二个参数为每次执行工作的函数。当咱们调用p.Invoke(data)的时候,ants池会在其治理的 goroutine 中找出一个闲暇的,让它执行函数taskFunc,并将data作为参数。

具体这个库的设计就不具体开展了,前面会专门写一篇文章来介绍如何设计一个协程池。

信号量Semaphore

Go语言的官网扩大包为咱们提供了一个基于权重的信号量Semaphore,我能够依据信号量来管制肯定数量的 goroutine 并发工作,官网也给提供了一个例子:workerPool,代码有点长就不在这里贴了,咱们来本人写一个略微简略点的例子:

const (    Limit = 3  // 同时运行的goroutine下限    Weight = 1 // 信号量的权重)func main() {    names := []string{        "asong1",        "asong2",        "asong3",        "asong4",        "asong5",        "asong6",        "asong7",    }    sem := semaphore.NewWeighted(Limit)    var w sync.WaitGroup    for _, name := range names {        w.Add(1)        go func(name string) {            sem.Acquire(context.Background(), Weight)            fmt.Println(name)            time.Sleep(2 * time.Second) // 延时能更好的体现进去管制            sem.Release(Weight)            w.Done()        }(name)    }    w.Wait()    fmt.Println("over--------")}

下面的例子咱们应用 NewWeighted() 函数创立一个并发拜访的最大资源数,也就是同时运行的goroutine下限为3,应用Acquire函数来获取指定个数的资源,如果以后没有闲暇资源可用,则以后goroutine将陷入休眠状态,最初应用release函数开释已应用资源数量(计数器)进行更新缩小,并告诉其它 waiters

channel+waitgroup实现

这个办法我是在煎鱼大佬的一篇文章学到的:来,管制一下Goroutine的并发数量

次要实现原理是利用waitGroup做并发管制,利用channel能够在goroutine之间进行数据通信,通过限度channel的队列长度来管制同时运行的goroutine数量,例子如下:

func main()  {    count := 9 // 要运行的goroutine数量    limit := 3 // 同时运行的goroutine为3个    ch := make(chan bool, limit)    wg := sync.WaitGroup{}    wg.Add(count)    for i:=0; i < count; i++{        go func(num int) {            defer wg.Done()            ch <- true // 发送信号            fmt.Printf("%d 我在干活 at time %d\n",num,time.Now().Unix())            time.Sleep(2 * time.Second)            <- ch // 接收数据代表退出了        }(i)    }    wg.Wait()}

这种实现形式真的妙,与信号量的实现形式根本类似,某些场景大家也能够思考应用这种形式来达到管制goroutine的目标,不过最好封装一下,要不有点俊俏,感兴趣的能够看一下煎鱼大佬是怎么封装的:https://github.com/eddycjy/gs...

总结

本文次要目标是介绍管制goroutine的几种形式、管制goroutine数量的几种形式,goroutine的创立成本低、效率高带来了很大劣势,同时也会有一些弊病,这就须要咱们在理论开发中依据具体场景抉择正确的形式应用goroutine,本文介绍的技术计划也可能是全面的,如果你有更好的形式能够在评论区中分享进去,咱们大家一起学习学习~。

文中代码曾经上传github,欢送star:https://github.com/asong2020/...

素质三连(分享、点赞、在看)都是笔者继续创作更多优质内容的能源!我是asong,咱们下期见。

创立了一个Golang学习交换群,欢送各位大佬们踊跃入群,咱们一起学习交换。入群形式:关注公众号获取。更多学习材料请到公众号支付。

欢送关注公众号:【Golang梦工厂】

举荐往期文章:

  • 学习channel设计:从入门到放弃
  • 详解内存对齐
  • Go语言中new和make你应用哪个来分配内存?
  • 源码分析panic与recover,看不懂你打我好了!
  • 面试官:小松子来聊一聊内存逃逸
  • [面试官:你能聊聊string和[]byte的转换吗?](https://mp.weixin.qq.com/s/jz...)
  • 面试官:两个nil比拟后果是什么?
  • 并发编程包之 errgroup