乐趣区

关于后端:控制goroutine退出和数量的几种方法

原文链接:警觉请勿滥用 goroutine

前言

哈喽,大家好,我是 asongGo 语言中,goroutine的创立老本很低,调度效率很高,人称能够开几百几千万个 goroutine,然而真正开几百几千万个goroutine 就不会有任何影响吗?本文咱们就一起来看一看 goroutine 是否有数量限度并介绍几种正确应用 goroutine 的姿态~。

现状

Go 语言中,goroutine的创立老本很低,调度效率高,Go语言在设计时就是按以数万个 goroutine 为标准进行设计的,数十万个并不意外,然而 goroutine 在内存占用方面的确具备无限的老本,你不能发明有限数量的它们,比方这个例子:

ch := generate() 
go func() {for range ch {} 
}()

这段代码通过 generate() 办法取得一个 channel,而后启动一个goroutine 始终去解决这个 channel 的数据,这个 goroutine 什么时候会退出?答案是不确定,ch是由函数 generate() 来决定的,所以有可能这个 goroutine 永远都不会退出,这就有可能会引发内存透露。

goroutine就是 G-P-M 调度模型中的 G,咱们能够把goroutine 看成是一种协程,创立 goroutine 也是有开销的,然而开销很小,初始只须要 2-4k 的栈空间,当 goroutine 数量越来越大时,同时存在的 goroutine 也越来越多时,程序就暗藏内存透露的问题。看一个例子:

func main()  {
    for i := 0; i < math.MaxInt64; i++ {go func(i int) {time.Sleep(5 * time.Second)
        }(i)
    }
}

大家能够在本人的电脑上运行一下这个程序,察看一下 CPU 和内存占用状况,我说下我运行后的景象:

  • CPU 使用率疯狂上涨
  • 内存占用率也一直上涨
  • 运行一段时间后主过程解体了。。。

因而每次在编写 GO 程序时,都应该认真思考一个问题:

您将要启动的 goroutine 将如何以及在什么条件下完结?

接下来咱们就来介绍几种形式能够管制 goroutinegoroutine的数量。

管制 goroutine 的办法

Context

Go 语言中的每一个申请的都是通过一个独自的 goroutine 进行解决的,HTTP/RPC 申请的处理器往往都会启动新的 Goroutine 拜访数据库和 RPC 服务,咱们可能会创立多个 goroutine 来解决一次申请,而 Context 的次要作用就是在不同的 goroutine 之间同步申请特定的数据、勾销信号以及解决申请的截止日期。

Context包次要衍生了四个函数:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

应用这四个函数咱们对 goroutine 进行管制,具体开展就不再本文说了,咱们以 WithCancel 办法写一个例子:

func main()  {ctx,cancel := context.WithCancel(context.Background())
    go Speak(ctx)
    time.Sleep(10*time.Second)
    cancel()
    time.Sleep(2 * time.Second)
    fmt.Println("bye bye!")
}

func Speak(ctx context.Context)  {for range time.Tick(time.Second){
        select {case <- ctx.Done():
            fmt.Println("asong 哥,我收到信号了,要走了,拜拜!")
            return
        default:
            fmt.Println("asong 哥,你好帅呀~balabalabalabala")
        }
    }
}

运行后果:

asong 哥,你好帅呀~balabalabalabala
# ....... 省略局部
asong 哥,我收到信号了,要走了,拜拜!bye bye!

这里咱们应用 withCancel 创立了一个基于 Backgroundctx,而后启动了一个 goroutine 每隔 1s 夸我一句,10s后在主 goroutine 中发送勾销新信号,那么启动的 goroutine 在检测到信号后就会勾销退出。

channel

咱们晓得 channel 是用于 goroutine 的数据通信,在 Go 中通过 goroutine+channel 的形式,能够简略、高效地解决并发问题。下面咱们介绍了应用 context 来达到对 goroutine 的管制,实际上 context 的外部实现也是应用的 channel,所以有时候为了实现不便,咱们能够间接通过channel+select 或者 channel+close 的形式来管制 goroutine 的退出,咱们别离来一写一个例子:

  • channel+select
func fibonacci(ch chan int, done chan struct{}) {
    x, y := 0, 1
    for {
        select {
        case ch <- x:
            x, y = y, x+y
        case <-done:
            fmt.Println("over")
            return
        }
    }
}
func main() {ch := make(chan int)
    done := make(chan struct{})
    go func() {
        for i := 0; i < 10; i++ {fmt.Println(<-ch)
        }
        done <- struct{}{}
    }()
    fibonacci(ch, done)
}

下面的例子是计算斐波那契数列的后果,咱们应用两个 channel,一个channel 用来传输数据,另外一个 channel 用来做完结信号,这里咱们应用的是 select 的阻塞式的收发操作,直到有一个 channel 产生状态扭转,咱们也能够在 select 中应用 default 语句,那么 select 语句在执行时会遇到这两种状况:

  • 当存在能够收发的 Channel 时,间接解决该Channel 对应的 case
  • 当不存在能够收发的Channel 时,执行 default 中的语句;

倡议大家应用带 default 的形式,因为在一个 nil channel 上的操作会始终被阻塞,如果没有 default case, 只有nil channelselect会始终被阻塞。

  • channel+close

channel能够单个出队,也能够循环出队,因为咱们能够应用 for-range 循环解决 channelrange ch 会始终迭代到 channel 被敞开,依据这个个性,咱们也可做到对 goroutine 的管制:

func main()  {ch := make(chan int, 10)
    go func() {
        for i:=0; i<10;i++{ch <- i}
        close(ch)
    }()
    go func() {
        for val := range ch{fmt.Println(val)
        }
        fmt.Println("receive data over")
    }()
    time.Sleep(5* time.Second)
    fmt.Println("program over")
}

如果对 channel 不相熟的敌人能够看一下我之前的文章:学习 channel 设计:从入门到放弃

管制 goroutine 的数量

咱们能够通过以下形式达到管制 goroutine 数量的目标,不过自身 Gogoroutine就曾经很轻量了,所以管制 goroutine 的数量还是要依据具体场景剖析,并不是所有场景都须要管制 goroutine 的数量的,个别在并发场景咱们会思考管制 goroutine 的数量,接下来咱们来看一看如下几种形式达到管制 goroutine 数量的目标。

协程池

go 并发程序的时候如果程序会启动大量的 goroutine,势必会耗费大量的系统资源(内存,CPU),所以能够思考应用goroutine 池达到复用 goroutine,节俭资源,晋升性能。也有一些开源的协程池库,例如:antsgo-playground/pooljeffail/tunny 等,这里咱们看 ants 的一个官网例子:

var sum int32

func myFunc(i interface{}) {n := i.(int32)
    atomic.AddInt32(&sum, n)
    fmt.Printf("run with %d\n", n)
}

func demoFunc() {time.Sleep(10 * time.Millisecond)
    fmt.Println("Hello World!")
}

func main() {defer ants.Release()

    runTimes := 1000

    // Use the common pool.
    var wg sync.WaitGroup
    syncCalculateSum := func() {demoFunc()
        wg.Done()}
    for i := 0; i < runTimes; i++ {wg.Add(1)
        _ = ants.Submit(syncCalculateSum)
    }
    wg.Wait()
    fmt.Printf("running goroutines: %d\n", ants.Running())
    fmt.Printf("finish all tasks.\n")

    // Use the pool with a function,
    // set 10 to the capacity of goroutine pool and 1 second for expired duration.
    p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {myFunc(i)
        wg.Done()})
    defer p.Release()
    // Submit tasks one by one.
    for i := 0; i < runTimes; i++ {wg.Add(1)
        _ = p.Invoke(int32(i))
    }
    wg.Wait()
    fmt.Printf("running goroutines: %d\n", p.Running())
    fmt.Printf("finish all tasks, result is %d\n", sum)
}

这个例子其实就是计算大量整数和的程序,这里通过 ants.NewPoolWithFunc() 创立了一个 goroutine 池。第一个参数是池容量,即池中最多有 10 goroutine。第二个参数为每次执行工作的函数。当咱们调用p.Invoke(data) 的时候,ants池会在其治理的 goroutine 中找出一个闲暇的,让它执行函数 taskFunc,并将data 作为参数。

具体这个库的设计就不具体开展了,前面会专门写一篇文章来介绍如何设计一个协程池。

信号量Semaphore

Go语言的官网扩大包为咱们提供了一个基于权重的信号量 Semaphore,我能够依据信号量来管制肯定数量的 goroutine 并发工作,官网也给提供了一个例子:workerPool,代码有点长就不在这里贴了,咱们来本人写一个略微简略点的例子:

const (
    Limit = 3  // 同时运行的 goroutine 下限
    Weight = 1 // 信号量的权重
)
func main() {names := []string{
        "asong1",
        "asong2",
        "asong3",
        "asong4",
        "asong5",
        "asong6",
        "asong7",
    }

    sem := semaphore.NewWeighted(Limit)
    var w sync.WaitGroup
    for _, name := range names {w.Add(1)
        go func(name string) {sem.Acquire(context.Background(), Weight)
            fmt.Println(name)
            time.Sleep(2 * time.Second) // 延时能更好的体现进去管制
            sem.Release(Weight)
            w.Done()}(name)
    }
    w.Wait()

    fmt.Println("over--------")
}

下面的例子咱们应用 NewWeighted() 函数创立一个并发拜访的最大资源数,也就是同时运行的 goroutine 下限为 3,应用Acquire 函数来获取指定个数的资源,如果以后没有闲暇资源可用,则以后 goroutine 将陷入休眠状态,最初应用 release 函数开释已应用资源数量(计数器)进行更新缩小,并告诉其它 waiters

channel+waitgroup实现

这个办法我是在煎鱼大佬的一篇文章学到的:来,管制一下 Goroutine 的并发数量

次要实现原理是利用 waitGroup 做并发管制,利用 channel 能够在 goroutine 之间进行数据通信,通过限度 channel 的队列长度来管制同时运行的 goroutine 数量,例子如下:

func main()  {
    count := 9 // 要运行的 goroutine 数量
    limit := 3 // 同时运行的 goroutine 为 3 个
    ch := make(chan bool, limit)
    wg := sync.WaitGroup{}
    wg.Add(count)
    for i:=0; i < count; i++{go func(num int) {defer wg.Done()
            ch <- true // 发送信号
            fmt.Printf("%d 我在干活 at time %d\n",num,time.Now().Unix())
            time.Sleep(2 * time.Second)
            <- ch // 接收数据代表退出了
        }(i)
    }
    wg.Wait()}

这种实现形式真的妙,与信号量的实现形式根本类似,某些场景大家也能够思考应用这种形式来达到管制 goroutine 的目标,不过最好封装一下,要不有点俊俏,感兴趣的能够看一下煎鱼大佬是怎么封装的:https://github.com/eddycjy/gs…

总结

本文次要目标是介绍管制 goroutine 的几种形式、管制 goroutine 数量的几种形式,goroutine的创立成本低、效率高带来了很大劣势,同时也会有一些弊病,这就须要咱们在理论开发中依据具体场景抉择正确的形式应用goroutine,本文介绍的技术计划也可能是全面的,如果你有更好的形式能够在评论区中分享进去,咱们大家一起学习学习~。

文中代码曾经上传 github,欢送 star:https://github.com/asong2020/…

素质三连(分享、点赞、在看)都是笔者继续创作更多优质内容的能源!我是asong,咱们下期见。

创立了一个 Golang 学习交换群,欢送各位大佬们踊跃入群,咱们一起学习交换。入群形式:关注公众号获取。更多学习材料请到公众号支付。

欢送关注公众号:【Golang 梦工厂】

举荐往期文章:

  • 学习 channel 设计:从入门到放弃
  • 详解内存对齐
  • Go 语言中 new 和 make 你应用哪个来分配内存?
  • 源码分析 panic 与 recover,看不懂你打我好了!
  • 面试官:小松子来聊一聊内存逃逸
  • [面试官:你能聊聊 string 和[]byte 的转换吗?](https://mp.weixin.qq.com/s/jz…)
  • 面试官:两个 nil 比拟后果是什么?
  • 并发编程包之 errgroup
退出移动版