golang select 详解
前言
select 是golang用来做channel多路复用的一种技术,和switch的语法很像,不过每个case只能够有一个channel,send 操作和 receive 操作都应用 “<-” 操作符,在 send 语句中,channel 和值别离在操作符左右两边,在 receive 语句中,操作符放在 channel 操作数的后面。
示例:
c0 := make(chan struct{}) c1 := make(chan int, 100) for { select { case <-c0: return case <-c1: return } }
select与channel
之前channel详解文章中讲到过channel的阻塞写、阻塞读、非阻塞写、非阻塞读,这里不再赘述,须要阐明的是,select不止用来做channel的非阻塞操作,次要是用来作为多路复用操作channel的,机制和linux的select很像
不同的写法会触发不同的机制,上面咱们看看示例
// 阻塞读,对应channel的 chanrecv1函数select {case <-c0: return}// 非阻塞读,对应channel的 selectnbrecv 函数select {case <-c0: returndefault: return}// 多路复用select {case <-c0: returncase <-c1: returndefault: return}
从下面的代码中能够看出select的三种机制
1:只有一个case,并且没有default,相当于 <- c0的写法,阻塞读写数据
2:一个case,一个default,就会间接对应channel的非阻塞读写数据
3:有多个case,对应了真正的select多路复用机制,case随机执行,源码位于runtime/select.go
明天咱们次要来讨论一下第三种机制
数据结构
const ( caseNil = iota caseRecv caseSend caseDefault)type scase struct { c *hchan // channel elem unsafe.Pointer // 发送或者承受数据的变量地址 kind uint16 // case类型,<-, >-, default 对应上方常量 //...}
因为非 default
的 case
中都与 Channel 的发送和接收数据无关,所以在 scase
构造体中也蕴含一个 c
字段用于存储 case
中应用的 Channel,elem
是用于接管或者发送数据的变量地址、kind
示意以后 case
的品种
运行时
代码执行流程:/reflect/value.go/Select ->
/runtime/select.go/reflect_rselect ->
/runtime/select.go/selectgo
这里次要讲一下select下的两个函数
reflect_rselect:
func reflect_rselect(cases []runtimeSelect) (int, bool) { //判断case数量 if len(cases) == 0 { block() } //构建case数组 sel := make([]scase, len(cases)) //二倍的case长度 uint16数组 order := make([]uint16, 2*len(cases)) //组装case数组 for i := range cases { rc := &cases[i] switch rc.dir { case selectDefault: sel[i] = scase{kind: caseDefault} case selectSend: sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val} case selectRecv: sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val} } if raceenabled || msanenabled { selectsetpc(&sel[i]) } } return selectgo(&sel[0], &order[0], len(cases))}
从下面代码正文能够看进去,这个函数次要是为了组装case数组,每个元素就是一个scase构造
上面是本章的重点,selectgo函数,咱们先理解一下selectgo函数里都做了些什么事
1、打乱数组程序(随机获取case)
2、锁定所有channel
3、遍历所有channel,判断是否有可读或者可写的,如果有,解锁channel,返回对应数据
4、否则,判断有没有default,如果有,解锁channel,返回default对应scase
5、否则,把以后groutian增加到所有channel的期待队列里,解锁所有channel,期待被唤醒
6、被唤醒后,再次锁定所有channel
7、遍历所有channel,把g从channel期待队列中移除,并找到可操作的channel
8、如果对应的scase不为空,间接返回对应的值
9、否则循环此过程
代码解析:
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) { //... // 应用fastrandn随机算法,设置pollorder数组,前面会依据这个数组进行循环,以达到随机case for i := 1; i < ncases; i++ { j := fastrandn(uint32(i + 1)) pollorder[i] = pollorder[j] pollorder[j] = uint16(i) } // 这段代码是对lockorder做一个堆排序 // 所有的goroutian进来lockorder都是雷同排序 // 避免不同程序的case进来时锁定channel导致死锁 for i := 0; i < ncases; i++ { j := i // Start with the pollorder to permute cases on the same channel. c := scases[pollorder[i]].c for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() { k := (j - 1) / 2 lockorder[j] = lockorder[k] j = k } lockorder[j] = pollorder[i] } for i := ncases - 1; i >= 0; i-- { o := lockorder[i] c := scases[o].c lockorder[i] = lockorder[0] j := 0 for { k := j*2 + 1 if k >= i { break } if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() { k++ } if c.sortkey() < scases[lockorder[k]].c.sortkey() { lockorder[j] = lockorder[k] j = k continue } break } lockorder[j] = o } //依据lockorder的程序 sellock(scases, lockorder) var ( gp *g sg *sudog c *hchan k *scase sglist *sudog sgnext *sudog qp unsafe.Pointer nextp **sudog )loop: // pass 1 - look for something already waiting var dfli int var dfl *scase var casi int var cas *scase var recvOK bool //循环所有case for i := 0; i < ncases; i++ { //依据pollorder找到scases数组下标 casi = int(pollorder[i]) cas = &scases[casi] c = cas.c switch cas.kind { //如果kind为0,间接continue case caseNil: continue //如果kind为1,代表是接管 case caseRecv: //从channel的发送队列中获取groutian,如果有,跳到recv代码块 sg = c.sendq.dequeue() if sg != nil { goto recv } //判断channel是否为带缓冲的,并且缓冲区有值,跳到bufrecv代码块 if c.qcount > 0 { goto bufrecv } //如果channel曾经敞开,跳到rclose代码块 if c.closed != 0 { goto rclose } //如果kind为2,代表是发送 case caseSend: //send时先判断是否敞开 //如果channel曾经敞开,跳到sclose代码块 if c.closed != 0 { goto sclose } //如果channel的读取队列里存在groutian,跳到send代码块 sg = c.recvq.dequeue() if sg != nil { goto send } //如果channel为缓冲型,并且数据没满,跳转到bufsend代码块 if c.qcount < c.dataqsiz { goto bufsend } //如果kind为3,执行default逻辑 case caseDefault: dfli = casi dfl = cas } } //代码能走到这里,阐明所有的channel都不具备读取的机会,判断是否有default //如果存在default,先解锁所有channel,跳转到retc代码块 if dfl != nil { selunlock(scases, lockorder) casi = dfli cas = dfl goto retc } // 创立一个goroutian构造 gp = getg() if gp.waiting != nil { throw("gp.waiting != nil") } //循环scases,把groutian存储到channel对应的读写队列中 //设置gp.waiting为sudog队列 nextp = &gp.waiting for _, casei := range lockorder { casi = int(casei) cas = &scases[casi] if cas.kind == caseNil { continue } c = cas.c //构建sudog sg := acquireSudog() sg.g = gp sg.isSelect = true // No stack splits between assigning elem and enqueuing // sg on gp.waiting where copystack can find it. sg.elem = cas.elem sg.releasetime = 0 if t0 != 0 { sg.releasetime = -1 } sg.c = c //gp.waiting队列增加数据 *nextp = sg nextp = &sg.waitlink //如果kind为1,保留在channel的接管队列中 switch cas.kind { case caseRecv: c.recvq.enqueue(sg) //如果kind为2,保留在channel的发送队列中 case caseSend: c.sendq.enqueue(sg) } } // wait for someone to wake us up //设置goroutian的回调,如果有channel唤醒goroutian,会把对应的sudog保留到param中 gp.param = nil //挂起goroutian,selparkcommit会给所有channel解锁 gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1) gp.activeStackChans = false //唤醒后先给channel加锁 sellock(scases, lockorder) gp.selectDone = 0 //唤醒groutian对应的sudog sg = (*sudog)(gp.param) gp.param = nil // pass 3 - dequeue from unsuccessful chans // otherwise they stack up on quiet channels // record the successful case, if any. // We singly-linked up the SudoGs in lock order. casi = -1 cas = nil //sglist为所有sudog链表 sglist = gp.waiting // Clear all elem before unlinking from gp.waiting. for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink { sg1.isSelect = false sg1.elem = nil sg1.c = nil } gp.waiting = nil //循环所有case for _, casei := range lockorder { k = &scases[casei] if k.kind == caseNil { continue } if sglist.releasetime > 0 { k.releasetime = sglist.releasetime } //找到唤醒goroutian的sudog if sg == sglist { // sg has already been dequeued by the G that woke us up. casi = int(casei) cas = k } else { //从对应的读写队列中删除sudog c = k.c if k.kind == caseSend { c.sendq.dequeueSudoG(sglist) } else { c.recvq.dequeueSudoG(sglist) } } //从链表中获取下一个sudog,持续循环、删除读写队列 sgnext = sglist.waitlink sglist.waitlink = nil releaseSudog(sglist) sglist = sgnext } if cas == nil { //如果唤醒的case为nil,从loop从新开始 goto loop } c = cas.c if debugSelect { print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " kind=", cas.kind, "\n") } //如果是case是接管 if cas.kind == caseRecv { recvOK = true } //持续锁定channel selunlock(scases, lockorder) //跳转到retc代码块 goto retc //channel的缓冲区有数据时,间接从缓冲区获取数据bufrecv: recvOK = true qp = chanbuf(c, c.recvx) //如果有接管值,把数据地址存入elem中 if cas.elem != nil { typedmemmove(c.elemtype, cas.elem, qp) } typedmemclr(c.elemtype, qp) //接管索引往后挪一位或者初始化为0 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } //缓冲区的数据量缩小一个 c.qcount-- //解锁所有channel selunlock(scases, lockorder) //跳转到retc代码块 goto retc //channel的缓冲区有闲暇地位时,把数据间接写入buffer中bufsend: //设置数据到缓冲区 typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem) //发送下标向后移动或者初始化为0 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } //缓冲区中数据量加1 c.qcount++ //解锁channel selunlock(scases, lockorder) //跳转到retc代码区 goto retc //如果发送队列中有groutianrecv: // can receive from sleeping sender (sg) // 从发送的sudog中获取数据 // 解锁channel // 唤醒goroutian recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) if debugSelect { print("syncrecv: cas0=", cas0, " c=", c, "\n") } recvOK = true goto retc //接管时channel已敞开rclose: // read at end of closed channel // 解锁channel selunlock(scases, lockorder) recvOK = false // 如果有有接管值, eg: case a := <- chan0,把数据地址赋值给elem if cas.elem != nil { typedmemclr(c.elemtype, cas.elem) } if raceenabled { raceacquire(c.raceaddr()) } goto retc //发送时channel中存在接管goroutiansend: //把数据发送到接管的goroutian中 //解锁channel //唤醒goroutian send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2) if debugSelect { print("syncsend: cas0=", cas0, " c=", c, "\n") } goto retc //返回retc: if cas.releasetime > 0 { blockevent(cas.releasetime-t0, 1) } //返回对应case的下标,如果是接管,返回recvOK,channel敞开时为false return casi, recvOK //发送时channel已敞开,解锁channel,间接panicsclose: // send on closed channel selunlock(scases, lockorder) panic(plainError("send on closed channel"))}
recv接管办法
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { //如果为非缓冲区 if c.dataqsiz == 0 { //... if ep != nil { // 从sender队列中间接复制数据= recvDirect(c.elemtype, sg, ep) } } else { qp := chanbuf(c, c.recvx) //... //如果承受变量不为空,合乎数据到ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } //从缓冲区复制数据 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g //解锁channel unlockf() //发送者的param设置 gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } //唤醒goroutian goready(gp, skip+1)}
send办法:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { //... //发送数据如果不为空 if sg.elem != nil { //间接把数据写入接收者goroutian sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g //解锁channel unlockf() //接管groutian的param赋值 gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } //唤醒groutian goready(gp, skip+1)}
以上就是select的外围代码解析,能够对着正文和下面的图一起看,如果有一些channel的常识不是很明确,能够先看下channel详解,置信你肯定会有所播种