转载自:go channel原理及应用场景

源码解析

type hchan struct {    qcount   uint           // Channel 中的元素个数    dataqsiz uint           // Channel 中的循环队列的长度    buf      unsafe.Pointer // Channel 的缓冲区数据指针    elemsize uint16 // 以后 Channel 可能收发的元素大小    closed   uint32    elemtype *_type // 以后 Channel 可能收发的元素类型    sendx    uint   // Channel 的发送操作解决到的地位    recvx    uint   // Channel 的接管操作解决到的地位  recvq    waitq  // 以后 Channel 因为缓冲区空间有余而阻塞的 Goroutine 列表,双向链表(sugog)    sendq    waitq  // 以后 Channel 因为缓冲区空间有余而阻塞的 Goroutine 列表,双向链表(sugog)    // lock protects all fields in hchan, as well as several    // fields in sudogs blocked on this channel.    //    // Do not change another G's status while holding this lock    // (in particular, do not ready a G), as this can deadlock    // with stack shrinking.    lock mutex}

创立channel

channel的初始化有2种,一种是没有缓冲区的channel,一种是有缓冲区的channel。对应的初始化之后hchan也是有区别的。

无缓冲区的channel,初始化的时候只为channel分配内存,缓冲区dataqsiz的长度为0

有缓冲的channel,初始化时会为channel和缓冲区分配内存,dataqsiz长度大于0

同时channel的元素大小和缓冲区的长度都是有大小限度的

func makechan(t *chantype, size int) *hchan {    elem := t.elem    // compiler checks this but be safe.    if elem.size >= 1<<16 {        throw("makechan: invalid channel element type")    }    if hchanSize%maxAlign != 0 || elem.align > maxAlign {        throw("makechan: bad alignment")    }  // 如果内存超了,或者调配的内存大于channel最大分配内存,或者调配的size小于0,间接Panic    mem, overflow := math.MulUintptr(elem.size, uintptr(size))    if overflow || mem > maxAlloc-hchanSize || size < 0 {        panic(plainError("makechan: size out of range"))    }    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.    // buf points into the same allocation, elemtype is persistent.    // SudoG's are referenced from their owning thread so they can't be collected.    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.    var c *hchan    switch {    case mem == 0:        // 如果没有缓冲区,调配一段内存        c = (*hchan)(mallocgc(hchanSize, nil, true))        // Race detector uses this location for synchronization.        c.buf = c.raceaddr()    case elem.ptrdata == 0:        // 有缓冲时,如果元素不蕴含指针类型,会为以后的 Channel 和底层的数组调配一块间断的内存空间        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))        c.buf = add(unsafe.Pointer(c), hchanSize)    default:        // 有缓冲区,且元素蕴含指针类型,channel和buf数组各自分配内存        c = new(hchan)        c.buf = mallocgc(mem, elem, true)    }  // 元素大小,元素类型,循环数组长度,更新到channel    c.elemsize = uint16(elem.size)    c.elemtype = elem    c.dataqsiz = uint(size)    lockInit(&c.lock, lockRankHchan)    if debugChan {        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")    }    return c}

发送数据(ch <- i)

  • 发送数据前会加锁,避免多个线程并发批改数据。如果channel曾经敞开,间接Panic

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    lock(&c.lock)    if c.closed != 0 {        unlock(&c.lock)        panic(plainError("send on closed channel"))    }
  • 当存在期待的接收者时,通过 runtime.send 间接将数据发送给阻塞的接收者

    当channel的recvq队列不为空,而且channel是没有数据数据写入的。这个时候如果有数据写入,会间接把数据拷贝到接收者变量所在的内存地址上。即便这是一个有缓冲的channel,当有期待的接收者时,也是间接给接收者,不会先保留到循环队列

    // 如果指标 Channel 没有被敞开并且曾经有处于读期待的 Goroutine,那么 runtime.chansend 会从接管队列 recvq 中取出最先陷入期待的 Goroutine 并间接向它发送数据if sg := c.recvq.dequeue(); sg != nil {        send(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true    }// func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    if sg.elem != nil {    // 调用 runtime.sendDirect 将发送的数据间接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上        sendDirect(c.elemtype, sg, ep)        sg.elem = nil    }    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)  // 调用 runtime.goready 将期待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上期待执行,该处理器在下一次调度时会立即唤醒数据的接管方;  // 须要留神的是,发送数据的过程只是将接管方的 Goroutine 放到了处理器的 runnext 中,程序没有立即执行该 Goroutine    goready(gp, skip+1)}
  • 当缓冲区存在空余空间时,将发送的数据写入 Channel 的缓冲区

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    ...  // 如果以后元素数小于循环队列的长度    if c.qcount < c.dataqsiz {    // 应用 runtime.chanbuf 计算出下一个能够存储数据的地位        qp := chanbuf(c, c.sendx)    // 将发送的数据拷贝到缓冲区中        typedmemmove(c.elemtype, qp, ep)    // 发送的地位索引+1        c.sendx++    // 如果循环队列满了就从0开始    // 因为这里的 buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会从新回到数组开始的地位        if c.sendx == c.dataqsiz {            c.sendx = 0        }    // 减少以后元素数        c.qcount++        unlock(&c.lock)        return true    }    ...}
  • 当不存在缓冲区或者缓冲区已满时,期待其余 Goroutine 从 Channel 接收数据

    当因为不存在缓冲区或者缓冲区已满无奈写入时,会结构sudog期待执行的gorutine构造,放到hchan的期待队列中,直到被唤醒,把数据放到缓冲区或者间接拷贝给接收者

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    ...  // 应用 select 关键字能够向 Channel 非阻塞地发送音讯    if !block {        unlock(&c.lock)        return false    }  // 获取发送数据应用的 Goroutine    gp := getg()  //  获取 runtime.sudog 构造    mysg := acquireSudog()  // 设置待发送数据的内存地址    mysg.elem = ep  // 设置发送数据的goroutine    mysg.g = gp  mysg.isSelect = false  // 设置发送的channel    mysg.c = c  // 设置到goroutine的waiting上    gp.waiting = mysg  // 退出到发送期待队列    c.sendq.enqueue(mysg)  // 阻塞期待唤醒    atomic.Store8(&gp.parkingOnChan, 1)    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)    KeepAlive(ep)    // someone woke us up.    if mysg != gp.waiting {        throw("G waiting list is corrupted")    }    gp.waiting = nil    gp.activeStackChans = false    closed := !mysg.success    gp.param = nil    if mysg.releasetime > 0 {        blockevent(mysg.releasetime-t0, 2)    }    mysg.c = nil    releaseSudog(mysg)    if closed {        if c.closed == 0 {            throw("chansend: spurious wakeup")        }        panic(plainError("send on closed channel"))    }    return true}

接收数据(<- ch)

  • 从一个空 Channel 接收数据

    goroutine会让出使用权,并阻塞期待

        if c == nil {        if !block {            return        }    // 让出使用权        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)        throw("unreachable")    }    // 不获取锁的状况下,查看失败的非阻塞操作    if !block && empty(c) {        // 显示未敞开,持续返回false,因为channel不会从新关上        if atomic.Load(&c.closed) == 0 {            return        }        if empty(c) {            // The channel is irreversibly closed and empty.            if raceenabled {                raceacquire(c.raceaddr())            }      // Channel 曾经被敞开并且缓冲区中不存在任何数据,那么会革除 ep 指针中的数据并立即返回            if ep != nil {                typedmemclr(c.elemtype, ep)            }            return true, false        }    }    var t0 int64    if blockprofilerate > 0 {        t0 = cputicks()    }    lock(&c.lock)    if c.closed != 0 && c.qcount == 0 {        if raceenabled {            raceacquire(c.raceaddr())        }        unlock(&c.lock)    // Channel 曾经被敞开并且缓冲区中不存在任何数据,那么会革除 ep 指针中的数据并立即返回        if ep != nil {            typedmemclr(c.elemtype, ep)        }        return true, false    }
  • 当存在期待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据

    如果是无缓冲的channel,当有接收者进来时,会间接从阻塞的发送者拷贝数据

    如果是有缓冲的channel,当有接收者进来时,会先从缓冲区拿数据,接着期待的发送者会把数据拷贝到缓冲区

    留神这个时候并没有间接去唤醒发送者,而是放到下次p的执行队列中中,下次调度时会唤醒发送者,发送者会做一些开释资源的操作

    if sg := c.sendq.dequeue(); sg != nil {        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)        return true, true    }func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    if c.dataqsiz == 0 {        if raceenabled {            racesync(c, sg)        }        if ep != nil {            // 如果无缓存,间接从发送者拷贝数据            recvDirect(c.elemtype, sg, ep)        }    } else {        // 因为队列已满,接收数据的索引和发送数据的索引统一    qp := chanbuf(c, c.recvx)        if raceenabled {            racenotify(c, c.recvx, nil)            racenotify(c, c.recvx, sg)        }        // 数据从队列拷贝到指标内存地址        if ep != nil {            typedmemmove(c.elemtype, ep, qp)        }        // 数据从发送者拷贝到缓冲区        typedmemmove(c.elemtype, qp, sg.elem)        c.recvx++        if c.recvx == c.dataqsiz {            c.recvx = 0        }        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz    }    sg.elem = nil    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    sg.success = true    if sg.releasetime != 0 {        sg.releasetime = cputicks()    }  // 无论产生哪种状况,运行时都会调用 runtime.goready 将以后处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。    goready(gp, skip+1)}
  • 当缓冲区存在数据时,从 Channel 的缓冲区中接收数据

    if c.qcount > 0 {        // 间接从队列取数据        qp := chanbuf(c, c.recvx)        if raceenabled {            racenotify(c, c.recvx, nil)        }      // 放到指标内存        if ep != nil {            typedmemmove(c.elemtype, ep, qp)        }      // 清空队列中对应的元素        typedmemclr(c.elemtype, qp)      // 接管索引+1        c.recvx++        if c.recvx == c.dataqsiz {            c.recvx = 0        }      // 队列元素-1        c.qcount--        unlock(&c.lock)        return true, true    }
  • 当缓冲区中不存在数据时,期待其余 Goroutine 向 Channel 发送数据

    if !block {        unlock(&c.lock)        return false, false    }    // no sender available: block on this channel.    gp := getg()    mysg := acquireSudog()    mysg.releasetime = 0    if t0 != 0 {        mysg.releasetime = -1    }    // No stack splits between assigning elem and enqueuing mysg    // on gp.waiting where copystack can find it.    mysg.elem = ep    mysg.waitlink = nil    gp.waiting = mysg    mysg.g = gp    mysg.isSelect = false    mysg.c = c    gp.param = nil    c.recvq.enqueue(mysg)    // 阻塞期待,让出使用权    atomic.Store8(&gp.parkingOnChan, 1)    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)    // 唤醒之后清空sudog    if mysg != gp.waiting {        throw("G waiting list is corrupted")    }    gp.waiting = nil    gp.activeStackChans = false    if mysg.releasetime > 0 {        blockevent(mysg.releasetime-t0, 2)    }    success := mysg.success    gp.param = nil    mysg.c = nil    releaseSudog(mysg)    return true, success

敞开channel

  • 当 Channel 是一个空指针或者曾经被敞开时,Go 语言运行时都会间接解体并抛出异样

    func closechan(c *hchan) {    if c == nil {        panic(plainError("close of nil channel"))    }    lock(&c.lock)    if c.closed != 0 {        unlock(&c.lock)        panic(plainError("close of closed channel"))    }
  • recvqsendq 两个队列中的数据退出到 Goroutine 列表 gList 中,与此同时该函数会革除所有 runtime.sudog 上未被解决的元素

    c.closed = 1    var glist gList    // release all readers    for {        sg := c.recvq.dequeue()        if sg == nil {            break        }        if sg.elem != nil {            typedmemclr(c.elemtype, sg.elem)            sg.elem = nil        }        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }        gp := sg.g        gp.param = unsafe.Pointer(sg)        sg.success = false        if raceenabled {            raceacquireg(gp, c.raceaddr())        }        glist.push(gp)    }    // release all writers (they will panic)    for {        sg := c.sendq.dequeue()        if sg == nil {            break        }        sg.elem = nil        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }        gp := sg.g        gp.param = unsafe.Pointer(sg)        sg.success = false        if raceenabled {            raceacquireg(gp, c.raceaddr())        }        glist.push(gp)    }    unlock(&c.lock)    // 为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。    for !glist.empty() {        gp := glist.pop()        gp.schedlink = 0        goready(gp, 3)    }

应用场景

报错情景

  • 往一个敞开的channel发送数据会报错:panic: send on closed channel
  • 敞开一个nil的chan会报错:panic: close of nil channel
  • 敞开一个曾经敞开的channel报错:panic: close of closed channel

1、一个经典的算法题

有4个goroutine,编号为1、2、3、4。每秒钟会有一个goroutine打印出本人的编号,要求写一个程序,让输入的编号总是依照1、2、3、4、1、2、3、4...的程序打印进去

package mainimport (    "fmt"    "time")func main() {  // 4个channel    chs := make([]chan int, 4)    for i, _ := range chs {        chs[i] = make(chan int)    // 开4个协程        go func(i int) {            for {        // 获取以后channel值并打印                v := <-chs[i]                fmt.Println(v + 1)                time.Sleep(time.Second)        // 把下一个值写入下一个channel,期待下一次生产                chs[(i+1)%4] <- (v + 1) % 4            }        }(i)    }  // 往第一个塞入0    chs[0] <- 0    select {}}

2、限流器

package mainimport (    "fmt"    "time")func main() {    // 每次解决3个申请    chLimit := make(chan struct{}, 3)    for i := 0; i < 20; i++ {        chLimit <- struct{}{}        go func(i int) {            fmt.Println("上游服务解决逻辑...", i)            time.Sleep(time.Second * 3)            <-chLimit        }(i)    }    time.Sleep(30 * time.Second)}

如果感觉sleep太丑太暴力,能够用waitGroup管制完结机会

package mainimport (    "fmt"    "sync"    "time")var wg sync.WaitGroupfunc main() {    // 每次解决3个申请    chLimit := make(chan struct{}, 3)    for i := 0; i < 20; i++ {        chLimit <- struct{}{}        wg.Add(1)        go func(i int) {            fmt.Println("上游服务解决逻辑...", i)            time.Sleep(time.Second * 3)            <-chLimit            wg.Done()        }(i)    }    wg.Wait()}

3、优雅退出

package mainimport (    "fmt"    "log"    "os"    "os/signal"    "syscall"    "time")func main() {    var closing = make(chan struct{})    var closed = make(chan struct{})    go func() {        for {            select {            case <-closing:                return            default:                fmt.Println("业务逻辑...")                time.Sleep(1 * time.Second)            }        }    }()    termChan := make(chan os.Signal)  // 监听退出信号    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)    <-termChan  // 退出中    close(closing)    // 退出之前清理一下    go doCleanup(closed)    select {    case <-closed:    case <-time.After(time.Second):        log.Println("清理超时不等了")    }    log.Println("优雅退出")}func doCleanup(closed chan struct{}) {    time.Sleep(time.Minute)  // 清理完后退出    close(closed)}

4、实现互斥锁

初始化一个缓冲区为1的channel,放入元素代表一把锁,谁获取到这个元素就代表获取了这把锁,开释锁的时候再把这个元素放回channel

package mainimport (    "log"    "time")type Mutex struct {    ch chan struct{}}// 初始化锁func NewMutex() *Mutex {    mu := &Mutex{make(chan struct{}, 1)}    mu.ch <- struct{}{}    return mu}// 加锁,阻塞获取func (m *Mutex) Lock()  {    <- m.ch}// 开释锁func (m *Mutex) Unlock()  {    select {    // 胜利写入channel代表开释胜利    case m.ch <- struct{}{}:    default:        panic("unlock of unlocked mutex")    }}// 尝试获取锁func (m *Mutex) TryLock() bool {    select {    case <-m.ch:        return true    default:    }    return false}func (m *Mutex) LockTimeout(timeout time.Duration) bool {    timer := time.NewTimer(timeout)    select {    case <-m.ch:    // 胜利获取锁敞开定时器        timer.Stop()        return true    case <-timer.C:    }  // 获取锁超时    return false}// 是否上锁func (m *Mutex) IsLocked() bool {    return len(m.ch) == 0}func main()  {    m := NewMutex()    ok := m.TryLock()    log.Printf("locked v %v\n", ok)    ok = m.TryLock()    log.Printf("locked v %v\n", ok)    go func() {        time.Sleep(5*time.Second)        m.Unlock()    }()    ok = m.LockTimeout(10*time.Second)    log.Printf("LockTimeout v %v\n", ok)}

参考:

极刻工夫《go 并发编程实战》