乐趣区

关于golang:golang调度学习调度流程

golang 1.16.2 am64
以下就将会具体介绍 golang 的调度流程,不便浏览,将会省略局部无关代码。

rt0_go

咱们从 rt0_go 开始讲

// Defined as ABIInternal since it does not use the stack-based Go ABI (and
// in addition there are no calls to this entry point from Go code).
TEXT runtime·rt0_go<ABIInternal>(SB),NOSPLIT,$0
    // ... 略初始化 args  

    // create istack out of the given (operating system) stack.
    // _cgo_init may update stackguard.
    MOVQ    $runtime·g0(SB), DI                                 // DI = &runtime.g0
    LEAQ    (-64*1024+104)(SP), BX                              // BX = SP - 64*1024 + 104
    MOVQ    BX, g_stackguard0(DI)                               // runtime.g0.stackguard0 = BX
    MOVQ    BX, g_stackguard1(DI)                               // runtime.g0.stackguard1 = BX
    MOVQ    BX, (g_stack+stack_lo)(DI)                          // runtime.g0.stack.stack.lo = BX
    MOVQ    SP, (g_stack+stack_hi)(DI)                          // runtime.g0.stack.stack.hi = SP

    // ... 略 CPU 信息  
ok:
    // set the per-goroutine and per-mach "registers"
    get_tls(BX)                                         // BX = &g
    LEAQ    runtime·g0(SB), CX                          // CX = &runtime.g0
    MOVQ    CX, g(BX)                                   // &g = &runtime.g0, 切换以后 g
    LEAQ    runtime·m0(SB), AX                          // AX = &runtime.m0

    // save m->g0 = g0
    MOVQ    CX, m_g0(AX)                                // m0.g0 = g
    // save m0 to g0->m 
    MOVQ    AX, g_m(CX)                                 //  g.m = m0

    CLD                                                    // convention is flat 标记 D is always left cleared
    CALL    runtime·check(SB)                           // 做一些类型检查和调度无关

    MOVL    16(SP), AX                                    // copy argc
    MOVL    AX, 0(SP)
    MOVQ    24(SP), AX                                    // copy argv
    MOVQ    AX, 8(SP)
    CALL    runtime·args(SB)                            // 初始化 args
    CALL    runtime·osinit(SB)                          // 初始化 ncpu 和 physPageSize
    CALL    runtime·schedinit(SB)                       // 初始化调度信息上面马上介绍

    // create a new goroutine to start program
    MOVQ    $runtime·mainPC(SB), AX                        // entry, 就是 $runtime·main
    PUSHQ    AX                                          // newproc 的第二个参数
    PUSHQ    $0                                            // arg size 的第一个参数
    CALL    runtime·newproc(SB)                         // 调用 runtime·newproc($0, $runtime·mainPC(SB))
    POPQ    AX
    POPQ    AX

    // start this M
    CALL    runtime·mstart(SB)

    CALL    runtime·abort(SB)    // mstart should never return
    RET

    // Prevent dead-code elimination of debugCallV1, which is
    // intended to be called by debuggers.
    MOVQ    $runtime·debugCallV1<ABIInternal>(SB), AX
    RET
    
// mainPC is a function value for runtime.main, to be passed to newproc.
// The reference to runtime.main is made via ABIInternal, since the
// actual function (not the ABI0 wrapper) is needed by newproc.
DATA runtime·mainPC+0(SB)/8,$runtime·main<ABIInternal>(SB)
GLOBL runtime·mainPC(SB),RODATA,$8

schedinit

调度器的初始化从 schedinit() 函数开始,将会设置 m 最大个数(maxmcount)及 p 最大个数(GOMAXPROCS)等

// The bootstrap sequence is:
//
//    call osinit
//    call schedinit
//    make & queue new G
//    call runtime·mstart
//
// The new G calls runtime·main.
func schedinit() {
    // ... 略 lock rank 和 race
    
    _g_ := getg()
    sched.maxmcount = 10000                 // 设置 m 的最大数量是 10000
    
    worldStopped()                          // The world starts stopped. 用于 lock rank, 疏忽
    moduledataverify()                      // module 检测, 疏忽
    stackinit()                             // 栈初始化,详见内存章节, 先疏忽
    mallocinit()                            // 堆初始化,详见内存章节, 先疏忽
    fastrandinit()                          // 随机数初始化,先疏忽
    mcommoninit(_g_.m, -1)                  // 初始化 m0 信息,详见下文
    cpuinit()                               // 初始化 CPU 信息,先疏忽
    alginit()                               // 次要初始化哈希算法的值,无关疏忽
    modulesinit()                           // activeModules 数据初始化,次要是用于 gc 的数据, 无关疏忽
    typelinksinit()                         // 次要初始化 activeModules 的 typemap,无关疏忽
    itabsinit()                             // 初始化 interface 相干,无关疏忽

    sigsave(&_g_.m.sigmask)                 // sigmask, 无关疏忽
    initSigmask = _g_.m.sigmask             // sigmask, 无关疏忽

    goargs()                                // args, 无关疏忽
    goenvs()                                // env, 无关疏忽
    parsedebugvars()                        // 解析 debug values,无关疏忽
    gcinit()                                // GC 参数初始化,详见 gc 章节, 先疏忽

    lock(&sched.lock)
    sched.lastpoll = uint64(nanotime())     // time of last network poll
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {procs = n}
    if procresize(procs) != nil {           // 调整 P 的个数,这个函数很重要,所有的 P 都是从这里调配的,throw("unknown runnable goroutine during bootstrap")
    }
    unlock(&sched.lock)

    // World is effectively started now, as P's can run.
    worldStarted()

    // For cgocheck > 1, we turn on the write barrier at all times
    // and check all pointer writes. We can't do this until after
    // procresize because the write barrier needs a P.
    if debug.cgocheck > 1 {
        writeBarrier.cgo = true
        writeBarrier.enabled = true
        for _, p := range allp {p.wbBuf.reset()
        }
    }

    if buildVersion == "" {
        // Condition should never trigger. This code just serves
        // to ensure runtime·buildVersion is kept in the resulting binary.
        buildVersion = "unknown"
    }
    if len(modinfo) == 1 {
        // Condition should never trigger. This code just serves
        // to ensure runtime·modinfo is kept in the resulting binary.
        modinfo = ""
    }
}

schedinit 办法次要实现以下工作::

  1. 设置 m 的最大数量是 10000
  2. 调用 mcommoninit 初始化 m0
  3. 调用 procresize, 调整 p 的数量, 并且绑定 m0 和 p
    到当初曾经有了 m0 g0 和 p 互相绑定,并且有

mcommoninit

func mcommoninit(mp *m, id int64) {_g_ := getg()

    // g0 stack won't make sense for user (and is not necessary unwindable).
    if _g_ != _g_.m.g0 {
        // 调用 runtime.tracebackinit 负责初始化 traceback。// traceback 是一个函数栈。这些函数会在咱们达到以后执行点之前被调用。// 举个例子,每次产生一个 panic 时咱们都能够看到它们。// Traceback 是通过调用 runtime.gentraceback 函数产生的。// 要让这个函数工作,咱们须要晓得一些内置函数的地址(例如,因为咱们不心愿它们被蕴含到 traceback 中。// runtime.traceback 就负责初始化这些地址。callers(1, mp.createstack[:])
    }

    lock(&sched.lock)

    if id >= 0 {                                                    // id 是 -1,生成一个新 id
        mp.id = id
    } else {mp.id = mReserveID()
    }

    // 随机数相干
    mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed))
    mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
    if mp.fastrand[0]|mp.fastrand[1] == 0 {mp.fastrand[1] = 1
    }
    // mpreinit,创立 gsignal 并且调配 32k 的栈
    mpreinit(mp)
    if mp.gsignal != nil {mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard}

    // Add to allm so garbage collector doesn't free g->m
    // when it is just in a register or thread-local storage.
    // mp 加到 allm 链表中
    mp.alllink = allm

    // NumCgoCall() iterates over allm w/o schedlock,
    // so we need to publish it safely.
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)

    // Allocate memory to hold a cgo traceback if the cgo call crashes.
    if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" {mp.cgoCallers = new(cgoCallers)
    }
}

mcommoninit 办法次要实现以下工作::

  1. 运行 runtime.tracebackinit 初始化 M 的 traceback
  2. 随机数
  3. 创立 gsignal 并且调配 32k 的栈
  4. 把 m 退出 allm

procresize

// Change number of processors.
//
// sched.lock must be held, and the world must be stopped.
//
// gcworkbufs must not be being modified by either the GC or the write barrier
// code, so the GC must not be running if the number of Ps actually changes.
//
// Returns list of Ps with local work, they need to be scheduled by the caller.
func procresize(nprocs int32) *p {
    // ... 略,运行工夫,各种检测

    old := gomaxprocs
    maskWords := (nprocs + 31) / 32
    // Grow allp if necessary.
    if nprocs > int32(len(allp)) {     // 以前的 allp 不够用,须要扩大
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
       // 扩大 allp
        lock(&allpLock)
        if nprocs <= int32(cap(allp)) {allp = allp[:nprocs]
        } else {nallp := make([]*p, nprocs)
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        // 扩大 idlepMask 和 timerpMask,用于标记 p 是 idle 状态和领有 time 状态
        if maskWords <= int32(cap(idlepMask)) {idlepMask = idlepMask[:maskWords]
            timerpMask = timerpMask[:maskWords]
        } else {nidlepMask := make([]uint32, maskWords)
            // No need to copy beyond len, old Ps are irrelevant.
            copy(nidlepMask, idlepMask)
            idlepMask = nidlepMask

            ntimerpMask := make([]uint32, maskWords)
            copy(ntimerpMask, timerpMask)
            timerpMask = ntimerpMask
        }
        unlock(&allpLock)
    }

    // initialize new P's
    for i := old; i < nprocs; i++ {pp := allp[i]
        if pp == nil {pp = new(p)
        }
        pp.init(i)
        atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
    }

    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {          // 以后 g 的 p 依然可用, 
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning
        _g_.m.p.ptr().mcache.prepareForSweep()
    } else {                                                // schedinit 调用的时候还没有 p, 所以走这个分支
        // release the current P and acquire allp[0].
        //
        // We must do this before destroying our current P
        // because p.destroy itself has write barriers, so we
        // need to do that from a valid P.
        if _g_.m.p != 0 {
            if trace.enabled {
                // Pretend that we were descheduled
                // and then scheduled again to keep
                // the trace sane.
                traceGoSched()
                traceProcStop(_g_.m.p.ptr())
            }
            _g_.m.p.ptr().m = 0                             // P 存在解绑 P->M}
        _g_.m.p = 0                                         // 解绑 M -> P
        p := allp[0]                                        // 
        p.m = 0                                             // 解绑 allp[0] -> m
        p.status = _Pidle                                   // 调整 status, 这些都是因为 acquirep 外面会断定条件
        acquirep(p)                                         // 绑定 p 和以后 m
        if trace.enabled {traceGoStart()
        }
    }
    // g.m.p is now set, so we no longer need mcache0 for bootstrapping.
    // 以前有些 g 没有 p 的时候长期用这个,当初大家都有 p 了就用不到了,而且也被复用了,不能能够无锁拜访
    mcache0 = nil                                           

    // release resources from unused P's
    for i := nprocs; i < old; i++ {p := allp[i]
        p.destroy()                                         // destroy 开释 p 的所有资源,并且把 p 的状态改为_Pdead
        // can't free P itself because it can be referenced by an M in syscall
    }

    // 删掉多余的 allp.
    if int32(len(allp)) != nprocs {lock(&allpLock)
        allp = allp[:nprocs]
        idlepMask = idlepMask[:maskWords]
        timerpMask = timerpMask[:maskWords]
        unlock(&allpLock)
    }
    
    // runnablePs 收集所有非 idle 非以后运行的 p 
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {p := allp[i]
        if _g_.m.p.ptr() == p {continue}
        p.status = _Pidle
        if runqempty(p) {pidleput(p)
        } else {p.m.set(mget())                             // 从新绑定一个 m
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    return runnablePs
}

procresize 办法次要实现以下工作::

  1. 初始化 allp,如果原来的不够就新加一些,如果多了就开释一些,(没有真的开释 p, 只是开释外面的数据)
  2. runnablePs 获取所有非 idle 非以后运行的 p , 并给他们绑定一个新 m

acquirep

// Associate p and the current m.
//
// This function is allowed to have write barriers even if the caller
// isn't because it immediately acquires _p_.
//
//go:yeswritebarrierrec
func acquirep(_p_ *p) {
    // Do the part that isn't allowed to have write barriers.
    // 把__p__和 g.m 互相绑定,并且把_p_.status 从_Pidle 转为_Prunning
    wirep(_p_)

    // Have p; write barriers now allowed.

    // Perform deferred mcache flush before this P can allocate
    // from a potentially stale mcache.
    _p_.mcache.prepareForSweep()

    if trace.enabled {traceProcStart()
    }
}

援用文章

[1] Go 语言底细(6):启动和内存调配初始化 https://studygolang.com/artic…

退出移动版