共计 9444 个字符,预计需要花费 24 分钟才能阅读完成。
golang channel 详解
前言
CSP: 不要通过共享内存来通信,而要通过通信来实现内存共享,它是 Go 的并发哲学,基于 channel 实现。
Channel 是 Go 中的一个外围类型,你能够把它看成一个管道,通过它并发外围单元就能够发送或者接收数据进行通信 (communication)。
数据结构
runtime/chan.go
type hchan struct {
qcount uint // 队列中残余元素
dataqsiz uint // 队列长度,eg make(chan int64, 5), dataqsiz 为 5
buf unsafe.Pointer // 数据存储环形数组
elemsize uint16 // 每个元素的大小
closed uint32 // 是否敞开 0 未敞开
elemtype *_type // 元素类型
sendx uint // 发送者写入地位
recvx uint // 接受者读数据地位
recvq waitq // 接收者队列,保留正在读取 channel 的 goroutian
sendq waitq // 发送者队列,保留正在发送 channel 的 goroutian
lock mutex // 锁
}
waitq 是双向链表,sudog 为 goroutian 的封装
type waitq struct {
first *sudog
last *sudog
}
make(chan int, 6)
上图为一个长度为 6,类型为 int, 两个接收者,三个发送者的 channel,以后接收者筹备读数据的地位为 0,发送者发送数据地位为 4
留神,个别状况下 recvq 和 sendq 至多有一个为空。只有一个例外,那就是同一个 goroutine 应用 select 语句向 channel 一边写数据,一边读数据。
channel 创立
创立 channel 的过程实际上是初始化 hchan 构造。其中类型信息和缓冲区长度由 make 语句传入,buf 的大小则与元素大小和缓冲区长度独特决定。
runtime/chan.go line:71
func makechan(t *chantype, size int) *hchan {
elem := t.elem
//...
var c *hchan
// 创立 hchan 构造并分配内存
switch {
// 无缓冲区
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
// 元素不含指针
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 默认场景,构造体和 buffer 独自分配内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 元素大小
c.elemsize = uint16(elem.size)
// 元素类型
c.elemtype = elem
// 队列长度
c.dataqsiz = uint(size)
//...
return c
}
写数据
- 如果期待接管队列 recvq 不为空,阐明缓冲区中没有数据或者没有缓冲区,此时间接从 recvq 取出 G, 并把数据写入,最初把该 G 唤醒,完结发送过程;
- 如果缓冲区中有空余地位,将数据写入缓冲区,完结发送过程;
- 如果缓冲区中没有空余地位,将待发送数据写入 G,将以后 G 退出 sendq,进入睡眠,期待被读 goroutine 唤醒;
写数据代码解析
写数据分为阻塞写和非阻塞写 代码示例:
c := make(chan int64)
c <- 1 // 阻塞写
// 非阻塞写
select {
case c <- 1:
//do something
break
default:
//do something
}
对应源码里两个函数:
// 非阻塞
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {return chansend(c, elem, false, getcallerpc())
}
// 阻塞
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}
留神:非阻塞写必须带上 default
chansend 源码
// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果 channel 是 nil
if c == nil {
// 不能阻塞,间接返回 false,示意未发送胜利
if !block {return false}
// 以后 goroutine 被挂起
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// 省略 debug 相干……
// 对于不阻塞的 send,疾速检测失败场景
//
// 如果 channel 未敞开且 channel 没有多余的缓冲空间。这可能是:// 1. channel 是非缓冲型的,且期待接管队列里没有 goroutine
// 2. channel 是缓冲型的,但循环数组曾经装满了元素
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {return false}
var t0 int64
if blockprofilerate > 0 {t0 = cputicks()
}
// 锁住 channel,并发平安
lock(&c.lock)
// 如果 channel 敞开了
if c.closed != 0 {
// 解锁
unlock(&c.lock)
// 间接 panic
panic(plainError("send on closed channel"))
}
// 如果接管队列里有 goroutine,间接将要发送的数据拷贝到接管 goroutine
if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() {unlock(&c.lock) }, 3)
return true
}
// 对于缓冲型的 channel,如果还有缓冲空间
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 地位
qp := chanbuf(c, c.sendx)
// ……
// 将数据从 ep 处拷贝到 qp
typedmemmove(c.elemtype, qp, ep)
// 发送游标值加 1
c.sendx++
// 如果发送游标值等于容量值,游标值归 0
if c.sendx == c.dataqsiz {c.sendx = 0}
// 缓冲区的元素数量加一
c.qcount++
// 解锁
unlock(&c.lock)
return true
}
// 如果不须要阻塞,则间接返回谬误
if !block {unlock(&c.lock)
return false
}
// channel 满了,发送方会被阻塞。接下来会结构一个 sudog
// 获取以后 goroutine 的指针
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {mysg.releasetime = -1}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 以后 goroutine 进入发送期待队列
c.sendq.enqueue(mysg)
// 以后 goroutine 被挂起
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 从这里开始被唤醒了(channel 有机会能够发送了)if mysg != gp.waiting {throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {throw("chansend: spurious wakeup")
}
// 被唤醒后,channel 敞开了。坑爹啊,panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)
}
// 去掉 mysg 上绑定的 channel
mysg.c = nil
releaseSudog(mysg)
return true
}
读数据
- 如果期待发送队列 sendq 不为空,且没有缓冲区,间接从 sendq 中取出 G,把 G 中数据读出,最初把 G 唤醒,完结读取过程;
- 如果期待发送队列 sendq 不为空,此时阐明缓冲区已满,从缓冲区中首部读出数据,把 G 中数据写入缓冲区尾部,把 G 唤醒,完结读取过程;
- 如果缓冲区中有数据,则从缓冲区取出数据,完结读取过程;
- 将以后 goroutine 退出 recvq,进入睡眠,期待被写 goroutine 唤醒;
读数据代码解析
读数据分为阻塞读和非阻塞读 代码示例:
c := make(chan int, 10)
<-c // 阻塞读
//select 读带 default 为非阻塞读
select{
case <-c:
//...
break
default:
//...
}
留神:非阻塞读必须带上 default
接管操作有两种写法,一种带“ok”,反馈 channel 是否敞开;一种不带“ok”,这种写法,当接管到相应类型的零值时无奈晓得是实在的发送者发送过去的值,还是 channel 被敞开后,返回给接收者的默认类型的零值,代码示例:
c := make(chan int64, 5)
c <- 0
v, ok := <-c
fmt.Println(v, ok) // 0, true
close(c)
v, ok = <-c
fmt.Println(v, ok) // 0, false
最初对应源码里的这四个函数:
// 非阻塞读不带 ok 返回
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {selected, _ = chanrecv(c, elem, false)
return
}
// 非阻塞读带 Ok 返回
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {// TODO(khr): just return 2 values from this function, now that it is in Go.
selected, *received = chanrecv(c, elem, false)
return
}
// 阻塞读不带 ok 返回
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}
// 阻塞读带 ok 返回
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)
return
}
能够看进去,最终都指向了 chanrecv 函数,如果有接管值,val := <-c,会把接管值放到 elem 的地址中,如果疏忽接管值间接写 <-c,这时 elem 为 nil
上面来看看 chanrecv 的代码
// 位于 src/runtime/chan.go
// chanrecv 函数接管 channel c 的元素并将其写入 ep 所指向的内存地址。// 如果 ep 是 nil,阐明疏忽了接管值。// 如果 block == false,即非阻塞型接管,在没有数据可接管的状况下,返回 (false, false)
// 否则,如果 c 处于敞开状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 省略 debug 内容 …………
// 如果是一个 nil 的 channel
if c == nil {// 如果不阻塞,间接返回 (false, false)
if !block {return}
// 否则,接管一个 nil 的 channel,goroutine 挂起
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
// 不会执行到这里
throw("unreachable")
}
// 在非阻塞模式下,疾速检测到失败,不必获取锁,疾速返回
// 当咱们察看到 channel 没筹备好接管:// 1. 非缓冲型,期待发送列队 sendq 里没有 goroutine 在期待
// 2. 缓冲型,但 buf 里没有元素
// 之后,又察看到 closed == 0,即 channel 未敞开。// 因为 channel 不可能被反复关上,所以前一个观测的时候 channel 也是未敞开的,// 因而在这种状况下能够间接发表接管失败,返回 (false, false)
// 非阻塞 && ((非缓冲型 && 发送队列为空) || (缓冲性 && 没有数据)) && 没有敞开
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {return}
var t0 int64
if blockprofilerate > 0 {t0 = cputicks()
}
// 加锁
lock(&c.lock)
// channel 已敞开,并且循环数组 buf 里没有元素
// 这里能够解决非缓冲型敞开 和 缓冲型敞开但 buf 无元素的状况
// 也就是说即便是敞开状态,但在缓冲型的 channel,// buf 里有元素的状况下还能接管到元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {raceacquire(unsafe.Pointer(c))
}
// 解锁
unlock(&c.lock)
if ep != nil {
// 从一个已敞开的 channel 执行接管操作,且未疏忽返回值
// 那么接管的值将是一个该类型的零值
// typedmemclr 依据类型清理相应地址的内存
typedmemclr(c.elemtype, ep)
}
// 从一个已敞开的 channel 接管,selected 会返回 true
return true, false
}
// 期待发送队列里有 goroutine 存在,阐明 buf 是满的
// 这有可能是:// 1. 非缓冲型的 channel
// 2. 缓冲型的 channel,但 buf 满了
// 针对 1,间接进行内存拷贝(从 sender goroutine -> receiver goroutine)// 针对 2,接管到循环数组头部的元素,并将发送者的元素放到循环数组尾部
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() {unlock(&c.lock) }, 3)
return true, true
}
// 缓冲型,buf 里有元素,能够失常接管
if c.qcount > 0 {
// 间接从循环数组里找到要接管的元素
qp := chanbuf(c, c.recvx)
// …………
// 代码里,没有疏忽要接管的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
if ep != nil {typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循环数组里相应地位的值
typedmemclr(c.elemtype, qp)
// 接管游标向前挪动
c.recvx++
// 接管游标归零
if c.recvx == c.dataqsiz {c.recvx = 0}
// buf 数组里的元素个数减 1
c.qcount--
// 解锁
unlock(&c.lock)
return true, true
}
if !block {
// 非阻塞接管,解锁。selected 返回 false,因为没有接管到值
unlock(&c.lock)
return false, false
}
// 接下来就是要被阻塞的状况了
// 结构一个 sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {mysg.releasetime = -1}
// 待接收数据的地址保留下来
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
// 进入 channel 的期待接管队列
c.recvq.enqueue(mysg)
// 将以后 goroutine 挂起
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// 被唤醒了,接着从这里继续执行一些开头工作
if mysg != gp.waiting {throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是非缓冲型的 channel
if c.dataqsiz == 0 {
if raceenabled {racesync(c, sg)
}
// 未疏忽接管的数据
if ep != nil {
// 间接拷贝数据,从 sender goroutine -> receiver goroutine
recvDirect(c.elemtype, sg, ep)
}
} else {
// 缓冲型的 channel,但 buf 已满。// 将循环数组 buf 队首的元素拷贝到接收数据的地址
// 将发送者的数据入队。实际上这时 revx 和 sendx 值相等
// 找到接管游标
qp := chanbuf(c, c.recvx)
// …………
// 将接管游标处的数据拷贝给接收者
if ep != nil {typedmemmove(c.elemtype, ep, qp)
}
// 将发送者数据拷贝到 buf
typedmemmove(c.elemtype, qp, sg.elem)
// 更新游标值
c.recvx++
if c.recvx == c.dataqsiz {c.recvx = 0}
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
// 解锁
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
// 唤醒发送的 goroutine。须要等到调度器的光顾
goready(gp, skip+1)
}
敞开 channel
close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中别离保留了阻塞的发送者和接收者。敞开 channel 后,对于期待接收者而言,会收到一个相应类型的零值。对于期待发送者,会间接 panic。所以,在不理解 channel 还有没有接收者的状况下,不能贸然敞开 channel。
close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最初,再将所有的 sudog 全都唤醒。
留神:敞开曾经敞开的 channel 或者往一个敞开的 channel 中发送数据会产生 panic
敞开准则:
个别原则上应用通道是不容许接管方敞开通道和 不能敞开一个有多个并发发送者的通道 。换而言之,你只能在发送方的 goroutine 中敞开只有该发送方的通道。
敞开代码解析
func closechan(c *hchan) {
// 敞开一个 nil channel,panic
if c == nil {panic(plainError("close of nil channel"))
}
// 上锁
lock(&c.lock)
// 如果 channel 曾经敞开
if c.closed != 0 {unlock(&c.lock)
// panic
panic(plainError("close of closed channel"))
}
// …………
// 批改敞开状态
c.closed = 1
var glist *g
// 将 channel 所有期待接管队列的里 sudog 开释
for {
// 从接管队列里出队一个 sudog
sg := c.recvq.dequeue()
// 出队结束,跳出循环
if sg == nil {break}
// 如果 elem 不为空,阐明此 receiver 未疏忽接收数据
// 给它赋一个相应类型的零值
if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
// 取出 goroutine
gp := sg.g
gp.param = nil
if raceenabled {raceacquireg(gp, unsafe.Pointer(c))
}
// 相连,造成链表
gp.schedlink.set(glist)
glist = gp
}
// 将 channel 期待发送队列里的 sudog 开释
// 如果存在,这些 goroutine 将会 panic
for {
// 从发送队列里出队一个 sudog
sg := c.sendq.dequeue()
if sg == nil {break}
// 发送者会 panic
sg.elem = nil
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {raceacquireg(gp, unsafe.Pointer(c))
}
// 造成链表
gp.schedlink.set(glist)
glist = gp
}
// 解锁
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 遍历链表
for glist != nil {
// 取最初一个
gp := glist
// 向前走一步,下一个唤醒的 g
glist = glist.schedlink.ptr()
gp.schedlink = 0
// 唤醒相应 goroutine
goready(gp, 3)
}
}