golang channel 详解

前言

CSP:不要通过共享内存来通信,而要通过通信来实现内存共享,它是Go 的并发哲学,基于 channel 实现。
Channel是Go中的一个外围类型,你能够把它看成一个管道,通过它并发外围单元就能够发送或者接收数据进行通信(communication)。

数据结构

runtime/chan.go

type hchan struct {    qcount   uint           // 队列中残余元素    dataqsiz uint           // 队列长度,eg make(chan int64, 5), dataqsiz为5    buf      unsafe.Pointer // 数据存储环形数组    elemsize uint16         // 每个元素的大小    closed   uint32         // 是否敞开 0 未敞开    elemtype *_type         // 元素类型    sendx    uint           // 发送者写入地位    recvx    uint           // 接受者读数据地位    recvq    waitq          // 接收者队列,保留正在读取channel的goroutian    sendq    waitq          // 发送者队列,保留正在发送channel的goroutian    lock     mutex          // 锁}

waitq是双向链表,sudog为goroutian的封装

type waitq struct {    first *sudog    last  *sudog}

make(chan int, 6)

上图为一个长度为6,类型为int, 两个接收者,三个发送者的channel,以后接收者筹备读数据的地位为0,发送者发送数据地位为4

留神,个别状况下recvq和sendq至多有一个为空。只有一个例外,那就是同一个goroutine应用select语句向channel一边写数据,一边读数据。

channel创立

创立channel的过程实际上是初始化hchan构造。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度独特决定。
runtime/chan.go line:71

func makechan(t *chantype, size int) *hchan {    elem := t.elem    //...    var c *hchan    //创立hchan构造并分配内存    switch {    // 无缓冲区    case mem == 0:        c = (*hchan)(mallocgc(hchanSize, nil, true))        c.buf = c.raceaddr()    // 元素不含指针    case elem.ptrdata == 0:        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))        c.buf = add(unsafe.Pointer(c), hchanSize)    default:     // 默认场景,构造体和buffer独自分配内存        c = new(hchan)        c.buf = mallocgc(mem, elem, true)    }    //元素大小    c.elemsize = uint16(elem.size)    //元素类型    c.elemtype = elem    //队列长度    c.dataqsiz = uint(size)    //...    return c}

写数据

  1. 如果期待接管队列recvq不为空,阐明缓冲区中没有数据或者没有缓冲区,此时间接从recvq取出G,并把数据写入,最初把该G唤醒,完结发送过程;
  2. 如果缓冲区中有空余地位,将数据写入缓冲区,完结发送过程;
  3. 如果缓冲区中没有空余地位,将待发送数据写入G,将以后G退出sendq,进入睡眠,期待被读goroutine唤醒;

写数据代码解析

写数据分为阻塞写和非阻塞写 代码示例:

c := make(chan int64)    c <- 1 //阻塞写    //非阻塞写    select {    case c <- 1:        //do something        break    default:        //do something    }

对应源码里两个函数:

//非阻塞func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {    return chansend(c, elem, false, getcallerpc())}//阻塞func chansend1(c *hchan, elem unsafe.Pointer) {   chansend(c, elem, true, getcallerpc())}

留神:非阻塞写必须带上default

chansend源码

// 位于 src/runtime/chan.gofunc chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    // 如果 channel 是 nil    if c == nil {        // 不能阻塞,间接返回 false,示意未发送胜利        if !block {            return false        }        // 以后 goroutine 被挂起        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)        throw("unreachable")    }    // 省略 debug 相干……    // 对于不阻塞的 send,疾速检测失败场景    //    // 如果 channel 未敞开且 channel 没有多余的缓冲空间。这可能是:    // 1. channel 是非缓冲型的,且期待接管队列里没有 goroutine    // 2. channel 是缓冲型的,但循环数组曾经装满了元素    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {        return false    }    var t0 int64    if blockprofilerate > 0 {        t0 = cputicks()    }    // 锁住 channel,并发平安    lock(&c.lock)    // 如果 channel 敞开了    if c.closed != 0 {        // 解锁        unlock(&c.lock)        // 间接 panic        panic(plainError("send on closed channel"))    }    // 如果接管队列里有 goroutine,间接将要发送的数据拷贝到接管 goroutine    if sg := c.recvq.dequeue(); sg != nil {        send(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true    }    // 对于缓冲型的 channel,如果还有缓冲空间    if c.qcount < c.dataqsiz {        // qp 指向 buf 的 sendx 地位        qp := chanbuf(c, c.sendx)        // ……        // 将数据从 ep 处拷贝到 qp        typedmemmove(c.elemtype, qp, ep)        // 发送游标值加 1        c.sendx++        // 如果发送游标值等于容量值,游标值归 0        if c.sendx == c.dataqsiz {            c.sendx = 0        }        // 缓冲区的元素数量加一        c.qcount++        // 解锁        unlock(&c.lock)        return true    }    // 如果不须要阻塞,则间接返回谬误    if !block {        unlock(&c.lock)        return false    }    // channel 满了,发送方会被阻塞。接下来会结构一个 sudog    // 获取以后 goroutine 的指针    gp := getg()    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {        mysg.releasetime = -1    }    mysg.elem = ep    mysg.waitlink = nil    mysg.g = gp    mysg.selectdone = nil    mysg.c = c    gp.waiting = mysg    gp.param = nil    // 以后 goroutine 进入发送期待队列    c.sendq.enqueue(mysg)    // 以后 goroutine 被挂起    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)    // 从这里开始被唤醒了(channel 有机会能够发送了)    if mysg != gp.waiting {        throw("G waiting list is corrupted")    }    gp.waiting = nil    if gp.param == nil {        if c.closed == 0 {            throw("chansend: spurious wakeup")        }        // 被唤醒后,channel 敞开了。坑爹啊,panic        panic(plainError("send on closed channel"))    }    gp.param = nil    if mysg.releasetime > 0 {        blockevent(mysg.releasetime-t0, 2)    }    // 去掉 mysg 上绑定的 channel    mysg.c = nil    releaseSudog(mysg)    return true}

读数据

  1. 如果期待发送队列sendq不为空,且没有缓冲区,间接从sendq中取出G,把G中数据读出,最初把G唤醒,完结读取过程;
  2. 如果期待发送队列sendq不为空,此时阐明缓冲区已满,从缓冲区中首部读出数据,把G中数据写入缓冲区尾部,把G唤醒,完结读取过程;
  3. 如果缓冲区中有数据,则从缓冲区取出数据,完结读取过程;
  4. 将以后goroutine退出recvq,进入睡眠,期待被写goroutine唤醒;

读数据代码解析

读数据分为阻塞读和非阻塞读 代码示例:

c := make(chan int, 10)<-c //阻塞读//select读带default为非阻塞读select{case <-c:    //...    breakdefault:    //...}

留神:非阻塞读必须带上default

接管操作有两种写法,一种带 “ok”,反馈 channel 是否敞开;一种不带 “ok”,这种写法,当接管到相应类型的零值时无奈晓得是实在的发送者发送过去的值,还是 channel 被敞开后,返回给接收者的默认类型的零值,代码示例:

c := make(chan int64, 5)    c <- 0    v, ok := <-c    fmt.Println(v, ok) // 0, true    close(c)    v, ok = <-c     fmt.Println(v, ok) // 0, false

最初对应源码里的这四个函数:

//非阻塞读不带ok返回func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {    selected, _ = chanrecv(c, elem, false)    return}//非阻塞读带Ok返回func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {   // TODO(khr): just return 2 values from this function, now that it is in Go.  selected, *received = chanrecv(c, elem, false)   return}//阻塞读不带ok返回func chanrecv1(c *hchan, elem unsafe.Pointer) {   chanrecv(c, elem, true)}//阻塞读带ok返回func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {   _, received = chanrecv(c, elem, true)   return}

能够看进去,最终都指向了chanrecv函数,如果有接管值,val := <-c,会把接管值放到elem的地址中,如果疏忽接管值间接写<-c,这时elem为nil
上面来看看chanrecv的代码

// 位于 src/runtime/chan.go// chanrecv 函数接管 channel c 的元素并将其写入 ep 所指向的内存地址。// 如果 ep 是 nil,阐明疏忽了接管值。// 如果 block == false,即非阻塞型接管,在没有数据可接管的状况下,返回 (false, false)// 否则,如果 c 处于敞开状态,将 ep 指向的地址清零,返回 (true, false)// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)// 如果 ep 非空,则应该指向堆或者函数调用者的栈func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {    // 省略 debug 内容 …………    // 如果是一个 nil 的 channel    if c == nil {        // 如果不阻塞,间接返回 (false, false)        if !block {            return        }        // 否则,接管一个 nil 的 channel,goroutine 挂起        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)        // 不会执行到这里        throw("unreachable")    }    // 在非阻塞模式下,疾速检测到失败,不必获取锁,疾速返回    // 当咱们察看到 channel 没筹备好接管:    // 1. 非缓冲型,期待发送列队 sendq 里没有 goroutine 在期待    // 2. 缓冲型,但 buf 里没有元素    // 之后,又察看到 closed == 0,即 channel 未敞开。    // 因为 channel 不可能被反复关上,所以前一个观测的时候 channel 也是未敞开的,    // 因而在这种状况下能够间接发表接管失败,返回 (false, false)    // 非阻塞 && ((非缓冲型 && 发送队列为空) || (缓冲性 && 没有数据)) && 没有敞开    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&        atomic.Load(&c.closed) == 0 {        return    }    var t0 int64    if blockprofilerate > 0 {        t0 = cputicks()    }    // 加锁    lock(&c.lock)    // channel 已敞开,并且循环数组 buf 里没有元素    // 这里能够解决非缓冲型敞开 和 缓冲型敞开但 buf 无元素的状况    // 也就是说即便是敞开状态,但在缓冲型的 channel,    // buf 里有元素的状况下还能接管到元素    if c.closed != 0 && c.qcount == 0 {        if raceenabled {            raceacquire(unsafe.Pointer(c))        }        // 解锁        unlock(&c.lock)        if ep != nil {            // 从一个已敞开的 channel 执行接管操作,且未疏忽返回值            // 那么接管的值将是一个该类型的零值            // typedmemclr 依据类型清理相应地址的内存            typedmemclr(c.elemtype, ep)        }        // 从一个已敞开的 channel 接管,selected 会返回true        return true, false    }    // 期待发送队列里有 goroutine 存在,阐明 buf 是满的    // 这有可能是:    // 1. 非缓冲型的 channel    // 2. 缓冲型的 channel,但 buf 满了    // 针对 1,间接进行内存拷贝(从 sender goroutine -> receiver goroutine)    // 针对 2,接管到循环数组头部的元素,并将发送者的元素放到循环数组尾部    if sg := c.sendq.dequeue(); sg != nil {        // Found a waiting sender. If buffer is size 0, receive value        // directly from sender. Otherwise, receive from head of queue        // and add sender's value to the tail of the queue (both map to        // the same buffer slot because the queue is full).        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true, true    }    // 缓冲型,buf 里有元素,能够失常接管    if c.qcount > 0 {        // 间接从循环数组里找到要接管的元素        qp := chanbuf(c, c.recvx)        // …………        // 代码里,没有疏忽要接管的值,不是 "<- ch",而是 "val <- ch",ep 指向 val        if ep != nil {            typedmemmove(c.elemtype, ep, qp)        }        // 清理掉循环数组里相应地位的值        typedmemclr(c.elemtype, qp)        // 接管游标向前挪动        c.recvx++        // 接管游标归零        if c.recvx == c.dataqsiz {            c.recvx = 0        }        // buf 数组里的元素个数减 1        c.qcount--        // 解锁        unlock(&c.lock)        return true, true    }    if !block {        // 非阻塞接管,解锁。selected 返回 false,因为没有接管到值        unlock(&c.lock)        return false, false    }    // 接下来就是要被阻塞的状况了    // 结构一个 sudog    gp := getg()    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {        mysg.releasetime = -1    }    // 待接收数据的地址保留下来    mysg.elem = ep    mysg.waitlink = nil    gp.waiting = mysg    mysg.g = gp    mysg.selectdone = nil    mysg.c = c    gp.param = nil    // 进入channel 的期待接管队列    c.recvq.enqueue(mysg)    // 将以后 goroutine 挂起    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)    // 被唤醒了,接着从这里继续执行一些开头工作    if mysg != gp.waiting {        throw("G waiting list is corrupted")    }    gp.waiting = nil    if mysg.releasetime > 0 {        blockevent(mysg.releasetime-t0, 2)    }    closed := gp.param == nil    gp.param = nil    mysg.c = nil    releaseSudog(mysg)    return true, !closed}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    // 如果是非缓冲型的 channel    if c.dataqsiz == 0 {        if raceenabled {            racesync(c, sg)        }        // 未疏忽接管的数据        if ep != nil {            // 间接拷贝数据,从 sender goroutine -> receiver goroutine            recvDirect(c.elemtype, sg, ep)        }    } else {        // 缓冲型的 channel,但 buf 已满。        // 将循环数组 buf 队首的元素拷贝到接收数据的地址        // 将发送者的数据入队。实际上这时 revx 和 sendx 值相等        // 找到接管游标        qp := chanbuf(c, c.recvx)        // …………        // 将接管游标处的数据拷贝给接收者        if ep != nil {            typedmemmove(c.elemtype, ep, qp)        }        // 将发送者数据拷贝到 buf        typedmemmove(c.elemtype, qp, sg.elem)        // 更新游标值        c.recvx++        if c.recvx == c.dataqsiz {            c.recvx = 0        }        c.sendx = c.recvx    }    sg.elem = nil    gp := sg.g    // 解锁    unlockf()    gp.param = unsafe.Pointer(sg)    if sg.releasetime != 0 {        sg.releasetime = cputicks()    }    // 唤醒发送的 goroutine。须要等到调度器的光顾    goready(gp, skip+1)}

敞开channel

close 逻辑比较简单,对于一个 channel,recvq 和 sendq 中别离保留了阻塞的发送者和接收者。敞开 channel 后,对于期待接收者而言,会收到一个相应类型的零值。对于期待发送者,会间接 panic。所以,在不理解 channel 还有没有接收者的状况下,不能贸然敞开 channel。

close 函数先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最初,再将所有的 sudog 全都唤醒。

留神:敞开曾经敞开的channel或者往一个敞开的channel中发送数据会产生panic

敞开准则:
个别原则上应用通道是不容许接管方敞开通道和 不能敞开一个有多个并发发送者的通道。 换而言之, 你只能在发送方的 goroutine 中敞开只有该发送方的通道。

敞开代码解析

func closechan(c *hchan) {    // 敞开一个 nil channel,panic    if c == nil {        panic(plainError("close of nil channel"))    }    // 上锁    lock(&c.lock)    // 如果 channel 曾经敞开    if c.closed != 0 {        unlock(&c.lock)        // panic        panic(plainError("close of closed channel"))    }    // …………    // 批改敞开状态    c.closed = 1    var glist *g    // 将 channel 所有期待接管队列的里 sudog 开释    for {        // 从接管队列里出队一个 sudog        sg := c.recvq.dequeue()        // 出队结束,跳出循环        if sg == nil {            break        }        // 如果 elem 不为空,阐明此 receiver 未疏忽接收数据        // 给它赋一个相应类型的零值        if sg.elem != nil {            typedmemclr(c.elemtype, sg.elem)            sg.elem = nil        }        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }        // 取出 goroutine        gp := sg.g        gp.param = nil        if raceenabled {            raceacquireg(gp, unsafe.Pointer(c))        }        // 相连,造成链表        gp.schedlink.set(glist)        glist = gp    }    // 将 channel 期待发送队列里的 sudog 开释    // 如果存在,这些 goroutine 将会 panic    for {        // 从发送队列里出队一个 sudog        sg := c.sendq.dequeue()        if sg == nil {            break        }        // 发送者会 panic        sg.elem = nil        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }        gp := sg.g        gp.param = nil        if raceenabled {            raceacquireg(gp, unsafe.Pointer(c))        }        // 造成链表        gp.schedlink.set(glist)        glist = gp    }    // 解锁    unlock(&c.lock)    // Ready all Gs now that we've dropped the channel lock.    // 遍历链表    for glist != nil {        // 取最初一个        gp := glist        // 向前走一步,下一个唤醒的 g        glist = glist.schedlink.ptr()        gp.schedlink = 0        // 唤醒相应 goroutine        goready(gp, 3)    }}