golang select 详解
前言
select 是golang用来做channel多路复用的一种技术,和switch的语法很像,不过每个case只能够有一个channel,send 操作和 receive 操作都应用 “<-” 操作符,在 send 语句中,channel 和值别离在操作符左右两边,在 receive 语句中,操作符放在 channel 操作数的后面。
示例:
c0 := make(chan struct{})
c1 := make(chan int, 100)
for {
select {
case <-c0:
return
case <-c1:
return
}
}
select与channel
之前channel详解文章中讲到过channel的阻塞写、阻塞读、非阻塞写、非阻塞读,这里不再赘述,须要阐明的是,select不止用来做channel的非阻塞操作,次要是用来作为多路复用操作channel的,机制和linux的select很像
不同的写法会触发不同的机制,上面咱们看看示例
// 阻塞读,对应channel的 chanrecv1函数
select {
case <-c0:
return
}
// 非阻塞读,对应channel的 selectnbrecv 函数
select {
case <-c0:
return
default:
return
}
// 多路复用
select {
case <-c0:
return
case <-c1:
return
default:
return
}
从下面的代码中能够看出select的三种机制
1:只有一个case,并且没有default,相当于 <- c0的写法,阻塞读写数据
2:一个case,一个default,就会间接对应channel的非阻塞读写数据
3:有多个case,对应了真正的select多路复用机制,case随机执行,源码位于runtime/select.go
明天咱们次要来讨论一下第三种机制
数据结构
const (
caseNil = iota
caseRecv
caseSend
caseDefault
)
type scase struct {
c *hchan // channel
elem unsafe.Pointer // 发送或者承受数据的变量地址
kind uint16 // case类型,<-, >-, default 对应上方常量
//...
}
因为非 default
的 case
中都与 Channel 的发送和接收数据无关,所以在 scase
构造体中也蕴含一个 c
字段用于存储 case
中应用的 Channel,elem
是用于接管或者发送数据的变量地址、kind
示意以后 case
的品种
运行时
代码执行流程:/reflect/value.go/Select ->
/runtime/select.go/reflect_rselect ->
/runtime/select.go/selectgo
这里次要讲一下select下的两个函数
reflect_rselect:
func reflect_rselect(cases []runtimeSelect) (int, bool) {
//判断case数量
if len(cases) == 0 {
block()
}
//构建case数组
sel := make([]scase, len(cases))
//二倍的case长度 uint16数组
order := make([]uint16, 2*len(cases))
//组装case数组
for i := range cases {
rc := &cases[i]
switch rc.dir {
case selectDefault:
sel[i] = scase{kind: caseDefault}
case selectSend:
sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
case selectRecv:
sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
}
if raceenabled || msanenabled {
selectsetpc(&sel[i])
}
}
return selectgo(&sel[0], &order[0], len(cases))
}
从下面代码正文能够看进去,这个函数次要是为了组装case数组,每个元素就是一个scase构造
上面是本章的重点,selectgo函数,咱们先理解一下selectgo函数里都做了些什么事
1、打乱数组程序(随机获取case)
2、锁定所有channel
3、遍历所有channel,判断是否有可读或者可写的,如果有,解锁channel,返回对应数据
4、否则,判断有没有default,如果有,解锁channel,返回default对应scase
5、否则,把以后groutian增加到所有channel的期待队列里,解锁所有channel,期待被唤醒
6、被唤醒后,再次锁定所有channel
7、遍历所有channel,把g从channel期待队列中移除,并找到可操作的channel
8、如果对应的scase不为空,间接返回对应的值
9、否则循环此过程
代码解析:
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
//...
// 应用fastrandn随机算法,设置pollorder数组,前面会依据这个数组进行循环,以达到随机case
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
// 这段代码是对lockorder做一个堆排序
// 所有的goroutian进来lockorder都是雷同排序
// 避免不同程序的case进来时锁定channel导致死锁
for i := 0; i < ncases; i++ {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := ncases - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
//依据lockorder的程序
sellock(scases, lockorder)
var (
gp *g
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
loop:
// pass 1 - look for something already waiting
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
//循环所有case
for i := 0; i < ncases; i++ {
//依据pollorder找到scases数组下标
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch cas.kind {
//如果kind为0,间接continue
case caseNil:
continue
//如果kind为1,代表是接管
case caseRecv:
//从channel的发送队列中获取groutian,如果有,跳到recv代码块
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
//判断channel是否为带缓冲的,并且缓冲区有值,跳到bufrecv代码块
if c.qcount > 0 {
goto bufrecv
}
//如果channel曾经敞开,跳到rclose代码块
if c.closed != 0 {
goto rclose
}
//如果kind为2,代表是发送
case caseSend:
//send时先判断是否敞开
//如果channel曾经敞开,跳到sclose代码块
if c.closed != 0 {
goto sclose
}
//如果channel的读取队列里存在groutian,跳到send代码块
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
//如果channel为缓冲型,并且数据没满,跳转到bufsend代码块
if c.qcount < c.dataqsiz {
goto bufsend
}
//如果kind为3,执行default逻辑
case caseDefault:
dfli = casi
dfl = cas
}
}
//代码能走到这里,阐明所有的channel都不具备读取的机会,判断是否有default
//如果存在default,先解锁所有channel,跳转到retc代码块
if dfl != nil {
selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}
// 创立一个goroutian构造
gp = getg()
if gp.waiting != nil {
throw("gp.waiting != nil")
}
//循环scases,把groutian存储到channel对应的读写队列中
//设置gp.waiting为sudog队列
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {
continue
}
c = cas.c
//构建sudog
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
//gp.waiting队列增加数据
*nextp = sg
nextp = &sg.waitlink
//如果kind为1,保留在channel的接管队列中
switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)
//如果kind为2,保留在channel的发送队列中
case caseSend:
c.sendq.enqueue(sg)
}
}
// wait for someone to wake us up
//设置goroutian的回调,如果有channel唤醒goroutian,会把对应的sudog保留到param中
gp.param = nil
//挂起goroutian,selparkcommit会给所有channel解锁
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
gp.activeStackChans = false
//唤醒后先给channel加锁
sellock(scases, lockorder)
gp.selectDone = 0
//唤醒groutian对应的sudog
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
casi = -1
cas = nil
//sglist为所有sudog链表
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
//循环所有case
for _, casei := range lockorder {
k = &scases[casei]
if k.kind == caseNil {
continue
}
if sglist.releasetime > 0 {
k.releasetime = sglist.releasetime
}
//找到唤醒goroutian的sudog
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
} else { //从对应的读写队列中删除sudog
c = k.c
if k.kind == caseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
//从链表中获取下一个sudog,持续循环、删除读写队列
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
if cas == nil {
//如果唤醒的case为nil,从loop从新开始
goto loop
}
c = cas.c
if debugSelect {
print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
}
//如果是case是接管
if cas.kind == caseRecv {
recvOK = true
}
//持续锁定channel
selunlock(scases, lockorder)
//跳转到retc代码块
goto retc
//channel的缓冲区有数据时,间接从缓冲区获取数据
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
//如果有接管值,把数据地址存入elem中
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
//接管索引往后挪一位或者初始化为0
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
//缓冲区的数据量缩小一个
c.qcount--
//解锁所有channel
selunlock(scases, lockorder)
//跳转到retc代码块
goto retc
//channel的缓冲区有闲暇地位时,把数据间接写入buffer中
bufsend:
//设置数据到缓冲区
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
//发送下标向后移动或者初始化为0
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
//缓冲区中数据量加1
c.qcount++
//解锁channel
selunlock(scases, lockorder)
//跳转到retc代码区
goto retc
//如果发送队列中有groutian
recv:
// can receive from sleeping sender (sg)
// 从发送的sudog中获取数据
// 解锁channel
// 唤醒goroutian
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncrecv: cas0=", cas0, " c=", c, "\n")
}
recvOK = true
goto retc
//接管时channel已敞开
rclose:
// read at end of closed channel
// 解锁channel
selunlock(scases, lockorder)
recvOK = false
// 如果有有接管值, eg: case a := <- chan0,把数据地址赋值给elem
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {
raceacquire(c.raceaddr())
}
goto retc
//发送时channel中存在接管goroutian
send:
//把数据发送到接管的goroutian中
//解锁channel
//唤醒goroutian
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: cas0=", cas0, " c=", c, "\n")
}
goto retc
//返回
retc:
if cas.releasetime > 0 {
blockevent(cas.releasetime-t0, 1)
}
//返回对应case的下标,如果是接管,返回recvOK,channel敞开时为false
return casi, recvOK
//发送时channel已敞开,解锁channel,间接panic
sclose:
// send on closed channel
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}
recv接管办法
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//如果为非缓冲区
if c.dataqsiz == 0 {
//...
if ep != nil {
// 从sender队列中间接复制数据=
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
//...
//如果承受变量不为空,合乎数据到ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
//从缓冲区复制数据
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
//解锁channel
unlockf()
//发送者的param设置
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//唤醒goroutian
goready(gp, skip+1)
}
send办法:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//...
//发送数据如果不为空
if sg.elem != nil {
//间接把数据写入接收者goroutian
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
//解锁channel
unlockf()
//接管groutian的param赋值
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//唤醒groutian
goready(gp, skip+1)
}
以上就是select的外围代码解析,能够对着正文和下面的图一起看,如果有一些channel的常识不是很明确,能够先看下channel详解,置信你肯定会有所播种
发表回复