乐趣区

关于go:从鹅厂实例出发分析Go-Channel底层原理

本文是基于 Go1.18.1 源码的学习笔记。Channel 的底层源码从 Go1.14 到当初的 Go1.19 之间简直没有变动,这也是 Go 最早引入的组件之一,体现了 Go 并发思维:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通过共享内存来通信,⽽应通过通信来共享内存。

论断

还是先给出论断,没工夫看剖析过程的同学至多能够看一眼论断:

1. Channel 实质上是由三个 FIFO(First In FirstOut,先进先出)队列组成的用于协程之间传输数据的协程平安的通道;FIFO 的设计是为了保障偏心,让事件变得简略,准则是让等待时间最长的协程最有资格先从 channel 发送或接收数据;

2. 三个 FIFO 队列顺次是 buf 循环队列,sendq 待发送者队列,recvq 待接收者队列。buf 循环队列是大小固定的用来寄存 channel 接管的数据的队列;sendq 待发送者队列,用来寄存期待发送数据到 channel 的 goroutine 的双向链表,recvq 待接收者队列,用来寄存期待从 channel 读取数据的 goroutine 的双向链表;sendq 和 recvq 能够认为不限大小;

3. 跟函数调用传参实质都是传值一样,channel 传递数据的实质就是值拷贝,援用类型数据的传递也是地址拷贝;有从缓冲区 buf 地址拷贝数据到接收者 receiver 栈内存地址,也有从发送者 sender 栈内存地址拷贝数据到缓冲区 buf;

4. Channel 外面参数的批改不是并发平安的,包含对三个队列及其他参数的拜访,因而须要加锁,实质上,channel 就是一个有锁队列;

5. Channel 的性能跟 sync.Mutex 差不多,没有谁比谁强。Go 官网之所以举荐应用 Channel 进行并发协程的数据交互,是因为 channel 的设计理念能让程序变得简略,在大型程序、高并发简单的运行状况中也是如此。

从一个线上的内存透露问题谈起

去年底,团队有个线上服务产生了一个故障,该服务部署在 K8S 集群的容器里,通过 Prometheus 监控界面看到本服务的 Pod 的内存使用量呈锯齿状增长,达到服务设置的内存下限 16G 后,就会产生容器重启,看景象是产生了内存透露。

线上服务的代码通过简化,根本逻辑如下:

package main

import (
  "errors"
  "fmt"
)

func accessMultiService() (data string, err error) {respAChan := make(chan string)           // 无缓冲 channel
  go func() {serviceAResp, _ := accessServiceA()
    respAChan <- serviceAResp
  }()

  _, serviceBErr := accessServiceB()
  if serviceBErr != nil {return "", errors.New("service B response error")
  }

  _, serviceCErr := accessServiceC()
  if serviceCErr != nil {return "", errors.New("service C response error")
  }

  respA := <- respAChan
  fmt.Printf("service A resp is: %s\n", respA)
  return  "success", nil
}

func accessServiceA() (string, error) {return "service A result", nil}

func accessServiceB() (string, error) {return "service B result", errors.New("service B error")
}

func accessServiceC() (string, error) {return "service C result", nil}

通过排查,是在起的一个 goroutine 拜访 A 服务时,应用了一个无缓冲的 channel  respAChan,在后续的拜访服务 B,C 时,产生了异样导致父协程返回,A 服务的子协程里的无缓冲 channel respAChan 始终没有 goroutine 去读它,导致它始终被阻塞,无奈被开释,随着申请数的增多,它所在的 goroutine 会始终占用内存,直到达到容器内存下限,使容器解体重启。

解决办法能够是:将无缓冲的 channel 改成有缓冲 channel,并且在写入数据后敞开它,这样就不会产生 goroutine 始终阻塞,无奈被开释的问题了。

        respAChan := make(chan string, 1)           // 改为有缓冲 channel
  go func() {serviceAResp, _ := accessServiceA()
    respAChan <- serviceAResp
    close(respAChan)                   // 写入后敞开 channel
  }()

从这个问题能够晓得只管大家都用过 channel,却也容易因使用不当而导致线上故障。

Channel 是什么?怎么用?

首先是 channel 分为两类:

1. 无缓冲 channel,能够看作“同步模式”,发送方和接管方要同步就绪,只有在两者都 ready 的状况下,数据能力在两者间传输(前面会看到,实际上就是内存拷贝)。否则,任意一方后行进行发送或接管操作,都会被挂起,期待另一方的呈现能力被唤醒。

2. 有缓冲 channel 称为“异步模式”,在缓冲槽可用的状况下(有残余容量),发送和接管操作都能够顺利进行。否则,操作的一方(如写入)同样会被挂起,直到呈现相同操作(如接管)才会被唤醒。

channel 的根本用法有:

1. 读取 <- chan

2. 写入 chan <-

3. 敞开 close(chan)

4. 获取 channel 长度 len(chan)

5. 获取 channel 容量 cap(chan)

还有一种 select 非阻塞拜访形式,从所有的 case 中筛选一个不会阻塞的 channel 进行读写操作,或是 default 执行。

Channel 设计思维

Go 语言的并发模型是 CSP(Communicating Sequential Processes,通信顺序进程),提倡通过通信共享内存而不是通过共享内存而实现通信。

如果说 goroutine 是 Go 程序并发的执行体,channel 就是它们之间的连贯。channel 是能够让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。

上面无关并发探讨中的线程能够替换为过程、协程或函数,实质上都是同时对同一份数据的竞争。

先弄清楚并发和并行的区别:多线程程序在一个核的 CPU 上运行,就是并发。多线程程序在多个核的 CPU 上运行,就是并行。

单纯地将线程并发执行是没有意义的。线程与线程间须要替换数据能力体现并发执行线程的意义。

多个线程之间替换数据无非是两种形式:共享内存加互斥锁;先进先出 (FIFO) 将资源分配给等待时间最长的线程。

共享内存加互斥锁是 C ++ 等其余语言采纳的并发线程替换数据的形式,在高并发的场景下有时候难以正确的应用,特地是在超大型、巨型的程序中,容易带来难以觉察的暗藏的问题。Go 语言采纳的是后者,引入 channel 以先进先出 (FIFO) 将资源分配给等待时间最长的 goroutine,尽量打消数据竞争,让程序以尽可能程序统一的形式运行。

对于了解让程序尽量程序统一的含意,能够看看 Go 语言内存模型采纳的一个传统的基于 happens-before 对读写竞争的定义:

1. 批改由多个 goroutines 同时拜访的数据的程序必须串行化这些拜访。

2. 为了实现串行拜访, 须要应用 channel 操作或其余同步原语 (如 sync 和 sync/atomic 包中的原语) 来爱护数据。

3.go 语句创立一个 goroutine,肯定产生在 goroutine 执行之前。

4. 往一个 channel 中发送数据,肯定产生在从这个 channel 读取这个数据实现之前。

5. 一个 channel 的敞开,肯定产生在从这个 channel 读取到零值数据(这里指因为 close 而返回的零值数据)之前。

6. 从一个无缓冲 channel 的读取数据,肯定产生在往这个 channel 发送数据实现之前。

如果违反了这种定义,Go 会让程序间接 panic 或阻塞,无奈往后执行。

有人说,Go 没有采纳共享内存加互斥锁进行协程之间的通信,是因为这种形式性能太差,其实不是,因为 channel 实质也是一个有锁的队列,采纳 channel 进行协程之间的通信,次要是为了缩小数据竞争,在大型程序、高并发的简单场景下,以简略的原理实现的组件,更能让程序尽量按合乎预期的、不易出错的形式执行。

Go 中用于并发协程同步数据的组件次要分为 2 大类,一个是 sync 和 sync/atomic 包外面的,如 sync.Mutex、sync.RWMutex、sync.WaitGroup 等,另一个是 channel。只有 channel 才是 Go 语言举荐的并发同步的形式,是一等公民,用户应用 channel 甚至不须要引入包名。

Channel 构造

channel 的底层数据结构是 hchan,在 src/runtime/chan.go 中。

type hchan struct {
  qcount   uint                 // 队列中所有数据总数
  dataqsiz uint                 // 循环队列大小
  buf      unsafe.Pointer       // 指向循环队列的指针
  elemsize uint16               // 循环队列中元素的大小
  closed   uint32               // chan 是否敞开的标识
  elemtype *_type               // 循环队列中元素的类型
  sendx    uint                 // 已发送元素在循环队列中的地位
  recvx    uint                 // 已接管元素在循环队列中的地位
  recvq    waitq                // 期待接管的 goroutine 的期待队列
  sendq    waitq                // 期待发送的 goroutine 的期待队列
  lock mutex                    // 管制 chan 并发拜访的互斥锁
}

qcount 代表 chan 中曾经接管但还没被读取的元素的个数;

dataqsiz 代表循环队列的大小;

buf 是指向循环队列的指针,循环队列是大小固定的用来寄存 chan 接管的数据的队列;

elemtype 和 elemsiz 示意循环队列中元素的类型和元素的大小;

sendx:待发送的数据在循环队列 buffer 中的地位索引;

recvx:待接管的数据在循环队列 buffer 中的地位索引;

recvq 和 sendq 别离示意期待接收数据的 goroutine 与期待发送数据的 goroutine;

sendq 和 recvq 存储了以后 Channel 因为缓冲区空间有余而阻塞的 Goroutine 列表,这些期待队列应用双向链表 waitq 示意,链表中所有的元素都是 sudog 构造:

type waitq struct {
  first *sudog
  last  *sudog
}

sudog 代表着期待队列中的一个 goroutine,G 与同步对象(指 chan)关系是多对多的。一个 G 能够呈现在许多期待队列上,因而一个 G 可能有多个 sudog。并且多个 G 可能正在期待同一个同步对象,因而一个对象可能有许多 sudog。sudog 是从非凡池中调配进去的。应用 acquireSudog 和 releaseSudog 调配和开释它们。

创立 Chan

Channel 的创立会应用 make 关键字:

ch := make(chan int, 10)

编译器编译上述代码,在查看 ir 节点时,依据节点 op 不同类型,进行不同的查看,源码如下:

func walkExpr1(n ir.Node, init *ir.Nodes) ir.Node {switch n.Op() {
  ......
  case ir.OMAKECHAN:
    n := n.(*ir.MakeExpr)
    return walkMakeChan(n, init)
  ......
}

编译器会将 make(chan int, 10) 表达式转换成 OMAKE 类型的节点,并在类型查看阶段将 OMAKE 类型的节点转换成 OMAKECHAN 类型,该类型节点会调用 walkMakeChan 函数解决:

func walkMakeChan(n *ir.MakeExpr, init *ir.Nodes) ir.Node {
  size := n.Len
  fnname := "makechan64"
  argtype := types.Types[types.TINT64]

  if size.Type().IsKind(types.TIDEAL) || size.Type().Size() <= types.Types[types.TUINT].Size() {
    fnname = "makechan"
    argtype = types.Types[types.TINT]
  }

  return mkcall1(chanfn(fnname, 1, n.Type()), n.Type(), init, reflectdata.TypePtr(n.Type()), typecheck.Conv(size, argtype))
}

上述代码默认调用 makechan64()函数。如果在 make 函数中传入的 channel size 大小在 int 范畴内,举荐应用 makechan()。因为 makechan() 在 32 位的平台上更快,用的内存更少。

makechan64() 办法在 src/runtime/chan.go,只是判断一下传入的入参 size 是否还在 int 范畴之内:

func makechan64(t *chantype, size int64) *hchan {if int64(int(size)) != size {panic(plainError("makechan: size out of range"))
  }

  return makechan(t, int(size))
}

最终创立 channel 调用的还是 runtime.makechan() 函数:

func makechan(t *chantype, size int) *hchan {
  elem := t.elem

  // 查看数据项大小不能超过 64KB
  if elem.size >= 1<<16 {throw("makechan: invalid channel element type")
  }
        // 查看内存对齐是否正确
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")
  }
        // 缓冲区大小查看,判断是否溢出
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))
  }

  var c *hchan
  switch {
  case mem == 0:
    // 队列或者元素大小为 zero 时,毋庸创立 buf 环形队列.
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    // 竞态查看,利用这个地址进行同步操作.
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:
    // 元素不是指针,调配一块间断的内存给 hchan 数据结构和缓冲区 buf
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
                // 示意 hchan 前面在内存里紧跟着就是 buf 环形队列
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // 元素蕴含指针,独自调配环形队列 buf
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

        // 设置元素个数、元素类型给创立的 chan
  c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)
  lockInit(&c.lock, lockRankHchan)

  if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
  }
  return c
}

下面这段 makechan() 代码次要目标是生成 *hchan 对象。重点关注 switch-case 中的 3 种状况:

1. 当队列或者元素大小为 0 时,调用 mallocgc() 在堆上为 channel 开拓一段大小为 hchanSize 的内存空间;

2. 当元素类型不是指针类型时,调用 mallocgc() 在堆上为 channel 和底层 buf 缓冲区数组开拓一段大小为 hchanSize + mem 间断的内存空间;

3. 默认状况元素类型中有指针类型,调用 mallocgc() 在堆上别离为 channel 和 buf 缓冲区分配内存。

这里须要解释下:当存储在 buf 中的元素不蕴含指针时,hchan 中也不蕴含 GC 关怀的指针。buf 指向一段雷同元素类型的内存,elemtype 固定不变。受到垃圾回收器的限度,指针类型的缓冲 buf 须要独自分配内存。

channel 自身是援用类型,其创立全副调用的是 mallocgc(),在堆上开拓的内存空间,阐明 channel 自身会被 GC 主动回收。

发送数据

向 channel 中发送数据应用 ch <- 1 代码,编译器在编译它时,会把它解析成 OSEND 节点:

func walkExpr1(n ir.Node, init *ir.Nodes) ir.Node {switch n.Op() {
  ......
  case ir.OSEND:
    n := n.(*ir.SendStmt)
    return walkSend(n, init)
  ......
}

对 OSEND 节点会调用 walkSend()函数解决:

func walkSend(n *ir.SendStmt, init *ir.Nodes) ir.Node {
  n1 := n.Value
  n1 = typecheck.AssignConv(n1, n.Chan.Type().Elem(), "chan send")
  n1 = walkExpr(n1, init)
  n1 = typecheck.NodAddr(n1)
  return mkcall1(chanfn("chansend1", 2, n.Chan.Type()), nil, init, n.Chan, n1)
}

运行时的 chansend1()函数理论调用的是 chansend():

func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}

chansend()函数的次要逻辑是:

1. 在 chan 为 nil 未初始化的状况下,对于 select 这种非阻塞的发送,间接返回 false;对于阻塞的发送,将 goroutine 挂起,并且永远不会返回。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        // 如果 chan 为 nil
  if c == nil {
                // 对于 select 这种非阻塞的发送, 间接返回
    if !block {return false}
                // 对于阻塞的通道,将 goroutine 挂起
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
        ......
}

2. 非阻塞发送的状况下,当 channel 不为 nil,并且 channel 没有敞开时,如果没有缓冲区且没有接收者 receiver,或者缓冲区曾经满了,返回 false。

        if !block && c.closed == 0 && full(c) {return false}

full() 办法作用是判断在 channel 上发送是否会阻塞,用来判断的参数是 qcount,c.recvq.first,dataqsiz,前两个变量都是单字长的,所以对它们单个值的读操作是原子性的。dataqsiz 字段,它在创立完 channel 当前是不可变的,因而它能够平安的在任意时刻读取。

func full(c *hchan) bool {
  // 如果循环队列大小为 0
  if c.dataqsiz == 0 {
    // 假如指针读取是近似原子性的,这里用来判断没有接收者
    return c.recvq.first == nil
  }
  // 队列满了
  return c.qcount == c.dataqsiz
}

3. 接下来,对 chan 加锁,判断 chan 不是敞开状态,再从 recvq 队列中取出一个接收者,如果接收者存在,则间接向它发送音讯,绕过循环队列 buf,此时,因为有接收者存在,则循环队列 buf 肯定是空的。

        ......
        // 对 chan 加锁
        lock(&c.lock)

        // 查看 chan 是否敞开
  if c.closed != 0 {unlock(&c.lock)
    panic(plainError("send on closed channel"))
  }

        // 从 recvq 中取出一个接收者
  if sg := c.recvq.dequeue(); sg != nil {
    // 如果接收者存在,间接向该接收者发送数据,绕过循环队列 buf
    send(c, sg, ep, func() {unlock(&c.lock) }, 3)
    return true
  }
        ......

send() 函数次要实现了 2 件事:调用 sendDirect() 函数将数据拷贝到了接收者的内存地址上;调用 goready() 将期待接管的阻塞 goroutine 的状态从 Gwaiting 或者 Gscanwaiting 扭转成 Grunnable。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ......
  if sg.elem != nil {
                // 间接把要发送的数据拷贝到 receiver 的内存地址
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  if sg.releasetime != 0 {sg.releasetime = cputicks()
  }
        // 唤醒期待的接收者 goroutine
  goready(gp, skip+1)
}

4. 回到 chansend()办法,接下来是有缓冲区的异步发送的逻辑。

// 如果缓冲区没有满,间接将要发送的数据复制到缓冲区
        if c.qcount < c.dataqsiz {
    // 找到要发送数据到循环队列 buf 的索引地位
    qp := chanbuf(c, c.sendx)
    ......
                // 数据拷贝到循环队列中
    typedmemmove(c.elemtype, qp, ep)
                // 将待发送数据索引加 1,因为是循环队列,如果到了开端,从 0 开始
    c.sendx++
    if c.sendx == c.dataqsiz {c.sendx = 0}
                // chan 中元素个数加 1,开释锁返回 true
    c.qcount++
    unlock(&c.lock)
    return true
  }

如果缓冲区 buf 还没有满,调用 chanbuf() 获取 sendx 索引的元素指针值。调用 typedmemmove() 办法将发送的值拷贝到缓冲区 buf 中。拷贝实现,减少 sendx 索引下标值和 qcount 个数。

5. 如果执行后面的步骤还没有胜利发送,就示意缓冲区没有空间了,而且也没有任何接收者在期待。前面须要将 goroutine 挂起而后期待新的接收者了。

   // 缓冲区没有空间,对于 select 这种非阻塞调用间接返回 false
        if !block {unlock(&c.lock)
    return false
  }

  // 上面的逻辑是将以后 goroutine 挂起
        // 调用 getg()办法获取以后 goroutine 的指针,用于绑定给一个 sudog
  gp := getg()
        // 调用 acquireSudog()办法获取一个 sudog,可能是新建的 sudog,也有可能是从缓存中获取的。设置好 sudog 要发送的数据和状态。比方发送的 Channel、是否在 select 中和待发送数据的内存地址等等。mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {mysg.releasetime = -1}

  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil
        // 调用 c.sendq.enqueue 办法将配置好的 sudog 退出待发送的期待队列
  c.sendq.enqueue(mysg)
  atomic.Store8(&gp.parkingOnChan, 1)
        // 调用 gopark 办法挂起以后 goroutine,状态为 waitReasonChanSend,阻塞期待 channel 接收者的激活
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  // 最初,KeepAlive() 确保发送的值放弃活动状态,直到接收者将其复制进去
  KeepAlive(ep)

6.chansend()办法最初的代码是当 goroutine 唤醒当前,解除阻塞的状态。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        ......
  if mysg != gp.waiting {throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  closed := !mysg.success
  gp.param = nil
  if mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)
  }
  mysg.c = nil
  releaseSudog(mysg)
  if closed {
    if c.closed == 0 {throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
  return true
}

综上所述:

1. 首先 select 这种非阻塞的发送,判断两种状况;

2. 而后是个别的阻塞调用,先判断 recvq 期待接管队列是否为空,不为空阐明缓冲区中没有内容或者是一个无缓冲 channel;

3. 如果 recvq 有接收者,则缓冲区肯定为空,间接从 recvq 中取出一个 goroutine,而后写入数据,接着唤醒 goroutine,完结发送过程;

4. 如果缓冲区有空余的地位,写入数据到缓冲区,实现发送;

5. 如果缓冲区满了,那么就把发送数据的 goroutine 放到 sendq 中,进入睡眠,期待该 goroutine 被唤醒。

接收数据

从 channel 中接收数据的代码是:

i <- ch
i, ok <- ch

通过编译器的解决,会解析成 ORECV 节点,后者会在类型查看阶段被转换成 OAS2RECV 类型。最终,这两种不同的 channel 接管形式会转换成 runtime.chanrecv1 和 runtime.chanrecv2 两种不同函数的调用,然而最终外围逻辑还是在 runtime.chanrecv 中。

上面间接看 chanrecv()办法的逻辑:

1.chanrecv()办法有两个返回值,selected, received bool,前者示意是否接管到值,后者示意接管的值是否敞开后发送的。有三种状况:如果是非阻塞的状况,没有数据能够接管,则返回 (false,flase);如果 chan 曾经敞开了,将 ep 指向的值置为 0 值,并且返回 (true, false);其它状况返回值为 (true,true),示意胜利从 chan 中获取到了数据,且是 chan 未敞开发送。

// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {......}

2. 首先判断如果 chan 为空,且是 select 这种非阻塞调用,那么间接返回 (false,false),否则阻塞以后的 goroutine。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ......
        // 如果 c 为空
  if c == nil {// 如果 c 为空且是非阻塞调用,那么间接返回 (false,false)
    if !block {return}
                // 阻塞以后的 goroutine
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
        ......
}

3. 如果是非阻塞调用,通过 empty()办法原子判断是无缓冲 chan 或者是 chan 中没有数据且 chan 没有敞开,则返回(false,false),如果 chan 敞开,为了避免查看期间的状态变动,二次调用 empty()进行原子查看,如果是无缓冲 chan 或者是 chan 中没有数据,返回 (true, false),这里的第一个 true 示意 chan 敞开后读取的 0 值。

        // 非阻塞调用,通过 empty()判断是无缓冲 chan 或者是 chan 中没有数据
  if !block && empty(c) {// 如果 chan 没有敞开,则间接返回 (false, false)
    if atomic.Load(&c.closed) == 0 {return}

                // 如果 chan 敞开, 为了避免查看期间的状态变动,二次调用 empty()进行原子查看,如果是无缓冲 chan 或者是 chan 中没有数据,返回 (true, false)
    if empty(c) {
      if raceenabled {raceacquire(c.raceaddr())
      }
      if ep != nil {typedmemclr(c.elemtype, ep)
      }
      return true, false
    }
  }
func empty(c *hchan) bool {
  // c.dataqsiz 是不可变的
  if c.dataqsiz == 0 {return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
  }
  return atomic.Loaduint(&c.qcount) == 0
}

4. 接下来阻塞调用的逻辑,chanrecv 办法对 chan 加锁,判断 chan 如果曾经敞开,并且 chan 中没有数据,返回 (true,false),这里的第一个 true 示意 chan 敞开后读取的 0 值。

        ......
        // 对 chan 加锁
        lock(&c.lock)
        // 如果曾经敞开,并且 chan 中没有数据,返回 (true,false)
  if c.closed != 0 && c.qcount == 0 {
    if raceenabled {raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {typedmemclr(c.elemtype, ep)
    }
    return true, false
  }
        ......

5. 接下来,从发送队列中获取一个期待发送的 goroutine,即取出期待队列队头的 goroutine。如果缓冲区的大小为 0,则间接从发送方接管值。否则,对应缓冲区满的状况,从队列的头部接收数据,发送者的值增加到队列的开端(此时队列已满,因而两者都映射到缓冲区中的同一个下标)。这里须要留神,因为有发送者在期待,所以如果有缓冲区,那么缓冲区肯定是满的。

       ......
       // 从发送者队列获取期待发送的 goroutine  
       if sg := c.sendq.dequeue(); sg != nil {
    // 在 channel 的发送队列中找到了期待发送的 goroutine,取出队头期待的 goroutine。如果缓冲区的大小为 0,则间接从发送方接管值。否则,对应缓冲区满的状况,从队列的头部接收数据,发送者的值增加到队列的开端(此时队列已满,因而两者都映射到缓冲区中的同一个下标)recv(c, sg, ep, func() {unlock(&c.lock) }, 3)
    return true, true
  }
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if raceenabled {racesync(c, sg)
    }
    if ep != nil {
      // 从发送者 sender 外面拷贝数据
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    // 队列是满的
    qp := chanbuf(c, c.recvx)
    if raceenabled {racenotify(c, c.recvx, nil)
      racenotify(c, c.recvx, sg)
    }
    // 从缓冲区拷贝数据给接收者 receiver
    if ep != nil {typedmemmove(c.elemtype, ep, qp)
    }
    // 从发送者 sender 拷贝数据到缓冲区
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {c.recvx = 0}
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  if sg.releasetime != 0 {sg.releasetime = cputicks()
  }
        // 唤醒发送者
  goready(gp, skip+1)
}

recv()办法先判断 chan 是否无缓冲,如果是,则间接从发送者 sender 那里拷贝数据,如果有缓存区,因为有发送者,此时缓冲区的循环队列肯定是满的,会先从缓冲区拷贝数据给接收者 receiver,而后将发送者的数据拷贝到缓冲区,满足 FIFO。最初,唤醒发送者的 goroutine。

6. 接下来,是异步接管逻辑,如果缓冲区有数据,间接从缓冲区接收数据,行将缓冲区 recvx 指向的数据复制到 ep 接管地址,并且将 recvx 加 1。

   ......
         // 如果缓冲区有数据
         if c.qcount > 0 {
    // 间接从缓冲区接收数据
    qp := chanbuf(c, c.recvx)
    if raceenabled {racenotify(c, c.recvx, nil)
    }
                // 接收数据地址 ep 不为空,间接从缓冲区复制数据到 ep
    if ep != nil {typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
                // 待接管索引加 1
    c.recvx++
                // 循环队列,如果到了开端,从 0 开始
    if c.recvx == c.dataqsiz {c.recvx = 0}
                // 缓冲区数据减 1
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
        ......

7. 而后,是缓冲区没有数据的状况;如果是 select 这种非阻塞读取的状况,间接返回 (false, false),示意获取不到数据;否则,会获取 sudog 绑定以后接收者 goroutine,调用 gopark() 挂起以后接收者 goroutine,期待 chan 的其余发送者唤醒。

        ......
        // 如果是 select 非阻塞读取的状况,间接返回(false, false)
        if !block {unlock(&c.lock)
    return false, false
  }

  // 没有发送者,挂起以后 goroutine
        // 获取以后 goroutine 的指针,用于绑定给一个 sudog
  gp := getg()
        // 调用 acquireSudog() 办法获取一个 sudog,可能是新建的 sudog,也有可能是从缓存中获取的。设置好 sudog 要发送的数据和状态
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {mysg.releasetime = -1}

  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil
        // 将配置好的 sudog 退出待发送的期待队列
  c.recvq.enqueue(mysg)
  atomic.Store8(&gp.parkingOnChan, 1)
        // 挂起以后 goroutine
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
        ......

8. 最初,以后 goroutine 被唤醒,实现 chan 数据的接管,之后进行参数查看,解除 chan 绑定,并开释 sudog。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ......
        // 以后 goroutine 被唤醒,实现 chan 数据的接管,之后进行参数查看,解除 chan 绑定,并开释 sudog
  if mysg != gp.waiting {throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  if mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)
  }
  success := mysg.success
  gp.param = nil
  mysg.c = nil
  releaseSudog(mysg)
  return true, success
}

综上剖析,从 chan 接收数据的流程如下:

1. 也是先判断 select 这种非阻塞接管的两种状况(block 为 false);而后是加锁进行阻塞调用的逻辑;

2. 同步接管:如果发送者队列 sendq 不为空,且没有缓存区,间接从 sendq 中取出一个 goroutine,读取以后 goroutine 中的音讯,唤醒 goroutine, 完结读取的过程;

3. 同步接管:如果发送者队列 sendq 不为空,阐明缓冲区曾经满了,挪动 recvx 指针的地位,取出一个数据,同时在 sendq 中取出一个 goroutine,拷贝外面的数据到 buf 中,完结以后读取;

4. 异步接管:如果发送者队列 sendq 为空,且缓冲区有数据,间接在缓冲区取出数据,实现本次读取;

5. 阻塞接管:如果发送者队列 sendq 为空,且缓冲区没有数据。将以后 goroutine 退出 recvq,进入睡眠,期待被发送者 goroutine 唤醒。

敞开 Chan

敞开 chan 的代码是 close(ch),编译器会将其转为调用 runtime.closechan() 办法。

func closechan(c *hchan) {
        // 如果 chan 为空,此时敞开它会 panic
  if c == nil {panic(plainError("close of nil channel"))
  }

        // 加锁
  lock(&c.lock)
        // 如果 chan 曾经敞开了,再次敞开它会 panic
  if c.closed != 0 {unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }

  if raceenabled {callerpc := getcallerpc()
    racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
    racerelease(c.raceaddr())
  }
        // 设置 chan 的 closed 状态为敞开
  c.closed = 1
        // 申明一个寄存所有接收者和发送者 goroutine 的 list
  var glist gList

  // 获取 recvq 里的所有接收者
  for {sg := c.recvq.dequeue()
    if sg == nil {break}
    if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = unsafe.Pointer(sg)
    sg.success = false
    if raceenabled {raceacquireg(gp, c.raceaddr())
    }
                // 放入队列 glist 中
    glist.push(gp)
  }

  // 获取所有发送者
  for {sg := c.sendq.dequeue()
    if sg == nil {break}
    sg.elem = nil
    if sg.releasetime != 0 {sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = unsafe.Pointer(sg)
    sg.success = false
    if raceenabled {raceacquireg(gp, c.raceaddr())
    }
                // 放入队列 glist 中
    glist.push(gp)
  }
  unlock(&c.lock)

  // 唤醒所有的 glist 中的 goroutine 
  for !glist.empty() {gp := glist.pop()
    gp.schedlink = 0
    goready(gp, 3)
  }
}

敞开 chan 的步骤是:

1. 先查看异常情况,当 Channel 是一个 nil 空指针或者敞开一个曾经敞开的 channel 时,Go 语言运行时都会间接 panic。

2. 敞开的次要工作是开释所有的接收者和发送者:将所有的接收者 readers 的 sudog 期待队列(recvq)退出到待革除队列 glist 中。留神这里是先回收接收者,因为从一个敞开的 channel 中读数据,不会产生 panic,顶多读到一个默认零值。再回收发送者 senders,将发送者的期待队列 sendq 中的 sudog 放入待革除队列 glist 中。留神这里可能会产生 panic,因为往一个敞开的 channel 中发送数据,会产生 panic。

总结

Channel 是基于有锁队列实现数据在不同协程之间传输的通道,数据传输的形式其实就是值传递,援用类型数据的传递是地址拷贝。

有别于通过共享内存加锁的形式在协程之间传输数据,通过 channel 传递数据,这些数据的所有权也能够在 goroutine 之间传输。当 goroutine 向 channel 发送值时,咱们能够看到 goroutine 开释了一些值的所有权。当一个 goroutine 从一个 channel 接管到一个值时,能够看到 goroutine 取得了一些值的所有权。

channel 常见的读写异常情况如下表所示:

腾讯工程师技术干货中转:

  1. 快珍藏!最全 GO 语言实现设计模式【下】

  2. 如何成为优良工程师之软技能篇

  3. 如何更好地应用 Kafka?

  4. 一文带你深刻理解 HTTP

退出移动版