共计 8316 个字符,预计需要花费 21 分钟才能阅读完成。
golang select 详解
前言
select 是 golang 用来做 channel 多路复用的一种技术,和 switch 的语法很像,不过每个 case 只能够有一个 channel,send 操作和 receive 操作都应用“<-”操作符,在 send 语句中,channel 和值别离在操作符左右两边,在 receive 语句中,操作符放在 channel 操作数的后面。
示例:
c0 := make(chan struct{})
c1 := make(chan int, 100)
for {
select {
case <-c0:
return
case <-c1:
return
}
}
select 与 channel
之前 channel 详解文章中讲到过 channel 的阻塞写、阻塞读、非阻塞写、非阻塞读,这里不再赘述,须要阐明的是,select 不止用来做 channel 的非阻塞操作,次要是用来作为多路复用操作 channel 的,机制和 linux 的 select 很像
不同的写法会触发不同的机制,上面咱们看看示例
// 阻塞读,对应 channel 的 chanrecv1 函数
select {
case <-c0:
return
}
// 非阻塞读,对应 channel 的 selectnbrecv 函数
select {
case <-c0:
return
default:
return
}
// 多路复用
select {
case <-c0:
return
case <-c1:
return
default:
return
}
从下面的代码中能够看出 select 的三种机制
1:只有一个 case,并且没有 default,相当于 <- c0 的写法,阻塞读写数据
2:一个 case,一个 default,就会间接对应 channel 的非阻塞读写数据
3:有多个 case,对应了真正的 select 多路复用机制,case 随机执行,源码位于 runtime/select.go
明天咱们次要来讨论一下第三种机制
数据结构
const (
caseNil = iota
caseRecv
caseSend
caseDefault
)
type scase struct {
c *hchan // channel
elem unsafe.Pointer // 发送或者承受数据的变量地址
kind uint16 // case 类型,<-, >-, default 对应上方常量
//...
}
因为非 default
的 case
中都与 Channel 的发送和接收数据无关,所以在 scase
构造体中也蕴含一个 c
字段用于存储 case
中应用的 Channel,elem
是用于接管或者发送数据的变量地址、kind
示意以后 case
的品种
运行时
代码执行流程:/reflect/value.go/Select ->
/runtime/select.go/reflect_rselect ->
/runtime/select.go/selectgo
这里次要讲一下 select 下的两个函数
reflect_rselect:
func reflect_rselect(cases []runtimeSelect) (int, bool) {
// 判断 case 数量
if len(cases) == 0 {block()
}
// 构建 case 数组
sel := make([]scase, len(cases))
// 二倍的 case 长度 uint16 数组
order := make([]uint16, 2*len(cases))
// 组装 case 数组
for i := range cases {rc := &cases[i]
switch rc.dir {
case selectDefault:
sel[i] = scase{kind: caseDefault}
case selectSend:
sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
case selectRecv:
sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
}
if raceenabled || msanenabled {selectsetpc(&sel[i])
}
}
return selectgo(&sel[0], &order[0], len(cases))
}
从下面代码正文能够看进去,这个函数次要是为了组装 case 数组,每个元素就是一个 scase 构造
上面是本章的重点,selectgo 函数,咱们先理解一下 selectgo 函数里都做了些什么事
1、打乱数组程序(随机获取 case)
2、锁定所有 channel
3、遍历所有 channel,判断是否有可读或者可写的,如果有,解锁 channel, 返回对应数据
4、否则,判断有没有 default,如果有,解锁 channel,返回 default 对应 scase
5、否则,把以后 groutian 增加到所有 channel 的期待队列里,解锁所有 channel,期待被唤醒
6、被唤醒后,再次锁定所有 channel
7、遍历所有 channel,把 g 从 channel 期待队列中移除,并找到可操作的 channel
8、如果对应的 scase 不为空,间接返回对应的值
9、否则循环此过程
代码解析:
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
//...
// 应用 fastrandn 随机算法,设置 pollorder 数组,前面会依据这个数组进行循环,以达到随机 case
for i := 1; i < ncases; i++ {j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
// 这段代码是对 lockorder 做一个堆排序
// 所有的 goroutian 进来 lockorder 都是雷同排序
// 避免不同程序的 case 进来时锁定 channel 导致死锁
for i := 0; i < ncases; i++ {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := ncases - 1; i >= 0; i-- {o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {break}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {k++}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
// 依据 lockorder 的程序
sellock(scases, lockorder)
var (
gp *g
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
loop:
// pass 1 - look for something already waiting
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
// 循环所有 case
for i := 0; i < ncases; i++ {
// 依据 pollorder 找到 scases 数组下标
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch cas.kind {
// 如果 kind 为 0,间接 continue
case caseNil:
continue
// 如果 kind 为 1,代表是接管
case caseRecv:
// 从 channel 的发送队列中获取 groutian,如果有,跳到 recv 代码块
sg = c.sendq.dequeue()
if sg != nil {goto recv}
// 判断 channel 是否为带缓冲的,并且缓冲区有值,跳到 bufrecv 代码块
if c.qcount > 0 {goto bufrecv}
// 如果 channel 曾经敞开,跳到 rclose 代码块
if c.closed != 0 {goto rclose}
// 如果 kind 为 2, 代表是发送
case caseSend:
//send 时先判断是否敞开
// 如果 channel 曾经敞开,跳到 sclose 代码块
if c.closed != 0 {goto sclose}
// 如果 channel 的读取队列里存在 groutian, 跳到 send 代码块
sg = c.recvq.dequeue()
if sg != nil {goto send}
// 如果 channel 为缓冲型,并且数据没满,跳转到 bufsend 代码块
if c.qcount < c.dataqsiz {goto bufsend}
// 如果 kind 为 3,执行 default 逻辑
case caseDefault:
dfli = casi
dfl = cas
}
}
// 代码能走到这里,阐明所有的 channel 都不具备读取的机会,判断是否有 default
// 如果存在 default,先解锁所有 channel,跳转到 retc 代码块
if dfl != nil {selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}
// 创立一个 goroutian 构造
gp = getg()
if gp.waiting != nil {throw("gp.waiting != nil")
}
// 循环 scases,把 groutian 存储到 channel 对应的读写队列中
// 设置 gp.waiting 为 sudog 队列
nextp = &gp.waiting
for _, casei := range lockorder {casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {continue}
c = cas.c
// 构建 sudog
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {sg.releasetime = -1}
sg.c = c
//gp.waiting 队列增加数据
*nextp = sg
nextp = &sg.waitlink
// 如果 kind 为 1,保留在 channel 的接管队列中
switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)
// 如果 kind 为 2,保留在 channel 的发送队列中
case caseSend:
c.sendq.enqueue(sg)
}
}
// wait for someone to wake us up
// 设置 goroutian 的回调,如果有 channel 唤醒 goroutian,会把对应的 sudog 保留到 param 中
gp.param = nil
// 挂起 goroutian,selparkcommit 会给所有 channel 解锁
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
gp.activeStackChans = false
// 唤醒后先给 channel 加锁
sellock(scases, lockorder)
gp.selectDone = 0
// 唤醒 groutian 对应的 sudog
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
casi = -1
cas = nil
//sglist 为所有 sudog 链表
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
// 循环所有 case
for _, casei := range lockorder {k = &scases[casei]
if k.kind == caseNil {continue}
if sglist.releasetime > 0 {k.releasetime = sglist.releasetime}
// 找到唤醒 goroutian 的 sudog
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
} else { // 从对应的读写队列中删除 sudog
c = k.c
if k.kind == caseSend {c.sendq.dequeueSudoG(sglist)
} else {c.recvq.dequeueSudoG(sglist)
}
}
// 从链表中获取下一个 sudog,持续循环、删除读写队列
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
if cas == nil {
// 如果唤醒的 case 为 nil,从 loop 从新开始
goto loop
}
c = cas.c
if debugSelect {print("wait-return: cas0=", cas0, "c=", c, "cas=", cas, "kind=", cas.kind, "\n")
}
// 如果是 case 是接管
if cas.kind == caseRecv {recvOK = true}
// 持续锁定 channel
selunlock(scases, lockorder)
// 跳转到 retc 代码块
goto retc
//channel 的缓冲区有数据时,间接从缓冲区获取数据
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
// 如果有接管值,把数据地址存入 elem 中
if cas.elem != nil {typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
// 接管索引往后挪一位或者初始化为 0
c.recvx++
if c.recvx == c.dataqsiz {c.recvx = 0}
// 缓冲区的数据量缩小一个
c.qcount--
// 解锁所有 channel
selunlock(scases, lockorder)
// 跳转到 retc 代码块
goto retc
//channel 的缓冲区有闲暇地位时,把数据间接写入 buffer 中
bufsend:
// 设置数据到缓冲区
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
// 发送下标向后移动或者初始化为 0
c.sendx++
if c.sendx == c.dataqsiz {c.sendx = 0}
// 缓冲区中数据量加 1
c.qcount++
// 解锁 channel
selunlock(scases, lockorder)
// 跳转到 retc 代码区
goto retc
// 如果发送队列中有 groutian
recv:
// can receive from sleeping sender (sg)
// 从发送的 sudog 中获取数据
// 解锁 channel
// 唤醒 goroutian
recv(c, sg, cas.elem, func() {selunlock(scases, lockorder) }, 2)
if debugSelect {print("syncrecv: cas0=", cas0, "c=", c, "\n")
}
recvOK = true
goto retc
// 接管时 channel 已敞开
rclose:
// read at end of closed channel
// 解锁 channel
selunlock(scases, lockorder)
recvOK = false
// 如果有有接管值,eg:case a := <- chan0, 把数据地址赋值给 elem
if cas.elem != nil {typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {raceacquire(c.raceaddr())
}
goto retc
// 发送时 channel 中存在接管 goroutian
send:
// 把数据发送到接管的 goroutian 中
// 解锁 channel
// 唤醒 goroutian
send(c, sg, cas.elem, func() {selunlock(scases, lockorder) }, 2)
if debugSelect {print("syncsend: cas0=", cas0, "c=", c, "\n")
}
goto retc
// 返回
retc:
if cas.releasetime > 0 {blockevent(cas.releasetime-t0, 1)
}
// 返回对应 case 的下标,如果是接管,返回 recvOK,channel 敞开时为 false
return casi, recvOK
// 发送时 channel 已敞开,解锁 channel,间接 panic
sclose:
// send on closed channel
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}
recv 接管办法
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果为非缓冲区
if c.dataqsiz == 0 {
//...
if ep != nil {
// 从 sender 队列中间接复制数据 =
recvDirect(c.elemtype, sg, ep)
}
} else {qp := chanbuf(c, c.recvx)
//...
// 如果承受变量不为空,合乎数据到 ep
if ep != nil {typedmemmove(c.elemtype, ep, qp)
}
// 从缓冲区复制数据
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {c.recvx = 0}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
// 解锁 channel
unlockf()
// 发送者的 param 设置
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
// 唤醒 goroutian
goready(gp, skip+1)
}
send 办法:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//...
// 发送数据如果不为空
if sg.elem != nil {
// 间接把数据写入接收者 goroutian
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
// 解锁 channel
unlockf()
// 接管 groutian 的 param 赋值
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {sg.releasetime = cputicks()
}
// 唤醒 groutian
goready(gp, skip+1)
}
以上就是 select 的外围代码解析,能够对着正文和下面的图一起看,如果有一些 channel 的常识不是很明确,能够先看下 channel 详解,置信你肯定会有所播种