置信大家在开发的过程中常常会应用到 go 中并发利器 channelchannelCSP 并发模型中最重要的一个组件,两个独立的并发实体通过共享的通信 channel 进行通信。大多数人只是会用这么个构造很少有人探讨它底层实现,这篇文章讲写写 channel 的底层实现。



type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex


通道像一个 传送带 或者 队列 ,总是遵循FIFO 的规定,保障收发数据的程序,通道是 goroutine 间重要通信的形式,是并发平安的。


hchan构造体中的 buf 指向一个循环队列,用来实现循环队列,sendx是循环队列的队尾指针,recvx是循环队列的队头指针,dataqsize是缓存型通道的大小,qcount是记录通道内元素个数。

在日常开发过程中用的最多就是 ch := make(chan int, 10) 这样的形式创立一个通道,如果这要申明初始化的话,这个通道就是有缓冲区的,也是图上紫色的 bufbuf 是在 make 的时候程序创立的,它有 元素大小 * 元素个数 组成一个循环队列,能够看做成一个环形构造,buf则是一个指针指向这个环。

上图对应的代码那就是 ch = make(chan int,6)buf 指向这个环在 heap 上的地址。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {throw("makechan: invalid channel element type")
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")

      mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    return c

下面就是对应的代码实现,上来它会查看你一系列参数是否非法,而后在通过 mallocgc 在内存开拓这块空间,而后返回。

sendx & recvx

上面我手动模仿一个 ring 实现的代码:

// Queue cycle buffer
type CycleQueue struct {data                  []interface{} // 寄存元素的数组,精确来说是切片
    frontIndex, rearIndex int           // frontIndex 头指针,rearIndex 尾指针
    size                  int           // circular 的大小

// NewQueue Circular Queue
func NewQueue(size int) (*CycleQueue, error) {
    if size <= 0 || size < 10 {return nil, fmt.Errorf("initialize circular queue size fail,%d not legal,size >= 10", size)
    cq := new(CycleQueue)
    cq.data = make([]interface{}, size)
    cq.size = size
    return cq, nil

// Push  add data to queue
func (q *CycleQueue) Push(value interface{}) error {if (q.rearIndex+1)%cap(q.data) == q.frontIndex {return errors.New("circular queue full")
    q.data[q.rearIndex] = value
    q.rearIndex = (q.rearIndex + 1) % cap(q.data)
    return nil

// Pop return queue a front element
func (q *CycleQueue) Pop() interface{} {
    if q.rearIndex == q.frontIndex {return nil}
    v := q.data[q.frontIndex]
    q.data[q.frontIndex] = nil // 拿除元素 地位就设置为空
    q.frontIndex = (q.frontIndex + 1) % cap(q.data)
    return v

循环队列个别应用空余单元法来解决队空和队满时候都存在 font=rear 带来的二义性问题,但这样会节约一个单元。golangchannel 中是通过减少 qcount 字段记录队列长度来解决二义性,一方面不会节约一个存储单元,另一方面当应用 len 函数查看队列长度时候,能够间接返回 qcount 字段,两全其美。

当咱们须要读取的数据的时候间接从 recvx 指针上的元素取,而写就从 sendx 地位写入元素,如图:

sendq & recvq


如果写阻塞的时候会把以后的协程退出到 sendq 的队列中,直到有一个 recvq 发动了一个读取的操作,那么写的队列就会被程序唤醒进行工作。

当缓冲区满了所有的 g-w 则被退出 sendq 队列期待 g-r 有操作就被唤醒 g-w,持续工作,这种设计和操作系统的外面thread5种状态很靠近了,能够看出 go 的设计者在可能参考过操作系统的 thread 设计。

当然下面只是我简述整个个过程,实际上 go 还做了其余细节优化,sendq不为空的时候,并且没有缓冲区,也就是无缓冲区通道,此时会从 sendq 第一个协程中拿取数据,有趣味的 gopher 能够去本人查看源代码,本文也是最近笔者在看到这块源代码的笔记总结。


