在Go中,要了解channel,首先须要意识goroutine。

一、为什么会有goroutine

古代操作系统中为咱们提供了三种根本的结构并发程序的办法:多过程、I/O多路复用和多线程。其中最简略的结构形式当属多过程,然而多过程的并发程序,因为对过程管制和过程间通信开销微小,这样的并发形式往往会很慢。

因而,操作系统提供了更小粒度的运行单元:线程(确切叫法是内核线程)。它是一种运行在过程上下文中的逻辑流,线程之间通过操作系统来调度,其调度模型如下图所示。

多线程的并发形式,相较于多过程而言要快得多。然而因为线程上下文切换总是不可避免的陷入内核态,它的开销仍然较大。那么有没有不用陷入内核态的运行载体呢?有,用户级线程。 用户级线程的切换由用户程序本人管制,不须要内核干预,因而少了进出内核态的耗费。

这里的用户级线程就是协程(coroutine),它们的切换由运行时零碎来对立调度治理,内核并不知道它的存在。协程是形象于内核线程之上的对象,一个内核线程能够对应多个协程。但最终的零碎调用依然须要内核线程来实现。留神,线程的调度是操作系统来治理,是一种抢占式调度。而协程不同,协程之间须要单干,会被动交出执行权,是一种合作式调度,这也是为何被称为协程的起因。

Go天生在语言层面反对了协程,即咱们常说的goroutine。Go的runtime零碎实现的是一种M:N调度模型,通过GMP对象来形容,其中G代表的就是协程,M是线程,P是调度上下文。在Go程序中,一个goroutine就代表着一个最小用户代码执行流,它们也是并发流的最小单元。

二、channel的存在定位

从内存的角度而言,并发模型只分两种:基于共享内存和基于音讯通信(内存拷贝)。在Go中,两种并发模型的同步原语均有提供:sync.\和atomic.\代表的就是基于共享内存;channel代表的就是基于音讯通信。而Go提倡后者,它包含三大元素:goroutine(执行体),channel(通信),select(协调)。

Do not communicate by sharing memory; instead, share memory by communicating.

在Go中通过goroutine+channel的形式,能够简略、高效地解决并发问题,channel就是goroutine之间的数据桥梁。

Concurrency is the key to designing high performance network services. Go's concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution.

以下是一个简略的channel应用示例代码。

func goroutineA(ch <-chan int)  {    fmt.Println("[goroutineA] want a data")    val := <- ch    fmt.Println("[goroutineA] received the data", val)}func goroutineB(ch chan<- int)  {    time.Sleep(time.Second*1)    ch <- 1    fmt.Println("[goroutineB] send the data 1")}func main() {    ch := make(chan int, 1)    go goroutineA(ch)    go goroutineB(ch)    time.Sleep(2*time.Second)}

上述过程趣解图如下

三、channel源码解析

channel源码位于src/go/runtime/chan.go。本章内容分为两局部:channel内部结构和channel操作。

3.1 channel内部结构

ch := make(chan int,2)

对于以上channel的申明语句,咱们能够在程序中退出断点,失去ch的信息如下。

很好,看起来十分的清晰。然而,这些信息代表的是什么含意呢?接下来,咱们先看几个重要的构造体。

  • hchan

当咱们通过make(chan Type, size)生成channel时,在runtime零碎中,生成的是一个hchan构造体对象。源码位于src/runtime/chan.go

type hchan struct {    qcount   uint           // 循环队列中数据数    dataqsiz uint           // 循环队列的大小    buf      unsafe.Pointer // 指向大小为dataqsize的蕴含数据元素的数组指针    elemsize uint16         // 数据元素的大小    closed   uint32         // 代表channel是否敞开       elemtype *_type         // _type代表Go的类型零碎,elemtype代表channel中的元素类型    sendx    uint           // 发送索引号,初始值为0    recvx    uint           // 接管索引号,初始值为0  recvq    waitq          // 接管期待队列,存储试图从channel接收数据(<-ch)的阻塞goroutines    sendq    waitq          // 发送期待队列,存储试图发送数据(ch<-)到channel的阻塞goroutines    lock mutex              // 加锁能爱护hchan的所有字段,包含waitq中sudoq对象}
  • waitq

waitq用于表白处于阻塞状态的goroutines链表信息,first指向链头goroutine,last指向链尾goroutine

type waitq struct {    first *sudog               last  *sudog}
  • sudug

sudog代表的就是一个处于期待列表中的goroutine对象,源码位于src/runtime/runtime2.go

type sudog struct {    g *g    next *sudog    prev *sudog    elem unsafe.Pointer // data element (may point to stack)    c        *hchan // channel  ...}

为了更好了解hchan构造体,咱们将通过以下代码来了解hchan中的字段含意。

package mainimport "time"func goroutineA(ch chan int) {    ch <- 100}func goroutineB(ch chan int) {    ch <- 200}func goroutineC(ch chan int) {    ch <- 300}func goroutineD(ch chan int) {    ch <- 300}func main() {    ch := make(chan int, 4)    for i := 0; i < 4; i++ {        ch <- i * 10    }    go goroutineA(ch)    go goroutineB(ch)    go goroutineC(ch)    go goroutineD(ch)    // 第一个sleep是为了给上足够的工夫让所有goroutine都已启动    time.Sleep(time.Millisecond * 500)    time.Sleep(time.Second)}

关上代码调试性能,将程序运行至断点time.Sleep(time.Second)处,此时失去的chan信息如下。

在该channel中,通过make(chan int, 4)定义的channel大小为4,即dataqsiz的值为4。同时因为循环队列中曾经增加了4个元素,所以qcount值也为4。此时,有4个goroutine(A-D)想发送数据给channel,然而因为存放数据的循环队列已满,所以只能进入发送期待列表,即sendq。同时要留神到,此时的发送和接管索引值均为0,即下一次接收数据的goroutine会从循环队列的第一个元素拿,发送数据的goroutine会发送到循环队列的第一个地位。

上述hchan构造可视化图解如下

3.2 channel操作

将channel操作分为四局部:创立、发送、接管和敞开。

创立

本文的参考Go版本为1.15.2。其channel的创立实现代码位于src/go/runtime/chan.go的makechan办法。

func makechan(t *chantype, size int) *hchan {    elem := t.elem  // 发送元素大小限度    if elem.size >= 1<<16 {        throw("makechan: invalid channel element type")    }  // 对齐查看    if hchanSize%maxAlign != 0 || elem.align > maxAlign {        throw("makechan: bad alignment")    }  // 判断是否会内存溢出    mem, overflow := math.MulUintptr(elem.size, uintptr(size))    if overflow || mem > maxAlloc-hchanSize || size < 0 {        panic(plainError("makechan: size out of range"))    }  // 为结构的hchan对象分配内存    var c *hchan    switch {  // 无缓冲的channel或者元素大小为0的状况    case mem == 0:        c = (*hchan)(mallocgc(hchanSize, nil, true))        c.buf = c.raceaddr()  // 元素不蕴含指针的状况      case elem.ptrdata == 0:        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))        c.buf = add(unsafe.Pointer(c), hchanSize)  // 元素蕴含指针      default:        c = new(hchan)        c.buf = mallocgc(mem, elem, true)    }  // 初始化相干参数    c.elemsize = uint16(elem.size)    c.elemtype = elem    c.dataqsiz = uint(size)    lockInit(&c.lock, lockRankHchan)    if debugChan {        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")    }    return c}

能够看到,makechan办法次要就是查看传送元素的合法性,并为hchan分配内存,初始化相干参数,包含对锁的初始化。

发送

channel的发送实现代码位于src/go/runtime/chan.go的chansend办法。发送过程,存在以下几种状况。

  1. 当发送的channel为nil
if c == nil {    if !block {        return false    }    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)    throw("unreachable")}

往一个nil的channel中发送数据时,调用gopark函数将以后执行的goroutine从running态转入waiting态。

  1. 往已敞开的channel中发送数据
    if c.closed != 0 {        unlock(&c.lock)        panic(plainError("send on closed channel"))    }

如果向已敞开的channel中发送数据,会引发panic。

  1. 如果曾经有阻塞的接管goroutines(即recvq中指向非空),那么数据将被间接发送给接管goroutine。
if sg := c.recvq.dequeue(); sg != nil {    // Found a waiting receiver. We pass the value we want to send    // directly to the receiver, bypassing the channel buffer (if any).    send(c, sg, ep, func() { unlock(&c.lock) }, 3)    return true}

该逻辑的实现代码在send办法和sendDirect中。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {  ... // 省略了竞态代码    if sg.elem != nil {        sendDirect(c.elemtype, sg, ep)        sg.elem = nil    }    gp := sg.g    unlockf()    gp.param = unsafe.Pointer(sg)    if sg.releasetime != 0 {        sg.releasetime = cputicks()    }    goready(gp, skip+1)}func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {    dst := sg.elem    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)    memmove(dst, src, t.size)}

其中,memmove咱们曾经在源码系列中遇到屡次了,它的目标是将内存中src的内容拷贝至dst中去。另外,留神到goready(gp, skip+1)这句代码,它会使得之前在接管期待队列中的第一个goroutine的状态变为runnable,这样go的调度器就能够从新让该goroutine失去执行。

  1. 对于有缓冲的channel来说,如果以后缓冲区hchan.buf有可用空间,那么会将数据拷贝至缓冲区
if c.qcount < c.dataqsiz {    qp := chanbuf(c, c.sendx)    if raceenabled {        raceacquire(qp)        racerelease(qp)    }    typedmemmove(c.elemtype, qp, ep)  // 发送索引号+1    c.sendx++  // 因为存储数据元素的构造是循环队列,所以当以后索引号曾经到队末时,将索引号调整到队头    if c.sendx == c.dataqsiz {        c.sendx = 0    }  // 以后循环队列中存储元素数+1    c.qcount++    unlock(&c.lock)    return true}

其中,chanbuf(c, c.sendx)是获取指向对应内存区域的指针。typememmove会调用memmove办法,实现数据的拷贝工作。另外留神到,当对hchan进行实际操作时,是须要调用lock(&c.lock)加锁,因而,在实现数据拷贝后,通过unlock(&c.lock)将锁开释。

  1. 有缓冲的channel,当hchan.buf已满;或者无缓冲的channel,以后没有接管的goroutine
gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {    mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nilc.sendq.enqueue(mysg)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

通过getg获取以后执行的goroutine。acquireSudog是先取得以后执行goroutine的线程M,再获取M对应的P,最初将P的sudugo缓存队列中的队头sudog取出(详见源码src/runtime/proc.go)。通过c.sendq.enqueue将sudug退出到channel的发送期待列表中,并调用gopark将以后goroutine转为waiting态。

  • 发送操作会对hchan加锁。
  • 当recvq中存在期待接管的goroutine时,数据元素将会被间接拷贝给接管goroutine。
  • 当recvq期待队列为空时,会判断hchan.buf是否可用。如果可用,则会将发送的数据拷贝至hchan.buf中。
  • 如果hchan.buf已满,那么将以后发送goroutine置于sendq中排队,并在运行时中挂起。
  • 向曾经敞开的channel发送数据,会引发panic。

对于无缓冲的channel来说,它人造就是hchan.buf已满的状况,因为它的hchan.buf的容量为0。

package mainimport "time"func main() {    ch := make(chan int)    go func(ch chan int) {        ch <- 100    }(ch)    time.Sleep(time.Millisecond * 500)    time.Sleep(time.Second)}

在上述示例中,发送goroutine向无缓冲的channel发送数据,然而没有接管goroutine。将断点置于time.Sleep(time.Second),失去此时ch构造如下。

能够看到,在无缓冲的channel中,其hchan的buf长度为0,当没有接管groutine时,发送的goroutine将被置于sendq的发送队列中。

接管

channel的接管实现分两种,v :=<-ch对应于chanrecv1,v, ok := <- ch对应于chanrecv2,但它们都依赖于位于src/go/runtime/chan.go的chanrecv办法。

func chanrecv1(c *hchan, elem unsafe.Pointer) {    chanrecv(c, elem, true)}func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {    _, received = chanrecv(c, elem, true)    return}

chanrecv的具体代码此处就不再展现,和chansend逻辑对应,具体解决准则如下。

  • 接管操作会对hchan加锁。
  • 当sendq中存在期待发送的goroutine时,意味着此时的hchan.buf已满(无缓存的人造已满),分两种状况(见代码src/go/runtime/chan.go的recv办法):1. 如果是有缓存的hchan,那么先将缓冲区的数据拷贝给接管goroutine,再将sendq的队头sudog出队,将出队的sudog上的元素拷贝至hchan的缓存区。 2. 如果是无缓存的hchan,那么间接将出队的sudog上的元素拷贝给接管goroutine。两种状况的最初都会唤醒出队的sudog上的发送goroutine。
  • 当sendq发送队列为空时,会判断hchan.buf是否可用。如果可用,则会将hchan.buf的数据拷贝给接管goroutine。
  • 如果hchan.buf不可用,那么将以后接管goroutine置于recvq中排队,并在运行时中挂起。
  • 与发送不同的是,当channel敞开时,goroutine还能从channel中获取数据。如果recvq期待列表中有goroutines,那么它们都会被唤醒接收数据。如果hchan.buf中还有未接管的数据,那么goroutine会接收缓冲区中的数据,否则goroutine会获取到元素的零值。

以下是channel敞开之后,接管goroutine的读取示例代码。

func main() {    ch := make(chan int, 1)    ch <- 10    close(ch)    a, ok := <-ch    fmt.Println(a, ok)    b, ok := <-ch    fmt.Println(b, ok)    c := <-ch    fmt.Println(c)}//输入如下10 true0 false0

留神:在channel中进行的所有元素转移都随同着内存的拷贝。

func main() {    type Instance struct {        ID   int        name string    }    var ins = Instance{ID: 1, name: "Golang"}    ch := make(chan Instance, 3)    ch <- ins    fmt.Println("ins的原始值:", ins)    ins.name = "Python"    go func(ch chan Instance) {        fmt.Println("channel接管值:", <-ch)    }(ch)    time.Sleep(time.Second)    fmt.Println("ins的最终值:", ins)}// 输入后果ins的原始值: {1 Golang}channel接管值: {1 Golang}ins的最终值: {1 Python}

前半段图解如下

后半段图解如下

留神,如果把channel传递类型替换为Instance指针时,那么只管channel存入到buf中的元素曾经是拷贝对象了,从channel中取出又被拷贝了一次。然而因为它们的类型是Instance指针,拷贝对象与原始对象均会指向同一个内存地址,批改原有元素对象的数据时,会影响到取出数据。

func main() {    type Instance struct {        ID   int        name string    }    var ins = &Instance{ID: 1, name: "Golang"}    ch := make(chan *Instance, 3)    ch <- ins    fmt.Println("ins的原始值:", ins)    ins.name = "Python"    go func(ch chan *Instance) {        fmt.Println("channel接管值:", <-ch)    }(ch)    time.Sleep(time.Second)    fmt.Println("ins的最终值:", ins)}// 输入后果ins的原始值: &{1 Golang}channel接管值: &{1 Python}ins的最终值: &{1 Python}

因而,在应用channel时,尽量避免传递指针,如果传递指针,则需谨慎。

敞开

channel的敞开实现代码位于src/go/runtime/chan.go的chansend办法,具体执行逻辑已通过正文写明。

func closechan(c *hchan) {  // 如果hchan对象为nil,则会引发painc    if c == nil {        panic(plainError("close of nil channel"))    }  // 对hchan加锁    lock(&c.lock)  // 不同屡次调用close(c chan<- Type)办法,否则会引发painc    if c.closed != 0 {        unlock(&c.lock)        panic(plainError("close of closed channel"))    }    if raceenabled {        callerpc := getcallerpc()        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))        racerelease(c.raceaddr())    }  // close标记    c.closed = 1  // gList代表Go的GMP调度的G汇合    var glist gList    // 该for循环是为了开释recvq上的所有期待接管sudog    for {        sg := c.recvq.dequeue()        if sg == nil {            break        }        if sg.elem != nil {            typedmemclr(c.elemtype, sg.elem)            sg.elem = nil        }        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }        gp := sg.g        gp.param = nil        if raceenabled {            raceacquireg(gp, c.raceaddr())        }        glist.push(gp)    }    // 该for循环会开释sendq上的所有期待发送sudog    for {        sg := c.sendq.dequeue()        if sg == nil {            break        }        sg.elem = nil        if sg.releasetime != 0 {            sg.releasetime = cputicks()        }        gp := sg.g        gp.param = nil        if raceenabled {            raceacquireg(gp, c.raceaddr())        }        glist.push(gp)    }  // 开释sendq和recvq之后,hchan开释锁    unlock(&c.lock)  // 将上文中glist中的退出的goroutine取出,让它们均变为runnable(可执行)状态,期待调度器执行    // 留神:咱们上文中剖析过,试图向一个已敞开的channel发送数据,会引发painc。  // 所以,如果是开释sendq中的goroutine,它们一旦失去执行将会引发panic。    for !glist.empty() {        gp := glist.pop()        gp.schedlink = 0        goready(gp, 3)    }}

对于敞开操作,有几个点须要留神一下。

  • 如果敞开已敞开的channel会引发painc。
  • 对channel敞开后,如果有阻塞的读取或发送goroutines将会被唤醒。读取goroutines会获取到hchan的已接管元素,如果没有,则获取到元素零值;发送goroutine的执行则会引发painc。

对于第二点,咱们能够很好利用这一个性来实现对程序执行流的管制(相似于sync.WaitGroup的作用),以下是示例程序代码。

func main() {    ch := make(chan struct{})    //    go func() {        // do something work...        // when work has done, call close()        close(ch)    }()    // waiting work done    <- ch    // other work continue...}

四、总结

channel是Go中十分弱小有用的机制,为了更无效地应用它,咱们必须理解它的实现原理,这也是写作本文的目标。

  • hchan构造体有锁的保障,对于并发goroutine而言是平安的
  • channel接管、发送数据遵循FIFO(First In First Out)原语
  • channel的数据传递依赖于内存拷贝
  • channel能阻塞(gopark)、唤醒(goready)goroutine
  • 所谓无缓存的channel,它的工作形式就是间接发送goroutine拷贝数据给接管goroutine,而不通过hchan.buf

另外,能够看到Go在channel的设计上衡量了简略与性能。为了简略性,hchan是有锁的构造,因为有锁的队列会更易了解和实现,然而这样会损失一些性能。思考到整个 channel 操作带锁的老本较高,其实官网也曾思考过应用无锁 channel 的设计,然而因为目前已有提案中(https://github.com/golang/go/...),无锁实现的channel可维护性差、且理论性能测试不具备说服力,而且也不合乎Go的简略哲学,因而官网目前为止并没有驳回无锁设计。

在性能上,有一点,咱们须要意识到:所谓channel中阻塞goroutine,只是在runtime零碎中被blocked,它是用户层的阻塞。而理论的底层内核线程不受影响,它依然是unblocked的。

参考链接

https://speakerdeck.com/kavya...

https://codeburst.io/diving-d...

https://github.com/talkgo/nig...