golang 1.16.2 am64
以下就将会具体介绍golang的调度流程,不便浏览,将会省略局部无关代码。
rt0_go
咱们从rt0_go开始讲
// Defined as ABIInternal since it does not use the stack-based Go ABI (and// in addition there are no calls to this entry point from Go code).TEXT runtime·rt0_go<ABIInternal>(SB),NOSPLIT,$0 // ... 略初始化 args // create istack out of the given (operating system) stack. // _cgo_init may update stackguard. MOVQ $runtime·g0(SB), DI // DI = &runtime.g0 LEAQ (-64*1024+104)(SP), BX // BX = SP - 64*1024 + 104 MOVQ BX, g_stackguard0(DI) // runtime.g0.stackguard0 = BX MOVQ BX, g_stackguard1(DI) // runtime.g0.stackguard1 = BX MOVQ BX, (g_stack+stack_lo)(DI) // runtime.g0.stack.stack.lo = BX MOVQ SP, (g_stack+stack_hi)(DI) // runtime.g0.stack.stack.hi = SP // ... 略 CPU信息 ok: // set the per-goroutine and per-mach "registers" get_tls(BX) // BX = &g LEAQ runtime·g0(SB), CX // CX = &runtime.g0 MOVQ CX, g(BX) // &g = &runtime.g0, 切换以后g LEAQ runtime·m0(SB), AX // AX = &runtime.m0 // save m->g0 = g0 MOVQ CX, m_g0(AX) // m0.g0 = g // save m0 to g0->m MOVQ AX, g_m(CX) // g.m = m0 CLD // convention is flat标记 D is always left cleared CALL runtime·check(SB) // 做一些类型检查和调度无关 MOVL 16(SP), AX // copy argc MOVL AX, 0(SP) MOVQ 24(SP), AX // copy argv MOVQ AX, 8(SP) CALL runtime·args(SB) // 初始化 args CALL runtime·osinit(SB) // 初始化ncpu和physPageSize CALL runtime·schedinit(SB) // 初始化调度信息上面马上介绍 // create a new goroutine to start program MOVQ $runtime·mainPC(SB), AX // entry, 就是 $runtime·main PUSHQ AX // newproc 的第二个参数 PUSHQ $0 // arg size的第一个参数 CALL runtime·newproc(SB) // 调用 runtime·newproc($0, $runtime·mainPC(SB)) POPQ AX POPQ AX // start this M CALL runtime·mstart(SB) CALL runtime·abort(SB) // mstart should never return RET // Prevent dead-code elimination of debugCallV1, which is // intended to be called by debuggers. MOVQ $runtime·debugCallV1<ABIInternal>(SB), AX RET // mainPC is a function value for runtime.main, to be passed to newproc.// The reference to runtime.main is made via ABIInternal, since the// actual function (not the ABI0 wrapper) is needed by newproc.DATA runtime·mainPC+0(SB)/8,$runtime·main<ABIInternal>(SB)GLOBL runtime·mainPC(SB),RODATA,$8
schedinit
调度器的初始化从 schedinit()函数开始,将会设置m最大个数(maxmcount)及p最大个数(GOMAXPROCS)等
// The bootstrap sequence is://// call osinit// call schedinit// make & queue new G// call runtime·mstart//// The new G calls runtime·main.func schedinit() { // ...略lock rank和race _g_ := getg() sched.maxmcount = 10000 // 设置m的最大数量是10000 worldStopped() // The world starts stopped. 用于lock rank, 疏忽 moduledataverify() // module检测, 疏忽 stackinit() // 栈初始化,详见内存章节, 先疏忽 mallocinit() // 堆初始化,详见内存章节, 先疏忽 fastrandinit() // 随机数初始化,先疏忽 mcommoninit(_g_.m, -1) // 初始化m0信息,详见下文 cpuinit() // 初始化CPU信息,先疏忽 alginit() // 次要初始化哈希算法的值,无关疏忽 modulesinit() // activeModules数据初始化,次要是用于gc的数据, 无关疏忽 typelinksinit() // 次要初始化activeModules的typemap, 无关疏忽 itabsinit() // 初始化interface相干, 无关疏忽 sigsave(&_g_.m.sigmask) // sigmask, 无关疏忽 initSigmask = _g_.m.sigmask // sigmask, 无关疏忽 goargs() // args, 无关疏忽 goenvs() // env, 无关疏忽 parsedebugvars() // 解析debug values, 无关疏忽 gcinit() // GC参数初始化, 详见gc章节, 先疏忽 lock(&sched.lock) sched.lastpoll = uint64(nanotime()) // time of last network poll procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } if procresize(procs) != nil { // 调整P的个数, 这个函数很重要,所有的P都是从这里调配的, throw("unknown runnable goroutine during bootstrap") } unlock(&sched.lock) // World is effectively started now, as P's can run. worldStarted() // For cgocheck > 1, we turn on the write barrier at all times // and check all pointer writes. We can't do this until after // procresize because the write barrier needs a P. if debug.cgocheck > 1 { writeBarrier.cgo = true writeBarrier.enabled = true for _, p := range allp { p.wbBuf.reset() } } if buildVersion == "" { // Condition should never trigger. This code just serves // to ensure runtime·buildVersion is kept in the resulting binary. buildVersion = "unknown" } if len(modinfo) == 1 { // Condition should never trigger. This code just serves // to ensure runtime·modinfo is kept in the resulting binary. modinfo = "" }}
schedinit办法次要实现以下工作: :
- 设置m的最大数量是10000
- 调用mcommoninit初始化m0
- 调用procresize,调整p的数量, 并且绑定m0和p
到当初曾经有了m0 g0和p互相绑定,并且有
mcommoninit
func mcommoninit(mp *m, id int64) { _g_ := getg() // g0 stack won't make sense for user (and is not necessary unwindable). if _g_ != _g_.m.g0 { // 调用runtime.tracebackinit 负责初始化 traceback。 // traceback 是一个函数栈。这些函数会在咱们达到以后执行点之前被调用。 // 举个例子,每次产生一个 panic 时咱们都能够看到它们。 // Traceback 是通过调用 runtime.gentraceback 函数产生的。 // 要让这个函数工作, 咱们须要晓得一些内置函数的地址(例如,因为咱们不心愿它们被蕴含到 traceback 中。 // runtime.traceback 就负责初始化这些地址。 callers(1, mp.createstack[:]) } lock(&sched.lock) if id >= 0 { // id是-1, 生成一个新id mp.id = id } else { mp.id = mReserveID() } // 随机数相干 mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed)) mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed)) if mp.fastrand[0]|mp.fastrand[1] == 0 { mp.fastrand[1] = 1 } // mpreinit,创立gsignal并且调配32k的栈 mpreinit(mp) if mp.gsignal != nil { mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard } // Add to allm so garbage collector doesn't free g->m // when it is just in a register or thread-local storage. // mp加到allm链表中 mp.alllink = allm // NumCgoCall() iterates over allm w/o schedlock, // so we need to publish it safely. atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp)) unlock(&sched.lock) // Allocate memory to hold a cgo traceback if the cgo call crashes. if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" { mp.cgoCallers = new(cgoCallers) }}
mcommoninit办法次要实现以下工作: :
- 运行runtime.tracebackinit初始化M的traceback
- 随机数
- 创立gsignal并且调配32k的栈
- 把m退出allm
procresize
// Change number of processors.//// sched.lock must be held, and the world must be stopped.//// gcworkbufs must not be being modified by either the GC or the write barrier// code, so the GC must not be running if the number of Ps actually changes.//// Returns list of Ps with local work, they need to be scheduled by the caller.func procresize(nprocs int32) *p { // ... 略,运行工夫,各种检测 old := gomaxprocs maskWords := (nprocs + 31) / 32 // Grow allp if necessary. if nprocs > int32(len(allp)) { // 以前的allp不够用,须要扩大 // Synchronize with retake, which could be running // concurrently since it doesn't run on a P. // 扩大allp lock(&allpLock) if nprocs <= int32(cap(allp)) { allp = allp[:nprocs] } else { nallp := make([]*p, nprocs) // Copy everything up to allp's cap so we // never lose old allocated Ps. copy(nallp, allp[:cap(allp)]) allp = nallp } // 扩大idlepMask和timerpMask,用于标记p是idle状态和领有time状态 if maskWords <= int32(cap(idlepMask)) { idlepMask = idlepMask[:maskWords] timerpMask = timerpMask[:maskWords] } else { nidlepMask := make([]uint32, maskWords) // No need to copy beyond len, old Ps are irrelevant. copy(nidlepMask, idlepMask) idlepMask = nidlepMask ntimerpMask := make([]uint32, maskWords) copy(ntimerpMask, timerpMask) timerpMask = ntimerpMask } unlock(&allpLock) } // initialize new P's for i := old; i < nprocs; i++ { pp := allp[i] if pp == nil { pp = new(p) } pp.init(i) atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) } _g_ := getg() if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // 以后g的p依然可用, // continue to use the current P _g_.m.p.ptr().status = _Prunning _g_.m.p.ptr().mcache.prepareForSweep() } else { // schedinit调用的时候还没有p, 所以走这个分支 // release the current P and acquire allp[0]. // // We must do this before destroying our current P // because p.destroy itself has write barriers, so we // need to do that from a valid P. if _g_.m.p != 0 { if trace.enabled { // Pretend that we were descheduled // and then scheduled again to keep // the trace sane. traceGoSched() traceProcStop(_g_.m.p.ptr()) } _g_.m.p.ptr().m = 0 // P存在解绑 P->M } _g_.m.p = 0 // 解绑 M -> P p := allp[0] // p.m = 0 // 解绑 allp[0] -> m p.status = _Pidle // 调整status, 这些都是因为acquirep外面会断定条件 acquirep(p) // 绑定 p和以后m if trace.enabled { traceGoStart() } } // g.m.p is now set, so we no longer need mcache0 for bootstrapping. // 以前有些g没有p的时候长期用这个,当初大家都有p了就用不到了, 而且也被复用了,不能能够无锁拜访 mcache0 = nil // release resources from unused P's for i := nprocs; i < old; i++ { p := allp[i] p.destroy() // destroy开释p的所有资源,并且把p的状态改为_Pdead // can't free P itself because it can be referenced by an M in syscall } // 删掉多余的 allp. if int32(len(allp)) != nprocs { lock(&allpLock) allp = allp[:nprocs] idlepMask = idlepMask[:maskWords] timerpMask = timerpMask[:maskWords] unlock(&allpLock) } // runnablePs收集所有非idle非以后运行的p var runnablePs *p for i := nprocs - 1; i >= 0; i-- { p := allp[i] if _g_.m.p.ptr() == p { continue } p.status = _Pidle if runqempty(p) { pidleput(p) } else { p.m.set(mget()) // 从新绑定一个m p.link.set(runnablePs) runnablePs = p } } stealOrder.reset(uint32(nprocs)) var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) return runnablePs}
procresize办法次要实现以下工作: :
- 初始化allp,如果原来的不够就新加一些,如果多了就开释一些,(没有真的开释p, 只是开释外面的数据)
- runnablePs 获取所有非idle非以后运行的p , 并给他们绑定一个新m
acquirep
// Associate p and the current m.//// This function is allowed to have write barriers even if the caller// isn't because it immediately acquires _p_.////go:yeswritebarrierrecfunc acquirep(_p_ *p) { // Do the part that isn't allowed to have write barriers. // 把__p__和g.m互相绑定,并且把_p_.status 从_Pidle转为_Prunning wirep(_p_) // Have p; write barriers now allowed. // Perform deferred mcache flush before this P can allocate // from a potentially stale mcache. _p_.mcache.prepareForSweep() if trace.enabled { traceProcStart() }}
援用文章
[1] Go语言底细(6):启动和内存调配初始化
https://studygolang.com/artic...