前言

channel 作为 Go 外围的数据结构和 Goroutine 之间的通信形式,Channel 是撑持 Go 语言高性能并发编程模型的重要构造本节会介绍管道 Channel 的设计原理、数据结构和常见操作,例如 Channel 的创立、发送、接管和敞开。

在进入主题内容之前,读者须要先把握下表中的不同状态下的channel执行Read、Write、close操作所会产生的后果。

图来自 曹大

Go 语言中最常见的、也是常常被人提及的设计模式就是:不要通过共享内存的形式进行通信,而是应该通过通信的形式共享内存。尽管咱们在 Go 语言中也能应用共享内存加互斥锁进行通信,然而 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 别离对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。 本文将会介绍基于channel实现的多种通信模型 。

MPSC:多生产者单消费者模型
对于MPSC的利用场景中,多个生产者负责生产数据,只有一个消费者来生产数据, 而这个模型又可分为两种实现形式:

  1. 多个生产者是专用一个channel 来和消费者通信
  2. 应用本人独有的channel来和消费者通信

变种一:生产者共用channel

如图所示,右边有多个生产goroutine往公共的channel中写入数据,左边只有一个生产goroutine从这个channel中读取数据进行解决。

根底版
咱们首先定义传递的音讯构造体定义:

`type Msg struct {

in int

}`

而后实现生产者如下,其中参数 sendChan就是生产者和消费者进行通信的channel

// 生产者func producer(sendChan chan Msg) {    for i := 0; i < 10; i++ {        sendChan <- Msg{in: i}    }}

消费者以及音讯处理函数定义如下,其中参数 sendChan就是生产者和消费者进行通信的channel,以后音讯处理函数process目前只是把音讯内容打印进去。

// 消费者func consumer(sendChan chan Msg) {    for v := range sendChan {        process(v)    }}// 音讯处理函数func process(msg Msg){    fmt.Println(msg)}

mpsc的模型代码如下,首先创立通信用的channel,而后开启三个生产者goroutine,一个消费者goroutine。

func mpsc() {    sendChan := make(chan Msg, 10)    for p := 0; p < 3; p++ {        go producer(sendChan)    }    go consumer(sendChan)}

main函数如下, 外面 select{} 是为了放弃main函数所在的goroutine始终阻塞,不然main函数立即退出后,生产者和消费者goroutine说不定还没有执行或只执行了一部分就退出了。

`func main() {

mpsc()select{}

}`

残缺代码 在线演示 https://www.online-ide.com/IB... 后果如下

{0}{1}{2}{3}{4}{5}{6}{7}{8}{9}{0}{1}{2}{3}{4}{5}{6}{7}{8}{9}{0}{1}{2}{3}{4}{5}{6}{7}{8}{9}fatal error: all goroutines are asleep - deadlock!goroutine 1 [select (no cases)]:main.main()    /home/kingeasternsun/1ba7ba19-5e66-4f4c-beb3-cb5e3e6d881e/main.go:9 +0x25goroutine 9 [chan receive]:main.consumer(0xc000072000)    /home/kingeasternsun/1ba7ba19-5e66-4f4c-beb3-cb5e3e6d881e/main.go:25 +0xa9created by main.mpsc    /home/kingeasternsun/1ba7ba19-5e66-4f4c-beb3-cb5e3e6d881e/main.go:43 +0x9bexit status 2** Process exited - Return Code: 1 **

能够看到打印的数字是由穿插的,阐明多个生产者并发的执行了写入。 不过最初生产者发送完后程序有两个报错:

  1. 第一条 goroutine 1 [select (no cases)]: 是指 select{} 始终阻塞
  2. 第二条 goroutine 9 [chan receive]: 是指在于生产者发送完数据库后没有新的数据写入channel,而消费者生产完channel中的数据库,再从空的channel中读数据就会统一阻塞,所以下面报fatal error: all goroutines are asleep - deadlock! 这个谬误

因为咱们是demo,为了放弃main活着应用了 select{} ,理论我的项目中mspc往往会在一个继续运行的程序中调用,所以就没有下面的问题了。

当然咱们也能够间接来修复这个谬误,让生产者都发送完后给消费者发消息让其退出即可。

修复deadlock问题
要等生产者都发送完后才给消费者发消息,那么咱们就须要应用sync.WaitGroup来进行同步。 生产者如何给消费者发消息,同时要保障消费者把之前的音讯解决完后才退出。咱们能够又两种计划:

  1. 发送一个非凡标记的Msg,标记这个音讯是终止音讯
  2. close channel

第1种计划会造成通信额定的内存占用耗费,举荐第2种计划。

首先批改生产者代码如下,入参多了一个 wg *sync.WaitGroup , 生产者发送完数据后调用wg.Done()

// 生产者func producer(sendChan chan Msg, wg *sync.WaitGroup) {    for i := 0; i < 10; i++ {        sendChan <- Msg{in: i}    }    wg.Done()}

而后mpsc模型改写为如下:

func mpsc() {    // 生产者个数    pNum := 3    sendChan := make(chan Msg, 10)    wg := sync.WaitGroup{}    wg.Add(pNum)    for p := 0; p < pNum; p++ {        go producer(sendChan, &wg)    }    // 期待生产者都实现后敞开 sendChan 告诉到 消费者    go func() {        wg.Wait()        close(sendChan)    }()    consumer(sendChan)}

能够看到在mpsc中有以下几个变动

  1. 新起了一个goroutine,利用wg.Wait()期待生产者都实现,而后去敞开channel;
  2. consumer(sendChan) 不再新起一个goroutine来执行,这样mspc就变成阻塞的,期待消费者失常完结。

因为mpsc自身是阻塞的了,所以咱们在main中只须要调用mpsc即可

`func main() {

mpsc()

}`

残缺代码

生产者消费者双向通信
以后模型中,生产者把音讯发送给消费者后,并不知道音讯解决的后果,如果生产者想要晓得音讯的处理结果,该如何改变呢? 其中一个比拟常见的办法就是每个生产者保护一个本人公有的channel,而后在发送音讯的时候,把本人公有的channel连同音讯一起发送给消费者,消费者解决音讯后,再将处理结果通过音讯中的channel发送回生产者。

首先音讯类型定义中新增一个channel成员,用于存储生产者的公有channel

type Msg struct {    in int    ch chan int}

生产者开始的时候会新建一个独有的channel,而后在发送音讯的时候把这个通道放入到Msg中,同时生产者会新启一个goroutine从这个独有的channel中承受消费者返回的效应。

// 生产者func producer(sendChan chan Msg, wg *sync.WaitGroup) {    recvCh := make(chan int)    go func() {        for v := range recvCh {            fmt.Println("recv ", v)        }    }()    for i := 0; i < 10; i++ {        sendChan <- Msg{in: i, ch: recvCh}    }    wg.Done()}

最初批改音讯处理函数如下,将承受的音讯中的value加倍后通过音讯中的channel传递回去。

// 音讯处理函数func process(msg Msg) {    msg.ch <- 2 * msg.in}

残缺代码


目前为止生产者给消费者发送音讯是专用一个channel,还有一种计划时生产者应用本人独有的channel给消费者发送音讯。

变种二:生产者应用独有的channel和消费者通信

在这种计划中,每一个生产者保护一个独有的channel和消费者通信,消费者监听这些channel来获取音讯进行解决。

根底版

对于生产者,会创立一个独有的channel,返回进来给消费者读取,同时外部新起一个goroutine来往这个channel中发送数据,代码如下:

func producer(in []int) chan Msg {    ch := make(chan Msg)    go func() {        for _, v := range in {            ch <- Msg{in: v}        }        close(ch)    }()    return ch}

消费者会同时监听多个生产者的channel读取音讯,对应的消费者如下:

func consumer(ch1, ch2 chan Msg) {    for {        select {        case v1 := <-ch1:            fmt.Println(v1)        case v2 := <-ch2:            fmt.Println(v2)        }    }}

对应的mpsc模型如下

func mpsc() {    ch1 := producer([]int{1, 2, 3})    ch2 := producer([]int{4, 5, 6})    consumer(ch1, ch2)}

残缺代码

理论执行的时候会发现问题,当所有生产者发送完数据close本人的channel后,消费者还在不停的从channel外面承受数据,不过接管的数据值都是0. 其中起因在于close的通道依然时可读的,读取的时候理论会返回两个值,第一个值是个零值,第二个值是一个bool值,标识以后通道是否close了,所以咱们在代码中要读取这个bool值来判断以后的channel是否敞开。 另外select中的多个channel可能其中一个敞开了,然而其它的channel并没有close,依然有数据可读,那如何让消费者跳过曾经close的channel呢?咱们能够把曾经敞开的channel设置为nil,读写nil的channel都是阻塞的,所以在select中就会跳过这些channel了。

修复版

依据上文所说,咱们要做以下几点的批改

  1. 读取channel的敞开标记,来判断以后channel是否close了
  2. 如果以后channel曾经close了,那么就将以后channel设置为nil
  3. 当所有的channel都敞开了,消费者就退出

代码如下:

func consumer(ch1, ch2 chan Msg) {    var v1 Msg    var v2 Msg    ok1 := true    ok2 := true    for ok1 || ok2 {        select {        case v1, ok1 = <-ch1:            fmt.Println(v1)            if !ok1 { //通道敞开了                ch1 = nil            }        case v2, ok2 = <-ch2:            fmt.Println(v2)            if !ok2 { //通道敞开了                ch2 = nil            }        }    }}

残缺代码

SPMC:单生产者多消费者模型

如图,单个生产者和多个消费者之间通过共有的一个channel进行通信,生产者往channel外面写音讯,多个消费者争抢着从channel中读取音讯进行解决,这个模型十分像一个音讯队列中的FanOut模型。

生产者负责往channel写音讯,写完后敞开channel

func producer(ch chan Msg) {    in := []int{1, 2, 3, 4, 5, 6}    for _, v := range in {        ch <- Msg{in: v}    }    close(ch)}

消费者负责从channel中读取音讯

func consumer(ch chan Msg) {    for v := range ch {        fmt.Println(v)    }}

spmc模型代码如下:

func spmc() {    ch := make(chan Msg)    go producer(ch)    go consumer(ch)    go consumer(ch)    go consumer(ch)}

spmc模型解决敞开特地不便,因为只有一个生产者,所以生产者发送完音讯后就能够close这个channel,而后上游的消费者都能够在channel中数据被读完后主动退出。for range 操作channel的时候,如果读取到channel返回的bool值是false就会退出循环。

因为spmc外面生产者和消费者都是异步的,所以main只有放弃始终阻塞能力让逻辑失常执行。

`func main() {

spmc()select {}

}`

残缺代码

另外一种常见的写法如下,producer在外部创立channel而后返回进来

func producer() chan Msg {    in := []int{1, 2, 3, 4, 5, 6}    ch := make(chan Msg)    go func() {        for _, v := range in {            ch <- Msg{in: v}        }        close(ch)    }()    return ch}

而后消费者从producer返回的channel中读取音讯。 spmc模型如下

func spmc() {    ch := producer()    go consumer(ch)    go consumer(ch)    go consumer(ch)}

残缺代码