上篇已经记录到发送数据到 chanel 的三种情况的代码逻辑,接下来是从 chanel 接收数据的逻辑。chanrecv 方法和 chansend 方法十分类似如果 hchan 为空 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw(“unreachable”) }如果 chenel 已经关闭lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }接收数据的三种情况如果 hchan 的 sendq 队列中有阻塞的 goroutine,buf 已满如果 hchan.buf 还有数据未取出如果 hchan.buf 为空下面分别截取三种情况的代码段。如果 hchan 的 sendq 队列中有阻塞的 goroutine,buf 已满 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender’s value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }如果 hchan.buf 还有数据未取出if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount– unlock(&c.lock) return true, true }如果 hchan.buf 为空 ··· // 上面条件都不满足,则只剩一种情况:hchan.buf 为空 // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)阻塞与唤醒假设有一个 goroutine sender,和一个 goroutine reciever,如果 reciever 执行 chanrecv 方法的时候,buf 已经为空了,从上面的代码最后一行知道,goparkunlock 方法使 reciever 阻塞,那么 sender 写数据进 chanel,reciever 又如何被 sender 唤醒呢?reciever 休眠后, sender 来了,sender 执行到以下代码处。sender 从 recvq 队列 弹出 reciever,然后执行 send 方法。 if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }send 方法最后一行执行 goready 方法将 reciever 唤醒。// send processes a send operation on an empty channel c.// The value ep sent by the sender is copied to the receiver sg.// The receiver is then woken up to go on its merry way.// Channel c must be empty and locked. send unlocks c with unlockf.// sg must already be dequeued from c.// ep must be non-nil and point to the heap or the caller’s stack.func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. qp := chanbuf(c, c.recvx) raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1)}reciever 唤醒后,继续执行 chanrecv 方法剩下的语句: // someone woke us up if mysg != gp.waiting { throw(“G waiting list is corrupted”) } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closedclosechan 方法close(ch) 对应的执行方法即为 closechan 方法。主要有以下步骤:关闭 nil chanel ,返回 panic:“close of nil channel"关闭 closed chanel,返回 panic:“close of closed channel"将 hchan.closed 的值设为 1释放所有 reader sudog 对象,释放的同时将 sudog 中的 g 插入 glist 链表释放所有 writer sudog 对象,释放的同时将 sudog 中的 g 插入 glist 链表遍历 glist 链表,唤醒 glist 中的所有 goroutinefunc closechan(c *hchan) { if c == nil { panic(plainError(“close of nil channel”)) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError(“close of closed channel”)) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist *g // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } gp.schedlink.set(glist) // <—— a 语句 glist = gp // <—— b 语句 } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } gp.schedlink.set(glist) // <—— c 语句 glist = gp // <—— d 语句 } unlock(&c.lock) // Ready all Gs now that we’ve dropped the channel lock. for glist != nil { gp := glist glist = glist.schedlink.ptr() // <—— e 语句 gp.schedlink = 0 goready(gp, 3) }}//go:nosplitfunc (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) } // 获得 gp 的地址 //go:nosplitfunc (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) } // 设置 gp 的地址 上面代码有个注意的点: schedlink 是 G 的一个属性,用于指向下一个 G 。参考文章http://legendtkl.com/2017/08/…