有了上篇的基本了解,可以翻阅源码了涉及的数据结构// Go/src/runtime/chan.gotype hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G’s status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex}type waitq struct { first *sudog last *sudog}// sudog represents a g in a wait list, such as for sending/receiving// on a channel.//// sudog is necessary because the g ↔ synchronization object relation// is many-to-many. A g can be on many wait lists, so there may be// many sudogs for one g; and many gs may be waiting on the same// synchronization object, so there may be many sudogs for one object.//// sudogs are allocated from a special pool. Use acquireSudog and// releaseSudog to allocate and free them.type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g // isSelect indicates g is participating in a select, so // g.selectDone must be CAS’d to win the wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel}makechan()当 make(chan int,3)带有设置缓存大小的参数,则会分配一段连续空间,buf 指向这段内存空间c.buf = add(unsafe.Pointer(c), hchanSize) 分配 hchanSize 大小的空间,其中常量 hchanSize hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. ··· var c *hchan // 返回的是指针 switch { case size == 0 || elem.size == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.kind&kindNoPointers != 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) // <============ default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(uintptr(size)*elem.size, elem, true) // <============== } ··· return c}chansend()我们截取不同情况的代码段。如果 chanel 为 nil如果 chanel 为空,即没有用 make 分配内存,那么会调用 gopark 方法if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw(“unreachable”) }而 gopark 方法会将当前的 goroutine 休眠,然后回调通过参数传来的 unlockf 方法。注意看回上面的代码,调用 gopark 方法时传递的 unlockf 参数为 nil,所以会一直休眠。// Puts the current goroutine into a waiting state and calls unlockf.// If unlockf returns false, the goroutine is resumed.// unlockf must not access this G’s stack, as it may be moved between// the call to gopark and the call to unlockf.// Reason explains why the goroutine has been parked.// It is displayed in stack traces and heap dumps.// Reasons should be unique and descriptive.// Do not re-use reasons, add new ones.func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { if reason != waitReasonSleep { checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy } mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw(“gopark: bad g status”) } mp.waitlock = lock mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf)) gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can’t do anything that might move the G between Ms here. mcall(park_m)}这时, Go 语言启动的时候会有一个 goroutine sysmon 一直检测系统的运行情况,其中有一个方法 checkdead(),当检测到所有 goroutine 都处于休眠,即死锁,便抛出错误。// /Go/src/runtime/proc.gofunc checkdead() { … throw(“all goroutines are asleep - deadlock!”) }如果 chanel 已经被关闭了直接引发 panic: lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError(“send on closed channel”)) }如果能发送数据发送数据还分三种情况:1 当前 hchan 的 recvq 接收队列上已经有 goroutine 阻塞 lock(&c.lock) ··· if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }send 方法判断到接收方 sudog 的 elem 字段存有对应的内存空间地址值的话,调用 sendDirect 方法。这里啰嗦一下,sudog 的 elem 是在 func chanrecv 方法中赋值的,将接收方 goroutine 用来接收数据的栈空间地址赋值给 elem。// send processes a send operation on an empty channel c.// The value ep sent by the sender is copied to the receiver sg.// The receiver is then woken up to go on its merry way.// Channel c must be empty and locked. send unlocks c with unlockf.// sg must already be dequeued from c.// ep must be non-nil and point to the heap or the caller’s stack.func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ··· if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1)}sendDirect 方法中 memmove 方法直接拷贝 t.size 个字节到目的内存空间。func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src is on our stack, dst is a slot on another stack. // Once we read sg.elem out of sg, it will no longer // be updated if the destination’s stack gets copied (shrunk). // So make sure that no preemption points can happen between read & use. dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) // No need for cgo write barrier checks because dst is always // Go memory. memmove(dst, src, t.size)}2 当前 hchan.buf 还有可用空间:将数据放到 buffer 里面。 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }3 当前 hchan.buf 已满:阻塞当前 goroutine // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)goparkunlock 方法将当前 goroutine 休眠,并且释放锁资源// Puts the current goroutine into a waiting state and unlocks the lock.// The goroutine can be made runnable again by calling goready(gp).func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) { gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)}参考文章http://legendtkl.com/2017/08/…