channel

channel的申明与应用

// 无缓冲区的channel// 无缓冲区的channel必须有协程在期待它才能够向channel发送数据ch := make(chan string)// 向channel发送数据ch <- "hllo"// 从channel承受数据并赋给xx = <-ch// 从channel接收数据并抛弃<-ch// 有缓冲区的cahnnelc := make(chan string,10)

channel的数据结构

type hchan struct {    // 环形缓冲区    // 队列中的总数据    qcount   uint           // total data in the queue    // 循环队列的大小    dataqsiz uint           // size of the circular queue    // 指向 dataqsiz 的数组的元素    buf      unsafe.Pointer // points to an array of dataqsiz elements    // 元素的大小    elemsize uint16    // 是否closed    closed   uint32    // 元素类型    elemtype *_type // element type    // 发送索引    sendx    uint   // send index    // 接管索引    recvx    uint   // receive index    // 接管队列,由链表实现    recvq    waitq  // list of recv waiters    // 发送队列,由链表实现    sendq    waitq  // list of send waiters    // lock protects all fields in hchan, as well as several    // fields in sudogs blocked on this channel.    //    // Do not change another G's status while holding this lock    // (in particular, do not ready a G), as this can deadlock    // with stack shrinking.    // 爱护hchan所有字段    // channel并不是无锁的    // 只在产生数据和接收数据时加锁,其余期间不加锁    lock mutex}type waitq struct {    first *sudog    last  *sudog}// sudog represents a g in a wait list, such as for sending/receiving// on a channel.//// sudog is necessary because the g ↔ synchronization object relation// is many-to-many. A g can be on many wait lists, so there may be// many sudogs for one g; and many gs may be waiting on the same// synchronization object, so there may be many sudogs for one object.//// sudogs are allocated from a special pool. Use acquireSudog and// releaseSudog to allocate and free them.type sudog struct {    // The following fields are protected by the hchan.lock of the    // channel this sudog is blocking on. shrinkstack depends on    // this for sudogs involved in channel ops.    g *g    next *sudog    prev *sudog    elem unsafe.Pointer // data element (may point to stack)    // The following fields are never accessed concurrently.    // For channels, waitlink is only accessed by g.    // For semaphores, all fields (including the ones above)    // are only accessed when holding a semaRoot lock.    acquiretime int64    releasetime int64    ticket      uint32    // isSelect indicates g is participating in a select, so    // g.selectDone must be CAS'd to win the wake-up race.    isSelect bool    // success indicates whether communication over channel c    // succeeded. It is true if the goroutine was awoken because a    // value was delivered over channel c, and false if awoken    // because c was closed.    success bool    parent   *sudog // semaRoot binary tree    waitlink *sudog // g.waiting list or semaRoot    waittail *sudog // semaRoot    c        *hchan // channel}

创立channel

ch := make(chan int,10)
0x0018 00024 (E:\project\deom\main.go:6)        LEAQ    type.chan int(SB), AX0x001f 00031 (E:\project\deom\main.go:6)        MOVL    $10, BX0x0024 00036 (E:\project\deom\main.go:6)        PCDATA  $1, $00x0024 00036 (E:\project\deom\main.go:6)        CALL    runtime.makechan(SB)
ch := make(chan int)
0x0018 00024 (E:\project\deom\main.go:6)        LEAQ    type.chan int(SB), AX0x001f 00031 (E:\project\deom\main.go:6)        XORL    BX, BX0x0021 00033 (E:\project\deom\main.go:6)        PCDATA  $1, $00x0021 00033 (E:\project\deom\main.go:6)        CALL    runtime.makechan(SB)
func makechan(t *chantype, size int) *hchan {    elem := t.elem    // compiler checks this but be safe.    if elem.size >= 1<<16 {        throw("makechan: invalid channel element type")    }    if hchanSize%maxAlign != 0 || elem.align > maxAlign {        throw("makechan: bad alignment")    }    mem, overflow := math.MulUintptr(elem.size, uintptr(size))    if overflow || mem > maxAlloc-hchanSize || size < 0 {        panic(plainError("makechan: size out of range"))    }    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.    // buf points into the same allocation, elemtype is persistent.    // SudoG's are referenced from their owning thread so they can't be collected.    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.    var c *hchan    switch {    // 队列或元素大小为零。    case mem == 0:        // Queue or element size is zero.        c = (*hchan)(mallocgc(hchanSize, nil, true))        // Race detector uses this location for synchronization.        c.buf = c.raceaddr()    case elem.ptrdata == 0:        // Elements do not contain pointers.        // Allocate hchan and buf in one call.         // 元素不蕴含指针。        // 一次调用调配 hchan 和 buf。        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))        c.buf = add(unsafe.Pointer(c), hchanSize)    default:        // Elements contain pointers.         // 元素蕴含指针。        c = new(hchan)        c.buf = mallocgc(mem, elem, true)    }    c.elemsize = uint16(elem.size)    c.elemtype = elem    c.dataqsiz = uint(size)    lockInit(&c.lock, lockRankHchan)    if debugChan {        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")    }    return c}
func makechan64(t *chantype, size int64) *hchan {    if int64(int(size)) != size {        panic(plainError("makechan: size out of range"))    }    return makechan(t, int(size))}

channel发送数据的原理

ch <- "data"// ch <- 是一个语法糖
// entry point for c <- x from compiled code//go:nosplit// 编译阶段,会把ch <- 转化为runtime.chansend1()// chansend1()会调用chansen()func chansend1(c *hchan, elem unsafe.Pointer) {    chansend(c, elem, true, getcallerpc())}
/* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed.  it is easiest to loop and re-run * the operation; we'll see that it's now closed. */func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    if c == nil {        if !block {            return false        }        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)        throw("unreachable")    }    if debugChan {        print("chansend: chan=", c, "\n")    }    if raceenabled {        racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))    }    // Fast path: check for failed non-blocking operation without acquiring the lock.    //    // After observing that the channel is not closed, we observe that the channel is    // not ready for sending. Each of these observations is a single word-sized read    // (first c.closed and second full()).    // Because a closed channel cannot transition from 'ready for sending' to    // 'not ready for sending', even if the channel is closed between the two observations,    // they imply a moment between the two when the channel was both not yet closed    // and not ready for sending. We behave as if we observed the channel at that moment,    // and report that the send cannot proceed.    //    // It is okay if the reads are reordered here: if we observe that the channel is not    // ready for sending and then observe that it is not closed, that implies that the    // channel wasn't closed during the first observation. However, nothing here    // guarantees forward progress. We rely on the side effects of lock release in    // chanrecv() and closechan() to update this thread's view of c.closed and full().    if !block && c.closed == 0 && full(c) {        return false    }    var t0 int64    if blockprofilerate > 0 {        t0 = cputicks()    }    // 加锁    // 缓冲区足够大,加锁工夫很短    lock(&c.lock)    // 判断channel是否敞开    if c.closed != 0 {        unlock(&c.lock)        panic(plainError("send on closed channel"))    }    // 期待队列    // 接管队列出队。如果不为空(即有人在期待),进入send    // 间接发送数据,    if sg := c.recvq.dequeue(); sg != nil {        // Found a waiting receiver. We pass the value we want to send        // directly to the receiver, bypassing the channel buffer (if any).         // 找到一个正在期待的接收器。咱们将要发送的值间接传递给接收者,绕过通道缓冲区             //(如果有的话)。        send(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true    }    // 判断缓冲区是否还有空间    if c.qcount < c.dataqsiz {        // Space is available in the channel buffer. Enqueue the element to send.         //channel缓冲区中有可用空间。将要发送的元素入队。                 // chanbuf(c, i) 是指向缓冲区中第 i 个槽的指针         // 找出缓冲区中要存入的地址        qp := chanbuf(c, c.sendx)        if raceenabled {            racenotify(c, c.sendx, nil)        }         // func typedmemmove(typ *_type, dst unsafe.Pointer, src unsafe.Pointer)         // typedmemmove 将 t 类型的值从 src\ 复制到 dst。必须是 nosplit         // 存入缓冲区        typedmemmove(c.elemtype, qp, ep)         // 保护索引        c.sendx++        if c.sendx == c.dataqsiz {            c.sendx = 0        }         // 保护索引        c.qcount++        unlock(&c.lock)        return true    }    if !block {        unlock(&c.lock)        return false    }    // Block on the channel. Some receiver will complete our operation for us.    // 休眠期待    // 拿到本人的协程构造体    gp := getg()    // 把本人包装为Sudog    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {        mysg.releasetime = -1    }    // No stack splits between assigning elem and enqueuing mysg    // on gp.waiting where copystack can find it.    // 记录要发送的数据    mysg.elem = ep    mysg.waitlink = nil    // 记录协程的指针    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.waiting = mysg    gp.param = nil    // 入队    c.sendq.enqueue(mysg)    // Signal to anyone trying to shrink our stack that we're about    // to park on a channel. The window between when this G's status    // changes and when we set gp.activeStackChans is not safe for    // stack shrinking.    atomic.Store8(&gp.parkingOnChan, 1)    // 休眠    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)    // Ensure the value being sent is kept alive until the    // receiver copies it out. The sudog has a pointer to the    // stack object, but sudogs aren't considered as roots of the    // stack tracer.    KeepAlive(ep)    // someone woke us up.    if mysg != gp.waiting {        throw("G waiting list is corrupted")    }    gp.waiting = nil    gp.activeStackChans = false    closed := !mysg.success    gp.param = nil    if mysg.releasetime > 0 {        blockevent(mysg.releasetime-t0, 2)    }    mysg.c = nil    releaseSudog(mysg)    if closed {        if c.closed == 0 {            throw("chansend: spurious wakeup")        }        panic(plainError("send on closed channel"))    }    return true}// send processes a send operation on an empty channel c.// The value ep sent by the sender is copied to the receiver sg.// The receiver is then woken up to go on its merry way.// Channel c must be empty and locked.  send unlocks c with unlockf.// sg must already be dequeued from c.// ep must be non-nil and point to the heap or the caller's stack.func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    if raceenabled {        if c.dataqsiz == 0 {            racesync(c, sg)        } else {            // Pretend we go through the buffer, even though            // we copy directly. Note that we need to increment            // the head/tail locations only when raceenabled.            racenotify(c, c.recvx, nil)            racenotify(c, c.recvx, sg)            c.recvx++            if c.recvx == c.dataqsiz {                c.recvx = 0            }            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz        }    }    // 将数据间接拷贝到接管变量中    if sg.elem != nil {        sendDirect(c.elemtype, sg, ep)        sg.elem = nil    }    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    sg.success = true    if sg.releasetime != 0 {        sg.releasetime = cputicks()    }    // 唤醒协程    goready(gp, skip+1)}// Sends and receives on unbuffered or empty-buffered channels are the// only operations where one running goroutine writes to the stack of// another running goroutine. The GC assumes that stack writes only// happen when the goroutine is running and are only done by that// goroutine. Using a write barrier is sufficient to make up for// violating that assumption, but the write barrier has to work.// typedmemmove will call bulkBarrierPreWrite, but the target bytes// are not in the heap, so that will not help. We arrange to call// memmove and typeBitsBulkBarrier instead.func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {    // src is on our stack, dst is a slot on another stack.    // Once we read sg.elem out of sg, it will no longer    // be updated if the destination's stack gets copied (shrunk).    // So make sure that no preemption points can happen between read & use.    // 目的地的指针     dst := sg.elem    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)    // No need for cgo write barrier checks because dst is always    // Go memory.    // 间接拷贝    memmove(dst, src, t.size)}//go:nosplitfunc acquireSudog() *sudog {    // Delicate dance: the semaphore implementation calls    // acquireSudog, acquireSudog calls new(sudog),    // new calls malloc, malloc can call the garbage collector,    // and the garbage collector calls the semaphore implementation    // in stopTheWorld.    // Break the cycle by doing acquirem/releasem around new(sudog).    // The acquirem/releasem increments m.locks during new(sudog),    // which keeps the garbage collector from being invoked.    mp := acquirem()    pp := mp.p.ptr()    if len(pp.sudogcache) == 0 {        lock(&sched.sudoglock)        // First, try to grab a batch from central cache.        for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {            s := sched.sudogcache            sched.sudogcache = s.next            s.next = nil            pp.sudogcache = append(pp.sudogcache, s)        }        unlock(&sched.sudoglock)        // If the central cache is empty, allocate a new one.        if len(pp.sudogcache) == 0 {            pp.sudogcache = append(pp.sudogcache, new(sudog))        }    }    n := len(pp.sudogcache)    s := pp.sudogcache[n-1]    pp.sudogcache[n-1] = nil    pp.sudogcache = pp.sudogcache[:n-1]    if s.elem != nil {        throw("acquireSudog: found s.elem != nil in cache")    }    releasem(mp)    return s}

间接发送

  • 发送数据前,曾经有G在休眠期待接管
  • 此时缓冲区为空,不必思考缓冲区
  • 将数据间接拷贝给G的接管变量,唤醒G
实现
  • 从接管期待队列中取出一个期待接管的G
  • 将数据间接拷贝到接管变量中

放入缓冲区

  • 没有G休眠期待,然而有缓冲区空间
  • 存入数据
实现
  • 获取可存入的缓冲区地址
  • 存入数据
  • 保护索引

休眠期待

  • 没有G在休眠期待,而且没有缓冲或缓冲区满了
  • 进入发送期待队列,休眠期待
实现
  • 把本人包装成sudog
  • 把sudog放入sendq队列
  • 休眠并解锁
  • 被唤醒后,数据曾经被取走,保护其余数据

channel接收数据

// <-ch 是一个语法糖<- ch// 编译阶段,转化为runtime.chanrecv1()data <- ch// 编译阶段,转化为runtime.chanrecv2()value,ok <- ch// 最初调用chanrecv()
// entry points for <- c from compiled code//go:nosplitfunc chanrecv1(c *hchan, elem unsafe.Pointer) {    chanrecv(c, elem, true)}
//go:nosplitfunc chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {    _, received = chanrecv(c, elem, true)    return}

有期待发送的G,从期待发送G接管

  • 接收数据前,曾经有G在休眠期待发送
  • 而且这个channel没有缓冲区
  • 将数据间接从G拷贝过去,唤醒G
实现
  • 判断有G在产生期待队列,进入`recv()`
  • 判断此channel无缓冲区
  • 间接从期待的G中取走数据,唤醒G

有期待发送的G,从缓冲区接管

  • 接收数据前,曾经有G在休眠期待发送(缓冲区中的数据比发送队列的数据更晚,因为数据先发送到缓冲区,缓冲区满了当前再发送到发送队列)
  • 而且channel有缓冲区
  • 从缓冲区取走一个数据
  • 将休眠G的数据放入缓冲区,唤醒G
实现
  • 判断有G在发送队列期待,进入recv()
  • 判断此channel有缓冲区i
  • 从缓冲区取走一个数据
  • 将G的数据放入缓冲区,唤醒G

接收缓冲区

  • 没有G在休眠期待,然而缓冲区有内容
  • 间接从缓冲区取走数据
实现
  • 判断没有G在发送队列期待
  • 判断channel有无缓冲区
  • 从缓冲区取走一个数据

 

阻塞接管

  • 没有发送G在休眠期待而且没有缓冲区或者缓冲区为空
  • 进入接管队列,休眠期待
实现
  • 判断没有G在发送期待
  • 判断channel有无缓冲区
  • 把本人包装为sudog
  • sudog放入接管期待队列,休眠
  • 唤醒时,发送的G曾经把数据拷贝到位

代码

// chanrecv receives on channel c and writes the received data to ep.// ep may be nil, in which case received data is ignored.// If block == false and no elements are available, returns (false, false).// Otherwise, if c is closed, zeros *ep and returns (true, false).// Otherwise, fills in *ep with an element and returns (true, true).// A non-nil ep must point to the heap or the caller's stack.func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {    // raceenabled: don't need to check ep, as it is always on the stack    // or is new memory allocated by reflect.    if debugChan {        print("chanrecv: chan=", c, "\n")    }    if c == nil {        if !block {            return        }        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)        throw("unreachable")    }    // Fast path: check for failed non-blocking operation without acquiring the lock.    if !block && empty(c) {        // After observing that the channel is not ready for receiving, we observe whether the        // channel is closed.        //        // Reordering of these checks could lead to incorrect behavior when racing with a close.        // For example, if the channel was open and not empty, was closed, and then drained,        // reordered reads could incorrectly indicate "open and empty". To prevent reordering,        // we use atomic loads for both checks, and rely on emptying and closing to happen in        // separate critical sections under the same lock.  This assumption fails when closing        // an unbuffered channel with a blocked send, but that is an error condition anyway.        if atomic.Load(&c.closed) == 0 {            // Because a channel cannot be reopened, the later observation of the channel            // being not closed implies that it was also not closed at the moment of the            // first observation. We behave as if we observed the channel at that moment            // and report that the receive cannot proceed.            return        }        // The channel is irreversibly closed. Re-check whether the channel has any pending data        // to receive, which could have arrived between the empty and closed checks above.        // Sequential consistency is also required here, when racing with such a send.        if empty(c) {            // The channel is irreversibly closed and empty.            if raceenabled {                raceacquire(c.raceaddr())            }            if ep != nil {                typedmemclr(c.elemtype, ep)            }            return true, false        }    }    var t0 int64    if blockprofilerate > 0 {        t0 = cputicks()    }    // 加锁    lock(&c.lock)    // channel是否敞开    if c.closed != 0 && c.qcount == 0 {        if raceenabled {            raceacquire(c.raceaddr())        }        unlock(&c.lock)        if ep != nil {            typedmemclr(c.elemtype, ep)        }        return true, false    }    // 发送队列出队    // 发送队列不为空(即有G在休眠期待),进入recv()    if sg := c.sendq.dequeue(); sg != nil {        // Found a waiting sender. If buffer is size 0, receive value        // directly from sender. Otherwise, receive from head of queue        // and add sender's value to the tail of the queue (both map to        // the same buffer slot because the queue is full).        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true, true    }    if c.qcount > 0 {        // Receive directly from queue         //取出数据        qp := chanbuf(c, c.recvx)        if raceenabled {            racenotify(c, c.recvx, nil)        }        if ep != nil {            typedmemmove(c.elemtype, ep, qp)        }         // 间接拷贝        typedmemclr(c.elemtype, qp)        // 保护索引         c.recvx++        if c.recvx == c.dataqsiz {            c.recvx = 0        }        c.qcount--        unlock(&c.lock)        return true, true    }    if !block {        unlock(&c.lock)        return false, false    }    // no sender available: block on this channel.    // 阻塞    gp := getg()    // 包装sudog    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {        mysg.releasetime = -1    }    // No stack splits between assigning elem and enqueuing mysg    // on gp.waiting where copystack can find it.    mysg.elem = ep    mysg.waitlink = nil    gp.waiting = mysg    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.param = nil    // 入队,接管队列    c.recvq.enqueue(mysg)    // Signal to anyone trying to shrink our stack that we're about    // to park on a channel. The window between when this G's status    // changes and when we set gp.activeStackChans is not safe for    // stack shrinking.    atomic.Store8(&gp.parkingOnChan, 1)    // 休眠    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)    // someone woke us up    if mysg != gp.waiting {        throw("G waiting list is corrupted")    }    // 唤醒当前保护本人的状态;间接返回    gp.waiting = nil    gp.activeStackChans = false    if mysg.releasetime > 0 {        blockevent(mysg.releasetime-t0, 2)    }    success := mysg.success    gp.param = nil    mysg.c = nil    releaseSudog(mysg)    return true, success}// recv processes a receive operation on a full channel c.// There are 2 parts:// 1) The value sent by the sender sg is put into the channel//    and the sender is woken up to go on its merry way.// 2) The value received by the receiver (the current G) is//    written to ep.// For synchronous channels, both values are the same.// For asynchronous channels, the receiver gets its data from// the channel buffer and the sender's data is put in the// channel buffer.// Channel c must be full and locked. recv unlocks c with unlockf.// sg must already be dequeued from c.// A non-nil ep must point to the heap or the caller's stack.func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    // 缓冲区为空    if c.dataqsiz == 0 {        if raceenabled {            racesync(c, sg)        }        if ep != nil {            // copy data from sender             // 间接拷贝            recvDirect(c.elemtype, sg, ep)        }    } else {        // Queue is full. Take the item at the        // head of the queue. Make the sender enqueue        // its item at the tail of the queue. Since the        // queue is full, those are both the same slot.        //队列已满。获取队列头部的我的项目。使发送者将其我的项目排在队列的尾部。因为队列已满,因而它们都是同一个插槽。        // 有缓冲区        // 取出数据        qp := chanbuf(c, c.recvx)        if raceenabled {            racenotify(c, c.recvx, nil)            racenotify(c, c.recvx, sg)        }        // copy data from queue to receiver        if ep != nil {            typedmemmove(c.elemtype, ep, qp)        }         // 间接拷贝         // copy data from sender to queue        typedmemmove(c.elemtype, qp, sg.elem)         // 保护索引        c.recvx++        if c.recvx == c.dataqsiz {            c.recvx = 0        }         // 保护索引        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz    }    sg.elem = nil    // 保护协程的状态    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    sg.success = true    if sg.releasetime != 0 {        sg.releasetime = cputicks()    }    // 唤醒发送协程    // 协程唤醒当前,数据曾经被取走,保护本人的状态,本人返回    goready(gp, skip+1)}func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {    // dst is on our stack or the heap, src is on another stack.    // The channel is locked, so src will not move during this    // operation.    src := sg.elem    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)    memmove(dst, src, t.size)}

敞开channel

func closechan(c *hchan) {    if c == nil {        panic(plainError("close of nil channel"))    }    // 加锁    lock(&c.lock)        //close of closed channel    // 敞开已敞开的channel会产生panic    // 如果channel曾经敞开,解锁,产生一个panic    if c.closed != 0 {        unlock(&c.lock)        panic(plainError("close of closed channel"))    }    if raceenabled {        callerpc := getcallerpc()        racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))        racerelease(c.raceaddr())    }        // closed置1    c.closed = 1    var glist gList    // release all readers    // 开释所有的读者    for {         // 获取承受协程         // 从承受队列出队        sg := c.recvq.dequeue()         // 承受队列为空,跳出循环        if sg == nil {            break        }         // 协程数据不为空        if sg.elem != nil {            typedmemclr(c.elemtype, sg.elem)            sg.elem = nil        }        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }         // 保护协程状态        gp := sg.g        gp.param = unsafe.Pointer(sg)        sg.success = false        if raceenabled {            raceacquireg(gp, c.raceaddr())        }        glist.push(gp)    }    // release all writers (they will panic)    for {        sg := c.sendq.dequeue()        if sg == nil {            break        }        sg.elem = nil        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }        gp := sg.g        gp.param = unsafe.Pointer(sg)        sg.success = false        if raceenabled {            raceacquireg(gp, c.raceaddr())        }        glist.push(gp)    }    unlock(&c.lock)    // Ready all Gs now that we've dropped the channel lock.    for !glist.empty() {        gp := glist.pop()        gp.schedlink = 0        goready(gp, 3)    }}

channel的cap和len

// len = 循环队列中的总数据len = qcount// cap = 循环队列的大小cap = dataqsiz

channel的应用

select非阻塞的应用channel

timer定时器