chan 介绍
package mainimport “fmt”
func main() {
c := make(chan int)
go func() {
c <- 1 // send to channel
}()
x := <-c // recv from channel
fmt.Println(x)
}
咱们能够这样查看汇编后果:
go tool compile -N -l -S hello.go- N 示意禁用优化 - l 禁用内联 - S 打印后果
通过下面这样的方流量交易式,咱们能够直到 chan 是调用的哪些函数:
源码剖析
构造体与创立
type hchan struct {
qcount uint // 循环列表元素个数
dataqsiz uint // 循环队列的大小
buf unsafe.Pointer // 循环队列的指针
elemsize uint16 // chan 中元素的大小
closed uint32 // 是否已 close
elemtype *_type // chan 中元素类型
sendx uint // send 在 buffer 中的索引
recvx uint // recv 在 buffer 中的索引
recvq waitq // receiver 的期待队列
sendq waitq // sender 的期待队列
// 互拆锁
lock mutex
}
qcount 代表 chan 中曾经接管但还没被取走的元素的个数,函数 len 能够返回这个字段的值;
dataqsiz 和 buf 别离代表队列 buffer 的大小,cap 函数能够返回这个字段的值以及队列 buffer 的指针,是一个定长的环形数组;
elemtype 和 elemsiz 示意 chan 中元素的类型和 元素的大小;
sendx:发送数据的指针在 buffer 中的地位;
recvx:接管申请时的指针在 buffer 中的地位;
recvq 和 sendq 别离示意期待接收数据的 goroutine 与期待发送数据的 goroutine;
sendq 和 recvq 的类型是 waitq 的构造体:
type waitq struct {
first *sudog
last *sudog
}
waitq 外面连贯的是一个 sudog 双向链表,保留的是期待的 goroutine。整个 chan 的图例大略是这样:上面看一下创立 chan,咱们通过汇编后果也能够查看到 make(chan int) 这句代码会调用到 runtime 的 makechan 函数中:
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
)
func makechan(t chantype, size int) hchan {
elem := t.elem
// 略去查看代码
…
// 计算须要调配的 buf 空间
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError(“makechan: size out of range”))
}
var c *hchan
switch {
case mem == 0:
// chan 的 size 或者元素的 size 是 0,不用创立 buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不是指针,调配一块间断的内存给 hchan 数据结构和 buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 示意 hchan 前面在内存里紧跟着就是 buf
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素蕴含指针,那么独自调配 buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
首先咱们能够看到计算 hchanSize:
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
maxAlign 是 8,那么 maxAlign- 1 的二进制就是 111,而后和 int(unsafe.Sizeof(hchan{})) 取与就是取它的低三位,hchanSize 就失去的是 8 的整数倍,做对齐应用。
这里 switch 有三种状况,第一种状况是缓冲区所需大小为 0,那么在为 hchan 分配内存时,只须要调配 sizeof(hchan) 大小的内存;
第二种状况是缓冲区所需大小不为 0,而且数据类型不蕴含指针,那么就调配间断的内存。留神的是,咱们在创立 channel 的时候能够指定类型为指针类型:
//chan 里存入的是 int 的指针
c := make(chan *int)//chan 里存入的是 int 的值
c := make(chan int)
第三种状况是缓冲区所需大小不为 0,而且数据类型蕴含指针,那么就不应用 add 的形式让 hchan 和 buf 放在一起了,而是独自的为 buf 申请一块内存。
发送数据
channel 的阻塞非阻塞
在看发送数据的代码之前,咱们先看一下什么是 channel 的阻塞和非阻塞。
个别状况下,传入的参数都是 block=true,即阻塞调用,一个往 channel 中插入数据的 goroutine 会阻塞到插入胜利为止。
非阻塞是只这种状况:
select {case c <- v:
… foodefault:
… bar}
编译器会将其改为:
if selectnbsend(c, v) {
… foo
} else {
… bar
}
selectnbsend 办法传入的 block 就是 false:
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
chansend 办法
向通道发送数据咱们通过汇编后果能够发现是在 runtime 中通过 chansend 实现的,办法比拟长上面咱们分段来进行了解:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 对于非阻塞的发送, 间接返回
if !block {
return false
}
// 对于阻塞的通道,将 goroutine 挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw(“unreachable”)
}
…
}
这里会对 chan 做一个判断,如果它是空的,那么对于非阻塞的发送,间接返回 false;对于阻塞的通道,将 goroutine 挂起,并且永远不会返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
…
// 非阻塞的状况下,如果通道没有敞开,满足以下一条:
// 1. 没有缓冲区并且以后没有接收者
// 2. 缓冲区不为 0,并且已满
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
…
}
须要留神的是这里是没有加锁的,go 尽管在应用指针读取单个值的时候原子性的,然而读取多个值并不能保障,所以在判断完 closed 尽管是没有敞开的,那么在读取完之后仍然可能在这一瞬间从未敞开状态转变成敞开状态。那么就有两种可能:
通道没有敞开,而且曾经满了,那么须要返回 false,没有问题;
通道敞开,而且曾经满了,然而在非阻塞的发送中返回 false,也没有问题;
无关 go 的一致性原语
下面的这些判断被称为 fast path,因为加锁的操作是一个很重的操作,所以可能在加锁之前返回的判断就在加锁之前做好是最好的。
上面接着看看加锁局部的代码:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
…
// 加锁
lock(&c.lock)
// 是否敞开的判断
if c.closed != 0 {
unlock(&c.lock)
panic(plainError(“send on closed channel”))
}
// 从 recvq 中取出一个接收者
if sg := c.recvq.dequeue(); sg != nil {
// 如果接收者存在,间接向该接收者发送数据,绕过 buffer
send(c, sg, ep, func() {unlock(&c.lock) }, 3)
return true
}
…
}
进入了 lock 区域之后还须要再判断以下 close 的状态,而后从 recvq 中取出一个接收者,如果曾经有接收者,那么就向第一个接收者发送以后 enqueue 的音讯。这里须要留神的是如果有接收者在队列中期待,则阐明此时的缓冲区是空的。
既然是一行行剖析代码,那么咱们再进入到 send 看一下实现:
func send(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) {
…
if sg.elem != nil {
// 间接把要发送的数据 copy 到 reciever 的栈空间
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒对应的 goroutine
goready(gp, skip+1)
}
在 send 办法里,sg 就是 goroutine 打包好的对象,ep 是对应要发送数据的指针,sendDirect 办法会调用 memmove 进行数据的内存拷贝。而后 goready 函数会唤醒对应的 goroutine 进行调度。
回到 chansend 办法,持续往下看:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
…
// 如果缓冲区没有满,间接将要发送的数据复制到缓冲区
if c.qcount < c.dataqsiz {
// 找到 buf 要填充数据的索引地位
qp := chanbuf(c, c.sendx)
…
// 将数据拷贝到 buffer 中
typedmemmove(c.elemtype, qp, ep)
// 数据索引前移,如果到了开端,又从 0 开始
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 元素个数加 1,开释锁并返回
c.qcount++
unlock(&c.lock)
return true
}
…
}
这里会判断 buf 缓冲区有没有满,如果没有满,那么就找到 buf 要填充数据的索引地位,调用 typedmemmove 办法将数据拷贝到 buf 中,而后从新设值 sendx 偏移量。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
…
// 缓冲区没有空间了,所以对于非阻塞调用间接返回
if !block {
unlock(&c.lock)
return false
}
// 创立 sudog 对象
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将 sudog 对象入队
c.sendq.enqueue(mysg)
// 进入期待状态
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
…
}
这里会做两局部的操作,对于非阻塞的调用会间接返回;对于阻塞的调用会创立 sudog 对象,而后将 sudog 对象入队之后 gopark 将 goroutine 转入 waiting 状态,并解锁。调用 gopark 之后,在使用者看来该向 channel 发送数据的代码语句会进行阻塞。
这里也须要留神一下,如果缓冲区为 0,那么也会进入到这里,会调用到 gopark 立马阻塞,所以在应用的时候须要记得接收数据,避免向 chan 发送数据的那一端永远阻塞,如:
func process(timeout time.Duration) bool {
ch := make(chan bool)
go func() {
// 模仿解决耗时的业务
time.Sleep((timeout + time.Second))
ch <- true // block
fmt.Println(“exit goroutine”)
}()
select {
case result := <-ch:
return result
case <-time.After(timeout):
return false
}
}
如果这里在 select 的时候间接 timeout 返回了,而没有调用 result := <-ch,那么 goroutine 就会永远阻塞。
到这里发送的代码就解说完了,整个流程大抵如下:
比方我要执行:ch<-10
查看 recvq 是否为空,如果不为空,则从 recvq 头部取一个 goroutine,将数据发送过来;
如果 recvq 为空,,并且 buf 没有满,则将数据放入到 buf 中;
如果 buf 已满,则将要发送的数据和以后 goroutine 打包成 sudog,而后入队到 sendq 队列中,并将以后 goroutine 置为 waiting 状态进行阻塞。
接收数据
从 chan 获取数据实现函数为 chanrecv。上面咱们看一下代码实现:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
…
if c == nil {
// 如果 c 为空且是非阻塞调用,那么间接返回 (false,false)
if !block {
return
}
// 阻塞调用间接期待
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw(“unreachable”)
}
// 对于非阻塞的状况,并且没有敞开的状况
// 如果是无缓冲 chan 或者是 chan 中没有数据,那么间接返回 (false,false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
// 上锁
lock(&c.lock)
// 如果曾经敞开,并且 chan 中没有数据,返回 (true,false)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
…
}
chanrecv 办法和 chansend 办法是一样的,首先也是做非空判断,如果 chan 没有初始化,那么如果是非阻塞调用,那么间接返回 (false,false),阻塞调用会间接期待;
上面的两个 if 判断我放在一起来进行解说,因为这里和 chansend 是不一样的,chanrecv 要依据不同条件须要返回不同的后果。
在上锁之前的判断是边界条件的判断:如果是非阻塞调用会判断 chan 没有发送方(dataqsiz 为空且发送队列为空),或 chan 的缓冲为空(dataqsiz>0 并且 qcount==0)并且 chan 是没有 close,那么须要返回 (false,false);而 chan 曾经敞开了,并且 buf 中没有数据,须要返回 (true,false);
为了实现这个需要,所以在 chanrecv 办法外面边界条件的判断都应用 atomic 办法进行了获取。
因为须要正确的失去 chan 已敞开,并且 buf 空会返回(true, false),而不是(false,false),所以在 lock 上锁之前须要应用 atomic 来获取参数避免重排序(Happens Before),因而必须使此处的 qcount 和 closed 的读取操作的程序通过原子操作失去程序保障。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
…
// 从发送者队列获取数据
if sg := c.sendq.dequeue(); sg != nil {
// 发送者队列不为空,间接从发送者那里提取数据
recv(c, sg, ep, func() {unlock(&c.lock) }, 3)
return true, true
}
…
}
func recv(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是无缓冲区 chan
if c.dataqsiz == 0 {
…
if ep != nil {
// 间接从发送者拷贝数据
recvDirect(c.elemtype, sg, ep)
}
// 有缓冲区 chan
} else {
// 获取 buf 的存放数据指针
qp := chanbuf(c, c.recvx)
…
// 间接从缓冲区拷贝数据给接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 从发送者拷贝数据到缓冲区
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将发送者唤醒
goready(gp, skip+1)
}
在这里如果有发送者在队列期待,那么间接从发送者那里提取数据,并且唤醒这个发送者。须要留神的是因为有发送者在期待,所以如果有缓冲区,那么缓冲区肯定是满的。
在唤醒发送者之前须要对缓冲区做判断,如果是无缓冲区,那么间接从发送者那里提取数据;如果有缓冲区首先会获取 recvx 的指针,而后将从缓冲区拷贝数据给接收者,再将发送者数据拷贝到缓冲区。
而后将 recvx 加 1,相当于将新的数据移到了队尾,再将 recvx 的值赋值给 sendx,最初调用 goready 将发送者唤醒,这里有些绕,咱们通过图片来展现:
这里展现的是在 chansend 中将数据拷贝到缓冲区中,当数据满的时候会将 sendx 的指针置为 0,所以当 buf 环形队列是满的时候 sendx 等于 recvx。
而后再来看看 chanrecv 中发送者队列有数据的时候移交缓冲区的数据是怎么做这里会将 recvx 为 0 处的数据间接从缓存区拷贝数据给接收者,而后将发送者拷贝数据到缓冲区 recvx 指针处,而后将 recvx 指针加 1 并将 recvx 赋值给 sendx,因为是满的所以用 recvx 加 1 的成果实现了将新退出的数据入库到队尾的操作。
接着往下看:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
…
// 如果缓冲区中有数据
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
…
// 从缓冲区复制数据到 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 接收数据的指针前移
c.recvx++
// 环形队列,如果到了开端,再从 0 开始
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 缓冲区中现存数据减一
c.qcount–
unlock(&c.lock)
return true, true
}
…
}
到了这里,阐明缓冲区中有数据,然而发送者队列没有数据,那么将数据拷贝到接收数据的协程,而后将接收数据的指针前移,如果曾经到了队尾,那么就从 0 开始,最初将缓冲区中现存数据减一并解锁。
上面就是缓冲区中没有数据的状况:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
…
// 非阻塞,间接返回
if !block {
unlock(&c.lock)
return false, false
}
// 创立 sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 将 sudog 增加到接管队列中
c.recvq.enqueue(mysg)
// 阻塞住 goroutine,期待被唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
…
}
如果是非阻塞调用,间接返回;阻塞调用会将以后 goroutine 封装成 sudog,而后将 sudog 增加到接管队列中,调用 gopark 阻塞住 goroutine,期待被唤醒。