乐趣区

关于golang:一文读懂Channel设计

在 Go 中,要了解 channel,首先须要意识 goroutine。

一、为什么会有 goroutine

古代操作系统中为咱们提供了三种根本的结构并发程序的办法:多过程、I/ O 多路复用和多线程。其中最简略的结构形式当属多过程,然而多过程的并发程序,因为对过程管制和过程间通信开销微小,这样的并发形式往往会很慢。

因而,操作系统提供了更小粒度的运行单元:线程(确切叫法是内核线程)。它是一种运行在过程上下文中的逻辑流,线程之间通过操作系统来调度,其调度模型如下图所示。

多线程的并发形式,相较于多过程而言要快得多。然而因为线程上下文切换总是不可避免的陷入内核态,它的开销仍然较大。那么有没有不用陷入内核态的运行载体呢?有,用户级线程。用户级线程的切换由用户程序本人管制,不须要内核干预,因而少了进出内核态的耗费。

这里的用户级线程就是协程(coroutine),它们的切换由运行时零碎来对立调度治理,内核并不知道它的存在。协程是形象于内核线程之上的对象,一个内核线程能够对应多个协程。但最终的零碎调用依然须要内核线程来实现。留神,线程的调度是操作系统来治理,是一种抢占式调度。而协程不同,协程之间须要单干,会被动交出执行权,是一种合作式调度,这也是为何被称为协程的起因。

Go 天生在语言层面反对了协程,即咱们常说的 goroutine。Go 的 runtime 零碎实现的是一种 M:N 调度模型,通过 GMP 对象来形容,其中 G 代表的就是协程,M 是线程,P 是调度上下文。在 Go 程序中,一个 goroutine 就代表着一个最小用户代码执行流,它们也是并发流的最小单元。

二、channel 的存在定位

从内存的角度而言,并发模型只分两种:基于共享内存和基于音讯通信(内存拷贝)。在 Go 中,两种并发模型的同步原语均有提供:sync.\和 atomic.\代表的就是基于共享内存;channel 代表的就是基于音讯通信。而 Go 提倡后者,它包含三大元素:goroutine(执行体),channel(通信),select(协调)。

Do not communicate by sharing memory; instead, share memory by communicating.

在 Go 中通过 goroutine+channel 的形式,能够简略、高效地解决并发问题,channel 就是 goroutine 之间的数据桥梁。

Concurrency is the key to designing high performance network services. Go’s concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution.

以下是一个简略的 channel 应用示例代码。

func goroutineA(ch <-chan int)  {fmt.Println("[goroutineA] want a data")
    val := <- ch
    fmt.Println("[goroutineA] received the data", val)
}

func goroutineB(ch chan<- int)  {time.Sleep(time.Second*1)
    ch <- 1
    fmt.Println("[goroutineB] send the data 1")
}

func main() {ch := make(chan int, 1)
    go goroutineA(ch)
    go goroutineB(ch)
    time.Sleep(2*time.Second)
}

上述过程趣解图如下

三、channel 源码解析

channel 源码位于 src/go/runtime/chan.go。本章内容分为两局部:channel 内部结构和 channel 操作。

3.1 channel 内部结构

ch := make(chan int,2)

对于以上 channel 的申明语句,咱们能够在程序中退出断点,失去 ch 的信息如下。

很好,看起来十分的清晰。然而,这些信息代表的是什么含意呢?接下来,咱们先看几个重要的构造体。

  • hchan

当咱们通过 make(chan Type, size)生成 channel 时,在 runtime 零碎中,生成的是一个 hchan 构造体对象。源码位于 src/runtime/chan.go

type hchan struct {
    qcount   uint           // 循环队列中数据数
    dataqsiz uint           // 循环队列的大小
    buf      unsafe.Pointer // 指向大小为 dataqsize 的蕴含数据元素的数组指针
    elemsize uint16         // 数据元素的大小
    closed   uint32         // 代表 channel 是否敞开   
    elemtype *_type         // _type 代表 Go 的类型零碎,elemtype 代表 channel 中的元素类型
    sendx    uint           // 发送索引号,初始值为 0
    recvx    uint           // 接管索引号,初始值为 0
  recvq    waitq          // 接管期待队列,存储试图从 channel 接收数据 (<-ch) 的阻塞 goroutines
    sendq    waitq          // 发送期待队列,存储试图发送数据 (ch<-) 到 channel 的阻塞 goroutines

    lock mutex              // 加锁能爱护 hchan 的所有字段,包含 waitq 中 sudoq 对象
}
  • waitq

waitq 用于表白处于阻塞状态的 goroutines 链表信息,first 指向链头 goroutine,last 指向链尾 goroutine

type waitq struct {
    first *sudog           
    last  *sudog
}
  • sudug

sudog 代表的就是一个处于期待列表中的 goroutine 对象,源码位于 src/runtime/runtime2.go

type sudog struct {
    g *g
    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)
    c        *hchan // channel
  ...
}

为了更好了解 hchan 构造体,咱们将通过以下代码来了解 hchan 中的字段含意。

package main

import "time"

func goroutineA(ch chan int) {ch <- 100}

func goroutineB(ch chan int) {ch <- 200}

func goroutineC(ch chan int) {ch <- 300}

func goroutineD(ch chan int) {ch <- 300}

func main() {ch := make(chan int, 4)
    for i := 0; i < 4; i++ {ch <- i * 10}
    go goroutineA(ch)
    go goroutineB(ch)
    go goroutineC(ch)
    go goroutineD(ch)
    // 第一个 sleep 是为了给上足够的工夫让所有 goroutine 都已启动
    time.Sleep(time.Millisecond * 500)
    time.Sleep(time.Second)
}

关上代码调试性能,将程序运行至断点 time.Sleep(time.Second)处,此时失去的 chan 信息如下。

在该 channel 中,通过 make(chan int, 4)定义的 channel 大小为 4,即 dataqsiz 的值为 4。同时因为循环队列中曾经增加了 4 个元素,所以 qcount 值也为 4。此时,有 4 个 goroutine(A-D)想发送数据给 channel,然而因为存放数据的循环队列已满,所以只能进入发送期待列表,即 sendq。同时要留神到,此时的发送和接管索引值均为 0,即下一次接收数据的 goroutine 会从循环队列的第一个元素拿,发送数据的 goroutine 会发送到循环队列的第一个地位。

上述 hchan 构造可视化图解如下

3.2 channel 操作

将 channel 操作分为四局部:创立、发送、接管和敞开。

创立

本文的参考 Go 版本为 1.15.2。其 channel 的创立实现代码位于 src/go/runtime/chan.go 的 makechan 办法。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

  // 发送元素大小限度
    if elem.size >= 1<<16 {throw("makechan: invalid channel element type")
    }
  // 对齐查看
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")
    }

  // 判断是否会内存溢出
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))
    }

  // 为结构的 hchan 对象分配内存
    var c *hchan
    switch {
  // 无缓冲的 channel 或者元素大小为 0 的状况
    case mem == 0:
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
  // 元素不蕴含指针的状况  
    case elem.ptrdata == 0:
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
  // 元素蕴含指针  
    default:
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

  // 初始化相干参数
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

能够看到,makechan 办法次要就是查看传送元素的合法性,并为 hchan 分配内存,初始化相干参数,包含对锁的初始化。

发送

channel 的发送实现代码位于 src/go/runtime/chan.go 的 chansend 办法。发送过程,存在以下几种状况。

  1. 当发送的 channel 为 nil
if c == nil {
    if !block {return false}
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
}

往一个 nil 的 channel 中发送数据时,调用 gopark 函数将以后执行的 goroutine 从 running 态转入 waiting 态。

  1. 往已敞开的 channel 中发送数据
    if c.closed != 0 {unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

如果向已敞开的 channel 中发送数据,会引发 panic。

  1. 如果曾经有阻塞的接管 goroutines(即 recvq 中指向非空),那么数据将被间接发送给接管 goroutine。
if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
    send(c, sg, ep, func() {unlock(&c.lock) }, 3)
    return true
}

该逻辑的实现代码在 send 办法和 sendDirect 中。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ... // 省略了竞态代码
    if sg.elem != nil {sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

其中,memmove 咱们曾经在源码系列中遇到屡次了,它的目标是将内存中 src 的内容拷贝至 dst 中去。另外,留神到 goready(gp, skip+1)这句代码,它会使得之前在接管期待队列中的第一个 goroutine 的状态变为 runnable,这样 go 的调度器就能够从新让该 goroutine 失去执行。

  1. 对于有缓冲的 channel 来说,如果以后缓冲区 hchan.buf 有可用空间,那么会将数据拷贝至缓冲区
if c.qcount < c.dataqsiz {qp := chanbuf(c, c.sendx)
    if raceenabled {raceacquire(qp)
        racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
  // 发送索引号 +1
    c.sendx++
  // 因为存储数据元素的构造是循环队列,所以当以后索引号曾经到队末时,将索引号调整到队头
    if c.sendx == c.dataqsiz {c.sendx = 0}
  // 以后循环队列中存储元素数 +1
    c.qcount++
    unlock(&c.lock)
    return true
}

其中,chanbuf(c, c.sendx)是获取指向对应内存区域的指针。typememmove 会调用 memmove 办法,实现数据的拷贝工作。另外留神到,当对 hchan 进行实际操作时,是须要调用 lock(&c.lock)加锁,因而,在实现数据拷贝后,通过 unlock(&c.lock)将锁开释。

  1. 有缓冲的 channel,当 hchan.buf 已满;或者无缓冲的 channel,以后没有接管的 goroutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {mysg.releasetime = -1}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

通过 getg 获取以后执行的 goroutine。acquireSudog 是先取得以后执行 goroutine 的线程 M,再获取 M 对应的 P,最初将 P 的 sudugo 缓存队列中的队头 sudog 取出(详见源码 src/runtime/proc.go)。通过 c.sendq.enqueue 将 sudug 退出到 channel 的发送期待列表中,并调用 gopark 将以后 goroutine 转为 waiting 态。

  • 发送操作会对 hchan 加锁。
  • 当 recvq 中存在期待接管的 goroutine 时,数据元素将会被间接拷贝给接管 goroutine。
  • 当 recvq 期待队列为空时,会判断 hchan.buf 是否可用。如果可用,则会将发送的数据拷贝至 hchan.buf 中。
  • 如果 hchan.buf 已满,那么将以后发送 goroutine 置于 sendq 中排队,并在运行时中挂起。
  • 向曾经敞开的 channel 发送数据,会引发 panic。

对于无缓冲的 channel 来说,它人造就是 hchan.buf 已满的状况,因为它的 hchan.buf 的容量为 0。

package main

import "time"

func main() {ch := make(chan int)
    go func(ch chan int) {ch <- 100}(ch)
    time.Sleep(time.Millisecond * 500)
    time.Sleep(time.Second)
}

在上述示例中,发送 goroutine 向无缓冲的 channel 发送数据,然而没有接管 goroutine。将断点置于 time.Sleep(time.Second),失去此时 ch 构造如下。

能够看到,在无缓冲的 channel 中,其 hchan 的 buf 长度为 0,当没有接管 groutine 时,发送的 goroutine 将被置于 sendq 的发送队列中。

接管

channel 的接管实现分两种,v :=<-ch 对应于 chanrecv1,v, ok := <- ch 对应于 chanrecv2,但它们都依赖于位于 src/go/runtime/chan.go 的 chanrecv 办法。

func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)
    return
}

chanrecv 的具体代码此处就不再展现,和 chansend 逻辑对应,具体解决准则如下。

  • 接管操作会对 hchan 加锁。
  • 当 sendq 中存在期待发送的 goroutine 时,意味着此时的 hchan.buf 已满(无缓存的人造已满),分两种状况(见代码 src/go/runtime/chan.go 的 recv 办法):1. 如果是有缓存的 hchan,那么先将缓冲区的数据拷贝给接管 goroutine,再将 sendq 的队头 sudog 出队,将出队的 sudog 上的元素拷贝至 hchan 的缓存区。2. 如果是无缓存的 hchan,那么间接将出队的 sudog 上的元素拷贝给接管 goroutine。两种状况的最初都会唤醒出队的 sudog 上的发送 goroutine。
  • 当 sendq 发送队列为空时,会判断 hchan.buf 是否可用。如果可用,则会将 hchan.buf 的数据拷贝给接管 goroutine。
  • 如果 hchan.buf 不可用,那么将以后接管 goroutine 置于 recvq 中排队,并在运行时中挂起。
  • 与发送不同的是,当 channel 敞开时,goroutine 还能从 channel 中获取数据。如果 recvq 期待列表中有 goroutines,那么它们都会被唤醒接收数据。如果 hchan.buf 中还有未接管的数据,那么 goroutine 会接收缓冲区中的数据,否则 goroutine 会获取到元素的零值。

以下是 channel 敞开之后,接管 goroutine 的读取示例代码。

func main() {ch := make(chan int, 1)
    ch <- 10
    close(ch)
    a, ok := <-ch
    fmt.Println(a, ok)
    b, ok := <-ch
    fmt.Println(b, ok)
    c := <-ch
    fmt.Println(c)
}

// 输入如下
10 true
0 false
0

留神:在 channel 中进行的所有元素转移都随同着内存的拷贝。

func main() {
    type Instance struct {
        ID   int
        name string
    }

    var ins = Instance{ID: 1, name: "Golang"}

    ch := make(chan Instance, 3)
    ch <- ins

    fmt.Println("ins 的原始值:", ins)

    ins.name = "Python"
    go func(ch chan Instance) {fmt.Println("channel 接管值:", <-ch)
    }(ch)

    time.Sleep(time.Second)
    fmt.Println("ins 的最终值:", ins)
}

// 输入后果
ins 的原始值:{1 Golang}
channel 接管值:{1 Golang}
ins 的最终值:{1 Python}

前半段图解如下

后半段图解如下

留神,如果把 channel 传递类型替换为 Instance 指针时,那么只管 channel 存入到 buf 中的元素曾经是拷贝对象了,从 channel 中取出又被拷贝了一次。然而因为它们的类型是 Instance 指针,拷贝对象与原始对象均会指向同一个内存地址,批改原有元素对象的数据时,会影响到取出数据。

func main() {
    type Instance struct {
        ID   int
        name string
    }

    var ins = &Instance{ID: 1, name: "Golang"}

    ch := make(chan *Instance, 3)
    ch <- ins

    fmt.Println("ins 的原始值:", ins)

    ins.name = "Python"
    go func(ch chan *Instance) {fmt.Println("channel 接管值:", <-ch)
    }(ch)

    time.Sleep(time.Second)
    fmt.Println("ins 的最终值:", ins)
}

// 输入后果
ins 的原始值:&{1 Golang}
channel 接管值:&{1 Python}
ins 的最终值:&{1 Python}

因而,在应用 channel 时,尽量避免传递指针,如果传递指针,则需谨慎。

敞开

channel 的敞开实现代码位于 src/go/runtime/chan.go 的 chansend 办法,具体执行逻辑已通过正文写明。

func closechan(c *hchan) {
  // 如果 hchan 对象为 nil,则会引发 painc
    if c == nil {panic(plainError("close of nil channel"))
    }

  // 对 hchan 加锁
    lock(&c.lock)
  // 不同屡次调用 close(c chan<- Type)办法,否则会引发 painc
    if c.closed != 0 {unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

  // close 标记
    c.closed = 1

  // gList 代表 Go 的 GMP 调度的 G 汇合
    var glist gList

    // 该 for 循环是为了开释 recvq 上的所有期待接管 sudog
    for {sg := c.recvq.dequeue()
        if sg == nil {break}
        if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // 该 for 循环会开释 sendq 上的所有期待发送 sudog
    for {sg := c.sendq.dequeue()
        if sg == nil {break}
        sg.elem = nil
        if sg.releasetime != 0 {sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
  // 开释 sendq 和 recvq 之后,hchan 开释锁
    unlock(&c.lock)

  // 将上文中 glist 中的退出的 goroutine 取出,让它们均变为 runnable(可执行)状态,期待调度器执行
    // 留神:咱们上文中剖析过,试图向一个已敞开的 channel 发送数据,会引发 painc。// 所以,如果是开释 sendq 中的 goroutine,它们一旦失去执行将会引发 panic。for !glist.empty() {gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

对于敞开操作,有几个点须要留神一下。

  • 如果敞开已敞开的 channel 会引发 painc。
  • 对 channel 敞开后,如果有阻塞的读取或发送 goroutines 将会被唤醒。读取 goroutines 会获取到 hchan 的已接管元素,如果没有,则获取到元素零值;发送 goroutine 的执行则会引发 painc。

对于第二点,咱们能够很好利用这一个性来实现对程序执行流的管制(相似于 sync.WaitGroup 的作用),以下是示例程序代码。

func main() {ch := make(chan struct{})
    //
    go func() {
        // do something work...
        // when work has done, call close()
        close(ch)
    }()
    // waiting work done
    <- ch
    // other work continue...
}

四、总结

channel 是 Go 中十分弱小有用的机制,为了更无效地应用它,咱们必须理解它的实现原理,这也是写作本文的目标。

  • hchan 构造体有锁的保障,对于并发 goroutine 而言是平安的
  • channel 接管、发送数据遵循 FIFO(First In First Out)原语
  • channel 的数据传递依赖于内存拷贝
  • channel 能阻塞(gopark)、唤醒(goready)goroutine
  • 所谓无缓存的 channel,它的工作形式就是间接发送 goroutine 拷贝数据给接管 goroutine,而不通过 hchan.buf

另外,能够看到 Go 在 channel 的设计上衡量了简略与性能。为了简略性,hchan 是有锁的构造,因为有锁的队列会更易了解和实现,然而这样会损失一些性能。思考到整个 channel 操作带锁的老本较高,其实官网也曾思考过应用无锁 channel 的设计,然而因为目前已有提案中(https://github.com/golang/go/…),无锁实现的 channel 可维护性差、且理论性能测试不具备说服力,而且也不合乎 Go 的简略哲学,因而官网目前为止并没有驳回无锁设计。

在性能上,有一点,咱们须要意识到:所谓 channel 中阻塞 goroutine,只是在 runtime 零碎中被 blocked,它是用户层的阻塞。而理论的底层内核线程不受影响,它依然是 unblocked 的。

参考链接

https://speakerdeck.com/kavya…

https://codeburst.io/diving-d…

https://github.com/talkgo/nig…

退出移动版