channel
channel的申明与应用
// 无缓冲区的channel// 无缓冲区的channel必须有协程在期待它才能够向channel发送数据ch := make(chan string)// 向channel发送数据ch <- "hllo"// 从channel承受数据并赋给xx = <-ch// 从channel接收数据并抛弃<-ch// 有缓冲区的cahnnelc := make(chan string,10)
channel的数据结构
type hchan struct { // 环形缓冲区 // 队列中的总数据 qcount uint // total data in the queue // 循环队列的大小 dataqsiz uint // size of the circular queue // 指向 dataqsiz 的数组的元素 buf unsafe.Pointer // points to an array of dataqsiz elements // 元素的大小 elemsize uint16 // 是否closed closed uint32 // 元素类型 elemtype *_type // element type // 发送索引 sendx uint // send index // 接管索引 recvx uint // receive index // 接管队列,由链表实现 recvq waitq // list of recv waiters // 发送队列,由链表实现 sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. // 爱护hchan所有字段 // channel并不是无锁的 // 只在产生数据和接收数据时加锁,其余期间不加锁 lock mutex}type waitq struct { first *sudog last *sudog}// sudog represents a g in a wait list, such as for sending/receiving// on a channel.//// sudog is necessary because the g ↔ synchronization object relation// is many-to-many. A g can be on many wait lists, so there may be// many sudogs for one g; and many gs may be waiting on the same// synchronization object, so there may be many sudogs for one object.//// sudogs are allocated from a special pool. Use acquireSudog and// releaseSudog to allocate and free them.type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 // isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool // success indicates whether communication over channel c // succeeded. It is true if the goroutine was awoken because a // value was delivered over channel c, and false if awoken // because c was closed. success bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel}
创立channel
ch := make(chan int,10)
0x0018 00024 (E:\project\deom\main.go:6) LEAQ type.chan int(SB), AX0x001f 00031 (E:\project\deom\main.go:6) MOVL $10, BX0x0024 00036 (E:\project\deom\main.go:6) PCDATA $1, $00x0024 00036 (E:\project\deom\main.go:6) CALL runtime.makechan(SB)
ch := make(chan int)
0x0018 00024 (E:\project\deom\main.go:6) LEAQ type.chan int(SB), AX0x001f 00031 (E:\project\deom\main.go:6) XORL BX, BX0x0021 00033 (E:\project\deom\main.go:6) PCDATA $1, $00x0021 00033 (E:\project\deom\main.go:6) CALL runtime.makechan(SB)
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { // 队列或元素大小为零。 case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. // 元素不蕴含指针。 // 一次调用调配 hchan 和 buf。 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. // 元素蕴含指针。 c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c}
func makechan64(t *chantype, size int64) *hchan { if int64(int(size)) != size { panic(plainError("makechan: size out of range")) } return makechan(t, int(size))}
channel发送数据的原理
ch <- "data"// ch <- 是一个语法糖
// entry point for c <- x from compiled code//go:nosplit// 编译阶段,会把ch <- 转化为runtime.chansend1()// chansend1()会调用chansen()func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc())}
/* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed. it is easiest to loop and re-run * the operation; we'll see that it's now closed. */func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } if raceenabled { racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second full()). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasn't closed during the first observation. However, nothing here // guarantees forward progress. We rely on the side effects of lock release in // chanrecv() and closechan() to update this thread's view of c.closed and full(). if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 加锁 // 缓冲区足够大,加锁工夫很短 lock(&c.lock) // 判断channel是否敞开 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 期待队列 // 接管队列出队。如果不为空(即有人在期待),进入send // 间接发送数据, if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). // 找到一个正在期待的接收器。咱们将要发送的值间接传递给接收者,绕过通道缓冲区 //(如果有的话)。 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 判断缓冲区是否还有空间 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. //channel缓冲区中有可用空间。将要发送的元素入队。 // chanbuf(c, i) 是指向缓冲区中第 i 个槽的指针 // 找出缓冲区中要存入的地址 qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } // func typedmemmove(typ *_type, dst unsafe.Pointer, src unsafe.Pointer) // typedmemmove 将 t 类型的值从 src\ 复制到 dst。必须是 nosplit // 存入缓冲区 typedmemmove(c.elemtype, qp, ep) // 保护索引 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // 保护索引 c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // Block on the channel. Some receiver will complete our operation for us. // 休眠期待 // 拿到本人的协程构造体 gp := getg() // 把本人包装为Sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. // 记录要发送的数据 mysg.elem = ep mysg.waitlink = nil // 记录协程的指针 mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 入队 c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 休眠 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true}// send processes a send operation on an empty channel c.// The value ep sent by the sender is copied to the receiver sg.// The receiver is then woken up to go on its merry way.// Channel c must be empty and locked. send unlocks c with unlockf.// sg must already be dequeued from c.// ep must be non-nil and point to the heap or the caller's stack.func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } // 将数据间接拷贝到接管变量中 if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒协程 goready(gp, skip+1)}// Sends and receives on unbuffered or empty-buffered channels are the// only operations where one running goroutine writes to the stack of// another running goroutine. The GC assumes that stack writes only// happen when the goroutine is running and are only done by that// goroutine. Using a write barrier is sufficient to make up for// violating that assumption, but the write barrier has to work.// typedmemmove will call bulkBarrierPreWrite, but the target bytes// are not in the heap, so that will not help. We arrange to call// memmove and typeBitsBulkBarrier instead.func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src is on our stack, dst is a slot on another stack. // Once we read sg.elem out of sg, it will no longer // be updated if the destination's stack gets copied (shrunk). // So make sure that no preemption points can happen between read & use. // 目的地的指针 dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) // No need for cgo write barrier checks because dst is always // Go memory. // 间接拷贝 memmove(dst, src, t.size)}//go:nosplitfunc acquireSudog() *sudog { // Delicate dance: the semaphore implementation calls // acquireSudog, acquireSudog calls new(sudog), // new calls malloc, malloc can call the garbage collector, // and the garbage collector calls the semaphore implementation // in stopTheWorld. // Break the cycle by doing acquirem/releasem around new(sudog). // The acquirem/releasem increments m.locks during new(sudog), // which keeps the garbage collector from being invoked. mp := acquirem() pp := mp.p.ptr() if len(pp.sudogcache) == 0 { lock(&sched.sudoglock) // First, try to grab a batch from central cache. for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } unlock(&sched.sudoglock) // If the central cache is empty, allocate a new one. if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } n := len(pp.sudogcache) s := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if s.elem != nil { throw("acquireSudog: found s.elem != nil in cache") } releasem(mp) return s}
间接发送
- 发送数据前,曾经有G在休眠期待接管
- 此时缓冲区为空,不必思考缓冲区
- 将数据间接拷贝给G的接管变量,唤醒G
实现
- 从接管期待队列中取出一个期待接管的G
- 将数据间接拷贝到接管变量中
放入缓冲区
- 没有G休眠期待,然而有缓冲区空间
- 存入数据
实现
- 获取可存入的缓冲区地址
- 存入数据
- 保护索引
休眠期待
- 没有G在休眠期待,而且没有缓冲或缓冲区满了
- 进入发送期待队列,休眠期待
实现
- 把本人包装成sudog
- 把sudog放入sendq队列
- 休眠并解锁
- 被唤醒后,数据曾经被取走,保护其余数据
channel接收数据
// <-ch 是一个语法糖<- ch// 编译阶段,转化为runtime.chanrecv1()data <- ch// 编译阶段,转化为runtime.chanrecv2()value,ok <- ch// 最初调用chanrecv()
// entry points for <- c from compiled code//go:nosplitfunc chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true)}
//go:nosplitfunc chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return}
有期待发送的G,从期待发送G接管
- 接收数据前,曾经有G在休眠期待发送
- 而且这个channel没有缓冲区
- 将数据间接从G拷贝过去,唤醒G
实现
- 判断有G在产生期待队列,进入`recv()`
- 判断此channel无缓冲区
- 间接从期待的G中取走数据,唤醒G
有期待发送的G,从缓冲区接管
- 接收数据前,曾经有G在休眠期待发送(缓冲区中的数据比发送队列的数据更晚,因为数据先发送到缓冲区,缓冲区满了当前再发送到发送队列)
- 而且channel有缓冲区
- 从缓冲区取走一个数据
- 将休眠G的数据放入缓冲区,唤醒G
实现
- 判断有G在发送队列期待,进入recv()
- 判断此channel有缓冲区i
- 从缓冲区取走一个数据
- 将G的数据放入缓冲区,唤醒G
接收缓冲区
- 没有G在休眠期待,然而缓冲区有内容
- 间接从缓冲区取走数据
实现
- 判断没有G在发送队列期待
- 判断channel有无缓冲区
- 从缓冲区取走一个数据
阻塞接管
- 没有发送G在休眠期待而且没有缓冲区或者缓冲区为空
- 进入接管队列,休眠期待
实现
- 判断没有G在发送期待
- 判断channel有无缓冲区
- 把本人包装为sudog
- sudog放入接管期待队列,休眠
- 唤醒时,发送的G曾经把数据拷贝到位
代码
// chanrecv receives on channel c and writes the received data to ep.// ep may be nil, in which case received data is ignored.// If block == false and no elements are available, returns (false, false).// Otherwise, if c is closed, zeros *ep and returns (true, false).// Otherwise, fills in *ep with an element and returns (true, true).// A non-nil ep must point to the heap or the caller's stack.func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") } if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. if !block && empty(c) { // After observing that the channel is not ready for receiving, we observe whether the // channel is closed. // // Reordering of these checks could lead to incorrect behavior when racing with a close. // For example, if the channel was open and not empty, was closed, and then drained, // reordered reads could incorrectly indicate "open and empty". To prevent reordering, // we use atomic loads for both checks, and rely on emptying and closing to happen in // separate critical sections under the same lock. This assumption fails when closing // an unbuffered channel with a blocked send, but that is an error condition anyway. if atomic.Load(&c.closed) == 0 { // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. return } // The channel is irreversibly closed. Re-check whether the channel has any pending data // to receive, which could have arrived between the empty and closed checks above. // Sequential consistency is also required here, when racing with such a send. if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } // 加锁 lock(&c.lock) // channel是否敞开 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // 发送队列出队 // 发送队列不为空(即有G在休眠期待),进入recv() if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { // Receive directly from queue //取出数据 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 间接拷贝 typedmemclr(c.elemtype, qp) // 保护索引 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // no sender available: block on this channel. // 阻塞 gp := getg() // 包装sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 入队,接管队列 c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 休眠 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } // 唤醒当前保护本人的状态;间接返回 gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success}// recv processes a receive operation on a full channel c.// There are 2 parts:// 1) The value sent by the sender sg is put into the channel// and the sender is woken up to go on its merry way.// 2) The value received by the receiver (the current G) is// written to ep.// For synchronous channels, both values are the same.// For asynchronous channels, the receiver gets its data from// the channel buffer and the sender's data is put in the// channel buffer.// Channel c must be full and locked. recv unlocks c with unlockf.// sg must already be dequeued from c.// A non-nil ep must point to the heap or the caller's stack.func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 缓冲区为空 if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender // 间接拷贝 recvDirect(c.elemtype, sg, ep) } } else { // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. //队列已满。获取队列头部的我的项目。使发送者将其我的项目排在队列的尾部。因为队列已满,因而它们都是同一个插槽。 // 有缓冲区 // 取出数据 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 间接拷贝 // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) // 保护索引 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } // 保护索引 c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil // 保护协程的状态 gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒发送协程 // 协程唤醒当前,数据曾经被取走,保护本人的状态,本人返回 goready(gp, skip+1)}func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { // dst is on our stack or the heap, src is on another stack. // The channel is locked, so src will not move during this // operation. src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size)}
敞开channel
func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } // 加锁 lock(&c.lock) //close of closed channel // 敞开已敞开的channel会产生panic // 如果channel曾经敞开,解锁,产生一个panic if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) racerelease(c.raceaddr()) } // closed置1 c.closed = 1 var glist gList // release all readers // 开释所有的读者 for { // 获取承受协程 // 从承受队列出队 sg := c.recvq.dequeue() // 承受队列为空,跳出循环 if sg == nil { break } // 协程数据不为空 if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } // 保护协程状态 gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }}
channel的cap和len
// len = 循环队列中的总数据len = qcount// cap = 循环队列的大小cap = dataqsiz