关于go:channel

5次阅读

共计 18128 个字符,预计需要花费 46 分钟才能阅读完成。

channel

channel 的申明与应用

// 无缓冲区的 channel
// 无缓冲区的 channel 必须有协程在期待它才能够向 channel 发送数据
ch := make(chan string)
// 向 channel 发送数据
ch <- "hllo"
// 从 channel 承受数据并赋给 x
x = <-ch
// 从 channel 接收数据并抛弃
<-ch

// 有缓冲区的 cahnnel
c := make(chan string,10)

channel 的数据结构

type hchan struct {
    // 环形缓冲区
    // 队列中的总数据
    qcount   uint           // total data in the queue
    // 循环队列的大小
    dataqsiz uint           // size of the circular queue
    // 指向 dataqsiz 的数组的元素
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    // 元素的大小
    elemsize uint16
    // 是否 closed
    closed   uint32
    // 元素类型
    elemtype *_type // element type
    // 发送索引
    sendx    uint   // send index
    // 接管索引
    recvx    uint   // receive index
    // 接管队列,由链表实现
    recvq    waitq  // list of recv waiters
    // 发送队列,由链表实现
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    // 爱护 hchan 所有字段
    // channel 并不是无锁的
    // 只在产生数据和接收数据时加锁,其余期间不加锁
    lock mutex
}

type waitq struct {
    first *sudog
    last  *sudog
}

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool

    // success indicates whether communication over channel c
    // succeeded. It is true if the goroutine was awoken because a
    // value was delivered over channel c, and false if awoken
    // because c was closed.
    success bool

    parent   *sudog // semaRoot binary tree
    waitlink *sudog // g.waiting list or semaRoot
    waittail *sudog // semaRoot
    c        *hchan // channel
}

创立 channel

ch := make(chan int,10)
0x0018 00024 (E:\project\deom\main.go:6)        LEAQ    type.chan int(SB), AX
0x001f 00031 (E:\project\deom\main.go:6)        MOVL    $10, BX
0x0024 00036 (E:\project\deom\main.go:6)        PCDATA  $1, $0
0x0024 00036 (E:\project\deom\main.go:6)        CALL    runtime.makechan(SB)
ch := make(chan int)
0x0018 00024 (E:\project\deom\main.go:6)        LEAQ    type.chan int(SB), AX
0x001f 00031 (E:\project\deom\main.go:6)        XORL    BX, BX
0x0021 00033 (E:\project\deom\main.go:6)        PCDATA  $1, $0
0x0021 00033 (E:\project\deom\main.go:6)        CALL    runtime.makechan(SB)
func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")
    }

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    // 队列或元素大小为零。case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
         // 元素不蕴含指针。// 一次调用调配 hchan 和 buf。c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
         // 元素蕴含指针。c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}
func makechan64(t *chantype, size int64) *hchan {if int64(int(size)) != size {panic(plainError("makechan: size out of range"))
    }

    return makechan(t, int(size))
}

channel 发送数据的原理

ch <- "data"
// ch <- 是一个语法糖 
// entry point for c <- x from compiled code
//go:nosplit
// 编译阶段,会把 ch <- 转化为 runtime.chansend1()
// chansend1() 会调用 chansen()
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}
/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {return false}
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {print("chansend: chan=", c, "\n")
    }

    if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not closed, we observe that the channel is
    // not ready for sending. Each of these observations is a single word-sized read
    // (first c.closed and second full()).
    // Because a closed channel cannot transition from 'ready for sending' to
    // 'not ready for sending', even if the channel is closed between the two observations,
    // they imply a moment between the two when the channel was both not yet closed
    // and not ready for sending. We behave as if we observed the channel at that moment,
    // and report that the send cannot proceed.
    //
    // It is okay if the reads are reordered here: if we observe that the channel is not
    // ready for sending and then observe that it is not closed, that implies that the
    // channel wasn't closed during the first observation. However, nothing here
    // guarantees forward progress. We rely on the side effects of lock release in
    // chanrecv() and closechan() to update this thread's view of c.closed and full().
    if !block && c.closed == 0 && full(c) {return false}

    var t0 int64
    if blockprofilerate > 0 {t0 = cputicks()
    }
    // 加锁
    // 缓冲区足够大,加锁工夫很短
    lock(&c.lock)
    // 判断 channel 是否敞开
    if c.closed != 0 {unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    // 期待队列
    // 接管队列出队。如果不为空(即有人在期待),进入 send
    // 间接发送数据,if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
         // 找到一个正在期待的接收器。咱们将要发送的值间接传递给接收者,绕过通道缓冲区             //(如果有的话)。send(c, sg, ep, func() {unlock(&c.lock) }, 3)
        return true
    }
    // 判断缓冲区是否还有空间
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
         //channel 缓冲区中有可用空间。将要发送的元素入队。// chanbuf(c, i) 是指向缓冲区中第 i 个槽的指针
         // 找出缓冲区中要存入的地址
        qp := chanbuf(c, c.sendx)
        if raceenabled {racenotify(c, c.sendx, nil)
        }
         // func typedmemmove(typ *_type, dst unsafe.Pointer, src unsafe.Pointer)
         // typedmemmove 将 t 类型的值从 src\ 复制到 dst。必须是 nosplit
         // 存入缓冲区
        typedmemmove(c.elemtype, qp, ep)
         // 保护索引
        c.sendx++
        if c.sendx == c.dataqsiz {c.sendx = 0}
         // 保护索引
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    // 休眠期待
    // 拿到本人的协程构造体
    gp := getg()
    // 把本人包装为 Sudog
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {mysg.releasetime = -1}
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    // 记录要发送的数据
    mysg.elem = ep
    mysg.waitlink = nil
    // 记录协程的指针
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // 入队
    c.sendq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    // 休眠
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
    if mysg != gp.waiting {throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    if mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    if closed {
        if c.closed == 0 {throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    return true
}

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {racesync(c, sg)
        } else {
            // Pretend we go through the buffer, even though
            // we copy directly. Note that we need to increment
            // the head/tail locations only when raceenabled.
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
            c.recvx++
            if c.recvx == c.dataqsiz {c.recvx = 0}
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
    // 将数据间接拷贝到接管变量中
    if sg.elem != nil {sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {sg.releasetime = cputicks()
    }
    // 唤醒协程
    goready(gp, skip+1)
}

// Sends and receives on unbuffered or empty-buffered channels are the
// only operations where one running goroutine writes to the stack of
// another running goroutine. The GC assumes that stack writes only
// happen when the goroutine is running and are only done by that
// goroutine. Using a write barrier is sufficient to make up for
// violating that assumption, but the write barrier has to work.
// typedmemmove will call bulkBarrierPreWrite, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src is on our stack, dst is a slot on another stack.

    // Once we read sg.elem out of sg, it will no longer
    // be updated if the destination's stack gets copied (shrunk).
    // So make sure that no preemption points can happen between read & use.
    // 目的地的指针 
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    // No need for cgo write barrier checks because dst is always
    // Go memory.
    // 间接拷贝
    memmove(dst, src, t.size)
}

//go:nosplit
func acquireSudog() *sudog {
    // Delicate dance: the semaphore implementation calls
    // acquireSudog, acquireSudog calls new(sudog),
    // new calls malloc, malloc can call the garbage collector,
    // and the garbage collector calls the semaphore implementation
    // in stopTheWorld.
    // Break the cycle by doing acquirem/releasem around new(sudog).
    // The acquirem/releasem increments m.locks during new(sudog),
    // which keeps the garbage collector from being invoked.
    mp := acquirem()
    pp := mp.p.ptr()
    if len(pp.sudogcache) == 0 {lock(&sched.sudoglock)
        // First, try to grab a batch from central cache.
        for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
            s := sched.sudogcache
            sched.sudogcache = s.next
            s.next = nil
            pp.sudogcache = append(pp.sudogcache, s)
        }
        unlock(&sched.sudoglock)
        // If the central cache is empty, allocate a new one.
        if len(pp.sudogcache) == 0 {pp.sudogcache = append(pp.sudogcache, new(sudog))
        }
    }
    n := len(pp.sudogcache)
    s := pp.sudogcache[n-1]
    pp.sudogcache[n-1] = nil
    pp.sudogcache = pp.sudogcache[:n-1]
    if s.elem != nil {throw("acquireSudog: found s.elem != nil in cache")
    }
    releasem(mp)
    return s
}

间接发送

  • 发送数据前,曾经有 G 在休眠期待接管
  • 此时缓冲区为空,不必思考缓冲区
  • 将数据间接拷贝给 G 的接管变量,唤醒 G
实现
  • 从接管期待队列中取出一个期待接管的 G
  • 将数据间接拷贝到接管变量中

放入缓冲区

  • 没有 G 休眠期待,然而有缓冲区空间
  • 存入数据
实现
  • 获取可存入的缓冲区地址
  • 存入数据
  • 保护索引

休眠期待

  • 没有 G 在休眠期待,而且没有缓冲或缓冲区满了
  • 进入发送期待队列,休眠期待
实现
  • 把本人包装成 sudog
  • 把 sudog 放入 sendq 队列
  • 休眠并解锁
  • 被唤醒后,数据曾经被取走,保护其余数据

channel 接收数据

// <-ch 是一个语法糖
<- ch
// 编译阶段,转化为 runtime.chanrecv1()
data <- ch
// 编译阶段,转化为 runtime.chanrecv2()
value,ok <- ch

// 最初调用 chanrecv()
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)
    return
}

有期待发送的 G,从期待发送 G 接管

  • 接收数据前,曾经有 G 在休眠期待发送
  • 而且这个 channel 没有缓冲区
  • 将数据间接从 G 拷贝过去,唤醒 G
实现
  • 判断有 G 在产生期待队列,进入`recv()`
  • 判断此 channel 无缓冲区
  • 间接从期待的 G 中取走数据,唤醒 G

有期待发送的 G,从缓冲区接管

  • 接收数据前,曾经有 G 在休眠期待发送(缓冲区中的数据比发送队列的数据更晚,因为数据先发送到缓冲区,缓冲区满了当前再发送到发送队列)
  • 而且 channel 有缓冲区
  • 从缓冲区取走一个数据
  • 将休眠 G 的数据放入缓冲区,唤醒 G
实现
  • 判断有 G 在发送队列期待,进入 recv()
  • 判断此 channel 有缓冲区 i
  • 从缓冲区取走一个数据
  • 将 G 的数据放入缓冲区,唤醒 G

接收缓冲区

  • 没有 G 在休眠期待,然而缓冲区有内容
  • 间接从缓冲区取走数据
实现
  • 判断没有 G 在发送队列期待
  • 判断 channel 有无缓冲区
  • 从缓冲区取走一个数据

阻塞接管

  • 没有发送 G 在休眠期待而且没有缓冲区或者缓冲区为空
  • 进入接管队列,休眠期待
实现
  • 判断没有 G 在发送期待
  • 判断 channel 有无缓冲区
  • 把本人包装为 sudog
  • sudog 放入接管期待队列,休眠
  • 唤醒时,发送的 G 曾经把数据拷贝到位

代码

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.

    if debugChan {print("chanrecv: chan=", c, "\n")
    }

    if c == nil {
        if !block {return}
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    if !block && empty(c) {
        // After observing that the channel is not ready for receiving, we observe whether the
        // channel is closed.
        //
        // Reordering of these checks could lead to incorrect behavior when racing with a close.
        // For example, if the channel was open and not empty, was closed, and then drained,
        // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
        // we use atomic loads for both checks, and rely on emptying and closing to happen in
        // separate critical sections under the same lock.  This assumption fails when closing
        // an unbuffered channel with a blocked send, but that is an error condition anyway.
        if atomic.Load(&c.closed) == 0 {
            // Because a channel cannot be reopened, the later observation of the channel
            // being not closed implies that it was also not closed at the moment of the
            // first observation. We behave as if we observed the channel at that moment
            // and report that the receive cannot proceed.
            return
        }
        // The channel is irreversibly closed. Re-check whether the channel has any pending data
        // to receive, which could have arrived between the empty and closed checks above.
        // Sequential consistency is also required here, when racing with such a send.
        if empty(c) {
            // The channel is irreversibly closed and empty.
            if raceenabled {raceacquire(c.raceaddr())
            }
            if ep != nil {typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }

    var t0 int64
    if blockprofilerate > 0 {t0 = cputicks()
    }
    // 加锁
    lock(&c.lock)
    // channel 是否敞开
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {typedmemclr(c.elemtype, ep)
        }
        return true, false
    }
    // 发送队列出队
    // 发送队列不为空(即有G在休眠期待),进入 recv()
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() {unlock(&c.lock) }, 3)
        return true, true
    }

    if c.qcount > 0 {
        // Receive directly from queue
         // 取出数据
        qp := chanbuf(c, c.recvx)
        if raceenabled {racenotify(c, c.recvx, nil)
        }
        if ep != nil {typedmemmove(c.elemtype, ep, qp)
        }
         // 间接拷贝
        typedmemclr(c.elemtype, qp)
        // 保护索引
         c.recvx++
        if c.recvx == c.dataqsiz {c.recvx = 0}
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
    // 阻塞
    gp := getg()
    // 包装 sudog
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {mysg.releasetime = -1}
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 入队, 接管队列
    c.recvq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    // 休眠
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up
    if mysg != gp.waiting {throw("G waiting list is corrupted")
    }
    // 唤醒当前保护本人的状态;间接返回
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)
    }
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 缓冲区为空
    if c.dataqsiz == 0 {
        if raceenabled {racesync(c, sg)
        }
        if ep != nil {
            // copy data from sender
             // 间接拷贝
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        // 队列已满。获取队列头部的我的项目。使发送者将其我的项目排在队列的尾部。因为队列已满,因而它们都是同一个插槽。// 有缓冲区
        // 取出数据
        qp := chanbuf(c, c.recvx)
        if raceenabled {racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
        }
        // copy data from queue to receiver
        if ep != nil {typedmemmove(c.elemtype, ep, qp)
        }
         // 间接拷贝 
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
         // 保护索引
        c.recvx++
        if c.recvx == c.dataqsiz {c.recvx = 0}
         // 保护索引
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    // 保护协程的状态
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {sg.releasetime = cputicks()
    }
    // 唤醒发送协程
    // 协程唤醒当前,数据曾经被取走,保护本人的状态,本人返回
    goready(gp, skip+1)
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst is on our stack or the heap, src is on another stack.
    // The channel is locked, so src will not move during this
    // operation.
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

敞开 channel

func closechan(c *hchan) {
    if c == nil {panic(plainError("close of nil channel"))
    }
    // 加锁
    lock(&c.lock)
    
    //close of closed channel
    // 敞开已敞开的 channel 会产生 panic
    // 如果 channel 曾经敞开,解锁,产生一个 panic
    if c.closed != 0 {unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
        racerelease(c.raceaddr())
    }
    
    // closed 置 1
    c.closed = 1

    var glist gList

    // release all readers
    // 开释所有的读者
    for {
         // 获取承受协程
         // 从承受队列出队
        sg := c.recvq.dequeue()
         // 承受队列为空,跳出循环
        if sg == nil {break}
         // 协程数据不为空
        if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {sg.releasetime = cputicks()
        }
         // 保护协程状态
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
    for {sg := c.sendq.dequeue()
        if sg == nil {break}
        sg.elem = nil
        if sg.releasetime != 0 {sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

channel 的 cap 和 len

// len = 循环队列中的总数据
len = qcount
// cap = 循环队列的大小
cap = dataqsiz

channel 的应用

select 非阻塞的应用 channel

timer 定时器

正文完
 0