chan介绍
package mainimport “fmt”
func main() {
c := make(chan int)
go func() {
c <- 1 // send to channel
}()
x := <-c // recv from channel
fmt.Println(x)
}
咱们能够这样查看汇编后果:
go tool compile -N -l -S hello.go-N示意禁用优化-l禁用内联-S打印后果
通过下面这样的方流量交易式,咱们能够直到chan是调用的哪些函数:
源码剖析
构造体与创立
type hchan struct {
qcount uint // 循环列表元素个数
dataqsiz uint // 循环队列的大小
buf unsafe.Pointer // 循环队列的指针
elemsize uint16 // chan中元素的大小
closed uint32 // 是否已close
elemtype *_type // chan中元素类型
sendx uint // send在buffer中的索引
recvx uint // recv在buffer中的索引
recvq waitq // receiver的期待队列
sendq waitq // sender的期待队列
// 互拆锁
lock mutex
}
qcount代表chan 中曾经接管但还没被取走的元素的个数,函数 len 能够返回这个字段的值;
dataqsiz和buf别离代表队列buffer的大小,cap函数能够返回这个字段的值以及队列buffer的指针,是一个定长的环形数组;
elemtype 和 elemsiz示意chan 中元素的类型和 元素的大小;
sendx:发送数据的指针在 buffer中的地位;
recvx:接管申请时的指针在 buffer 中的地位;
recvq和sendq别离示意期待接收数据的 goroutine 与期待发送数据的 goroutine;
sendq和recvq的类型是waitq的构造体:
type waitq struct {
first *sudog
last *sudog
}
waitq外面连贯的是一个sudog双向链表,保留的是期待的goroutine 。整个chan的图例大略是这样:上面看一下创立chan,咱们通过汇编后果也能够查看到make(chan int)这句代码会调用到runtime的makechan函数中:
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
)
func makechan(t chantype, size int) hchan {
elem := t.elem
// 略去查看代码
…
//计算须要调配的buf空间
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError(“makechan: size out of range”))
}
var c *hchan
switch {
case mem == 0:
// chan的size或者元素的size是0,不用创立buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不是指针,调配一块间断的内存给hchan数据结构和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 示意hchan前面在内存里紧跟着就是buf
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素蕴含指针,那么独自调配buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
首先咱们能够看到计算hchanSize:
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
maxAlign是8,那么maxAlign-1的二进制就是111,而后和int(unsafe.Sizeof(hchan{}))取与就是取它的低三位,hchanSize就失去的是8的整数倍,做对齐应用。
这里switch有三种状况,第一种状况是缓冲区所需大小为 0,那么在为 hchan 分配内存时,只须要调配 sizeof(hchan) 大小的内存;
第二种状况是缓冲区所需大小不为 0,而且数据类型不蕴含指针,那么就调配间断的内存。留神的是,咱们在创立channel的时候能够指定类型为指针类型:
//chan里存入的是int的指针
c := make(chan *int)//chan里存入的是int的值
c := make(chan int)
第三种状况是缓冲区所需大小不为 0,而且数据类型蕴含指针,那么就不应用add的形式让hchan和buf放在一起了,而是独自的为buf申请一块内存。
发送数据
channel的阻塞非阻塞
在看发送数据的代码之前,咱们先看一下什么是channel的阻塞和非阻塞。
个别状况下,传入的参数都是 block=true,即阻塞调用,一个往 channel 中插入数据的 goroutine 会阻塞到插入胜利为止。
非阻塞是只这种状况:
select {case c <- v:
… foodefault:
… bar}
编译器会将其改为:
if selectnbsend(c, v) {
… foo
} else {
… bar
}
selectnbsend办法传入的block就是false:
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
chansend办法
向通道发送数据咱们通过汇编后果能够发现是在runtime 中通过 chansend 实现的,办法比拟长上面咱们分段来进行了解:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 对于非阻塞的发送,间接返回
if !block {
return false
}
// 对于阻塞的通道,将 goroutine 挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw(“unreachable”)
}
…
}
这里会对chan做一个判断,如果它是空的,那么对于非阻塞的发送,间接返回 false;对于阻塞的通道,将 goroutine 挂起,并且永远不会返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
…
// 非阻塞的状况下,如果通道没有敞开,满足以下一条:
// 1.没有缓冲区并且以后没有接收者
// 2.缓冲区不为0,并且已满
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
…
}
须要留神的是这里是没有加锁的,go尽管在应用指针读取单个值的时候原子性的,然而读取多个值并不能保障,所以在判断完closed尽管是没有敞开的,那么在读取完之后仍然可能在这一瞬间从未敞开状态转变成敞开状态。那么就有两种可能:
通道没有敞开,而且曾经满了,那么须要返回false,没有问题;
通道敞开,而且曾经满了,然而在非阻塞的发送中返回false,也没有问题;
无关go的一致性原语
下面的这些判断被称为 fast path,因为加锁的操作是一个很重的操作,所以可能在加锁之前返回的判断就在加锁之前做好是最好的。
上面接着看看加锁局部的代码:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
…
//加锁
lock(&c.lock)
// 是否敞开的判断
if c.closed != 0 {
unlock(&c.lock)
panic(plainError(“send on closed channel”))
}
// 从 recvq 中取出一个接收者
if sg := c.recvq.dequeue(); sg != nil {
// 如果接收者存在,间接向该接收者发送数据,绕过buffer
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
…
}
进入了lock区域之后还须要再判断以下close的状态,而后从recvq 中取出一个接收者,如果曾经有接收者,那么就向第一个接收者发送以后enqueue的音讯。这里须要留神的是如果有接收者在队列中期待,则阐明此时的缓冲区是空的。
既然是一行行剖析代码,那么咱们再进入到send看一下实现:
func send(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) {
…
if sg.elem != nil {
// 间接把要发送的数据copy到reciever的栈空间
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒对应的 goroutine
goready(gp, skip+1)
}
在send办法里,sg就是goroutine打包好的对象,ep是对应要发送数据的指针,sendDirect办法会调用memmove进行数据的内存拷贝。而后goready函数会唤醒对应的 goroutine进行调度。
回到chansend办法,持续往下看:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
…
// 如果缓冲区没有满,间接将要发送的数据复制到缓冲区
if c.qcount < c.dataqsiz {
// 找到buf要填充数据的索引地位
qp := chanbuf(c, c.sendx)
…
// 将数据拷贝到 buffer 中
typedmemmove(c.elemtype, qp, ep)
// 数据索引前移,如果到了开端,又从0开始
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 元素个数加1,开释锁并返回
c.qcount++
unlock(&c.lock)
return true
}
…
}
这里会判断buf缓冲区有没有满,如果没有满,那么就找到buf要填充数据的索引地位,调用typedmemmove办法将数据拷贝到buf中,而后从新设值sendx偏移量。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
…
// 缓冲区没有空间了,所以对于非阻塞调用间接返回
if !block {
unlock(&c.lock)
return false
}
// 创立 sudog 对象
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 将sudog 对象入队
c.sendq.enqueue(mysg)
// 进入期待状态
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
…
}
这里会做两局部的操作,对于非阻塞的调用会间接返回;对于阻塞的调用会创立sudog 对象,而后将sudog对象入队之后gopark将 goroutine 转入 waiting 状态,并解锁。调用gopark之后,在使用者看来该向 channel 发送数据的代码语句会进行阻塞。
这里也须要留神一下,如果缓冲区为0,那么也会进入到这里,会调用到gopark立马阻塞,所以在应用的时候须要记得接收数据,避免向chan发送数据的那一端永远阻塞,如:
func process(timeout time.Duration) bool {
ch := make(chan bool)
go func() {
// 模仿解决耗时的业务
time.Sleep((timeout + time.Second))
ch <- true // block
fmt.Println(“exit goroutine”)
}()
select {
case result := <-ch:
return result
case <-time.After(timeout):
return false
}
}
如果这里在select的时候间接timeout返回了,而没有调用 result := <-ch,那么goroutine 就会永远阻塞。
到这里发送的代码就解说完了,整个流程大抵如下:
比方我要执行:ch<-10
查看 recvq 是否为空,如果不为空,则从 recvq 头部取一个 goroutine,将数据发送过来;
如果 recvq 为空,,并且buf没有满,则将数据放入到 buf中;
如果 buf已满,则将要发送的数据和以后 goroutine 打包成sudog,而后入队到sendq队列中,并将以后 goroutine 置为 waiting 状态进行阻塞。
接收数据
从chan获取数据实现函数为 chanrecv。上面咱们看一下代码实现:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
…
if c == nil {
// 如果 c 为空且是非阻塞调用,那么间接返回 (false,false)
if !block {
return
}
// 阻塞调用间接期待
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw(“unreachable”)
}
// 对于非阻塞的状况,并且没有敞开的状况
// 如果是无缓冲chan或者是chan中没有数据,那么间接返回 (false,false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
// 上锁
lock(&c.lock)
// 如果曾经敞开,并且chan中没有数据,返回 (true,false)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
…
}
chanrecv办法和chansend办法是一样的,首先也是做非空判断,如果chan没有初始化,那么如果是非阻塞调用,那么间接返回 (false,false),阻塞调用会间接期待;
上面的两个if判断我放在一起来进行解说,因为这里和chansend是不一样的,chanrecv要依据不同条件须要返回不同的后果。
在上锁之前的判断是边界条件的判断:如果是非阻塞调用会判断chan没有发送方(dataqsiz为空且发送队列为空),或chan的缓冲为空(dataqsiz>0 并且qcount==0)并且chan是没有close,那么须要返回 (false,false);而chan曾经敞开了,并且buf中没有数据,须要返回 (true,false);
为了实现这个需要,所以在chanrecv办法外面边界条件的判断都应用atomic办法进行了获取。
因为须要正确的失去chan已敞开,并且 buf 空会返回 (true, false),而不是 (false,false),所以在lock上锁之前须要应用atomic来获取参数避免重排序(Happens Before),因而必须使此处的 qcount 和 closed 的读取操作的程序通过原子操作失去程序保障。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
…
// 从发送者队列获取数据
if sg := c.sendq.dequeue(); sg != nil {
// 发送者队列不为空,间接从发送者那里提取数据
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
…
}
func recv(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是无缓冲区chan
if c.dataqsiz == 0 {
…
if ep != nil {
// 间接从发送者拷贝数据
recvDirect(c.elemtype, sg, ep)
}
// 有缓冲区chan
} else {
// 获取buf的存放数据指针
qp := chanbuf(c, c.recvx)
…
// 间接从缓冲区拷贝数据给接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 从发送者拷贝数据到缓冲区
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将发送者唤醒
goready(gp, skip+1)
}
在这里如果有发送者在队列期待,那么间接从发送者那里提取数据,并且唤醒这个发送者。须要留神的是因为有发送者在期待,所以如果有缓冲区,那么缓冲区肯定是满的。
在唤醒发送者之前须要对缓冲区做判断,如果是无缓冲区,那么间接从发送者那里提取数据;如果有缓冲区首先会获取recvx的指针,而后将从缓冲区拷贝数据给接收者,再将发送者数据拷贝到缓冲区。
而后将recvx加1,相当于将新的数据移到了队尾,再将recvx的值赋值给sendx,最初调用goready将发送者唤醒,这里有些绕,咱们通过图片来展现:
这里展现的是在chansend中将数据拷贝到缓冲区中,当数据满的时候会将sendx的指针置为0,所以当buf环形队列是满的时候sendx等于recvx。
而后再来看看chanrecv中发送者队列有数据的时候移交缓冲区的数据是怎么做这里会将recvx为0处的数据间接从缓存区拷贝数据给接收者,而后将发送者拷贝数据到缓冲区recvx指针处,而后将recvx指针加1并将recvx赋值给sendx,因为是满的所以用recvx加1的成果实现了将新退出的数据入库到队尾的操作。
接着往下看:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
…
// 如果缓冲区中有数据
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
…
// 从缓冲区复制数据到 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 接收数据的指针前移
c.recvx++
// 环形队列,如果到了开端,再从0开始
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 缓冲区中现存数据减一
c.qcount–
unlock(&c.lock)
return true, true
}
…
}
到了这里,阐明缓冲区中有数据,然而发送者队列没有数据,那么将数据拷贝到接收数据的协程,而后将接收数据的指针前移,如果曾经到了队尾,那么就从0开始,最初将缓冲区中现存数据减一并解锁。
上面就是缓冲区中没有数据的状况:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
…
// 非阻塞,间接返回
if !block {
unlock(&c.lock)
return false, false
}
// 创立sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 将sudog增加到接管队列中
c.recvq.enqueue(mysg)
// 阻塞住goroutine,期待被唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
…
}
如果是非阻塞调用,间接返回;阻塞调用会将以后goroutine 封装成sudog,而后将sudog增加到接管队列中,调用gopark阻塞住goroutine,期待被唤醒。
发表回复