共计 11125 个字符,预计需要花费 28 分钟才能阅读完成。
在 Go 中,要了解 channel,首先须要意识 goroutine。
一、为什么会有 goroutine
古代操作系统中为咱们提供了三种根本的结构并发程序的办法:多过程、I/ O 多路复用和多线程。其中最简略的结构形式当属多过程,然而多过程的并发程序,因为对过程管制和过程间通信开销微小,这样的并发形式往往会很慢。
因而,操作系统提供了更小粒度的运行单元:线程(确切叫法是内核线程)。它是一种运行在过程上下文中的逻辑流,线程之间通过操作系统来调度,其调度模型如下图所示。
多线程的并发形式,相较于多过程而言要快得多。然而因为线程上下文切换总是不可避免的陷入内核态,它的开销仍然较大。那么有没有不用陷入内核态的运行载体呢?有,用户级线程。用户级线程的切换由用户程序本人管制,不须要内核干预,因而少了进出内核态的耗费。
这里的用户级线程就是协程(coroutine),它们的切换由运行时零碎来对立调度治理,内核并不知道它的存在。协程是形象于内核线程之上的对象,一个内核线程能够对应多个协程。但最终的零碎调用依然须要内核线程来实现。留神,线程的调度是操作系统来治理,是一种抢占式调度。而协程不同,协程之间须要单干,会被动交出执行权,是一种合作式调度,这也是为何被称为协程的起因。
Go 天生在语言层面反对了协程,即咱们常说的 goroutine。Go 的 runtime 零碎实现的是一种 M:N 调度模型,通过 GMP 对象来形容,其中 G 代表的就是协程,M 是线程,P 是调度上下文。在 Go 程序中,一个 goroutine 就代表着一个最小用户代码执行流,它们也是并发流的最小单元。
二、channel 的存在定位
从内存的角度而言,并发模型只分两种:基于共享内存和基于音讯通信(内存拷贝)。在 Go 中,两种并发模型的同步原语均有提供:sync.\和 atomic.\代表的就是基于共享内存;channel 代表的就是基于音讯通信。而 Go 提倡后者,它包含三大元素:goroutine(执行体),channel(通信),select(协调)。
Do not communicate by sharing memory; instead, share memory by communicating.
在 Go 中通过 goroutine+channel 的形式,能够简略、高效地解决并发问题,channel 就是 goroutine 之间的数据桥梁。
Concurrency is the key to designing high performance network services. Go’s concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution.
以下是一个简略的 channel 应用示例代码。
func goroutineA(ch <-chan int) {fmt.Println("[goroutineA] want a data")
val := <- ch
fmt.Println("[goroutineA] received the data", val)
}
func goroutineB(ch chan<- int) {time.Sleep(time.Second*1)
ch <- 1
fmt.Println("[goroutineB] send the data 1")
}
func main() {ch := make(chan int, 1)
go goroutineA(ch)
go goroutineB(ch)
time.Sleep(2*time.Second)
}
上述过程趣解图如下
三、channel 源码解析
channel 源码位于 src/go/runtime/chan.go。本章内容分为两局部:channel 内部结构和 channel 操作。
3.1 channel 内部结构
ch := make(chan int,2)
对于以上 channel 的申明语句,咱们能够在程序中退出断点,失去 ch 的信息如下。
很好,看起来十分的清晰。然而,这些信息代表的是什么含意呢?接下来,咱们先看几个重要的构造体。
- hchan
当咱们通过 make(chan Type, size)生成 channel 时,在 runtime 零碎中,生成的是一个 hchan 构造体对象。源码位于 src/runtime/chan.go
type hchan struct {
qcount uint // 循环队列中数据数
dataqsiz uint // 循环队列的大小
buf unsafe.Pointer // 指向大小为 dataqsize 的蕴含数据元素的数组指针
elemsize uint16 // 数据元素的大小
closed uint32 // 代表 channel 是否敞开
elemtype *_type // _type 代表 Go 的类型零碎,elemtype 代表 channel 中的元素类型
sendx uint // 发送索引号,初始值为 0
recvx uint // 接管索引号,初始值为 0
recvq waitq // 接管期待队列,存储试图从 channel 接收数据 (<-ch) 的阻塞 goroutines
sendq waitq // 发送期待队列,存储试图发送数据 (ch<-) 到 channel 的阻塞 goroutines
lock mutex // 加锁能爱护 hchan 的所有字段,包含 waitq 中 sudoq 对象
}
- waitq
waitq 用于表白处于阻塞状态的 goroutines 链表信息,first 指向链头 goroutine,last 指向链尾 goroutine
type waitq struct {
first *sudog
last *sudog
}
- sudug
sudog 代表的就是一个处于期待列表中的 goroutine 对象,源码位于 src/runtime/runtime2.go
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
c *hchan // channel
...
}
为了更好了解 hchan 构造体,咱们将通过以下代码来了解 hchan 中的字段含意。
package main
import "time"
func goroutineA(ch chan int) {ch <- 100}
func goroutineB(ch chan int) {ch <- 200}
func goroutineC(ch chan int) {ch <- 300}
func goroutineD(ch chan int) {ch <- 300}
func main() {ch := make(chan int, 4)
for i := 0; i < 4; i++ {ch <- i * 10}
go goroutineA(ch)
go goroutineB(ch)
go goroutineC(ch)
go goroutineD(ch)
// 第一个 sleep 是为了给上足够的工夫让所有 goroutine 都已启动
time.Sleep(time.Millisecond * 500)
time.Sleep(time.Second)
}
关上代码调试性能,将程序运行至断点 time.Sleep(time.Second)处,此时失去的 chan 信息如下。
在该 channel 中,通过 make(chan int, 4)定义的 channel 大小为 4,即 dataqsiz 的值为 4。同时因为循环队列中曾经增加了 4 个元素,所以 qcount 值也为 4。此时,有 4 个 goroutine(A-D)想发送数据给 channel,然而因为存放数据的循环队列已满,所以只能进入发送期待列表,即 sendq。同时要留神到,此时的发送和接管索引值均为 0,即下一次接收数据的 goroutine 会从循环队列的第一个元素拿,发送数据的 goroutine 会发送到循环队列的第一个地位。
上述 hchan 构造可视化图解如下
3.2 channel 操作
将 channel 操作分为四局部:创立、发送、接管和敞开。
创立
本文的参考 Go 版本为 1.15.2。其 channel 的创立实现代码位于 src/go/runtime/chan.go 的 makechan 办法。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 发送元素大小限度
if elem.size >= 1<<16 {throw("makechan: invalid channel element type")
}
// 对齐查看
if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")
}
// 判断是否会内存溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))
}
// 为结构的 hchan 对象分配内存
var c *hchan
switch {
// 无缓冲的 channel 或者元素大小为 0 的状况
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
// 元素不蕴含指针的状况
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
// 元素蕴含指针
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 初始化相干参数
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
能够看到,makechan 办法次要就是查看传送元素的合法性,并为 hchan 分配内存,初始化相干参数,包含对锁的初始化。
发送
channel 的发送实现代码位于 src/go/runtime/chan.go 的 chansend 办法。发送过程,存在以下几种状况。
- 当发送的 channel 为 nil
if c == nil {
if !block {return false}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
往一个 nil 的 channel 中发送数据时,调用 gopark 函数将以后执行的 goroutine 从 running 态转入 waiting 态。
- 往已敞开的 channel 中发送数据
if c.closed != 0 {unlock(&c.lock)
panic(plainError("send on closed channel"))
}
如果向已敞开的 channel 中发送数据,会引发 panic。
- 如果曾经有阻塞的接管 goroutines(即 recvq 中指向非空),那么数据将被间接发送给接管 goroutine。
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() {unlock(&c.lock) }, 3)
return true
}
该逻辑的实现代码在 send 办法和 sendDirect 中。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
... // 省略了竞态代码
if sg.elem != nil {sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
其中,memmove 咱们曾经在源码系列中遇到屡次了,它的目标是将内存中 src 的内容拷贝至 dst 中去。另外,留神到 goready(gp, skip+1)这句代码,它会使得之前在接管期待队列中的第一个 goroutine 的状态变为 runnable,这样 go 的调度器就能够从新让该 goroutine 失去执行。
- 对于有缓冲的 channel 来说,如果以后缓冲区 hchan.buf 有可用空间,那么会将数据拷贝至缓冲区
if c.qcount < c.dataqsiz {qp := chanbuf(c, c.sendx)
if raceenabled {raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
// 发送索引号 +1
c.sendx++
// 因为存储数据元素的构造是循环队列,所以当以后索引号曾经到队末时,将索引号调整到队头
if c.sendx == c.dataqsiz {c.sendx = 0}
// 以后循环队列中存储元素数 +1
c.qcount++
unlock(&c.lock)
return true
}
其中,chanbuf(c, c.sendx)是获取指向对应内存区域的指针。typememmove 会调用 memmove 办法,实现数据的拷贝工作。另外留神到,当对 hchan 进行实际操作时,是须要调用 lock(&c.lock)加锁,因而,在实现数据拷贝后,通过 unlock(&c.lock)将锁开释。
- 有缓冲的 channel,当 hchan.buf 已满;或者无缓冲的 channel,以后没有接管的 goroutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {mysg.releasetime = -1}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
通过 getg 获取以后执行的 goroutine。acquireSudog 是先取得以后执行 goroutine 的线程 M,再获取 M 对应的 P,最初将 P 的 sudugo 缓存队列中的队头 sudog 取出(详见源码 src/runtime/proc.go)。通过 c.sendq.enqueue 将 sudug 退出到 channel 的发送期待列表中,并调用 gopark 将以后 goroutine 转为 waiting 态。
- 发送操作会对 hchan 加锁。
- 当 recvq 中存在期待接管的 goroutine 时,数据元素将会被间接拷贝给接管 goroutine。
- 当 recvq 期待队列为空时,会判断 hchan.buf 是否可用。如果可用,则会将发送的数据拷贝至 hchan.buf 中。
- 如果 hchan.buf 已满,那么将以后发送 goroutine 置于 sendq 中排队,并在运行时中挂起。
- 向曾经敞开的 channel 发送数据,会引发 panic。
对于无缓冲的 channel 来说,它人造就是 hchan.buf 已满的状况,因为它的 hchan.buf 的容量为 0。
package main
import "time"
func main() {ch := make(chan int)
go func(ch chan int) {ch <- 100}(ch)
time.Sleep(time.Millisecond * 500)
time.Sleep(time.Second)
}
在上述示例中,发送 goroutine 向无缓冲的 channel 发送数据,然而没有接管 goroutine。将断点置于 time.Sleep(time.Second),失去此时 ch 构造如下。
能够看到,在无缓冲的 channel 中,其 hchan 的 buf 长度为 0,当没有接管 groutine 时,发送的 goroutine 将被置于 sendq 的发送队列中。
接管
channel 的接管实现分两种,v :=<-ch 对应于 chanrecv1,v, ok := <- ch 对应于 chanrecv2,但它们都依赖于位于 src/go/runtime/chan.go 的 chanrecv 办法。
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)
return
}
chanrecv 的具体代码此处就不再展现,和 chansend 逻辑对应,具体解决准则如下。
- 接管操作会对 hchan 加锁。
- 当 sendq 中存在期待发送的 goroutine 时,意味着此时的 hchan.buf 已满(无缓存的人造已满),分两种状况(见代码 src/go/runtime/chan.go 的 recv 办法):1. 如果是有缓存的 hchan,那么先将缓冲区的数据拷贝给接管 goroutine,再将 sendq 的队头 sudog 出队,将出队的 sudog 上的元素拷贝至 hchan 的缓存区。2. 如果是无缓存的 hchan,那么间接将出队的 sudog 上的元素拷贝给接管 goroutine。两种状况的最初都会唤醒出队的 sudog 上的发送 goroutine。
- 当 sendq 发送队列为空时,会判断 hchan.buf 是否可用。如果可用,则会将 hchan.buf 的数据拷贝给接管 goroutine。
- 如果 hchan.buf 不可用,那么将以后接管 goroutine 置于 recvq 中排队,并在运行时中挂起。
- 与发送不同的是,当 channel 敞开时,goroutine 还能从 channel 中获取数据。如果 recvq 期待列表中有 goroutines,那么它们都会被唤醒接收数据。如果 hchan.buf 中还有未接管的数据,那么 goroutine 会接收缓冲区中的数据,否则 goroutine 会获取到元素的零值。
以下是 channel 敞开之后,接管 goroutine 的读取示例代码。
func main() {ch := make(chan int, 1)
ch <- 10
close(ch)
a, ok := <-ch
fmt.Println(a, ok)
b, ok := <-ch
fmt.Println(b, ok)
c := <-ch
fmt.Println(c)
}
// 输入如下
10 true
0 false
0
留神:在 channel 中进行的所有元素转移都随同着内存的拷贝。
func main() {
type Instance struct {
ID int
name string
}
var ins = Instance{ID: 1, name: "Golang"}
ch := make(chan Instance, 3)
ch <- ins
fmt.Println("ins 的原始值:", ins)
ins.name = "Python"
go func(ch chan Instance) {fmt.Println("channel 接管值:", <-ch)
}(ch)
time.Sleep(time.Second)
fmt.Println("ins 的最终值:", ins)
}
// 输入后果
ins 的原始值:{1 Golang}
channel 接管值:{1 Golang}
ins 的最终值:{1 Python}
前半段图解如下
后半段图解如下
留神,如果把 channel 传递类型替换为 Instance 指针时,那么只管 channel 存入到 buf 中的元素曾经是拷贝对象了,从 channel 中取出又被拷贝了一次。然而因为它们的类型是 Instance 指针,拷贝对象与原始对象均会指向同一个内存地址,批改原有元素对象的数据时,会影响到取出数据。
func main() {
type Instance struct {
ID int
name string
}
var ins = &Instance{ID: 1, name: "Golang"}
ch := make(chan *Instance, 3)
ch <- ins
fmt.Println("ins 的原始值:", ins)
ins.name = "Python"
go func(ch chan *Instance) {fmt.Println("channel 接管值:", <-ch)
}(ch)
time.Sleep(time.Second)
fmt.Println("ins 的最终值:", ins)
}
// 输入后果
ins 的原始值:&{1 Golang}
channel 接管值:&{1 Python}
ins 的最终值:&{1 Python}
因而,在应用 channel 时,尽量避免传递指针,如果传递指针,则需谨慎。
敞开
channel 的敞开实现代码位于 src/go/runtime/chan.go 的 chansend 办法,具体执行逻辑已通过正文写明。
func closechan(c *hchan) {
// 如果 hchan 对象为 nil,则会引发 painc
if c == nil {panic(plainError("close of nil channel"))
}
// 对 hchan 加锁
lock(&c.lock)
// 不同屡次调用 close(c chan<- Type)办法,否则会引发 painc
if c.closed != 0 {unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// close 标记
c.closed = 1
// gList 代表 Go 的 GMP 调度的 G 汇合
var glist gList
// 该 for 循环是为了开释 recvq 上的所有期待接管 sudog
for {sg := c.recvq.dequeue()
if sg == nil {break}
if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 该 for 循环会开释 sendq 上的所有期待发送 sudog
for {sg := c.sendq.dequeue()
if sg == nil {break}
sg.elem = nil
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 开释 sendq 和 recvq 之后,hchan 开释锁
unlock(&c.lock)
// 将上文中 glist 中的退出的 goroutine 取出,让它们均变为 runnable(可执行)状态,期待调度器执行
// 留神:咱们上文中剖析过,试图向一个已敞开的 channel 发送数据,会引发 painc。// 所以,如果是开释 sendq 中的 goroutine,它们一旦失去执行将会引发 panic。for !glist.empty() {gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
对于敞开操作,有几个点须要留神一下。
- 如果敞开已敞开的 channel 会引发 painc。
- 对 channel 敞开后,如果有阻塞的读取或发送 goroutines 将会被唤醒。读取 goroutines 会获取到 hchan 的已接管元素,如果没有,则获取到元素零值;发送 goroutine 的执行则会引发 painc。
对于第二点,咱们能够很好利用这一个性来实现对程序执行流的管制(相似于 sync.WaitGroup 的作用),以下是示例程序代码。
func main() {ch := make(chan struct{})
//
go func() {
// do something work...
// when work has done, call close()
close(ch)
}()
// waiting work done
<- ch
// other work continue...
}
四、总结
channel 是 Go 中十分弱小有用的机制,为了更无效地应用它,咱们必须理解它的实现原理,这也是写作本文的目标。
- hchan 构造体有锁的保障,对于并发 goroutine 而言是平安的
- channel 接管、发送数据遵循 FIFO(First In First Out)原语
- channel 的数据传递依赖于内存拷贝
- channel 能阻塞(gopark)、唤醒(goready)goroutine
- 所谓无缓存的 channel,它的工作形式就是间接发送 goroutine 拷贝数据给接管 goroutine,而不通过 hchan.buf
另外,能够看到 Go 在 channel 的设计上衡量了简略与性能。为了简略性,hchan 是有锁的构造,因为有锁的队列会更易了解和实现,然而这样会损失一些性能。思考到整个 channel 操作带锁的老本较高,其实官网也曾思考过应用无锁 channel 的设计,然而因为目前已有提案中(https://github.com/golang/go/…),无锁实现的 channel 可维护性差、且理论性能测试不具备说服力,而且也不合乎 Go 的简略哲学,因而官网目前为止并没有驳回无锁设计。
在性能上,有一点,咱们须要意识到:所谓 channel 中阻塞 goroutine,只是在 runtime 零碎中被 blocked,它是用户层的阻塞。而理论的底层内核线程不受影响,它依然是 unblocked 的。
参考链接
https://speakerdeck.com/kavya…
https://codeburst.io/diving-d…
https://github.com/talkgo/nig…