关于golang:golangselect详解

31次阅读

共计 8316 个字符,预计需要花费 21 分钟才能阅读完成。

golang select 详解

前言

select 是 golang 用来做 channel 多路复用的一种技术,和 switch 的语法很像,不过每个 case 只能够有一个 channel,send 操作和 receive 操作都应用“<-”操作符,在 send 语句中,channel 和值别离在操作符左右两边,在 receive 语句中,操作符放在 channel 操作数的后面。
示例:

    c0 := make(chan struct{})
    c1 := make(chan int, 100)
    for {
        select {
        case <-c0:
            return
        case <-c1:
            return
        }
    }

select 与 channel

之前 channel 详解文章中讲到过 channel 的阻塞写、阻塞读、非阻塞写、非阻塞读,这里不再赘述,须要阐明的是,select 不止用来做 channel 的非阻塞操作,次要是用来作为多路复用操作 channel 的,机制和 linux 的 select 很像
不同的写法会触发不同的机制,上面咱们看看示例

// 阻塞读,对应 channel 的 chanrecv1 函数
select {
case <-c0:
    return
}

// 非阻塞读,对应 channel 的 selectnbrecv 函数
select {
case <-c0:
    return
default:
    return
}

// 多路复用
select {
case <-c0:
    return
case <-c1:
    return
default:
    return
}

从下面的代码中能够看出 select 的三种机制
1:只有一个 case,并且没有 default,相当于 <- c0 的写法,阻塞读写数据
2:一个 case,一个 default,就会间接对应 channel 的非阻塞读写数据
3:有多个 case,对应了真正的 select 多路复用机制,case 随机执行,源码位于 runtime/select.go
明天咱们次要来讨论一下第三种机制

数据结构

const (
    caseNil = iota
    caseRecv
    caseSend
    caseDefault
)

type scase struct {
    c    *hchan         // channel
    elem unsafe.Pointer // 发送或者承受数据的变量地址
    kind uint16         // case 类型,<-, >-, default 对应上方常量
    //...
}

因为非 default 的 case 中都与 Channel 的发送和接收数据无关,所以在 scase 构造体中也蕴含一个 c 字段用于存储 case 中应用的 Channel,elem 是用于接管或者发送数据的变量地址、kind 示意以后 case 的品种

运行时

代码执行流程:/reflect/value.go/Select -> /runtime/select.go/reflect_rselect -> /runtime/select.go/selectgo

这里次要讲一下 select 下的两个函数
reflect_rselect:

func reflect_rselect(cases []runtimeSelect) (int, bool) {
    // 判断 case 数量
    if len(cases) == 0 {block()
    }

    // 构建 case 数组
    sel := make([]scase, len(cases))
    
    // 二倍的 case 长度 uint16 数组
    order := make([]uint16, 2*len(cases))

    // 组装 case 数组
    for i := range cases {rc := &cases[i]
        switch rc.dir {
        case selectDefault:
            sel[i] = scase{kind: caseDefault}
        case selectSend:
            sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
        case selectRecv:
            sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
        }
        if raceenabled || msanenabled {selectsetpc(&sel[i])
        }
    }

    return selectgo(&sel[0], &order[0], len(cases))
}

从下面代码正文能够看进去,这个函数次要是为了组装 case 数组,每个元素就是一个 scase 构造

上面是本章的重点,selectgo 函数,咱们先理解一下 selectgo 函数里都做了些什么事
1、打乱数组程序(随机获取 case)
2、锁定所有 channel
3、遍历所有 channel,判断是否有可读或者可写的,如果有,解锁 channel, 返回对应数据
4、否则,判断有没有 default,如果有,解锁 channel,返回 default 对应 scase
5、否则,把以后 groutian 增加到所有 channel 的期待队列里,解锁所有 channel,期待被唤醒
6、被唤醒后,再次锁定所有 channel
7、遍历所有 channel,把 g 从 channel 期待队列中移除,并找到可操作的 channel
8、如果对应的 scase 不为空,间接返回对应的值
9、否则循环此过程

代码解析:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    //...
    // 应用 fastrandn 随机算法,设置 pollorder 数组,前面会依据这个数组进行循环,以达到随机 case
    for i := 1; i < ncases; i++ {j := fastrandn(uint32(i + 1))
        pollorder[i] = pollorder[j]
        pollorder[j] = uint16(i)
    }

    // 这段代码是对 lockorder 做一个堆排序
    // 所有的 goroutian 进来 lockorder 都是雷同排序
    // 避免不同程序的 case 进来时锁定 channel 导致死锁
    for i := 0; i < ncases; i++ {
        j := i
        // Start with the pollorder to permute cases on the same channel.
        c := scases[pollorder[i]].c
        for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {k := (j - 1) / 2
            lockorder[j] = lockorder[k]
            j = k
        }
        lockorder[j] = pollorder[i]
    }
    for i := ncases - 1; i >= 0; i-- {o := lockorder[i]
        c := scases[o].c
        lockorder[i] = lockorder[0]
        j := 0
        for {
            k := j*2 + 1
            if k >= i {break}
            if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {k++}
            if c.sortkey() < scases[lockorder[k]].c.sortkey() {lockorder[j] = lockorder[k]
                j = k
                continue
            }
            break
        }
        lockorder[j] = o
    }

    // 依据 lockorder 的程序
    sellock(scases, lockorder)

    var (
        gp     *g
        sg     *sudog
        c      *hchan
        k      *scase
        sglist *sudog
        sgnext *sudog
        qp     unsafe.Pointer
        nextp  **sudog
    )

loop:
    // pass 1 - look for something already waiting
    var dfli int
    var dfl *scase
    var casi int
    var cas *scase
    var recvOK bool
    // 循环所有 case
    for i := 0; i < ncases; i++ {
        // 依据 pollorder 找到 scases 数组下标
        casi = int(pollorder[i])
        cas = &scases[casi]
        c = cas.c

        switch cas.kind {
        // 如果 kind 为 0,间接 continue
        case caseNil:
            continue
        // 如果 kind 为 1,代表是接管
        case caseRecv:
            // 从 channel 的发送队列中获取 groutian,如果有,跳到 recv 代码块
            sg = c.sendq.dequeue()
            if sg != nil {goto recv}

            // 判断 channel 是否为带缓冲的,并且缓冲区有值,跳到 bufrecv 代码块
            if c.qcount > 0 {goto bufrecv}

            // 如果 channel 曾经敞开,跳到 rclose 代码块
            if c.closed != 0 {goto rclose}

        // 如果 kind 为 2, 代表是发送
        case caseSend:
            //send 时先判断是否敞开
            // 如果 channel 曾经敞开,跳到 sclose 代码块
            if c.closed != 0 {goto sclose}
            // 如果 channel 的读取队列里存在 groutian, 跳到 send 代码块
            sg = c.recvq.dequeue()
            if sg != nil {goto send}
            // 如果 channel 为缓冲型,并且数据没满,跳转到 bufsend 代码块
            if c.qcount < c.dataqsiz {goto bufsend}
        // 如果 kind 为 3,执行 default 逻辑
        case caseDefault:
            dfli = casi
            dfl = cas
        }
    }

    // 代码能走到这里,阐明所有的 channel 都不具备读取的机会,判断是否有 default
    // 如果存在 default,先解锁所有 channel,跳转到 retc 代码块
    if dfl != nil {selunlock(scases, lockorder)
        casi = dfli
        cas = dfl
        goto retc
    }

    // 创立一个 goroutian 构造
    gp = getg()
    if gp.waiting != nil {throw("gp.waiting != nil")
    }

    // 循环 scases,把 groutian 存储到 channel 对应的读写队列中
    // 设置 gp.waiting 为 sudog 队列
    nextp = &gp.waiting
    for _, casei := range lockorder {casi = int(casei)
        cas = &scases[casi]
        if cas.kind == caseNil {continue}
        c = cas.c
        // 构建 sudog
        sg := acquireSudog()
        sg.g = gp
        sg.isSelect = true
        // No stack splits between assigning elem and enqueuing
        // sg on gp.waiting where copystack can find it.
        sg.elem = cas.elem
        sg.releasetime = 0
        if t0 != 0 {sg.releasetime = -1}
        sg.c = c
        //gp.waiting 队列增加数据
        *nextp = sg
        nextp = &sg.waitlink
        // 如果 kind 为 1,保留在 channel 的接管队列中
        switch cas.kind {
        case caseRecv:
            c.recvq.enqueue(sg)
        // 如果 kind 为 2,保留在 channel 的发送队列中
        case caseSend:
            c.sendq.enqueue(sg)
        }
    }

    // wait for someone to wake us up
    // 设置 goroutian 的回调,如果有 channel 唤醒 goroutian,会把对应的 sudog 保留到 param 中
    gp.param = nil

    // 挂起 goroutian,selparkcommit 会给所有 channel 解锁
    gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
    gp.activeStackChans = false

    // 唤醒后先给 channel 加锁
    sellock(scases, lockorder)
    gp.selectDone = 0

    // 唤醒 groutian 对应的 sudog
    sg = (*sudog)(gp.param)
    gp.param = nil

    // pass 3 - dequeue from unsuccessful chans
    // otherwise they stack up on quiet channels
    // record the successful case, if any.
    // We singly-linked up the SudoGs in lock order.
    casi = -1
    cas = nil

    //sglist 为所有 sudog 链表
    sglist = gp.waiting
    // Clear all elem before unlinking from gp.waiting.
    for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
        sg1.isSelect = false
        sg1.elem = nil
        sg1.c = nil
    }
    gp.waiting = nil

    // 循环所有 case
    for _, casei := range lockorder {k = &scases[casei]
        if k.kind == caseNil {continue}
        if sglist.releasetime > 0 {k.releasetime = sglist.releasetime}
        // 找到唤醒 goroutian 的 sudog
        if sg == sglist {
            // sg has already been dequeued by the G that woke us up.
            casi = int(casei)
            cas = k
        } else { // 从对应的读写队列中删除 sudog
            c = k.c
            if k.kind == caseSend {c.sendq.dequeueSudoG(sglist)
            } else {c.recvq.dequeueSudoG(sglist)
            }
        }
        // 从链表中获取下一个 sudog,持续循环、删除读写队列
        sgnext = sglist.waitlink
        sglist.waitlink = nil
        releaseSudog(sglist)
        sglist = sgnext
    }

    if cas == nil {
        // 如果唤醒的 case 为 nil,从 loop 从新开始
        goto loop
    }

    c = cas.c

    if debugSelect {print("wait-return: cas0=", cas0, "c=", c, "cas=", cas, "kind=", cas.kind, "\n")
    }

    // 如果是 case 是接管
    if cas.kind == caseRecv {recvOK = true}
    // 持续锁定 channel
    selunlock(scases, lockorder)

    // 跳转到 retc 代码块
    goto retc

    //channel 的缓冲区有数据时,间接从缓冲区获取数据
bufrecv:
    recvOK = true
    qp = chanbuf(c, c.recvx)
    // 如果有接管值,把数据地址存入 elem 中
    if cas.elem != nil {typedmemmove(c.elemtype, cas.elem, qp)
    }
    typedmemclr(c.elemtype, qp)
    // 接管索引往后挪一位或者初始化为 0
    c.recvx++
    if c.recvx == c.dataqsiz {c.recvx = 0}
    // 缓冲区的数据量缩小一个
    c.qcount--

    // 解锁所有 channel
    selunlock(scases, lockorder)

    // 跳转到 retc 代码块
    goto retc

    //channel 的缓冲区有闲暇地位时,把数据间接写入 buffer 中
bufsend:
    // 设置数据到缓冲区
    typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)

    // 发送下标向后移动或者初始化为 0
    c.sendx++
    if c.sendx == c.dataqsiz {c.sendx = 0}

    // 缓冲区中数据量加 1
    c.qcount++
    // 解锁 channel
    selunlock(scases, lockorder)
    // 跳转到 retc 代码区
    goto retc

    // 如果发送队列中有 groutian
recv:
    // can receive from sleeping sender (sg)
    // 从发送的 sudog 中获取数据
    // 解锁 channel
    // 唤醒 goroutian
    recv(c, sg, cas.elem, func() {selunlock(scases, lockorder) }, 2)
    if debugSelect {print("syncrecv: cas0=", cas0, "c=", c, "\n")
    }
    recvOK = true
    goto retc

    // 接管时 channel 已敞开
rclose:
    // read at end of closed channel
    // 解锁 channel
    selunlock(scases, lockorder)
    recvOK = false

    // 如果有有接管值,eg:case a := <- chan0, 把数据地址赋值给 elem
    if cas.elem != nil {typedmemclr(c.elemtype, cas.elem)
    }
    if raceenabled {raceacquire(c.raceaddr())
    }
    goto retc

    // 发送时 channel 中存在接管 goroutian
send:
    // 把数据发送到接管的 goroutian 中
    // 解锁 channel
    // 唤醒 goroutian
    send(c, sg, cas.elem, func() {selunlock(scases, lockorder) }, 2)
    if debugSelect {print("syncsend: cas0=", cas0, "c=", c, "\n")
    }
    goto retc

    // 返回
retc:
    if cas.releasetime > 0 {blockevent(cas.releasetime-t0, 1)
    }
    // 返回对应 case 的下标,如果是接管,返回 recvOK,channel 敞开时为 false
    return casi, recvOK

    // 发送时 channel 已敞开,解锁 channel,间接 panic
sclose:
    // send on closed channel
    selunlock(scases, lockorder)
    panic(plainError("send on closed channel"))
}

recv 接管办法

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果为非缓冲区
    if c.dataqsiz == 0 {
        //...
        if ep != nil {
            // 从 sender 队列中间接复制数据 =
            recvDirect(c.elemtype, sg, ep)
        }
    } else {qp := chanbuf(c, c.recvx)
        //...
        // 如果承受变量不为空,合乎数据到 ep
        if ep != nil {typedmemmove(c.elemtype, ep, qp)
        }
        // 从缓冲区复制数据
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {c.recvx = 0}
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g

    // 解锁 channel
    unlockf()
    // 发送者的 param 设置
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {sg.releasetime = cputicks()
    }
    // 唤醒 goroutian
    goready(gp, skip+1)
}

send 办法:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    //...
    // 发送数据如果不为空
    if sg.elem != nil {
        // 间接把数据写入接收者 goroutian
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    // 解锁 channel
    unlockf()
    // 接管 groutian 的 param 赋值
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {sg.releasetime = cputicks()
    }
    // 唤醒 groutian
    goready(gp, skip+1)
}

以上就是 select 的外围代码解析,能够对着正文和下面的图一起看,如果有一些 channel 的常识不是很明确,能够先看下 channel 详解,置信你肯定会有所播种

正文完
 0