共计 6128 个字符,预计需要花费 16 分钟才能阅读完成。
channel 是 Golang 提供的 goroutine 间的通信形式,能够让一个 goroutine 发送特定值到另一个 goroutine。
个性
通道没有缓冲区,或者有缓冲区但缓冲区没有数据时,从通道读取数据会阻塞,直到有协程向通道中写入数据。相似地,通道没有缓冲区,或者缓冲区已满时,向通道写入数据也会阻塞,直到有协程从通道读取数据。对于值为 nil 的通道,无论读写都会阻塞,而且是永恒阻塞。
应用内置函数 close 能够敞开通道,尝试向已敞开的通道发送数据会触发 panic,但此时依然可读。通道读取的表达式最多有两个返回值:
x, ok := <-ch
第一个变量示意读出的数据,第二个变量示意是否胜利读取了数据,它的值只跟通道缓冲区中是否有数据无关,与通道的敞开状态无关。
实现原理
数据结构
源码 src/runtime/chan.go:hchan 定义了 channel 的数据结构:
type hchan struct {
qcount uint // 以后队列中残余元素个数
dataqsiz uint // 环形队列长度,即能够寄存的元素个数
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 标识敞开状态
elemtype *_type // 元素类型
sendx uint // 队列下标,批示元素写入时寄存到队列中的地位
recvx uint // 队列下标,批示元素从队列的该地位读出
recvq waitq // 期待读音讯的 goroutine 队列
sendq waitq // 期待写音讯的 goroutine 队列
lock mutex // 互斥锁,chan 不容许并发读写
}
能够看出 channel 由队列、类型信息、goroutine 期待队列组成。
环形队列
channel 外部实现了一个环形队列作为其缓冲区,队列的长度是创立 channel 时指定的。下图展现了一个可缓存 6 个元素的 channel 示意图:
- dataqsiz 表明了队列长度为 6,即可缓存 6 个元素;
- buf 指向队列的内存地址;
- qcount 示意队列中还有两个元素;
- sendx 示意后续写入的数据存储的地位,取值为 [0, 6);
- recvx 示意读取数据的地位, 取值为 [0, 6)。
类型信息
一个 channel 只能传递一种类型的值:
- elemtype 代表类型,用于数据传递过程中的赋值;
- elemsize 代表类型大小,用于在 buf 中定位元素地位。
期待队列
从 channel 读取数据时,如果没有缓冲区或者缓冲区为空,则以后协程会被阻塞,并被退出 recvq 队列。向 channel 写入数据时,如果没有缓冲区或者缓冲区已满,则以后协程同样会被阻塞,而后退出到 sendq 的队列。处于期待队列中的协程会在其余协程操作 channel 时被唤醒。
下图展现了一个没有缓冲区的 channel,并有几个协程正在阻塞期待读取数据:
相干操作
创立通道
创立 channel 的过程实际上就是初始化 hchan 构造,类型信息和缓冲区长度由 make 语句传入,buf 的大小则由元素大小和缓冲区长度独特决定。
源码 src/runtime/chan.go 中定义了创立 channel 的函数 makechan(),精简版的代码如下所示:
func makechan(t *chantype, size int) *hchan {mem, overflow := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
发送数据
发送数据的操作最终都转化成了 chansend() 函数,次要代码和逻辑如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果通道为 nil,非阻塞式发送的话间接返回 false,否则将以后协程挂起
if c == nil {
if !block {return false}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 对于非阻塞式发送,如果通道未敞开且没有缓冲空间的话,间接返回 false
if !block && c.closed == 0 && full(c) {return false}
// 加锁,并发平安
lock(&c.lock)
// 如果通道敞开了,间接 panic
if c.closed != 0 {unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果接管队列不为空,间接将要发送的数据发送到队首的 goroutine
if sg := c.recvq.dequeue(); sg != nil {send(c, sg, ep, func() {unlock(&c.lock) }, 3)
return true
}
// 对于缓冲区还有闲暇的 channel,拷贝数据到缓冲区,保护相干信息
if c.qcount < c.dataqsiz {qp := chanbuf(c, c.sendx)
if raceenabled {raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {c.sendx = 0}
c.qcount++
unlock(&c.lock)
return true
}
// 没有缓冲空间时,发送方会挂起,并依据以后 goroutine 结构一个 sudog 构造体增加到 sendq 队列中
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {mysg.releasetime = -1}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 省略被唤醒时局部代码
return true
}
读取数据
读取数据的操作最终是转化成了 chanrecv() 函数,次要逻辑如下:
// selected 和 received 返回值别离代表是否可被 select 语句命中以及是否读取到了数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 如果 channel 为 nil,非阻塞式读取间接返回,否则间接挂起
if c == nil {
if !block {return}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 非阻塞模式并且没有音讯可读(没有缓冲区或者缓冲区为空),如果 channel 未敞开间接返回
if !block && empty(c) {if atomic.Load(&c.closed) == 0 {return}
if empty(c) {
if raceenabled {raceacquire(c.raceaddr())
}
if ep != nil {typedmemclr(c.elemtype, ep)
}
return true, false
}
}
// 加锁
lock(&c.lock)
// channel 已敞开并且没有音讯可读(没有缓冲区或者缓冲区为空),会接管到零值,typedmemclr 会依据类型清理相应地址的内存
if c.closed != 0 && c.qcount == 0 {
if raceenabled {raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {typedmemclr(c.elemtype, ep)
}
return true, false
}
// 期待发送队列不为空,如果是非缓冲型 channel,间接拷贝发送者的数据,否则接管队首的数据,并将发送者的数据挪动到环形队列尾部
if sg := c.sendq.dequeue(); sg != nil {recv(c, sg, ep, func() {unlock(&c.lock) }, 3)
return true, true
}
// 缓冲型 channel,buf 里有元素,能够失常接管
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {raceacquire(qp)
racerelease(qp)
}
if ep != nil {typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {c.recvx = 0}
c.qcount--
unlock(&c.lock)
return true, true
}
// 被阻塞的状况,结构一个 sudog 构造体,保留到 channel 的期待接管队列,并将以后 goroutine 挂起
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {mysg.releasetime = -1}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 省略被唤醒时局部代码
return true, !closed
}
敞开通道
敞开某个 channel,最终会执行函数 closechan(),外围代码如下:
func closechan(c *hchan) {
// 如果 channel 为 nil,间接 panic
if c == nil {panic(plainError("close of nil channel"))
}
// 加锁,如果 channel 已敞开,间接 panic
lock(&c.lock)
if c.closed != 0 {unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
var glist gList
// 开释期待接管队列中,向须要返回值的接收者返回相应的零值
for {sg := c.recvq.dequeue()
if sg == nil {break}
if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 开释期待发送队列,相干的 goroutine 会触发 panic
for {sg := c.sendq.dequeue()
if sg == nil {break}
sg.elem = nil
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// ...
}
常见利用
定时工作
这种用法须要与 timer 联合,分为两种:超时管制和定时执行。
如果须要执行某项操作,但又不想它消耗太长时间,想给它一个超时限度,能够这么做:
select {case <-time.After(100 * time.Millisecond):
case <-s.stopc:
return false
}
期待 100 ms 后,如果 s.stopc 还没有读出数据或者被敞开,就间接完结。
定时执行某个工作也比较简单,例如每隔 1 秒种,执行一次定时工作:
func worker() {ticker := time.Tick(1 * time.Second)
for {
select {
case <- ticker:
// 执行工作
}
}
}
解耦生产者与消费者
应用一个 channel 保留工作,启动 n 个 goroutine 作为工作协程池,这些协程工作在一个有限循环里,从该 channel 读取工作并执行:
func main() {taskCh := make(chan int, 100)
go worker(taskCh)
for i := 0; i < 10; i++ {taskCh <- i}
select {case <-time.After(time.Hour):
}
}
func worker(taskCh <-chan int) {
const N = 5
for i := 0; i < N; i++ {go func(id int) {
for {
task := <- taskCh
fmt.Printf("finish task: %d by worker %d\n", task, id)
time.Sleep(time.Second)
}
}(i)
}
}
管制并发数
有时须要定时执行几百个工作,然而并发数又不能太高,这时就能够通过 channel 来管制并发数。比方上面的例子:
var limit = make(chan int, 3)
func main() {
// …………
for _, w := range work {go func() {
limit <- 1
w()
<-limit
}()}
// …………
}
构建一个容量为 3 的 channel,遍历工作列表,每个工作启动一个 goroutine,真正执行工作的动作在 w() 中实现。在执行 w() 之前,先要从 limit 中拿“许可证”,拿到许可证之后,能力执行 w(),并且在执行完工作,要将“许可证”偿还。要留神的是,如果 w() 产生 panic,那“许可证”可能就还不回去了,因而须要应用 defer 来保障。