共计 5025 个字符,预计需要花费 13 分钟才能阅读完成。
channel 一个类型管道,通过它可以在 goroutine 之间发送和接收消息。它是 Golang 在语言层面提供的 goroutine 间的通信方式。
众所周知,Go 依赖于称为 CSP(Communicating Sequential Processes)的并发模型,通过 Channel 实现这种同步模式。Go 并发的核心哲学是不要通过共享内存进行通信; 相反,通过沟通分享记忆。
下面以简单的示例来演示 Go 如何通过 channel 来实现通信。
package main
import (
"fmt"
"time"
)
func goRoutineA(a <-chan int) {
val := <-a
fmt.Println("goRoutineA received the data", val)
}
func goRoutineB(b chan int) {
val := <-b
fmt.Println("goRoutineB received the data", val)
}
func main() {ch := make(chan int, 3)
go goRoutineA(ch)
go goRoutineB(ch)
ch <- 3
time.Sleep(time.Second * 1)
}
结果为:
goRoutineA received the data 3
上面只是个简单的例子,只输出 goRoutineA,没有执行 goRoutineB,说明 channel 仅允许被一个 goroutine 读写。
接下来我们通过源代码分析程序执行过程,在讲之前,如果不了解 go 并发和调度相关知识。请阅读这篇文章
https://github.com/guyan0319/…
说道 channel 这里不得不提通道的结构 hchan。
hchan
源代码在 src/runtime/chan.go
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
说明:
qcount uint // 当前队列中剩余元素个数
dataqsiz uint // 环形队列长度,即缓冲区的大小,即 make(chan T,N),N.
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 表示当前通道是否处于关闭状态。创建通道后,该字段设置为 0,即通道打开; 通过调用 close 将其设置为 1,通道关闭。
elemtype *_type // 元素类型,用于数据传递过程中的赋值;
sendx uint 和recvx uint 是环形缓冲区的状态字段,它指示缓冲区的当前索引 – 支持数组,它可以从中发送数据和接收数据。
recvq waitq // 等待读消息的 goroutine 队列
sendq waitq // 等待写消息的 goroutine 队列
lock mutex // 互斥锁,为每个读写操作锁定通道,因为发送和接收必须是互斥操作。
这里sudog 代表 goroutine。
创建 channel 有两种,一种是带缓冲的 channel,一种是不带缓冲的 channel
// 带缓冲
ch := make(chan Task, 3)
// 不带缓冲
ch := make(chan int)
这里我们先讨论带缓冲
ch := make(chan int, 3)
创建通道后的缓冲通道结构
hchan struct {
qcount uint : 0
dataqsiz uint : 3
buf unsafe.Pointer : 0xc00007e0e0
elemsize uint16 : 8
closed uint32 : 0
elemtype *runtime._type : &{
size:8
ptrdata:0
hash:4149441018
tflag:7
align:8
fieldalign:8
kind:130
alg:0x55cdf0
gcdata:0x4d61b4
str:1055
ptrToThis:45152
}
sendx uint : 0
recvx uint : 0
recvq runtime.waitq :
{first:<nil> last:<nil>}
sendq runtime.waitq :
{first:<nil> last:<nil>}
lock runtime.mutex :
{key:0}
}
源代码
func makechan(t *chantype, size int) *hchan {
elem := t.elem
...
}
如果我们创建一个带 buffer 的 channel,底层的数据模型如下图:
向 channel 写入数据
ch <- 3
底层 hchan 数据流程如图
发送操作概要
1、锁定整个通道结构。
2、确定写入。尝试 recvq
从等待队列中等待 goroutine,然后将元素直接写入 goroutine。
3、如果 recvq 为 Empty,则确定缓冲区是否可用。如果可用,从当前 goroutine 复制数据到缓冲区。
4、如果缓冲区已满,则要 写入的元素将保存在当前正在执行的 goroutine 的结构中,并且当前 goroutine 将在 sendq 中 排队并从运行时挂起。
5、写入完成释放锁。
这里我们要注意几个属性 buf、sendx、lock 的变化。
流程图
从 channel 读取操作
几乎和写入操作相同
代码
func goRoutineA(a <-chan int) {
val := <-a
fmt.Println("goRoutineA received the data", val)
}
底层 hchan 数据流程如图
这里我们要注意几个属性 buf、sendx、recvx、lock 的变化。
读取操作概要
1、先获取 channel 全局锁
2、尝试 sendq 从等待队列中获取等待的 goroutine,
3、如有等待的 goroutine,没有缓冲区,取出 goroutine 并读取数据,然后唤醒这个 goroutine,结束读取释放锁。
4、如有等待的 goroutine,且有缓冲区(此时缓冲区已满),从缓冲区队首取出数据,再从 sendq 取出一个 goroutine,将 goroutine 中的数据存入 buf 队尾,结束读取释放锁。
5、如没有等待的 goroutine,且缓冲区有数据,直接读取缓冲区数据,结束读取释放锁。
6、如没有等待的 goroutine,且没有缓冲区或缓冲区为空,将当前的 goroutine 加入 sendq 排队,进入睡眠,等待被写 goroutine 唤醒。结束读取释放锁。
流程图
recvq 和 sendq 结构
recvq 和 sendq 基本上是链表,看起来基本如下
select
select 就是用来监听和 channel 有关的 IO 操作,当 IO 操作发生时,触发相应的动作。
一个简单的示例如下
package main
import (
"fmt"
"time"
)
func goRoutineD(ch chan int, i int) {time.Sleep(time.Second * 3)
ch <- i
}
func goRoutineE(chs chan string, i string) {time.Sleep(time.Second * 3)
chs <- i
}
func main() {ch := make(chan int, 5)
chs := make(chan string, 5)
go goRoutineD(ch, 5)
go goRoutineE(chs, "ok")
select {
case msg := <-ch:
fmt.Println("received the data", msg)
case msgs := <-chs:
fmt.Println("received the data", msgs)
default:
fmt.Println("no data received")
time.Sleep(time.Second * 1)
}
}
运行程序,因为当前时间没有到 3s,所以 select 选择 defult
no data received
修改程序,我们注释掉 default,并多执行几次结果为
received the data 5
received the data ok
received the data ok
received the data ok
select 语句会阻塞,直到监测到一个可以执行的 IO 操作为止,而这里 goRoutineD 和 goRoutineE 睡眠时间是相同的,都是 3s,从输出可看出,从 channel 中读出数据的顺序是随机的。
再修改代码,goRoutineD 睡眠时间改成 4s
func goRoutineD(ch chan int, i int) {time.Sleep(time.Second * 4)
ch <- i
}
此时会先执行 goRoutineE,select 选择 case msgs := <-chs。
range
可以持续从 channel 读取数据,一直到 channel 被关闭,当 channel 中没有数据时会阻塞当前 goroutine,与读 channel 时阻塞处理机制一样。
package main
import (
"fmt"
"time"
)
func goRoutineD(ch chan int, i int) {
for i := 1; i <= 5; i++{time.Sleep(time.Second * 1)
ch <- i
}
}
func chanRange(chanName chan int) {
for e := range chanName {fmt.Printf("Get element from chan: %d\n", e)
if len(chanName) <= 0 { // 如果现有数据量为 0,跳出循环
break
}
}
}
func main() {ch := make(chan int, 5)
go goRoutineD(ch, 5)
chanRange(ch)
}
结果:
Get element from chan: 1
Get element from chan: 2
Get element from chan: 3
Get element from chan: 4
Get element from chan: 5
死锁(deadlock)
指两个或两个以上的协程的执行过程中,由于竞争资源或由于彼此通信而造成的一种阻塞的现象。
在非缓冲信道若发生只流入不流出,或只流出不流入,就会发生死锁。
下面是一些死锁的例子
1、
package main
func main() {ch := make(chan int)
ch <- 3
}
上面情况,向非缓冲通道写数据会发生阻塞,导致死锁。解决办法创建缓冲区 ch := make(chan int,3)
2、
package main
import ("fmt")
func main() {ch := make(chan int)
fmt.Println(<-ch)
}
向非缓冲通道读取数据会发生阻塞,导致死锁。解决办法开启缓冲区,先向 channel 写入数据。
3、
package main
func main() {ch := make(chan int, 3)
ch <- 3
ch <- 4
ch <- 5
ch <- 6
}
写入数据超过缓冲区数量也会发生死锁。解决办法将写入数据取走。
4、
package main
func main() {ch := make(chan int, 3)
ch <- 1
close(ch)
ch <- 2
}
向关闭的 channel 写入数据。解决办法别向关闭的 channel 写入数据。
死锁的情况有很多这里不再赘述。
参考:
https://codeburst.io/diving-d…
https://speakerdeck.com/kavya…
https://my.oschina.net/renhc/…
links
- 目录