乐趣区

深入理解Gogoroutine的实现及调度器分析

在学习 Go 的过程中,最让人惊叹的莫过于 goroutine 了。但是 goroutine 是什么,我们用 go 关键字就可以创建一个 goroutine,这么多的 goroutine 之间,是如何调度的呢?

1. 结构概览

在看 Go 源码的过程中,遍地可见 g、p、m,我们首先就看一下这些关键字的结构及相互之间的关系

1.1. 数据结构

这里我们仅列出来了结构体里面比较关键的一些成员

1.1.1. G(gouroutine)

goroutine 是运行时的最小执行单元

type g struct {
    // Stack parameters.
    // stack describes the actual stack memory: [stack.lo, stack.hi).
    // stackguard0 is the stack pointer compared in the Go stack growth prologue.
    // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
    // stackguard1 is the stack pointer compared in the C stack growth prologue.
    // It is stack.lo+StackGuard on g0 and gsignal stacks.
    // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
  // 当前 g 使用的栈空间,stack 结构包括 [lo, hi]两个成员
    stack       stack   // offset known to runtime/cgo
  // 用于检测是否需要进行栈扩张,go 代码使用
    stackguard0 uintptr // offset known to liblink
  // 用于检测是否需要进行栈扩展,原生代码使用的
    stackguard1 uintptr // offset known to liblink
  // 当前 g 所绑定的 m
    m              *m      // current m; offset known to arm liblink
  // 当前 g 的调度数据,当 goroutine 切换时,保存当前 g 的上下文,用于恢复
    sched          gobuf
    // g 当前的状态
    atomicstatus   uint32
  // 当前 g 的 id
    goid           int64
  // 下一个 g 的地址,通过 guintptr 结构体的 ptr set 函数可以设置和获取下一个 g,通过这个字段和 sched.gfreeStack sched.gfreeNoStack 可以把 free g 串成一个链表
    schedlink      guintptr
  // 判断 g 是否允许被抢占
    preempt        bool       // preemption signal, duplicates stackguard0 = stackpreempt
    // g 是否要求要回到这个 M 执行, 有的时候 g 中断了恢复会要求使用原来的 M 执行
    lockedm        muintptr
}

1.1.2. P(process)

P 是 M 运行 G 所需的资源

type p struct {
   lock mutex

   id          int32
   // p 的状态,稍后介绍
   status      uint32 // one of pidle/prunning/...
   // 下一个 p 的地址,可参考 g.schedlink
   link        puintptr
   // p 所关联的 m
   m           muintptr   // back-link to associated m (nil if idle)
   // 内存分配的时候用的,p 所属的 m 的 mcache 用的也是这个
   mcache      *mcache
  
   // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
   // 从 sched 中获取并缓存的 id,避免每次分配 goid 都从 sched 分配
     goidcache    uint64
     goidcacheend uint64

   // Queue of runnable goroutines. Accessed without lock.
   // p 本地的 runnbale 的 goroutine 形成的队列
   runqhead uint32
   runqtail uint32
   runq     [256]guintptr
   // runnext, if non-nil, is a runnable G that was ready'd by
   // the current G and should be run next instead of what's in
   // runq if there's time remaining in the running G's time
   // slice. It will inherit the time left in the current time
   // slice. If a set of goroutines is locked in a
   // communicate-and-wait pattern, this schedules that set as a
   // unit and eliminates the (potentially large) scheduling
   // latency that otherwise arises from adding the ready'd
   // goroutines to the end of the run queue.
   // 下一个执行的 g,如果是 nil,则从队列中获取下一个执行的 g
   runnext guintptr

   // Available G's (status == Gdead)
   // 状态为 Gdead 的 g 的列表,可以进行复用
   gfree    *g
   gfreecnt int32
}

1.1.3. M(machine)

type m struct {
   // g0 是用于调度和执行系统调用的特殊 g
   g0      *g     // goroutine with scheduling stack
     // m 当前运行的 g
   curg          *g       // current running goroutine
   // 当前拥有的 p
   p             puintptr // attached p for executing go code (nil if not executing go code)
   // 线程的 local storage
   tls           [6]uintptr   // thread-local storage
   // 唤醒 m 时,m 会拥有这个 p
   nextp         puintptr
   id            int64
   // 如果 !="", 继续运行 curg
   preemptoff    string // if != "", keep curg running on this m
   // 自旋状态,用于判断 m 是否工作已结束,并寻找 g 进行工作
   spinning      bool // m is out of work and is actively looking for work
   // 用于判断 m 是否进行休眠状态
   blocked       bool // m is blocked on a note
     // m 休眠和唤醒通过这个,note 里面有一个成员 key,对这个 key 所指向的地址进行值的修改,进而达到唤醒和休眠的目的
   park          note
   // 所有 m 组成的一个链表
   alllink       *m // on allm
   // 下一个 m,通过这个字段和 sched.midle 可以串成一个 m 的空闲链表
   schedlink     muintptr
   // mcache,m 拥有 p 的时候,会把自己的 mcache 给 p
   mcache        *mcache
   // lockedm 的对应值
   lockedg       guintptr
   // 待释放的 m 的 list,通过 sched.freem 串成一个链表
   freelink      *m      // on sched.freem
}

1.1.4. sched

type schedt struct {
   // 全局的 go id 分配
   goidgen  uint64
   // 记录的最后一次从 i / o 中查询 g 的时间
   lastpoll uint64

   lock mutex

   // When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
   // sure to call checkdead().
     // m 的空闲链表,结合 m.schedlink 就可以组成一个空闲链表了
   midle        muintptr // idle m's waiting for work
   nmidle       int32    // number of idle m's waiting for work
   nmidlelocked int32    // number of locked m's waiting for work
   // 下一个 m 的 id,也用来记录创建的 m 数量
   mnext        int64    // number of m's that have been created and next M ID
   // 最多允许的 m 的数量
   maxmcount    int32    // maximum number of m's allowed (or die)
   nmsys        int32    // number of system m's not counted for deadlock
   // free 掉的 m 的数量,exit 的 m 的数量
   nmfreed      int64    // cumulative number of freed m's

   ngsys uint32 // number of system goroutines; updated atomically

   pidle      puintptr // idle p's
   npidle     uint32
   nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

   // Global runnable queue.
   // 这个就是全局的 g 的队列了,如果 p 的本地队列没有 g 或者太多,会跟全局队列进行平衡
   // 根据 runqhead 可以获取队列头的 g,然后根据 g.schedlink 获取下一个,从而形成了一个链表
   runqhead guintptr
   runqtail guintptr
   runqsize int32

   // freem is the list of m's waiting to be freed when their
   // m.exited is set. Linked through m.freelink.
   // 等待释放的 m 的列表
   freem *m
}

在这里插一下状态的解析

1.1.5. g.status

  • _Gidle: goroutine 刚刚创建还没有初始化
  • _Grunnable: goroutine 处于运行队列中,但是还没有运行,没有自己的栈
  • _Grunning: 这个状态的 g 可能处于运行用户代码的过程中,拥有自己的 m 和 p
  • _Gsyscall: 运行 systemcall 中
  • _Gwaiting: 这个状态的 goroutine 正在阻塞中,类似于等待 channel
  • _Gdead: 这个状态的 g 没有被使用,有可能是刚刚退出,也有可能是正在初始化中
  • _Gcopystack: 表示 g 当前的栈正在被移除,新栈分配中

1.1.6. p.status

  • _Pidle: 空闲状态,此时 p 不绑定 m
  • _Prunning: m 获取到 p 的时候,p 的状态就是这个状态了,然后 m 可以使用这个 p 的资源运行 g
  • _Psyscall: 当 go 调用原生代码,原生代码又反过来调用 go 的时候,使用的 p 就会变成此态
  • _Pdead: 当运行中,需要减少 p 的数量时,被减掉的 p 的状态就是这个了

1.1.7. m.status

m 的 status 没有 p、g 的那么明确,但是在运行流程的分析中,主要有以下几个状态

  • 运行中: 拿到 p,执行 g 的过程中
  • 运行原生代码: 正在执行原声代码或者阻塞的 syscall
  • 休眠中: m 发现无待运行的 g 时,进入休眠,并加入到空闲列表中
  • 自旋中(spining): 当前工作结束,正在寻找下一个待运行的 g

在上面的结构中,存在很多的链表,g m p 结构中还有指向对方地址的成员,那么他们的关系到底是什么样的

我们可以从上图,简单的表述一下 m p g 的关系

2. 流程概览

从下图,可以简单的一窥 go 的整个调度流程的大概

接下来我们就从源码的角度来具体的分析整个调度流程(本人汇编不照,汇编方面的就不分析了????)

3. 源码分析

3.1. 初始化

go 的启动流程分为 4 步

  1. call osinit,这里就是设置了全局变量 ncpu = cpu 核心数量
  2. call schedinit
  3. make & queue new G(runtime.newproc, go func()也是调用这个函数来创建 goroutine)
  4. call runtime·mstart

其中,schedinit 就是调度器的初始化,出去 schedinit 中对内存分配,垃圾回收等操作,针对调度器的初始化大致就是初始化自身,设置最大的 maxmcount,确定 p 的数量并初始化这些操作

3.1.1. schedinit

schedinit 这里对当前 m 进行了初始化,并根据 osinit 获取到的 cpu 核数和设置的GOMAXPROCS 确定 p 的数量,并进行初始化

func schedinit() {
    // 从 TLS 或者专用寄存器获取当前 g 的指针类型
    _g_ := getg()
    // 设置 m 最大的数量
    sched.maxmcount = 10000

    // 初始化栈的复用空间
    stackinit()
    // 初始化当前 m
    mcommoninit(_g_.m)

    // osinit 的时候会设置 ncpu 这个全局变量,这里就是根据 cpu 核心数和参数 GOMAXPROCS 来确定 p 的数量
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {procs = n}
    // 生成设定数量的 p
    if procresize(procs) != nil {throw("unknown runnable goroutine during bootstrap")
    }
}

3.1.2. mcommoninit

func mcommoninit(mp *m) {_g_ := getg()

    lock(&sched.lock)
    // 判断 mnext 的值是否溢出,mnext 需要赋值给 m.id
    if sched.mnext+1 < sched.mnext {throw("runtime: thread ID overflow")
    }
    mp.id = sched.mnext
    sched.mnext++
    // 判断 m 的数量是否比 maxmcount 设定的要多,如果超出直接报异常
    checkmcount()
    // 创建一个新的 g 用于处理 signal,并分配栈
    mpreinit(mp)
    if mp.gsignal != nil {mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard}

    // Add to allm so garbage collector doesn't free g->m
    // when it is just in a register or thread-local storage.
    // 接下来的两行,首先将当前 m 放到 allm 的头,然后原子操作,将当前 m 的地址,赋值给 m,这样就将当前 m 添加到了 allm 链表的头了
    mp.alllink = allm

    // NumCgoCall() iterates over allm w/o schedlock,
    // so we need to publish it safely.
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)

    // Allocate memory to hold a cgo traceback if the cgo call crashes.
    if iscgo || GOOS == "solaris" || GOOS == "windows" {mp.cgoCallers = new(cgoCallers)
    }
}

在这里就开始涉及到了 m 链表了,这个链表可以如下图表示,其他的 p g 链表可以参考,只是使用的结构体的字段不一样

3.1.3. allm 链表示意图

3.1.4. procresize

更改 p 的数量,多退少补的原则,在初始化过程中,由于最开始是没有 p 的,所以这里的作用就是初始化设定数量的 p 了

procesize 不仅在初始化的时候会调用,当用户手动调用 runtime.GOMAXPROCS 的时候,会重新设定 nprocs,然后执行 startTheWorld()startTheWorld()会是使用新的 nprocs 再次调用procresize 这个方法

func procresize(nprocs int32) *p {
    old := gomaxprocs
    if old < 0 || nprocs <= 0 {throw("procresize: invalid arg")
    }
    // update statistics
    now := nanotime()
    if sched.procresizetime != 0 {sched.totaltime += int64(old) * (now - sched.procresizetime)
    }
    sched.procresizetime = now

    // Grow allp if necessary.
    // 如果新给的 p 的数量比原先的 p 的数量多,则新建增长的 p
    if nprocs > int32(len(allp)) {
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
        lock(&allpLock)
        // 判断 allp 的 cap 是否满足增长后的长度,满足就直接使用,不满足,则需要扩张这个 slice
        if nprocs <= int32(cap(allp)) {allp = allp[:nprocs]
        } else {nallp := make([]*p, nprocs)
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        unlock(&allpLock)
    }

    // initialize new P's
    // 初始化新增的 p
    for i := int32(0); i < nprocs; i++ {pp := allp[i]
        if pp == nil {pp = new(p)
            pp.id = i
            pp.status = _Pgcstop
            pp.sudogcache = pp.sudogbuf[:0]
            for i := range pp.deferpool {pp.deferpool[i] = pp.deferpoolbuf[i][:0]
            }
            pp.wbBuf.reset()
            // allp 是一个 slice,直接将新增的 p 放到对应的索引下面就 ok 了
            atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
        }
        if pp.mcache == nil {
            // 初始化时,old=0,第一个新建的 p 给当前的 m 使用
            if old == 0 && i == 0 {if getg().m.mcache == nil {throw("missing mcache?")
                }
                pp.mcache = getg().m.mcache // bootstrap} else {
                // 为 p 分配内存
                pp.mcache = allocmcache()}
        }
    }

    // free unused P's
    // 释放掉多余的 p,当新设置的 p 的数量,比原先设定的 p 的数量少的时候,会走到这个流程
    // 通过 runtime.GOMAXPROCS 就可以动态的修改 nprocs
    for i := nprocs; i < old; i++ {p := allp[i]
        // move all runnable goroutines to the global queue
        // 把当前 p 的运行队列里的 g 转移到全局的 g 的队列
        for p.runqhead != p.runqtail {
            // pop from tail of local queue
            p.runqtail--
            gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
            // push onto head of global queue
            globrunqputhead(gp)
        }
        // 把 runnext 里的 g 也转移到全局队列
        if p.runnext != 0 {globrunqputhead(p.runnext.ptr())
            p.runnext = 0
        }
        // if there's a background worker, make it runnable and put
        // it on the global queue so it can clean itself up
        // 如果有 gc worker 的话,修改 g 的状态,然后再把它放到全局队列中
        if gp := p.gcBgMarkWorker.ptr(); gp != nil {casgstatus(gp, _Gwaiting, _Grunnable)
            globrunqput(gp)
            // This assignment doesn't race because the
            // world is stopped.
            p.gcBgMarkWorker.set(nil)
        }
        // sudoig 的 buf 和 cache,以及 deferpool 全部清空
        for i := range p.sudogbuf {p.sudogbuf[i] = nil
        }
        p.sudogcache = p.sudogbuf[:0]
        for i := range p.deferpool {for j := range p.deferpoolbuf[i] {p.deferpoolbuf[i][j] = nil
            }
            p.deferpool[i] = p.deferpoolbuf[i][:0]
        }
        // 释放掉当前 p 的 mcache
        freemcache(p.mcache)
        p.mcache = nil
        // 把当前 p 的 gfree 转移到全局
        gfpurge(p)
        // 修改 p 的状态,让他自生自灭去了
        p.status = _Pdead
        // can't free P itself because it can be referenced by an M in syscall
    }

    // Trim allp.
    if int32(len(allp)) != nprocs {lock(&allpLock)
        allp = allp[:nprocs]
        unlock(&allpLock)
    }
    // 判断当前 g 是否有 p,有的话更改当前使用的 p 的状态,继续使用
    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning} else {// release the current P and acquire allp[0]
        // 如果当前 g 有 p,但是拥有的是已经释放的 p,则不再使用这个 p,重新分配
        if _g_.m.p != 0 {_g_.m.p.ptr().m = 0
        }
        // 分配 allp[0]给当前 g 使用
        _g_.m.p = 0
        _g_.m.mcache = nil
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        // 将 p m g 绑定,并把 m.mcache 指向 p.mcache,并修改 p 的状态为_Prunning
        acquirep(p)
    }
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {p := allp[i]
        if _g_.m.p.ptr() == p {continue}
        p.status = _Pidle
        // 根据 runqempty 来判断当前 p 的 g 运行队列是否为空
        if runqempty(p) {
            // g 运行队列为空的 p,放到 sched 的 pidle 队列里面
            pidleput(p)
        } else {
            // g 运行队列不为空的 p,组成一个可运行队列,并最后返回
            p.m.set(mget())
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    return runnablePs
}
  • runqempty: 这个函数比较简单,就不深究了,就是根据 p.runqtail == p.runqhead 和 p.runnext 来判断有没有待运行的 g
  • pidleput: 将当前的 p 设置为 sched.pidle,然后根据 p.link 将空闲 p 串联起来,可参考上图 allm 的链表示意图

3.2. 任务

创建一个 goroutine,只需要使用 go func 就可以了,编译器会将go func 翻译成 newproc 进行调用,那么新建的任务是如何调用的呢,我们从创建开始进行跟踪

3.2.1. newproc

newproc 函数获取了参数和当前 g 的 pc 信息,并通过 g0 调用 newproc1 去真正的执行创建或获取可用的 g

func newproc(siz int32, fn *funcval) {
    // 获取第一参数地址
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    // 获取当前执行的 g
    gp := getg()
    // 获取当前 g 的 pc
    pc := getcallerpc()
    systemstack(func() {
        // 使用 g0 去执行 newproc1 函数
        newproc1(fn, (*uint8)(argp), siz, gp, pc)
    })
}

3.2.2. newproc1

newporc1 的作用就是创建或者获取一个空间的 g,初始化这个 g,并尝试寻找一个 p 和 m 去执行 g

func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {_g_ := getg()

    if fn == nil {
        _g_.m.throwing = -1 // do not dump full stacks
        throw("go of nil func value")
    }
    // 加锁禁止被抢占
    _g_.m.locks++ // disable preemption because it can be holding p in a local var
    siz := narg
    siz = (siz + 7) &^ 7

    // We could allocate a larger initial stack if necessary.
    // Not worth it: this is almost always an error.
    // 4*sizeof(uintreg): extra space added below
    // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
    // 如果参数过多,则直接抛出异常,栈大小是 2k
    if siz >= _StackMin-4*sys.RegSize-sys.RegSize {throw("newproc: function arguments too large for new goroutine")
    }

    _p_ := _g_.m.p.ptr()
    // 尝试获取一个空闲的 g,如果获取不到,则新建一个,并添加到 allg 里面
    // gfget 首先会尝试从 p 本地获取空闲的 g,如果本地没有的话,则从全局获取一堆平衡到本地 p
    newg := gfget(_p_)
    if newg == nil {newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        // 新建的 g,添加到全局的 allg 里面,allg 是一个 slice,append 进去即可
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }
    // 判断获取的 g 的栈是否正常
    if newg.stack.hi == 0 {throw("newproc1: newg missing stack")
    }
    // 判断 g 的状态是否正常
    if readgstatus(newg) != _Gdead {throw("newproc1: new g is not Gdead")
    }
    // 预留一点空间,防止读取超出一点点
    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
    // 空间大小进行对齐
    totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
    sp := newg.stack.hi - totalSize
    spArg := sp
    // usesLr 为 0,这里不执行
    if usesLR {
        // caller's LR
        *(*uintptr)(unsafe.Pointer(sp)) = 0
        prepGoExitFrame(sp)
        spArg += sys.MinFrameSize
    }
    if narg > 0 {
        // 将参数拷贝入栈
        memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
        // ... 省略 ...
    }
    // 初始化用于保存现场的区域及初始化基本状态
    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    newg.stktopsp = sp
    // 这里保存了 goexit 的地址,在用户函数执行完成后,会根据 pc 来执行 goexit
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    // 这里调整 sched 信息,pc = goexit 的地址
    gostartcallfn(&newg.sched, fn)
    newg.gopc = callerpc
    newg.ancestors = saveAncestors(callergp)
    newg.startpc = fn.fn
    if _g_.m.curg != nil {newg.labels = _g_.m.curg.labels}
    if isSystemGoroutine(newg) {atomic.Xadd(&sched.ngsys, +1)
    }
    newg.gcscanvalid = false
    casgstatus(newg, _Gdead, _Grunnable)
    // 如果 p 缓存的 goid 已经用完,本地再从 sched 批量获取一点
    if _p_.goidcache == _p_.goidcacheend {
        // Sched.goidgen is the last allocated id,
        // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
        // At startup sched.goidgen=0, so main goroutine receives goid=1.
        _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
        _p_.goidcache -= _GoidCacheBatch - 1
        _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
    }
    // 分配 goid
    newg.goid = int64(_p_.goidcache)
    _p_.goidcache++
    // 把新的 g 放到 p 的可运行 g 队列中
    runqput(_p_, newg, true)
    // 判断是否有空闲 p,且是否需要唤醒一个 m 来执行 g
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {wakep()
    }
    _g_.m.locks--
    if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
        _g_.stackguard0 = stackPreempt
    }
}

3.2.2.1. gfget

这个函数的逻辑比较简单,就是看一下 p 有没有空闲的 g,没有则去全局的 freeg 队列查找,这里就涉及了 p 本地和全局平衡的一个交互了

func gfget(_p_ *p) *g {
retry:
    gp := _p_.gfree
    // 本地的 g 队列为空,且全局队列不为空,则从全局队列一次获取至多 32 个下来,如果全局队列不够就算了
    if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) {lock(&sched.gflock)
        for _p_.gfreecnt < 32 {
            if sched.gfreeStack != nil {
                // Prefer Gs with stacks.
                gp = sched.gfreeStack
                sched.gfreeStack = gp.schedlink.ptr()} else if sched.gfreeNoStack != nil {
                gp = sched.gfreeNoStack
                sched.gfreeNoStack = gp.schedlink.ptr()} else {break}
            _p_.gfreecnt++
            sched.ngfree--
            gp.schedlink.set(_p_.gfree)
            _p_.gfree = gp
        }
        // 已经从全局拿了 g 了,再去从头开始判断
        unlock(&sched.gflock)
        goto retry
    }
    // 如果拿到了 g,则判断 g 是否有栈,没有栈就分配
    // 栈的分配跟内存分配差不多,首先创建几个固定大小的栈的数组,然后到指定大小的数组里面去分配就 ok 了,过大则直接全局分配
    if gp != nil {_p_.gfree = gp.schedlink.ptr()
        _p_.gfreecnt--
        if gp.stack.lo == 0 {
            // Stack was deallocated in gfput. Allocate a new one.
            systemstack(func() {gp.stack = stackalloc(_FixedStack)
            })
            gp.stackguard0 = gp.stack.lo + _StackGuard
        } else {// ... 省略 ...}
    }
    // 注意:如果全局没有 g,p 也没有 g,则返回的 gp 还是 nil
    return gp
}

3.2.2.2. runqput

runqput 会把 g 放到 p 的本地队列或者 p.runnext,如果 p 的本地队列过长,则把 g 到全局队列,同时平衡 p 本地队列的一半到全局

func runqput(_p_ *p, gp *g, next bool) {if randomizeScheduler && next && fastrand()%2 == 0 {next = false}
    // 如果 next 为 true,则放入到 p.runnext 里面,并把原先 runnext 的 g 交换出来
    if next {
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {goto retryNext}
        if oldnext == 0 {return}
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()}

retry:
    h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    // 判断 p 的队列的长度是否超了,runq 是一个长度为 256 的数组,超出的话就会放到全局队列了
    if t-h < uint32(len(_p_.runq)) {_p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 把 g 放到全局队列
    if runqputslow(_p_, gp, h, t) {return}
    // the queue is not full, now the put above must succeed
    goto retry
}

3.2.2.3. runqputslow

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {var batch [len(_p_.runq)/2 + 1]*g

    // First, grab a batch from local queue.
    n := t - h
    n = n / 2
    if n != uint32(len(_p_.runq)/2) {throw("runqputslow: queue is not full")
    }
    // 获取 p 后面的一半
    for i := uint32(0); i < n; i++ {batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()}
    if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
        return false
    }
    batch[n] = gp

    // Link the goroutines.
    for i := uint32(0); i < n; i++ {batch[i].schedlink.set(batch[i+1])
    }

    // Now put the batch on global queue.
    // 放到全局队列队尾
    lock(&sched.lock)
    globrunqputbatch(batch[0], batch[n], int32(n+1))
    unlock(&sched.lock)
    return true
}

新建任务至此基本结束,创建完成任务后,等待调度执行就好了,从上面可以看出,任务的优先级是 p.runnext > p.runq > sched.runq

g 从创建到执行结束并放入 free 队列中的状态转换大致如下图所示

3.2.3 wakep

当 newproc1 创建完任务后,会尝试唤醒 m 来执行任务

func wakep() {
    // be conservative about spinning threads
    // 一次应该只有一个 m 在 spining,否则就退出
    if !atomic.Cas(&sched.nmspinning, 0, 1) {return}
    // 调用 startm 来执行
    startm(nil, true)
}

3.2.4 startm

调度 m 或者创建 m 来运行 p,如果 p ==nil,就会尝试获取一个空闲 p,p 的队列中有 g,拿到 p 后才能拿到 g

func startm(_p_ *p, spinning bool) {lock(&sched.lock)
    if _p_ == nil {
        // 如果没有指定 p, 则从 sched.pidle 获取空闲的 p
        _p_ = pidleget()
        if _p_ == nil {unlock(&sched.lock)
            // 如果没有获取到 p,重置 nmspinning
            if spinning {
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {throw("startm: negative nmspinning")
                }
            }
            return
        }
    }
    // 首先尝试从 sched.midle 获取一个空闲的 m
    mp := mget()
    unlock(&sched.lock)
    if mp == nil {
        // 如果获取不到空闲的 m,则创建一个 mspining = true 的 m,并将 p 绑定到 m 上,直接返回
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        newm(fn, _p_)
        return
    }
    // 判断获取到的空闲 m 是否是 spining 状态
    if mp.spinning {throw("startm: m is spinning")
    }
    // 判断获取到的 m 是否有 p
    if mp.nextp != 0 {throw("startm: m has p")
    }
    if spinning && !runqempty(_p_) {throw("startm: p has runnable gs")
    }
    // The caller incremented nmspinning, so set m.spinning in the new M.
    // 调用函数的父函数已经增加了 nmspinning,这里只需要设置 m.spining 就 ok 了,同时把 p 绑上来
    mp.spinning = spinning
    mp.nextp.set(_p_)
    // 唤醒 m
    notewakeup(&mp.park)
}

3.2.4.1. newm

newm 通过 allocm 函数来创建新 m

func newm(fn func(), _p_ *p) {
    // 新建一个 m
    mp := allocm(_p_, fn)
    // 为这个新建的 m 绑定指定的 p
    mp.nextp.set(_p_)
    // ... 省略 ...
    // 创建系统线程
    newm1(mp)
}

3.2.4.2. new1m

func newm1(mp *m) {
    // runtime cgo 包会把 iscgo 设置为 true,这里不分析
    if iscgo {
        var ts cgothreadstart
        if _cgo_thread_start == nil {throw("_cgo_thread_start missing")
        }
        ts.g.set(mp.g0)
        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
        ts.fn = unsafe.Pointer(funcPC(mstart))
        if msanenabled {msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        execLock.rlock() // Prevent process clone.
        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
        execLock.runlock()
        return
    }
    execLock.rlock() // Prevent process clone.
    newosproc(mp)
    execLock.runlock()}

3.2.4.3. newosproc

newosproc 创建一个新的系统线程,并执行 mstart_stub 函数,之后调用 mstart 函数进入调度,后面在执行流程会分析

func newosproc(mp *m) {stk := unsafe.Pointer(mp.g0.stack.hi)
    // Initialize an attribute object.
    var attr pthreadattr
    var err int32
    err = pthread_attr_init(&attr)

    // Finally, create the thread. It starts at mstart_stub, which does some low-level
    // setup and then calls mstart.
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    // 创建线程,并传入启动启动函数 mstart_stub,mstart_stub 之后调用 mstart
    err = pthread_create(&attr, funcPC(mstart_stub), unsafe.Pointer(mp))
    sigprocmask(_SIG_SETMASK, &oset, nil)
    if err != 0 {write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate)))
        exit(1)
    }
}

3.2.4.4. allocm

allocm 这里首先会释放 sched 的 freem,然后再去创建 m,并初始化 m

func allocm(_p_ *p, fn func()) *m {_g_ := getg()
    _g_.m.locks++ // disable GC because it can be called from sysmon
    if _g_.m.p == 0 {acquirep(_p_) // temporarily borrow p for mallocs in this function
    }

    // Release the free M list. We need to do this somewhere and
    // this may free up a stack we can use.
    // 首先释放掉 freem 列表
    if sched.freem != nil {lock(&sched.lock)
        var newList *m
        for freem := sched.freem; freem != nil; {
            if freem.freeWait != 0 {
                next := freem.freelink
                freem.freelink = newList
                newList = freem
                freem = next
                continue
            }
            stackfree(freem.g0.stack)
            freem = freem.freelink
        }
        sched.freem = newList
        unlock(&sched.lock)
    }

    mp := new(m)
    // 启动函数,根据 startm 调用来看,这个 fn 就是 mspinning,会将 m.mspinning 设置为 true
    mp.mstartfn = fn
    // 初始化 m,上面已经分析了
    mcommoninit(mp)
    // In case of cgo or Solaris or Darwin, pthread_create will make us a stack.
    // Windows and Plan 9 will layout sched stack on OS stack.
    // 为新的 m 创建 g0
    if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {mp.g0 = malg(-1)
    } else {mp.g0 = malg(8192 * sys.StackGuardMultiplier)
    }
    // 为 mp 的 g0 绑定自己
    mp.g0.m = mp
    // 如果当前的 m 所绑定的是参数传递过来的 p,解除绑定,因为参数传递过来的 p 稍后要绑定新建的 m
    if _p_ == _g_.m.p.ptr() {releasep()
    }

    _g_.m.locks--
    if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
        _g_.stackguard0 = stackPreempt
    }

    return mp
}

3.2.4.5. notewakeup

func notewakeup(n *note) {
    var v uintptr
    // 设置 m 为 locked
    for {v = atomic.Loaduintptr(&n.key)
        if atomic.Casuintptr(&n.key, v, locked) {break}
    }

    // Successfully set waitm to locked.
    // What was it before?
    // 根据 m 的原先的状态,来判断后面的执行流程,0 则直接返回,locked 则冲突,否则认为是 wating,唤醒
    switch {
    case v == 0:
        // Nothing was waiting. Done.
    case v == locked:
        // Two notewakeups! Not allowed.
        throw("notewakeup - double wakeup")
    default:
        // Must be the waiting m. Wake it up.
        // 唤醒系统线程
        semawakeup((*m)(unsafe.Pointer(v)))
    }
}

至此的话,创建完任务 g 后,将 g 放入了 p 的 local 队列或者是全局队列,然后开始获取了一个空闲的 m 或者新建一个 m 来执行 g,m, p, g 都已经准备完成了,下面就是开始调度,来运行任务 g 了

3.3. 执行

在 startm 函数分析的过程中会,可以看到,有两种获取 m 的方式

  • 新建:这时候执行 newm1 下的 newosproc,同时最终调用 mstart 来执行调度
  • 唤醒空闲 m:从休眠的地方继续执行

m 执行 g 有两个起点,一个是线程启动函数 mstart,另一个则是休眠被唤醒后的调度 schedule 了,我们从头开始,也就是mstartmstart 走到最后也是 schedule 调度

3.3.1. mstart

func mstart() {_g_ := getg()

    osStack := _g_.stack.lo == 0
    if osStack {
        // Initialize stack bounds from system stack.
        // Cgo may have left stack size in stack.hi.
        // minit may update the stack bounds.
        // 从系统堆栈上直接划出所需的范围
        size := _g_.stack.hi
        if size == 0 {size = 8192 * sys.StackGuardMultiplier}
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    // Initialize stack guards so that we can start calling
    // both Go and C functions with stack growth prologues.
    _g_.stackguard0 = _g_.stack.lo + _StackGuard
    _g_.stackguard1 = _g_.stackguard0
    // 调用 mstart1 来处理
    mstart1()

    // Exit this thread.
    if GOOS == "windows" || GOOS == "solaris" || GOOS == "plan9" || GOOS == "darwin" {
        // Window, Solaris, Darwin and Plan 9 always system-allocate
        // the stack, but put it in _g_.stack before mstart,
        // so the logic above hasn't set osStack yet.
        osStack = true
    }
    // 退出 m,正常情况下 mstart1 调用 schedule() 时,是不再返回的,所以,不用担心系统线程的频繁创建退出
    mexit(osStack)
}

3.3.2. mstart1

func mstart1() {_g_ := getg()

    if _g_ != _g_.m.g0 {throw("bad runtime·mstart")
    }

    // Record the caller for use as the top of stack in mcall and
    // for terminating the thread.
    // We're never coming back to mstart1 after we call schedule,
    // so other calls can reuse the current frame.
    // 保存调用者的 pc sp 等信息
    save(getcallerpc(), getcallersp())
    asminit()
    // 初始化 m 的 sigal 的栈和 mask
    minit()

    // Install signal handlers; after minit so that minit can
    // prepare the thread to be able to handle the signals.
    // 安装 sigal 处理器
    if _g_.m == &m0 {mstartm0()
    }
    // 如果设置了 mstartfn,就先执行这个
    if fn := _g_.m.mstartfn; fn != nil {fn()
    }

    if _g_.m.helpgc != 0 {
        _g_.m.helpgc = 0
        stopm()} else if _g_.m != &m0 {
        // 获取 nextp
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    schedule()}

3.3.2.1. acquirep

acquirep 函数主要是改变 p 的状态,绑定 m p,通过吧 p 的 mcache 与 m 共享

func acquirep(_p_ *p) {
    // Do the part that isn't allowed to have write barriers.
    acquirep1(_p_)

    // have p; write barriers now allowed
    _g_ := getg()
    // 把 p 的 mcache 与 m 共享
    _g_.m.mcache = _p_.mcache
}

3.3.2.2. acquirep1

func acquirep1(_p_ *p) {_g_ := getg()

    // 让 m p 互相绑定
    _g_.m.p.set(_p_)
    _p_.m.set(_g_.m)
    _p_.status = _Prunning
}

3.3.2.3. schedule

开始进入到调度函数了,这是一个由 schedule、execute、goroutine fn、goexit 构成的逻辑循环,就算 m 是唤醒后,也是从设置的断点开始执行

func schedule() {_g_ := getg()

    if _g_.m.locks != 0 {throw("schedule: holding locks")
    }
    // 如果有 lockg,停止执行当前的 m
    if _g_.m.lockedg != 0 {
        // 解除 lockedm 的锁定,并执行当前 g
        stoplockedm()
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }

    // We should not schedule away from a g that is executing a cgo call,
    // since the cgo call is using the m's g0 stack.
    if _g_.m.incgo {throw("schedule: in cgo")
    }

top:
    // gc 等待
    if sched.gcwaiting != 0 {gcstopm()
        goto top
    }

    var gp *g
    var inheritTime bool

    if gp == nil {
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        // 为了保证公平,每隔 61 次,从全局队列上获取 g
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        // 全局队列上获取不到待运行的 g,则从 p local 队列中获取
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {throw("schedule: spinning with local work")
        }
    }
    if gp == nil {
        // 如果 p local 获取不到待运行 g,则开始查找,这个函数会从 全局 io poll,p locl 和其他 p local 获取待运行的 g,后面详细分析
        gp, inheritTime = findrunnable() // blocks until work is available}

    // This thread is going to run a goroutine and is not spinning anymore,
    // so if it was marked as spinning we need to reset it now and potentially
    // start a new spinning M.
    if _g_.m.spinning {
        // 如果 m 是自旋状态,取消自旋
        resetspinning()}

    if gp.lockedm != 0 {
        // Hands off own p to the locked m,
        // then blocks waiting for a new p.
        // 如果 g 有 lockedm,则休眠上交 p,休眠 m,等待新的 m,唤醒后从这里开始执行,跳转到 top
        startlockedm(gp)
        goto top
    }
    // 开始执行这个 g
    execute(gp, inheritTime)
}
3.3.2.3.1. stoplockedm

因为当前的 m 绑定了 lockedg,而当前 g 不是指定的 lockedg,所以这个 m 不能执行,上交当前 m 绑定的 p,并且休眠 m 直到调度 lockedg

func stoplockedm() {_g_ := getg()

    if _g_.m.lockedg == 0 || _g_.m.lockedg.ptr().lockedm.ptr() != _g_.m {throw("stoplockedm: inconsistent locking")
    }
    if _g_.m.p != 0 {
        // Schedule another M to run this p.
        // 释放当前 p
        _p_ := releasep()
        handoffp(_p_)
    }
    incidlelocked(1)
    // Wait until another thread schedules lockedg again.
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
    status := readgstatus(_g_.m.lockedg.ptr())
    if status&^_Gscan != _Grunnable {print("runtime:stoplockedm: g is not Grunnable or Gscanrunnable\n")
        dumpgstatus(_g_)
        throw("stoplockedm: not runnable")
    }
    // 上交了当前的 p,将 nextp 设置为可执行的 p
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}
3.3.2.3.2. startlockedm

调度 lockedm 去运行 lockedg

func startlockedm(gp *g) {_g_ := getg()

    mp := gp.lockedm.ptr()
    if mp == _g_.m {throw("startlockedm: locked to me")
    }
    if mp.nextp != 0 {throw("startlockedm: m has p")
    }
    // directly handoff current P to the locked m
    incidlelocked(-1)
    // 移交当前 p 给 lockedm,并设置为 lockedm.nextp,以便于 lockedm 唤醒后,可以获取
    _p_ := releasep()
    mp.nextp.set(_p_)
    // m 被唤醒后,从 m 休眠的地方开始执行,也就是 schedule()函数中
    notewakeup(&mp.park)
    stopm()}
3.3.2.3.3. handoffp
func handoffp(_p_ *p) {
    // handoffp must start an M in any situation where
    // findrunnable would return a G to run on _p_.

    // if it has local work, start it straight away
    if !runqempty(_p_) || sched.runqsize != 0 {
        // 调用 startm 开始调度
        startm(_p_, false)
        return
    }

    // no local work, check that there are no spinning/idle M's,
    // otherwise our help is not required
    // 判断有没有正在寻找 p 的 m 以及有没有空闲的 p
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
    lock(&sched.lock)

    if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {sched.safePointFn(_p_)
        sched.safePointWait--
        if sched.safePointWait == 0 {notewakeup(&sched.safePointNote)
        }
    }
    // 如果 全局待运行 g 队列不为空,尝试使用 startm 进行调度
    if sched.runqsize != 0 {unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // If this is the last running P and nobody is polling network,
    // need to wakeup another M to poll network.
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // 把 p 放入到全局的空闲队列,放回队列就不多说了,参考 allm 的放回
    pidleput(_p_)
    unlock(&sched.lock)
}
3.3.2.3.4. execute

开始执行 g 的代码了

func execute(gp *g, inheritTime bool) {_g_ := getg()
    // 更改 g 的状态,并不允许抢占
    casgstatus(gp, _Grunnable, _Grunning)
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard
    if !inheritTime {
        // 调度计数
        _g_.m.p.ptr().schedtick++}
    _g_.m.curg = gp
    gp.m = _g_.m
    // 开始执行 g 的代码了
    gogo(&gp.sched)
}
3.3.2.3.5. gogo

gogo 函数承载的作用就是切换到 g 的栈,开始执行 g 的代码,汇编内容就不分析了,但是有一个疑问就是,gogo 执行完函数后,怎么再次进入调度呢?

我们回到 newproc1 函数的 L63 newg.sched.pc = funcPC(goexit) + sys.PCQuantum,这里保存了 pc 的质地为 goexit 的地址,所以当执行完用户代码后,就会进入 goexit 函数

3.3.2.3.6. goexit0

goexit 在汇编层面就是调用 runtime.goexit1,而 goexit1 通过 mcall 调用了goexit0 所以这里直接分析了goexit0

goexit0 重置 g 的状态,并重新进行调度,这样就调度就又回到了schedule() 了,开始循环往复的调度

func goexit0(gp *g) {_g_ := getg()
    // 转换 g 的状态为 dead,以放回空闲列表
    casgstatus(gp, _Grunning, _Gdead)
    if isSystemGoroutine(gp) {atomic.Xadd(&sched.ngsys, -1)
    }
    // 清空 g 的状态
    gp.m = nil
    locked := gp.lockedm != 0
    gp.lockedm = 0
    _g_.m.lockedg = 0
    gp.paniconfault = false
    gp._defer = nil // should be true already but just in case.
    gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
    gp.writebuf = nil
    gp.waitreason = 0
    gp.param = nil
    gp.labels = nil
    gp.timer = nil

    // Note that gp's stack scan is now"valid" because it has no
    // stack.
    gp.gcscanvalid = true
    dropg()

    // 把 g 放回空闲列表,以备复用
    gfput(_g_.m.p.ptr(), gp)
    // 再次进入调度循环
    schedule()}

至此,单次调度结束,再次进入调度,循环往复

3.3.2.3.7. findrunnable
func findrunnable() (gp *g, inheritTime bool) {_g_ := getg()

    // The conditions here and in handoffp must agree: if
    // findrunnable would return a G to run, handoffp must start
    // an M.

top:
    _p_ := _g_.m.p.ptr()

    // local runq
    // 从 p local 去获取 g
    if gp, inheritTime := runqget(_p_); gp != nil {return gp, inheritTime}

    // global runq
    // 从全局的待运行 d 队列获取
    if sched.runqsize != 0 {lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {return gp, false}
    }

    // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    // 看看 netpoll 中有没有已经准备好的 g
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {if gp := netpoll(false); gp != nil { // non-blocking
            // netpoll returns list of goroutines linked by schedlink.
            injectglist(gp.schedlink.ptr())
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }

    // Steal work from other P's.
    // 如果 sched.pidle == procs - 1,说明所有的 p 都是空闲的,无需遍历其他 p 了
    procs := uint32(gomaxprocs)
    if atomic.Load(&sched.npidle) == procs-1 {
        // Either GOMAXPROCS=1 or everybody, except for us, is idle already.
        // New work can appear from returning syscall/cgocall, network or timers.
        // Neither of that submits to local run queues, so no point in stealing.
        goto stop
    }
    // If number of spinning M's >= number of busy P's, block.
    // This is necessary to prevent excessive CPU consumption
    // when GOMAXPROCS>>1 but the program parallelism is low.
    // 如果寻找 p 的 m 的数量,大于有 g 的 p 的数量的一般,就不再去寻找了
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {goto stop}
    // 设置当前 m 的自旋状态
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
    // 开始窃取其他 p 的待运行 g 了
    for i := 0; i < 4; i++ {for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {goto top}
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            // 从其他的 p 偷取一般的任务数量,还会随机偷取 p 的 runnext(过分了),偷取部分就不分析了,就是 slice 的操作而已
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {return gp, false}
        }
    }

stop:
    // 对 all 做个镜像备份
    allpSnapshot := allp

    // return P and block
    lock(&sched.lock)

    if sched.runqsize != 0 {gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
    if releasep() != _p_ {throw("findrunnable: wrong p")
    }
    pidleput(_p_)
    unlock(&sched.lock)

    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        // 设置非自旋状态,因为找 p 的工作已经结束了
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again
    for _, _p_ := range allpSnapshot {if !runqempty(_p_) {lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }
    // poll network
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
        if _g_.m.p != 0 {throw("findrunnable: netpoll with p")
        }
        if _g_.m.spinning {throw("findrunnable: netpoll with spinning")
        }
        gp := netpoll(true) // block until new work is available
        atomic.Store64(&sched.lastpoll, uint64(nanotime()))
        if gp != nil {lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {acquirep(_p_)
                injectglist(gp.schedlink.ptr())
                casgstatus(gp, _Gwaiting, _Grunnable)
                if trace.enabled {traceGoUnpark(gp, 0)
                }
                return gp, false
            }
            injectglist(gp)
        }
    }
    stopm()
    goto top
}

这里真的是无奈啊,为了寻找一个可运行的 g,也是煞费苦心,及时进入了 stop 的 label,还是不死心,又来了一边寻找。大致寻找过程可以总结为一下几个:

  • 从 p 自己的 local 队列中获取可运行的 g
  • 从全局队列中获取可运行的 g
  • 从 netpoll 中获取一个已经准备好的 g
  • 从其他 p 的 local 队列中获取可运行的 g,随机偷取 p 的 runnext,有点任性
  • 无论如何都获取不到的话,就 stopm 了
3.3.2.3.7. stopm

stop 会把当前 m 放到空闲列表里面,同时绑定 m.nextp 与 m

func stopm() {_g_ := getg()
retry:
    lock(&sched.lock)
    // 把当前 m 放到 sched.midle 的空闲列表里
    mput(_g_.m)
    unlock(&sched.lock)
    // 休眠,等待被唤醒
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
    // 绑定 p
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

3.4. 监控

3.4.1. sysmon

go 的监控是依靠函数 sysmon 来完成的,监控主要做一下几件事

  • 释放闲置超过 5 分钟的 span 物理内存
  • 如果超过两分钟没有执行垃圾回收,则强制执行
  • 将长时间未处理的 netpoll 结果添加到任务队列
  • 向长时间运行的 g 进行抢占
  • 收回因为 syscall 而长时间阻塞的 p

监控线程并不是时刻在运行的,监控线程首次休眠 20us,每次执行完后,增加一倍的休眠时间,但是最多休眠 10ms

func sysmon() {lock(&sched.lock)
    sched.nmsys++
    checkdead()
    unlock(&sched.lock)

    // If a heap span goes unused for 5 minutes after a garbage collection,
    // we hand it back to the operating system.
    scavengelimit := int64(5 * 60 * 1e9)

    if debug.scavenge > 0 {
        // Scavenge-a-lot for testing.
        forcegcperiod = 10 * 1e6
        scavengelimit = 20 * 1e6
    }

    lastscavenge := nanotime()
    nscavenge := 0

    lasttrace := int64(0)
    idle := 0 // how many cycles in succession we had not wokeup somebody
    delay := uint32(0)
    for {
        // 判断当前循环,应该休眠的时间
        if idle == 0 { // start with 20us sleep...
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms...
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay)
        // STW 时休眠 sysmon
        if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {lock(&sched.lock)
            if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {atomic.Store(&sched.sysmonwait, 1)
                unlock(&sched.lock)
                // Make wake-up period small enough
                // for the sampling to be correct.
                maxsleep := forcegcperiod / 2
                if scavengelimit < forcegcperiod {maxsleep = scavengelimit / 2}
                shouldRelax := true
                if osRelaxMinNS > 0 {next := timeSleepUntil()
                    now := nanotime()
                    if next-now < osRelaxMinNS {shouldRelax = false}
                }
                if shouldRelax {osRelax(true)
                }
                // 进行休眠
                notetsleep(&sched.sysmonnote, maxsleep)
                if shouldRelax {osRelax(false)
                }
                lock(&sched.lock)
                // 唤醒后,清除休眠状态,继续执行
                atomic.Store(&sched.sysmonwait, 0)
                noteclear(&sched.sysmonnote)
                idle = 0
                delay = 20
            }
            unlock(&sched.lock)
        }
        // trigger libc interceptors if needed
        if *cgo_yield != nil {asmcgocall(*cgo_yield, nil)
        }
        // poll network if not polled for more than 10ms
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        now := nanotime()
        // 如果 netpoll 不为空,每隔 10ms 检查一下是否有 ok 的
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            // 返回了已经获取到结果的 goroutine 的列表
            gp := netpoll(false) // non-blocking - returns list of goroutines
            if gp != nil {incidlelocked(-1)
                // 把获取到的 g 的列表加入到全局待运行队列中
                injectglist(gp)
                incidlelocked(1)
            }
        }
        // retake P's blocked in syscalls
        // and preempt long running G's
        // 抢夺 syscall 长时间阻塞的 p 和长时间运行的 g
        if retake(now) != 0 {idle = 0} else {idle++}
        // check if we need to force a GC
        // 通过 gcTrigger.test() 函数判断是否超过设定的强制触发 gc 的时间间隔,if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {lock(&forcegc.lock)
            forcegc.idle = 0
            forcegc.g.schedlink = 0
            // 把 gc 的 g 加入待运行队列,等待调度运行
            injectglist(forcegc.g)
            unlock(&forcegc.lock)
        }
        // scavenge heap once in a while
        // 判断是否有 5 分钟未使用的 span,有的话,归还给系统
        if lastscavenge+scavengelimit/2 < now {mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
            lastscavenge = now
            nscavenge++
        }
        if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
            lasttrace = now
            schedtrace(debug.scheddetail > 0)
        }
    }
}

扫描 netpoll,并把 g 存放到去全局队列比较好理解,跟前面添加 p 和 m 的逻辑差不多,但是抢占这里就不是很理解了,你说抢占就抢占,被抢占的 g 岂不是很没面子,而且怎么抢占呢?

3.4.2. retake

const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
    n := 0
    // Prevent allp slice changes. This lock will be completely
    // uncontended unless we're already stopping the world.
    lock(&allpLock)
    // We can't use a range loop over allp because we may
    // temporarily drop the allpLock. Hence, we need to re-fetch
    // allp each time around the loop.
    for i := 0; i < len(allp); i++ {_p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall {// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            // pd.syscalltick 即 _p_.sysmontick.syscalltick 只有在 sysmon 的时候会更新,而 _p_.syscalltick 则会每次都更新,所以,当 syscall 之后,第一个 sysmon 检测到的时候并不会抢占,而是第二次开始才会抢占,中间间隔至少有 20us,最多会有 10ms
            t := int64(_p_.syscalltick)
            if int64(pd.syscalltick) != t {pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
            // 是否有空 p,有寻找 p 的 m,以及当前的 p 在 syscall 之后,有没有超过 10ms
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {continue}
            // Drop allpLock so we can take sched.lock.
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1)
            // 抢占 p,把 p 的状态转为 idle 状态
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                // 把当前 p 移交出去,上面已经分析过了
                handoffp(_p_)
            }
            incidlelocked(1)
            lock(&allpLock)
        } else if s == _Prunning {
            // Preempt G if it's running for too long.
            // 如果 p 是 running 状态,如果 p 下面的 g 执行太久了,则抢占
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {pd.schedtick = uint32(t)
                pd.schedwhen = now
                continue
            }
            // 判断是否超出 10ms, 不超过不抢占
            if pd.schedwhen+forcePreemptNS > now {continue}
            // 开始抢占
            preemptone(_p_)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

3.4.3. preemptone

这个函数的注释,作者就表明这种抢占并不是很靠谱????,我们先看一下实现吧

func preemptone(_p_ *p) bool {mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {return false}
    gp := mp.curg
    if gp == nil || gp == mp.g0 {return false}
    // 标识抢占字段
    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    // 更新 stackguard0,保证能检测到栈溢
    gp.stackguard0 = stackPreempt
    return true
}

在这里,作者会更新 gp.stackguard0 = stackPreempt,然后让 g 误以为栈不够用了,那就只有乖乖的去进行栈扩张,站扩张的话就用调用newstack 分配一个新栈,然后把原先的栈的内容拷贝过去,而在 newstack 里面有一段如下

if preempt {if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
        // Let the goroutine keep running for now.
        // gp->preempt is set, so it will be preempted next time.
        gp.stackguard0 = gp.stack.lo + _StackGuard
        gogo(&gp.sched) // never return
    }
}

然后这里就发现 g 被抢占了,那你栈不够用就有可能是假的,但是管你呢,你再去调度去吧,也不给你扩栈了,虽然作者和雨痕大神都吐槽了一下这个,但是这种抢占方式自动 1.5(也可能更早)就一直存在,且稳定运行,就说明还是很牛逼的了

4. 总结

在调度器的设置上,最明显的就是复用:g 的 free 链表,m 的 free 列表,p 的 free 列表,这样就避免了重复创建销毁锁浪费的资源

其次就是多级缓存:这一块跟内存上的设计思想也是一直的,p 一直有一个 g 的待运行队列,自己没有货过多的时候,才会平衡到全局队列,全局队列操作需要锁,则本地操作则不需要,大大减少了锁的创建销毁所消耗的资源

至此,g m p 的关系及状态转换大致都讲解完成了,由于对汇编这块比较薄弱,所以基本略过了,右面有机会还是需要多了解一点

5. 参考文档

  • 《go 语言学习笔记》
  • Golang 源码探索(二) 协程的实现原理
  • 【Go 源码分析】Go scheduler 源码分析
退出移动版