共计 5618 个字符,预计需要花费 15 分钟才能阅读完成。
前言
channel 作为 Go 外围的数据结构和 Goroutine 之间的通信形式,Channel 是撑持 Go 语言高性能并发编程模型的重要构造本节会介绍管道 Channel 的设计原理、数据结构和常见操作,例如 Channel 的创立、发送、接管和敞开。
在进入主题内容之前,读者须要先把握下表中的不同状态下的 channel 执行 Read、Write、close 操作所会产生的后果。
图来自 曹大
Go 语言中最常见的、也是常常被人提及的设计模式就是:不要通过共享内存的形式进行通信,而是应该通过通信的形式共享内存。尽管咱们在 Go 语言中也能应用共享内存加互斥锁进行通信,然而 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 别离对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。本文将会介绍基于 channel 实现的多种通信模型。
MPSC: 多生产者单消费者模型
对于 MPSC 的利用场景中,多个生产者负责生产数据,只有一个消费者来生产数据,而这个模型又可分为两种实现形式:
- 多个生产者是专用一个 channel 来和消费者通信
- 应用本人独有的 channel 来和消费者通信
变种一:生产者共用 channel
如图所示,右边有多个生产 goroutine 往公共的 channel 中写入数据,左边只有一个生产 goroutine 从这个 channel 中读取数据进行解决。
根底版
咱们首先定义传递的音讯构造体定义:
`type Msg struct {
in int
}`
而后实现生产者如下,其中参数 sendChan 就是生产者和消费者进行通信的 channel
// 生产者
func producer(sendChan chan Msg) {
for i := 0; i < 10; i++ {sendChan <- Msg{in: i}
}
}
消费者以及音讯处理函数定义如下,其中参数 sendChan 就是生产者和消费者进行通信的 channel,以后音讯处理函数 process 目前只是把音讯内容打印进去。
// 消费者
func consumer(sendChan chan Msg) {
for v := range sendChan {process(v)
}
}
// 音讯处理函数
func process(msg Msg){fmt.Println(msg)
}
mpsc 的模型代码如下,首先创立通信用的 channel,而后开启三个生产者 goroutine,一个消费者 goroutine。
func mpsc() {sendChan := make(chan Msg, 10)
for p := 0; p < 3; p++ {go producer(sendChan)
}
go consumer(sendChan)
}
main 函数如下, 外面 select{} 是为了放弃 main 函数所在的 goroutine 始终阻塞,不然 main 函数立即退出后,生产者和消费者 goroutine 说不定还没有执行或只执行了一部分就退出了。
`func main() {
mpsc()
select{}
}`
残缺代码 在线演示 https://www.online-ide.com/IB… 后果如下
{0}
{1}
{2}
{3}
{4}
{5}
{6}
{7}
{8}
{9}
{0}
{1}
{2}
{3}
{4}
{5}
{6}
{7}
{8}
{9}
{0}
{1}
{2}
{3}
{4}
{5}
{6}
{7}
{8}
{9}
fatal error:
all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
main.main()
/home/kingeasternsun/1ba7ba19-5e66-4f4c-beb3-cb5e3e6d881e/main.go:9 +0x25
goroutine 9 [chan receive]:
main.consumer(0xc000072000)
/home/kingeasternsun/1ba7ba19-5e66-4f4c-beb3-cb5e3e6d881e/main.go:25 +0xa9
created by main.mpsc
/home/kingeasternsun/1ba7ba19-5e66-4f4c-beb3-cb5e3e6d881e/main.go:43 +0x9b
exit status 2
** Process exited - Return Code: 1 **
能够看到打印的数字是由穿插的,阐明多个生产者并发的执行了写入。不过最初生产者发送完后程序有两个报错:
- 第一条 goroutine 1 [select (no cases)]: 是指 select{} 始终阻塞
- 第二条 goroutine 9 [chan receive]: 是指在于生产者发送完数据库后没有新的数据写入 channel,而消费者生产完 channel 中的数据库,再从空的 channel 中读数据就会统一阻塞,所以下面报 fatal error: all goroutines are asleep – deadlock! 这个谬误
因为咱们是 demo,为了放弃 main 活着应用了 select{},理论我的项目中 mspc 往往会在一个继续运行的程序中调用,所以就没有下面的问题了。
当然咱们也能够间接来修复这个谬误,让生产者都发送完后给消费者发消息让其退出即可。
修复 deadlock 问题
要等生产者都发送完后才给消费者发消息,那么咱们就须要应用 sync.WaitGroup 来进行同步。生产者如何给消费者发消息,同时要保障消费者把之前的音讯解决完后才退出。咱们能够又两种计划:
- 发送一个非凡标记的 Msg,标记这个音讯是终止音讯
- close channel
第 1 种计划会造成通信额定的内存占用耗费,举荐第 2 种计划。
首先批改生产者代码如下,入参多了一个 wg *sync.WaitGroup,生产者发送完数据后调用 wg.Done()
// 生产者
func producer(sendChan chan Msg, wg *sync.WaitGroup) {
for i := 0; i < 10; i++ {sendChan <- Msg{in: i}
}
wg.Done()}
而后 mpsc 模型改写为如下:
func mpsc() {
// 生产者个数
pNum := 3
sendChan := make(chan Msg, 10)
wg := sync.WaitGroup{}
wg.Add(pNum)
for p := 0; p < pNum; p++ {go producer(sendChan, &wg)
}
// 期待生产者都实现后敞开 sendChan 告诉到 消费者
go func() {wg.Wait()
close(sendChan)
}()
consumer(sendChan)
}
能够看到在 mpsc 中有以下几个变动
- 新起了一个 goroutine,利用 wg.Wait()期待生产者都实现,而后去敞开 channel;
- consumer(sendChan) 不再新起一个 goroutine 来执行,这样 mspc 就变成阻塞的,期待消费者失常完结。
因为 mpsc 自身是阻塞的了,所以咱们在 main 中只须要调用 mpsc 即可
`func main() {
mpsc()
}`
残缺代码
生产者消费者双向通信
以后模型中,生产者把音讯发送给消费者后,并不知道音讯解决的后果,如果生产者想要晓得音讯的处理结果,该如何改变呢?其中一个比拟常见的办法就是每个生产者保护一个本人公有的 channel,而后在发送音讯的时候,把本人公有的 channel 连同音讯一起发送给消费者,消费者解决音讯后,再将处理结果通过音讯中的 channel 发送回生产者。
首先音讯类型定义中新增一个 channel 成员,用于存储生产者的公有 channel
type Msg struct {
in int
ch chan int
}
生产者开始的时候会新建一个独有的 channel,而后在发送音讯的时候把这个通道放入到 Msg 中,同时生产者会新启一个 goroutine 从这个独有的 channel 中承受消费者返回的效应。
// 生产者
func producer(sendChan chan Msg, wg *sync.WaitGroup) {recvCh := make(chan int)
go func() {
for v := range recvCh {fmt.Println("recv", v)
}
}()
for i := 0; i < 10; i++ {sendChan <- Msg{in: i, ch: recvCh}
}
wg.Done()}
最初批改音讯处理函数如下,将承受的音讯中的 value 加倍后通过音讯中的 channel 传递回去。
// 音讯处理函数
func process(msg Msg) {msg.ch <- 2 * msg.in}
残缺代码
目前为止生产者给消费者发送音讯是专用一个 channel,还有一种计划时生产者应用本人独有的 channel 给消费者发送音讯。
变种二:生产者应用独有的 channel 和消费者通信
在这种计划中,每一个生产者保护一个独有的 channel 和消费者通信,消费者监听这些 channel 来获取音讯进行解决。
根底版
对于生产者,会创立一个独有的 channel,返回进来给消费者读取,同时外部新起一个 goroutine 来往这个 channel 中发送数据,代码如下:
func producer(in []int) chan Msg {ch := make(chan Msg)
go func() {
for _, v := range in {ch <- Msg{in: v}
}
close(ch)
}()
return ch
}
消费者会同时监听多个生产者的 channel 读取音讯,对应的消费者如下:
func consumer(ch1, ch2 chan Msg) {
for {
select {
case v1 := <-ch1:
fmt.Println(v1)
case v2 := <-ch2:
fmt.Println(v2)
}
}
}
对应的 mpsc 模型如下
func mpsc() {ch1 := producer([]int{1, 2, 3})
ch2 := producer([]int{4, 5, 6})
consumer(ch1, ch2)
}
残缺代码
理论执行的时候会发现问题,当所有生产者发送完数据 close 本人的 channel 后,消费者还在不停的从 channel 外面承受数据,不过接管的数据值都是 0. 其中起因在于 close 的通道依然时可读的,读取的时候理论会返回两个值,第一个值是个零值,第二个值是一个 bool 值,标识以后通道是否 close 了,所以咱们在代码中要读取这个 bool 值来判断以后的 channel 是否敞开。另外 select 中的多个 channel 可能其中一个敞开了,然而其它的 channel 并没有 close,依然有数据可读,那如何让消费者跳过曾经 close 的 channel 呢?咱们能够把曾经敞开的 channel 设置为 nil,读写 nil 的 channel 都是阻塞的,所以在 select 中就会跳过这些 channel 了。
修复版
依据上文所说,咱们要做以下几点的批改
- 读取 channel 的敞开标记,来判断以后 channel 是否 close 了
- 如果以后 channel 曾经 close 了,那么就将以后 channel 设置为 nil
- 当所有的 channel 都敞开了,消费者就退出
代码如下:
func consumer(ch1, ch2 chan Msg) {
var v1 Msg
var v2 Msg
ok1 := true
ok2 := true
for ok1 || ok2 {
select {
case v1, ok1 = <-ch1:
fmt.Println(v1)
if !ok1 { // 通道敞开了
ch1 = nil
}
case v2, ok2 = <-ch2:
fmt.Println(v2)
if !ok2 { // 通道敞开了
ch2 = nil
}
}
}
}
残缺代码
SPMC:单生产者多消费者模型
如图,单个生产者和多个消费者之间通过共有的一个 channel 进行通信,生产者往 channel 外面写音讯,多个消费者争抢着从 channel 中读取音讯进行解决,这个模型十分像一个音讯队列中的 FanOut 模型。
生产者负责往 channel 写音讯,写完后敞开 channel
func producer(ch chan Msg) {in := []int{1, 2, 3, 4, 5, 6}
for _, v := range in {ch <- Msg{in: v}
}
close(ch)
}
消费者负责从 channel 中读取音讯
func consumer(ch chan Msg) {
for v := range ch {fmt.Println(v)
}
}
spmc 模型代码如下:
func spmc() {ch := make(chan Msg)
go producer(ch)
go consumer(ch)
go consumer(ch)
go consumer(ch)
}
spmc 模型解决敞开特地不便,因为只有一个生产者,所以生产者发送完音讯后就能够 close 这个 channel,而后上游的消费者都能够在 channel 中数据被读完后主动退出。for range 操作 channel 的时候,如果读取到 channel 返回的 bool 值是 false 就会退出循环。
因为 spmc 外面生产者和消费者都是异步的,所以 main 只有放弃始终阻塞能力让逻辑失常执行。
`func main() {
spmc()
select {}
}`
残缺代码
另外一种常见的写法如下,producer 在外部创立 channel 而后返回进来
func producer() chan Msg {in := []int{1, 2, 3, 4, 5, 6}
ch := make(chan Msg)
go func() {
for _, v := range in {ch <- Msg{in: v}
}
close(ch)
}()
return ch
}
而后消费者从 producer 返回的 channel 中读取音讯。spmc 模型如下
func spmc() {ch := producer()
go consumer(ch)
go consumer(ch)
go consumer(ch)
}
残缺代码