关于golang:Goroutine的创建

49次阅读

共计 6379 个字符,预计需要花费 16 分钟才能阅读完成。

原文

传送门
Go 代码中,利用关键字go 启动协程。编译器发现 go func(…),将调用 newproc

package main
go func(){...}

/*
能够应用 go tool compile -S ./main.go 失去汇编代码
CALL    runtime.newproc(SB)
*/

func newproc(fn *funcval)

  1. 创立一个 g 来运行 fn
  2. 将 g 放入 g 期待队列中, 期待被调度
  3. 编译器会把 go 语句转化为调用 newproc
func newproc(fn *funcval) {
    //【获取以后调用方正在运行的 G】gp := getg()
    //【获取以后调用方 PC/IP 寄存器值】pc := getcallerpc()
    //【用 g0 栈创立 G 对象】systemstack(func () {newg := newproc1(fn, gp, pc)

        _p_ := getg().m.p.ptr()
        // newg 放入待运行队列
        runqput(_p_, newg, true)

        if mainStarted {
            // wackp 核心思想就是 寻找资源 执行 newg
            wakep()}
    })
}

fn *funcval

其中 newproc 函数有 1 个参数 fn 是一个可变参数类型

type funcval struct {
    fn uintptr
    // variable-size, fn-specific data here
}

如果咱们有 go 程序


func add(x, y int) int {
    z := x + y
    return z
}
func main() {
    x := 0x100
    y := 0x200
    go add(x, y)
}

那么对于 newproc 中参数 fn 构造体,扩大进去是这样的:

type funcval struct {
    fn uintptr
    x int
    y int
}

所以用”fn+ptrsize“跳过第一个函数指针参数,就能够取得参数 x 的地址

getg()、getcallerpc()

getg()返回以后 G 的指针;函数如下:

// getg returns the pointer to the current g.
// The compiler rewrites calls to this function into instructions
// 编译器会把这个 getg 指令翻译成从专用寄存器取
// that fetch the g directly (from TLS or from the dedicated register).
func getg() *g

间接从寄存器中读取就行。参考如下汇编代码:
Go1.17 R14 寄存器存的就是 g 地址

TEXT runtime.acquirem(SB) /usr/local/go/src/runtime/runtime1.go
      0x104a3e0   MOVQ 0x30(R14), CX ;; CX = &g     
      0x104a3e4   INCL 0x108(CX)  ;; g.m.locks++         
      0x104a3ea   MOVQ 0x30(R14), AX  ;; AX= &m    
      0x104a3ee   RET ;;return &m

0x30(R14) 代表的是 g.m , 这里能够察看下 g 构造体

type g struct {
    stack       stack   // offset 0x0
    stackguard0 uintptr // offset 0x10
    stackguard1 uintptr // offset 0x18

    _panic    *_panic // offset 0x20
    _defer    *_defer // offset 0x28
    m         *m      // offset 0x30
    ...

getcallerpc()函数和 getcallersp()函数是一对。

前者返回程序计数寄存器指针;后者返回栈顶指针。
然而要留神:getcallersp 的后果在返回时是正确的,
然而它可能会因为随后对函数的调用导致栈扩容而生效。
个别规定是应该立刻应用 getcallersp 的后果且只能传递给 nosplit 函数。

func newproc1(fn funcval, callergp g, callerpc uintptr) *g

  • 本函数创立一个_Grunnable 的 g
  • g 执行从 fn 开始
  • callerpc 是调用 go func 的语句的中央
  • caller 有任务将新创建的 g 退出运行时调度

源码 1

// Create a new g in state _Grunnable, starting at fn. callerpc is the
// address of the go statement that created this. The caller is responsible
// for adding the new g to the scheduler.
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {_g_ := getg()

    //【1】fn 函数指针不能为空;为空时,就设置此时与 g 相关联 m 的 throwing 变量值;顺便抛出异样。if fn == nil {
        _g_.m.throwing = -1 // do not dump full stacks
        throw("go of nil func value")
    }
    //【2】禁用抢占,因为在接下来的执行中, 会应用到 p, 在此期间, 不容许 p 和 m 拆散
    acquirem() // disable preemption because it can be holding p in a local var

    //【3】获取 p, 而后从 p.gfree 中取一个 g
    _p_ := _g_.m.p.ptr()
    //【4】gfget 获取 p 中的 free g 或者从全局 gfree 里取一个 g
    newg := gfget(_p_)

    if newg == nil {
        //【5】malg 如果没有可用的 g, 就申请一个新 g
        newg = malg(_StackMin)
        // 将 g 的状态改成_Gdead
        casgstatus(newg, _Gidle, _Gdead)
        // 将新 g 退出 allg,_Gdead 状态保障了 gc 不会去关注新 g 的栈空间
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }

gfget

gfget 核心思想其实就是复用 g, 从 gfree 链表里取
如果本地队列为空,就从全局队列里取

首先要明确本地 list 构造体和全局 list 构造体的申明:

// p 本地
// 用法:// p.gFree.glist.pop() | .push()
// p.gFree.n-- | .n++


gFree struct {
    gList
    n int32
}

// 全局
// Global cache of dead G's.
gFree struct {
    lock    mutex
    stack   gList // Gs with stacks
    noStack gList // Gs without stacks
    n       int32
}
// Get from gfree list.
// If local list is empty, grab a batch from global list.
func gfget(_p_ *p) *g {
retry:
    // _p_.gFree.empty() 如果本地队列为空
    // !sched.gFree.stack.empty() 并且全局有栈队列不为空 
    // || !sched.gFree.noStack.empty() 或 全局无栈链表不为空
    // 则进入该分支
    if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
        // 全局 gFree 加锁
        lock(&sched.gFree.lock)
        // Move a batch of free Gs to the P.
        // 将最多 32 个 free g 退出 P 的本地 gfree 链表
        for _p_.gFree.n < 32 {
            // Prefer Gs with stacks.
            // 优先有栈 g
            gp := sched.gFree.stack.pop()
            if gp == nil {
                // 优先有栈 g,切实没有就应用无栈 g
                gp = sched.gFree.noStack.pop()
                if gp == nil {break}
            }
            // 每次将全局 g 退出 p.gfree,就将全局 g 的数量减一
            sched.gFree.n--
            _p_.gFree.push(gp)
            _p_.gFree.n++
        }
        // 全局 gFree 解锁
        unlock(&sched.gFree.lock)
        goto retry
    }

    /*【本地链表非空,就出栈;判断 g 是否为 nil,是 nil 间接返回 外表本地和全局都无 free g】*/
    gp := _p_.gFree.pop()
    if gp == nil {return nil}
    _p_.gFree.n--
    if gp.stack.lo == 0 {
        // 如果是空栈 g 就调配一个栈空间
        // Stack was deallocated in gfput. Allocate a new one.
        systemstack(func() {gp.stack = stackalloc(_FixedStack)
        })
        // 设置 g 决裂的爱护线
        gp.stackguard0 = gp.stack.lo + _StackGuard
    } 
    return gp
}

源码 2


    totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
    totalSize = alignUp(totalSize, sys.StackAlign)
    sp := newg.stack.hi - totalSize
    spArg := sp
    if usesLR {
        // caller's LR
        *(*uintptr)(unsafe.Pointer(sp)) = 0
        prepGoExitFrame(sp)
        spArg += sys.MinFrameSize
    }
    // 下面几行代码就是为了确定 sp 的地位
    // 清空 g.sched 目标是 初始化 gobuf(g 切换用于爱护现场的构造)
    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))

    newg.sched.sp = sp
    newg.stktopsp = sp
    newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn)

    // g.gopc 代表返回地址是调用方执行 go func 的中央
    newg.gopc = callerpc
    // saveAncestors 此函数用于保留本人的”先人“;此函数还会设置”轨迹 trace“,咱们能够用”go tool trace“来跟踪 go 程序中的线程。// 参考 https://lessisbetter.site/2019/03/26/golang-scheduler-2-macro-view/
    newg.ancestors = saveAncestors(callergp)
    // g 开始执行中央是 fn.fn
    newg.startpc = fn.fn
    if _g_.m.curg != nil {// 这是一个判断。除了 g0,每个 G 的创立都由其余的 G 调用”go func()“执行;这个调用的 G 就是 curg。// 如果创立这个 G 存在一个这样的 curg,那么他们的标签设置为一样的;此标签也能够用于分析器的跟踪。newg.labels = _g_.m.curg.labels
    }
    if isSystemGoroutine(newg, false) {atomic.Xadd(&sched.ngsys, +1)
    }
    // Track initial transition?
    newg.trackingSeq = uint8(fastrand())
    if newg.trackingSeq%gTrackingPeriod == 0 {newg.tracking = true}
    // 将 newg 状态 设置成 _Grunnable
    casgstatus(newg, _Gdead, _Grunnable)

    if _p_.goidcache == _p_.goidcacheend {
        // 如果本地 p 曾经没有可调配的 goid 了就尝试获取 _GoidCacheBatch=16 个

        // Sched.goidgen is the last allocated id,
        // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
        // At startup sched.goidgen=0, so main goroutine receives goid=1.
        _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
        _p_.goidcache -= _GoidCacheBatch - 1
        _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
    }
    // 给 newg 一个惟一 id
    newg.goid = int64(_p_.goidcache)
    _p_.goidcache++

    releasem(_g_.m)

    return newg

func runqput(_p_ p, gp g, next bool)

runqput 尝试将 g 退出 p.runnext
并将 old runnext 退出 p 的本地队列中
如果本地队列是满了,就把 g 和一半本地队列 退出全局队列 参考 runqputslow

// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool) {if randomizeScheduler && next && fastrandn(2) == 0 {next = false}

    if next {
    // runqput 尝试将 g 退出 p.runnext
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {goto retryNext}
        if oldnext == 0 {return}
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()}

retry:
    // 并将 old runnext 退出 p 的本地队列中
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    if t-h < uint32(len(_p_.runq)) {_p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 如果本地队列是满了,就把 g 和一半本地队列 退出全局队列 参考 runqputslow
    if runqputslow(_p_, gp, h, t) {return}
    // the queue is not full, now the put above must succeed
    goto retry
}

正文完
 0