深入理解Gogoroutine的实现及调度器分析

39次阅读

共计 35508 个字符,预计需要花费 89 分钟才能阅读完成。

在学习 Go 的过程中,最让人惊叹的莫过于 goroutine 了。但是 goroutine 是什么,我们用 go 关键字就可以创建一个 goroutine,这么多的 goroutine 之间,是如何调度的呢?

1. 结构概览

在看 Go 源码的过程中,遍地可见 g、p、m,我们首先就看一下这些关键字的结构及相互之间的关系

1.1. 数据结构

这里我们仅列出来了结构体里面比较关键的一些成员

1.1.1. G(gouroutine)

goroutine 是运行时的最小执行单元

type g struct {
    // Stack parameters.
    // stack describes the actual stack memory: [stack.lo, stack.hi).
    // stackguard0 is the stack pointer compared in the Go stack growth prologue.
    // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
    // stackguard1 is the stack pointer compared in the C stack growth prologue.
    // It is stack.lo+StackGuard on g0 and gsignal stacks.
    // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
  // 当前 g 使用的栈空间,stack 结构包括 [lo, hi]两个成员
    stack       stack   // offset known to runtime/cgo
  // 用于检测是否需要进行栈扩张,go 代码使用
    stackguard0 uintptr // offset known to liblink
  // 用于检测是否需要进行栈扩展,原生代码使用的
    stackguard1 uintptr // offset known to liblink
  // 当前 g 所绑定的 m
    m              *m      // current m; offset known to arm liblink
  // 当前 g 的调度数据,当 goroutine 切换时,保存当前 g 的上下文,用于恢复
    sched          gobuf
    // g 当前的状态
    atomicstatus   uint32
  // 当前 g 的 id
    goid           int64
  // 下一个 g 的地址,通过 guintptr 结构体的 ptr set 函数可以设置和获取下一个 g,通过这个字段和 sched.gfreeStack sched.gfreeNoStack 可以把 free g 串成一个链表
    schedlink      guintptr
  // 判断 g 是否允许被抢占
    preempt        bool       // preemption signal, duplicates stackguard0 = stackpreempt
    // g 是否要求要回到这个 M 执行, 有的时候 g 中断了恢复会要求使用原来的 M 执行
    lockedm        muintptr
}

1.1.2. P(process)

P 是 M 运行 G 所需的资源

type p struct {
   lock mutex

   id          int32
   // p 的状态,稍后介绍
   status      uint32 // one of pidle/prunning/...
   // 下一个 p 的地址,可参考 g.schedlink
   link        puintptr
   // p 所关联的 m
   m           muintptr   // back-link to associated m (nil if idle)
   // 内存分配的时候用的,p 所属的 m 的 mcache 用的也是这个
   mcache      *mcache
  
   // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
   // 从 sched 中获取并缓存的 id,避免每次分配 goid 都从 sched 分配
     goidcache    uint64
     goidcacheend uint64

   // Queue of runnable goroutines. Accessed without lock.
   // p 本地的 runnbale 的 goroutine 形成的队列
   runqhead uint32
   runqtail uint32
   runq     [256]guintptr
   // runnext, if non-nil, is a runnable G that was ready'd by
   // the current G and should be run next instead of what's in
   // runq if there's time remaining in the running G's time
   // slice. It will inherit the time left in the current time
   // slice. If a set of goroutines is locked in a
   // communicate-and-wait pattern, this schedules that set as a
   // unit and eliminates the (potentially large) scheduling
   // latency that otherwise arises from adding the ready'd
   // goroutines to the end of the run queue.
   // 下一个执行的 g,如果是 nil,则从队列中获取下一个执行的 g
   runnext guintptr

   // Available G's (status == Gdead)
   // 状态为 Gdead 的 g 的列表,可以进行复用
   gfree    *g
   gfreecnt int32
}

1.1.3. M(machine)

type m struct {
   // g0 是用于调度和执行系统调用的特殊 g
   g0      *g     // goroutine with scheduling stack
     // m 当前运行的 g
   curg          *g       // current running goroutine
   // 当前拥有的 p
   p             puintptr // attached p for executing go code (nil if not executing go code)
   // 线程的 local storage
   tls           [6]uintptr   // thread-local storage
   // 唤醒 m 时,m 会拥有这个 p
   nextp         puintptr
   id            int64
   // 如果 !="", 继续运行 curg
   preemptoff    string // if != "", keep curg running on this m
   // 自旋状态,用于判断 m 是否工作已结束,并寻找 g 进行工作
   spinning      bool // m is out of work and is actively looking for work
   // 用于判断 m 是否进行休眠状态
   blocked       bool // m is blocked on a note
     // m 休眠和唤醒通过这个,note 里面有一个成员 key,对这个 key 所指向的地址进行值的修改,进而达到唤醒和休眠的目的
   park          note
   // 所有 m 组成的一个链表
   alllink       *m // on allm
   // 下一个 m,通过这个字段和 sched.midle 可以串成一个 m 的空闲链表
   schedlink     muintptr
   // mcache,m 拥有 p 的时候,会把自己的 mcache 给 p
   mcache        *mcache
   // lockedm 的对应值
   lockedg       guintptr
   // 待释放的 m 的 list,通过 sched.freem 串成一个链表
   freelink      *m      // on sched.freem
}

1.1.4. sched

type schedt struct {
   // 全局的 go id 分配
   goidgen  uint64
   // 记录的最后一次从 i / o 中查询 g 的时间
   lastpoll uint64

   lock mutex

   // When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
   // sure to call checkdead().
     // m 的空闲链表,结合 m.schedlink 就可以组成一个空闲链表了
   midle        muintptr // idle m's waiting for work
   nmidle       int32    // number of idle m's waiting for work
   nmidlelocked int32    // number of locked m's waiting for work
   // 下一个 m 的 id,也用来记录创建的 m 数量
   mnext        int64    // number of m's that have been created and next M ID
   // 最多允许的 m 的数量
   maxmcount    int32    // maximum number of m's allowed (or die)
   nmsys        int32    // number of system m's not counted for deadlock
   // free 掉的 m 的数量,exit 的 m 的数量
   nmfreed      int64    // cumulative number of freed m's

   ngsys uint32 // number of system goroutines; updated atomically

   pidle      puintptr // idle p's
   npidle     uint32
   nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

   // Global runnable queue.
   // 这个就是全局的 g 的队列了,如果 p 的本地队列没有 g 或者太多,会跟全局队列进行平衡
   // 根据 runqhead 可以获取队列头的 g,然后根据 g.schedlink 获取下一个,从而形成了一个链表
   runqhead guintptr
   runqtail guintptr
   runqsize int32

   // freem is the list of m's waiting to be freed when their
   // m.exited is set. Linked through m.freelink.
   // 等待释放的 m 的列表
   freem *m
}

在这里插一下状态的解析

1.1.5. g.status

  • _Gidle: goroutine 刚刚创建还没有初始化
  • _Grunnable: goroutine 处于运行队列中,但是还没有运行,没有自己的栈
  • _Grunning: 这个状态的 g 可能处于运行用户代码的过程中,拥有自己的 m 和 p
  • _Gsyscall: 运行 systemcall 中
  • _Gwaiting: 这个状态的 goroutine 正在阻塞中,类似于等待 channel
  • _Gdead: 这个状态的 g 没有被使用,有可能是刚刚退出,也有可能是正在初始化中
  • _Gcopystack: 表示 g 当前的栈正在被移除,新栈分配中

1.1.6. p.status

  • _Pidle: 空闲状态,此时 p 不绑定 m
  • _Prunning: m 获取到 p 的时候,p 的状态就是这个状态了,然后 m 可以使用这个 p 的资源运行 g
  • _Psyscall: 当 go 调用原生代码,原生代码又反过来调用 go 的时候,使用的 p 就会变成此态
  • _Pdead: 当运行中,需要减少 p 的数量时,被减掉的 p 的状态就是这个了

1.1.7. m.status

m 的 status 没有 p、g 的那么明确,但是在运行流程的分析中,主要有以下几个状态

  • 运行中: 拿到 p,执行 g 的过程中
  • 运行原生代码: 正在执行原声代码或者阻塞的 syscall
  • 休眠中: m 发现无待运行的 g 时,进入休眠,并加入到空闲列表中
  • 自旋中(spining): 当前工作结束,正在寻找下一个待运行的 g

在上面的结构中,存在很多的链表,g m p 结构中还有指向对方地址的成员,那么他们的关系到底是什么样的

我们可以从上图,简单的表述一下 m p g 的关系

2. 流程概览

从下图,可以简单的一窥 go 的整个调度流程的大概

接下来我们就从源码的角度来具体的分析整个调度流程(本人汇编不照,汇编方面的就不分析了????)

3. 源码分析

3.1. 初始化

go 的启动流程分为 4 步

  1. call osinit,这里就是设置了全局变量 ncpu = cpu 核心数量
  2. call schedinit
  3. make & queue new G(runtime.newproc, go func()也是调用这个函数来创建 goroutine)
  4. call runtime·mstart

其中,schedinit 就是调度器的初始化,出去 schedinit 中对内存分配,垃圾回收等操作,针对调度器的初始化大致就是初始化自身,设置最大的 maxmcount,确定 p 的数量并初始化这些操作

3.1.1. schedinit

schedinit 这里对当前 m 进行了初始化,并根据 osinit 获取到的 cpu 核数和设置的GOMAXPROCS 确定 p 的数量,并进行初始化

func schedinit() {
    // 从 TLS 或者专用寄存器获取当前 g 的指针类型
    _g_ := getg()
    // 设置 m 最大的数量
    sched.maxmcount = 10000

    // 初始化栈的复用空间
    stackinit()
    // 初始化当前 m
    mcommoninit(_g_.m)

    // osinit 的时候会设置 ncpu 这个全局变量,这里就是根据 cpu 核心数和参数 GOMAXPROCS 来确定 p 的数量
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {procs = n}
    // 生成设定数量的 p
    if procresize(procs) != nil {throw("unknown runnable goroutine during bootstrap")
    }
}

3.1.2. mcommoninit

func mcommoninit(mp *m) {_g_ := getg()

    lock(&sched.lock)
    // 判断 mnext 的值是否溢出,mnext 需要赋值给 m.id
    if sched.mnext+1 < sched.mnext {throw("runtime: thread ID overflow")
    }
    mp.id = sched.mnext
    sched.mnext++
    // 判断 m 的数量是否比 maxmcount 设定的要多,如果超出直接报异常
    checkmcount()
    // 创建一个新的 g 用于处理 signal,并分配栈
    mpreinit(mp)
    if mp.gsignal != nil {mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard}

    // Add to allm so garbage collector doesn't free g->m
    // when it is just in a register or thread-local storage.
    // 接下来的两行,首先将当前 m 放到 allm 的头,然后原子操作,将当前 m 的地址,赋值给 m,这样就将当前 m 添加到了 allm 链表的头了
    mp.alllink = allm

    // NumCgoCall() iterates over allm w/o schedlock,
    // so we need to publish it safely.
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)

    // Allocate memory to hold a cgo traceback if the cgo call crashes.
    if iscgo || GOOS == "solaris" || GOOS == "windows" {mp.cgoCallers = new(cgoCallers)
    }
}

在这里就开始涉及到了 m 链表了,这个链表可以如下图表示,其他的 p g 链表可以参考,只是使用的结构体的字段不一样

3.1.3. allm 链表示意图

3.1.4. procresize

更改 p 的数量,多退少补的原则,在初始化过程中,由于最开始是没有 p 的,所以这里的作用就是初始化设定数量的 p 了

procesize 不仅在初始化的时候会调用,当用户手动调用 runtime.GOMAXPROCS 的时候,会重新设定 nprocs,然后执行 startTheWorld()startTheWorld()会是使用新的 nprocs 再次调用procresize 这个方法

func procresize(nprocs int32) *p {
    old := gomaxprocs
    if old < 0 || nprocs <= 0 {throw("procresize: invalid arg")
    }
    // update statistics
    now := nanotime()
    if sched.procresizetime != 0 {sched.totaltime += int64(old) * (now - sched.procresizetime)
    }
    sched.procresizetime = now

    // Grow allp if necessary.
    // 如果新给的 p 的数量比原先的 p 的数量多,则新建增长的 p
    if nprocs > int32(len(allp)) {
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
        lock(&allpLock)
        // 判断 allp 的 cap 是否满足增长后的长度,满足就直接使用,不满足,则需要扩张这个 slice
        if nprocs <= int32(cap(allp)) {allp = allp[:nprocs]
        } else {nallp := make([]*p, nprocs)
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        unlock(&allpLock)
    }

    // initialize new P's
    // 初始化新增的 p
    for i := int32(0); i < nprocs; i++ {pp := allp[i]
        if pp == nil {pp = new(p)
            pp.id = i
            pp.status = _Pgcstop
            pp.sudogcache = pp.sudogbuf[:0]
            for i := range pp.deferpool {pp.deferpool[i] = pp.deferpoolbuf[i][:0]
            }
            pp.wbBuf.reset()
            // allp 是一个 slice,直接将新增的 p 放到对应的索引下面就 ok 了
            atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
        }
        if pp.mcache == nil {
            // 初始化时,old=0,第一个新建的 p 给当前的 m 使用
            if old == 0 && i == 0 {if getg().m.mcache == nil {throw("missing mcache?")
                }
                pp.mcache = getg().m.mcache // bootstrap} else {
                // 为 p 分配内存
                pp.mcache = allocmcache()}
        }
    }

    // free unused P's
    // 释放掉多余的 p,当新设置的 p 的数量,比原先设定的 p 的数量少的时候,会走到这个流程
    // 通过 runtime.GOMAXPROCS 就可以动态的修改 nprocs
    for i := nprocs; i < old; i++ {p := allp[i]
        // move all runnable goroutines to the global queue
        // 把当前 p 的运行队列里的 g 转移到全局的 g 的队列
        for p.runqhead != p.runqtail {
            // pop from tail of local queue
            p.runqtail--
            gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
            // push onto head of global queue
            globrunqputhead(gp)
        }
        // 把 runnext 里的 g 也转移到全局队列
        if p.runnext != 0 {globrunqputhead(p.runnext.ptr())
            p.runnext = 0
        }
        // if there's a background worker, make it runnable and put
        // it on the global queue so it can clean itself up
        // 如果有 gc worker 的话,修改 g 的状态,然后再把它放到全局队列中
        if gp := p.gcBgMarkWorker.ptr(); gp != nil {casgstatus(gp, _Gwaiting, _Grunnable)
            globrunqput(gp)
            // This assignment doesn't race because the
            // world is stopped.
            p.gcBgMarkWorker.set(nil)
        }
        // sudoig 的 buf 和 cache,以及 deferpool 全部清空
        for i := range p.sudogbuf {p.sudogbuf[i] = nil
        }
        p.sudogcache = p.sudogbuf[:0]
        for i := range p.deferpool {for j := range p.deferpoolbuf[i] {p.deferpoolbuf[i][j] = nil
            }
            p.deferpool[i] = p.deferpoolbuf[i][:0]
        }
        // 释放掉当前 p 的 mcache
        freemcache(p.mcache)
        p.mcache = nil
        // 把当前 p 的 gfree 转移到全局
        gfpurge(p)
        // 修改 p 的状态,让他自生自灭去了
        p.status = _Pdead
        // can't free P itself because it can be referenced by an M in syscall
    }

    // Trim allp.
    if int32(len(allp)) != nprocs {lock(&allpLock)
        allp = allp[:nprocs]
        unlock(&allpLock)
    }
    // 判断当前 g 是否有 p,有的话更改当前使用的 p 的状态,继续使用
    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning} else {// release the current P and acquire allp[0]
        // 如果当前 g 有 p,但是拥有的是已经释放的 p,则不再使用这个 p,重新分配
        if _g_.m.p != 0 {_g_.m.p.ptr().m = 0
        }
        // 分配 allp[0]给当前 g 使用
        _g_.m.p = 0
        _g_.m.mcache = nil
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        // 将 p m g 绑定,并把 m.mcache 指向 p.mcache,并修改 p 的状态为_Prunning
        acquirep(p)
    }
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {p := allp[i]
        if _g_.m.p.ptr() == p {continue}
        p.status = _Pidle
        // 根据 runqempty 来判断当前 p 的 g 运行队列是否为空
        if runqempty(p) {
            // g 运行队列为空的 p,放到 sched 的 pidle 队列里面
            pidleput(p)
        } else {
            // g 运行队列不为空的 p,组成一个可运行队列,并最后返回
            p.m.set(mget())
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    return runnablePs
}
  • runqempty: 这个函数比较简单,就不深究了,就是根据 p.runqtail == p.runqhead 和 p.runnext 来判断有没有待运行的 g
  • pidleput: 将当前的 p 设置为 sched.pidle,然后根据 p.link 将空闲 p 串联起来,可参考上图 allm 的链表示意图

3.2. 任务

创建一个 goroutine,只需要使用 go func 就可以了,编译器会将go func 翻译成 newproc 进行调用,那么新建的任务是如何调用的呢,我们从创建开始进行跟踪

3.2.1. newproc

newproc 函数获取了参数和当前 g 的 pc 信息,并通过 g0 调用 newproc1 去真正的执行创建或获取可用的 g

func newproc(siz int32, fn *funcval) {
    // 获取第一参数地址
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    // 获取当前执行的 g
    gp := getg()
    // 获取当前 g 的 pc
    pc := getcallerpc()
    systemstack(func() {
        // 使用 g0 去执行 newproc1 函数
        newproc1(fn, (*uint8)(argp), siz, gp, pc)
    })
}

3.2.2. newproc1

newporc1 的作用就是创建或者获取一个空间的 g,初始化这个 g,并尝试寻找一个 p 和 m 去执行 g

func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {_g_ := getg()

    if fn == nil {
        _g_.m.throwing = -1 // do not dump full stacks
        throw("go of nil func value")
    }
    // 加锁禁止被抢占
    _g_.m.locks++ // disable preemption because it can be holding p in a local var
    siz := narg
    siz = (siz + 7) &^ 7

    // We could allocate a larger initial stack if necessary.
    // Not worth it: this is almost always an error.
    // 4*sizeof(uintreg): extra space added below
    // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
    // 如果参数过多,则直接抛出异常,栈大小是 2k
    if siz >= _StackMin-4*sys.RegSize-sys.RegSize {throw("newproc: function arguments too large for new goroutine")
    }

    _p_ := _g_.m.p.ptr()
    // 尝试获取一个空闲的 g,如果获取不到,则新建一个,并添加到 allg 里面
    // gfget 首先会尝试从 p 本地获取空闲的 g,如果本地没有的话,则从全局获取一堆平衡到本地 p
    newg := gfget(_p_)
    if newg == nil {newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        // 新建的 g,添加到全局的 allg 里面,allg 是一个 slice,append 进去即可
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }
    // 判断获取的 g 的栈是否正常
    if newg.stack.hi == 0 {throw("newproc1: newg missing stack")
    }
    // 判断 g 的状态是否正常
    if readgstatus(newg) != _Gdead {throw("newproc1: new g is not Gdead")
    }
    // 预留一点空间,防止读取超出一点点
    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
    // 空间大小进行对齐
    totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
    sp := newg.stack.hi - totalSize
    spArg := sp
    // usesLr 为 0,这里不执行
    if usesLR {
        // caller's LR
        *(*uintptr)(unsafe.Pointer(sp)) = 0
        prepGoExitFrame(sp)
        spArg += sys.MinFrameSize
    }
    if narg > 0 {
        // 将参数拷贝入栈
        memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
        // ... 省略 ...
    }
    // 初始化用于保存现场的区域及初始化基本状态
    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    newg.stktopsp = sp
    // 这里保存了 goexit 的地址,在用户函数执行完成后,会根据 pc 来执行 goexit
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    // 这里调整 sched 信息,pc = goexit 的地址
    gostartcallfn(&newg.sched, fn)
    newg.gopc = callerpc
    newg.ancestors = saveAncestors(callergp)
    newg.startpc = fn.fn
    if _g_.m.curg != nil {newg.labels = _g_.m.curg.labels}
    if isSystemGoroutine(newg) {atomic.Xadd(&sched.ngsys, +1)
    }
    newg.gcscanvalid = false
    casgstatus(newg, _Gdead, _Grunnable)
    // 如果 p 缓存的 goid 已经用完,本地再从 sched 批量获取一点
    if _p_.goidcache == _p_.goidcacheend {
        // Sched.goidgen is the last allocated id,
        // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
        // At startup sched.goidgen=0, so main goroutine receives goid=1.
        _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
        _p_.goidcache -= _GoidCacheBatch - 1
        _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
    }
    // 分配 goid
    newg.goid = int64(_p_.goidcache)
    _p_.goidcache++
    // 把新的 g 放到 p 的可运行 g 队列中
    runqput(_p_, newg, true)
    // 判断是否有空闲 p,且是否需要唤醒一个 m 来执行 g
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {wakep()
    }
    _g_.m.locks--
    if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
        _g_.stackguard0 = stackPreempt
    }
}

3.2.2.1. gfget

这个函数的逻辑比较简单,就是看一下 p 有没有空闲的 g,没有则去全局的 freeg 队列查找,这里就涉及了 p 本地和全局平衡的一个交互了

func gfget(_p_ *p) *g {
retry:
    gp := _p_.gfree
    // 本地的 g 队列为空,且全局队列不为空,则从全局队列一次获取至多 32 个下来,如果全局队列不够就算了
    if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) {lock(&sched.gflock)
        for _p_.gfreecnt < 32 {
            if sched.gfreeStack != nil {
                // Prefer Gs with stacks.
                gp = sched.gfreeStack
                sched.gfreeStack = gp.schedlink.ptr()} else if sched.gfreeNoStack != nil {
                gp = sched.gfreeNoStack
                sched.gfreeNoStack = gp.schedlink.ptr()} else {break}
            _p_.gfreecnt++
            sched.ngfree--
            gp.schedlink.set(_p_.gfree)
            _p_.gfree = gp
        }
        // 已经从全局拿了 g 了,再去从头开始判断
        unlock(&sched.gflock)
        goto retry
    }
    // 如果拿到了 g,则判断 g 是否有栈,没有栈就分配
    // 栈的分配跟内存分配差不多,首先创建几个固定大小的栈的数组,然后到指定大小的数组里面去分配就 ok 了,过大则直接全局分配
    if gp != nil {_p_.gfree = gp.schedlink.ptr()
        _p_.gfreecnt--
        if gp.stack.lo == 0 {
            // Stack was deallocated in gfput. Allocate a new one.
            systemstack(func() {gp.stack = stackalloc(_FixedStack)
            })
            gp.stackguard0 = gp.stack.lo + _StackGuard
        } else {// ... 省略 ...}
    }
    // 注意:如果全局没有 g,p 也没有 g,则返回的 gp 还是 nil
    return gp
}

3.2.2.2. runqput

runqput 会把 g 放到 p 的本地队列或者 p.runnext,如果 p 的本地队列过长,则把 g 到全局队列,同时平衡 p 本地队列的一半到全局

func runqput(_p_ *p, gp *g, next bool) {if randomizeScheduler && next && fastrand()%2 == 0 {next = false}
    // 如果 next 为 true,则放入到 p.runnext 里面,并把原先 runnext 的 g 交换出来
    if next {
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {goto retryNext}
        if oldnext == 0 {return}
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()}

retry:
    h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    // 判断 p 的队列的长度是否超了,runq 是一个长度为 256 的数组,超出的话就会放到全局队列了
    if t-h < uint32(len(_p_.runq)) {_p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 把 g 放到全局队列
    if runqputslow(_p_, gp, h, t) {return}
    // the queue is not full, now the put above must succeed
    goto retry
}

3.2.2.3. runqputslow

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {var batch [len(_p_.runq)/2 + 1]*g

    // First, grab a batch from local queue.
    n := t - h
    n = n / 2
    if n != uint32(len(_p_.runq)/2) {throw("runqputslow: queue is not full")
    }
    // 获取 p 后面的一半
    for i := uint32(0); i < n; i++ {batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()}
    if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
        return false
    }
    batch[n] = gp

    // Link the goroutines.
    for i := uint32(0); i < n; i++ {batch[i].schedlink.set(batch[i+1])
    }

    // Now put the batch on global queue.
    // 放到全局队列队尾
    lock(&sched.lock)
    globrunqputbatch(batch[0], batch[n], int32(n+1))
    unlock(&sched.lock)
    return true
}

新建任务至此基本结束,创建完成任务后,等待调度执行就好了,从上面可以看出,任务的优先级是 p.runnext > p.runq > sched.runq

g 从创建到执行结束并放入 free 队列中的状态转换大致如下图所示

3.2.3 wakep

当 newproc1 创建完任务后,会尝试唤醒 m 来执行任务

func wakep() {
    // be conservative about spinning threads
    // 一次应该只有一个 m 在 spining,否则就退出
    if !atomic.Cas(&sched.nmspinning, 0, 1) {return}
    // 调用 startm 来执行
    startm(nil, true)
}

3.2.4 startm

调度 m 或者创建 m 来运行 p,如果 p ==nil,就会尝试获取一个空闲 p,p 的队列中有 g,拿到 p 后才能拿到 g

func startm(_p_ *p, spinning bool) {lock(&sched.lock)
    if _p_ == nil {
        // 如果没有指定 p, 则从 sched.pidle 获取空闲的 p
        _p_ = pidleget()
        if _p_ == nil {unlock(&sched.lock)
            // 如果没有获取到 p,重置 nmspinning
            if spinning {
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {throw("startm: negative nmspinning")
                }
            }
            return
        }
    }
    // 首先尝试从 sched.midle 获取一个空闲的 m
    mp := mget()
    unlock(&sched.lock)
    if mp == nil {
        // 如果获取不到空闲的 m,则创建一个 mspining = true 的 m,并将 p 绑定到 m 上,直接返回
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        newm(fn, _p_)
        return
    }
    // 判断获取到的空闲 m 是否是 spining 状态
    if mp.spinning {throw("startm: m is spinning")
    }
    // 判断获取到的 m 是否有 p
    if mp.nextp != 0 {throw("startm: m has p")
    }
    if spinning && !runqempty(_p_) {throw("startm: p has runnable gs")
    }
    // The caller incremented nmspinning, so set m.spinning in the new M.
    // 调用函数的父函数已经增加了 nmspinning,这里只需要设置 m.spining 就 ok 了,同时把 p 绑上来
    mp.spinning = spinning
    mp.nextp.set(_p_)
    // 唤醒 m
    notewakeup(&mp.park)
}

3.2.4.1. newm

newm 通过 allocm 函数来创建新 m

func newm(fn func(), _p_ *p) {
    // 新建一个 m
    mp := allocm(_p_, fn)
    // 为这个新建的 m 绑定指定的 p
    mp.nextp.set(_p_)
    // ... 省略 ...
    // 创建系统线程
    newm1(mp)
}

3.2.4.2. new1m

func newm1(mp *m) {
    // runtime cgo 包会把 iscgo 设置为 true,这里不分析
    if iscgo {
        var ts cgothreadstart
        if _cgo_thread_start == nil {throw("_cgo_thread_start missing")
        }
        ts.g.set(mp.g0)
        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
        ts.fn = unsafe.Pointer(funcPC(mstart))
        if msanenabled {msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        execLock.rlock() // Prevent process clone.
        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
        execLock.runlock()
        return
    }
    execLock.rlock() // Prevent process clone.
    newosproc(mp)
    execLock.runlock()}

3.2.4.3. newosproc

newosproc 创建一个新的系统线程,并执行 mstart_stub 函数,之后调用 mstart 函数进入调度,后面在执行流程会分析

func newosproc(mp *m) {stk := unsafe.Pointer(mp.g0.stack.hi)
    // Initialize an attribute object.
    var attr pthreadattr
    var err int32
    err = pthread_attr_init(&attr)

    // Finally, create the thread. It starts at mstart_stub, which does some low-level
    // setup and then calls mstart.
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    // 创建线程,并传入启动启动函数 mstart_stub,mstart_stub 之后调用 mstart
    err = pthread_create(&attr, funcPC(mstart_stub), unsafe.Pointer(mp))
    sigprocmask(_SIG_SETMASK, &oset, nil)
    if err != 0 {write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate)))
        exit(1)
    }
}

3.2.4.4. allocm

allocm 这里首先会释放 sched 的 freem,然后再去创建 m,并初始化 m

func allocm(_p_ *p, fn func()) *m {_g_ := getg()
    _g_.m.locks++ // disable GC because it can be called from sysmon
    if _g_.m.p == 0 {acquirep(_p_) // temporarily borrow p for mallocs in this function
    }

    // Release the free M list. We need to do this somewhere and
    // this may free up a stack we can use.
    // 首先释放掉 freem 列表
    if sched.freem != nil {lock(&sched.lock)
        var newList *m
        for freem := sched.freem; freem != nil; {
            if freem.freeWait != 0 {
                next := freem.freelink
                freem.freelink = newList
                newList = freem
                freem = next
                continue
            }
            stackfree(freem.g0.stack)
            freem = freem.freelink
        }
        sched.freem = newList
        unlock(&sched.lock)
    }

    mp := new(m)
    // 启动函数,根据 startm 调用来看,这个 fn 就是 mspinning,会将 m.mspinning 设置为 true
    mp.mstartfn = fn
    // 初始化 m,上面已经分析了
    mcommoninit(mp)
    // In case of cgo or Solaris or Darwin, pthread_create will make us a stack.
    // Windows and Plan 9 will layout sched stack on OS stack.
    // 为新的 m 创建 g0
    if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {mp.g0 = malg(-1)
    } else {mp.g0 = malg(8192 * sys.StackGuardMultiplier)
    }
    // 为 mp 的 g0 绑定自己
    mp.g0.m = mp
    // 如果当前的 m 所绑定的是参数传递过来的 p,解除绑定,因为参数传递过来的 p 稍后要绑定新建的 m
    if _p_ == _g_.m.p.ptr() {releasep()
    }

    _g_.m.locks--
    if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
        _g_.stackguard0 = stackPreempt
    }

    return mp
}

3.2.4.5. notewakeup

func notewakeup(n *note) {
    var v uintptr
    // 设置 m 为 locked
    for {v = atomic.Loaduintptr(&n.key)
        if atomic.Casuintptr(&n.key, v, locked) {break}
    }

    // Successfully set waitm to locked.
    // What was it before?
    // 根据 m 的原先的状态,来判断后面的执行流程,0 则直接返回,locked 则冲突,否则认为是 wating,唤醒
    switch {
    case v == 0:
        // Nothing was waiting. Done.
    case v == locked:
        // Two notewakeups! Not allowed.
        throw("notewakeup - double wakeup")
    default:
        // Must be the waiting m. Wake it up.
        // 唤醒系统线程
        semawakeup((*m)(unsafe.Pointer(v)))
    }
}

至此的话,创建完任务 g 后,将 g 放入了 p 的 local 队列或者是全局队列,然后开始获取了一个空闲的 m 或者新建一个 m 来执行 g,m, p, g 都已经准备完成了,下面就是开始调度,来运行任务 g 了

3.3. 执行

在 startm 函数分析的过程中会,可以看到,有两种获取 m 的方式

  • 新建:这时候执行 newm1 下的 newosproc,同时最终调用 mstart 来执行调度
  • 唤醒空闲 m:从休眠的地方继续执行

m 执行 g 有两个起点,一个是线程启动函数 mstart,另一个则是休眠被唤醒后的调度 schedule 了,我们从头开始,也就是mstartmstart 走到最后也是 schedule 调度

3.3.1. mstart

func mstart() {_g_ := getg()

    osStack := _g_.stack.lo == 0
    if osStack {
        // Initialize stack bounds from system stack.
        // Cgo may have left stack size in stack.hi.
        // minit may update the stack bounds.
        // 从系统堆栈上直接划出所需的范围
        size := _g_.stack.hi
        if size == 0 {size = 8192 * sys.StackGuardMultiplier}
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    // Initialize stack guards so that we can start calling
    // both Go and C functions with stack growth prologues.
    _g_.stackguard0 = _g_.stack.lo + _StackGuard
    _g_.stackguard1 = _g_.stackguard0
    // 调用 mstart1 来处理
    mstart1()

    // Exit this thread.
    if GOOS == "windows" || GOOS == "solaris" || GOOS == "plan9" || GOOS == "darwin" {
        // Window, Solaris, Darwin and Plan 9 always system-allocate
        // the stack, but put it in _g_.stack before mstart,
        // so the logic above hasn't set osStack yet.
        osStack = true
    }
    // 退出 m,正常情况下 mstart1 调用 schedule() 时,是不再返回的,所以,不用担心系统线程的频繁创建退出
    mexit(osStack)
}

3.3.2. mstart1

func mstart1() {_g_ := getg()

    if _g_ != _g_.m.g0 {throw("bad runtime·mstart")
    }

    // Record the caller for use as the top of stack in mcall and
    // for terminating the thread.
    // We're never coming back to mstart1 after we call schedule,
    // so other calls can reuse the current frame.
    // 保存调用者的 pc sp 等信息
    save(getcallerpc(), getcallersp())
    asminit()
    // 初始化 m 的 sigal 的栈和 mask
    minit()

    // Install signal handlers; after minit so that minit can
    // prepare the thread to be able to handle the signals.
    // 安装 sigal 处理器
    if _g_.m == &m0 {mstartm0()
    }
    // 如果设置了 mstartfn,就先执行这个
    if fn := _g_.m.mstartfn; fn != nil {fn()
    }

    if _g_.m.helpgc != 0 {
        _g_.m.helpgc = 0
        stopm()} else if _g_.m != &m0 {
        // 获取 nextp
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    schedule()}

3.3.2.1. acquirep

acquirep 函数主要是改变 p 的状态,绑定 m p,通过吧 p 的 mcache 与 m 共享

func acquirep(_p_ *p) {
    // Do the part that isn't allowed to have write barriers.
    acquirep1(_p_)

    // have p; write barriers now allowed
    _g_ := getg()
    // 把 p 的 mcache 与 m 共享
    _g_.m.mcache = _p_.mcache
}

3.3.2.2. acquirep1

func acquirep1(_p_ *p) {_g_ := getg()

    // 让 m p 互相绑定
    _g_.m.p.set(_p_)
    _p_.m.set(_g_.m)
    _p_.status = _Prunning
}

3.3.2.3. schedule

开始进入到调度函数了,这是一个由 schedule、execute、goroutine fn、goexit 构成的逻辑循环,就算 m 是唤醒后,也是从设置的断点开始执行

func schedule() {_g_ := getg()

    if _g_.m.locks != 0 {throw("schedule: holding locks")
    }
    // 如果有 lockg,停止执行当前的 m
    if _g_.m.lockedg != 0 {
        // 解除 lockedm 的锁定,并执行当前 g
        stoplockedm()
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }

    // We should not schedule away from a g that is executing a cgo call,
    // since the cgo call is using the m's g0 stack.
    if _g_.m.incgo {throw("schedule: in cgo")
    }

top:
    // gc 等待
    if sched.gcwaiting != 0 {gcstopm()
        goto top
    }

    var gp *g
    var inheritTime bool

    if gp == nil {
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        // 为了保证公平,每隔 61 次,从全局队列上获取 g
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        // 全局队列上获取不到待运行的 g,则从 p local 队列中获取
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {throw("schedule: spinning with local work")
        }
    }
    if gp == nil {
        // 如果 p local 获取不到待运行 g,则开始查找,这个函数会从 全局 io poll,p locl 和其他 p local 获取待运行的 g,后面详细分析
        gp, inheritTime = findrunnable() // blocks until work is available}

    // This thread is going to run a goroutine and is not spinning anymore,
    // so if it was marked as spinning we need to reset it now and potentially
    // start a new spinning M.
    if _g_.m.spinning {
        // 如果 m 是自旋状态,取消自旋
        resetspinning()}

    if gp.lockedm != 0 {
        // Hands off own p to the locked m,
        // then blocks waiting for a new p.
        // 如果 g 有 lockedm,则休眠上交 p,休眠 m,等待新的 m,唤醒后从这里开始执行,跳转到 top
        startlockedm(gp)
        goto top
    }
    // 开始执行这个 g
    execute(gp, inheritTime)
}
3.3.2.3.1. stoplockedm

因为当前的 m 绑定了 lockedg,而当前 g 不是指定的 lockedg,所以这个 m 不能执行,上交当前 m 绑定的 p,并且休眠 m 直到调度 lockedg

func stoplockedm() {_g_ := getg()

    if _g_.m.lockedg == 0 || _g_.m.lockedg.ptr().lockedm.ptr() != _g_.m {throw("stoplockedm: inconsistent locking")
    }
    if _g_.m.p != 0 {
        // Schedule another M to run this p.
        // 释放当前 p
        _p_ := releasep()
        handoffp(_p_)
    }
    incidlelocked(1)
    // Wait until another thread schedules lockedg again.
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
    status := readgstatus(_g_.m.lockedg.ptr())
    if status&^_Gscan != _Grunnable {print("runtime:stoplockedm: g is not Grunnable or Gscanrunnable\n")
        dumpgstatus(_g_)
        throw("stoplockedm: not runnable")
    }
    // 上交了当前的 p,将 nextp 设置为可执行的 p
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}
3.3.2.3.2. startlockedm

调度 lockedm 去运行 lockedg

func startlockedm(gp *g) {_g_ := getg()

    mp := gp.lockedm.ptr()
    if mp == _g_.m {throw("startlockedm: locked to me")
    }
    if mp.nextp != 0 {throw("startlockedm: m has p")
    }
    // directly handoff current P to the locked m
    incidlelocked(-1)
    // 移交当前 p 给 lockedm,并设置为 lockedm.nextp,以便于 lockedm 唤醒后,可以获取
    _p_ := releasep()
    mp.nextp.set(_p_)
    // m 被唤醒后,从 m 休眠的地方开始执行,也就是 schedule()函数中
    notewakeup(&mp.park)
    stopm()}
3.3.2.3.3. handoffp
func handoffp(_p_ *p) {
    // handoffp must start an M in any situation where
    // findrunnable would return a G to run on _p_.

    // if it has local work, start it straight away
    if !runqempty(_p_) || sched.runqsize != 0 {
        // 调用 startm 开始调度
        startm(_p_, false)
        return
    }

    // no local work, check that there are no spinning/idle M's,
    // otherwise our help is not required
    // 判断有没有正在寻找 p 的 m 以及有没有空闲的 p
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
    lock(&sched.lock)

    if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {sched.safePointFn(_p_)
        sched.safePointWait--
        if sched.safePointWait == 0 {notewakeup(&sched.safePointNote)
        }
    }
    // 如果 全局待运行 g 队列不为空,尝试使用 startm 进行调度
    if sched.runqsize != 0 {unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // If this is the last running P and nobody is polling network,
    // need to wakeup another M to poll network.
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // 把 p 放入到全局的空闲队列,放回队列就不多说了,参考 allm 的放回
    pidleput(_p_)
    unlock(&sched.lock)
}
3.3.2.3.4. execute

开始执行 g 的代码了

func execute(gp *g, inheritTime bool) {_g_ := getg()
    // 更改 g 的状态,并不允许抢占
    casgstatus(gp, _Grunnable, _Grunning)
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard
    if !inheritTime {
        // 调度计数
        _g_.m.p.ptr().schedtick++}
    _g_.m.curg = gp
    gp.m = _g_.m
    // 开始执行 g 的代码了
    gogo(&gp.sched)
}
3.3.2.3.5. gogo

gogo 函数承载的作用就是切换到 g 的栈,开始执行 g 的代码,汇编内容就不分析了,但是有一个疑问就是,gogo 执行完函数后,怎么再次进入调度呢?

我们回到 newproc1 函数的 L63 newg.sched.pc = funcPC(goexit) + sys.PCQuantum,这里保存了 pc 的质地为 goexit 的地址,所以当执行完用户代码后,就会进入 goexit 函数

3.3.2.3.6. goexit0

goexit 在汇编层面就是调用 runtime.goexit1,而 goexit1 通过 mcall 调用了goexit0 所以这里直接分析了goexit0

goexit0 重置 g 的状态,并重新进行调度,这样就调度就又回到了schedule() 了,开始循环往复的调度

func goexit0(gp *g) {_g_ := getg()
    // 转换 g 的状态为 dead,以放回空闲列表
    casgstatus(gp, _Grunning, _Gdead)
    if isSystemGoroutine(gp) {atomic.Xadd(&sched.ngsys, -1)
    }
    // 清空 g 的状态
    gp.m = nil
    locked := gp.lockedm != 0
    gp.lockedm = 0
    _g_.m.lockedg = 0
    gp.paniconfault = false
    gp._defer = nil // should be true already but just in case.
    gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
    gp.writebuf = nil
    gp.waitreason = 0
    gp.param = nil
    gp.labels = nil
    gp.timer = nil

    // Note that gp's stack scan is now"valid" because it has no
    // stack.
    gp.gcscanvalid = true
    dropg()

    // 把 g 放回空闲列表,以备复用
    gfput(_g_.m.p.ptr(), gp)
    // 再次进入调度循环
    schedule()}

至此,单次调度结束,再次进入调度,循环往复

3.3.2.3.7. findrunnable
func findrunnable() (gp *g, inheritTime bool) {_g_ := getg()

    // The conditions here and in handoffp must agree: if
    // findrunnable would return a G to run, handoffp must start
    // an M.

top:
    _p_ := _g_.m.p.ptr()

    // local runq
    // 从 p local 去获取 g
    if gp, inheritTime := runqget(_p_); gp != nil {return gp, inheritTime}

    // global runq
    // 从全局的待运行 d 队列获取
    if sched.runqsize != 0 {lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {return gp, false}
    }

    // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    // 看看 netpoll 中有没有已经准备好的 g
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {if gp := netpoll(false); gp != nil { // non-blocking
            // netpoll returns list of goroutines linked by schedlink.
            injectglist(gp.schedlink.ptr())
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }

    // Steal work from other P's.
    // 如果 sched.pidle == procs - 1,说明所有的 p 都是空闲的,无需遍历其他 p 了
    procs := uint32(gomaxprocs)
    if atomic.Load(&sched.npidle) == procs-1 {
        // Either GOMAXPROCS=1 or everybody, except for us, is idle already.
        // New work can appear from returning syscall/cgocall, network or timers.
        // Neither of that submits to local run queues, so no point in stealing.
        goto stop
    }
    // If number of spinning M's >= number of busy P's, block.
    // This is necessary to prevent excessive CPU consumption
    // when GOMAXPROCS>>1 but the program parallelism is low.
    // 如果寻找 p 的 m 的数量,大于有 g 的 p 的数量的一般,就不再去寻找了
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {goto stop}
    // 设置当前 m 的自旋状态
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
    // 开始窃取其他 p 的待运行 g 了
    for i := 0; i < 4; i++ {for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {goto top}
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            // 从其他的 p 偷取一般的任务数量,还会随机偷取 p 的 runnext(过分了),偷取部分就不分析了,就是 slice 的操作而已
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {return gp, false}
        }
    }

stop:
    // 对 all 做个镜像备份
    allpSnapshot := allp

    // return P and block
    lock(&sched.lock)

    if sched.runqsize != 0 {gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
    if releasep() != _p_ {throw("findrunnable: wrong p")
    }
    pidleput(_p_)
    unlock(&sched.lock)

    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        // 设置非自旋状态,因为找 p 的工作已经结束了
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again
    for _, _p_ := range allpSnapshot {if !runqempty(_p_) {lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }
    // poll network
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
        if _g_.m.p != 0 {throw("findrunnable: netpoll with p")
        }
        if _g_.m.spinning {throw("findrunnable: netpoll with spinning")
        }
        gp := netpoll(true) // block until new work is available
        atomic.Store64(&sched.lastpoll, uint64(nanotime()))
        if gp != nil {lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {acquirep(_p_)
                injectglist(gp.schedlink.ptr())
                casgstatus(gp, _Gwaiting, _Grunnable)
                if trace.enabled {traceGoUnpark(gp, 0)
                }
                return gp, false
            }
            injectglist(gp)
        }
    }
    stopm()
    goto top
}

这里真的是无奈啊,为了寻找一个可运行的 g,也是煞费苦心,及时进入了 stop 的 label,还是不死心,又来了一边寻找。大致寻找过程可以总结为一下几个:

  • 从 p 自己的 local 队列中获取可运行的 g
  • 从全局队列中获取可运行的 g
  • 从 netpoll 中获取一个已经准备好的 g
  • 从其他 p 的 local 队列中获取可运行的 g,随机偷取 p 的 runnext,有点任性
  • 无论如何都获取不到的话,就 stopm 了
3.3.2.3.7. stopm

stop 会把当前 m 放到空闲列表里面,同时绑定 m.nextp 与 m

func stopm() {_g_ := getg()
retry:
    lock(&sched.lock)
    // 把当前 m 放到 sched.midle 的空闲列表里
    mput(_g_.m)
    unlock(&sched.lock)
    // 休眠,等待被唤醒
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
    // 绑定 p
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

3.4. 监控

3.4.1. sysmon

go 的监控是依靠函数 sysmon 来完成的,监控主要做一下几件事

  • 释放闲置超过 5 分钟的 span 物理内存
  • 如果超过两分钟没有执行垃圾回收,则强制执行
  • 将长时间未处理的 netpoll 结果添加到任务队列
  • 向长时间运行的 g 进行抢占
  • 收回因为 syscall 而长时间阻塞的 p

监控线程并不是时刻在运行的,监控线程首次休眠 20us,每次执行完后,增加一倍的休眠时间,但是最多休眠 10ms

func sysmon() {lock(&sched.lock)
    sched.nmsys++
    checkdead()
    unlock(&sched.lock)

    // If a heap span goes unused for 5 minutes after a garbage collection,
    // we hand it back to the operating system.
    scavengelimit := int64(5 * 60 * 1e9)

    if debug.scavenge > 0 {
        // Scavenge-a-lot for testing.
        forcegcperiod = 10 * 1e6
        scavengelimit = 20 * 1e6
    }

    lastscavenge := nanotime()
    nscavenge := 0

    lasttrace := int64(0)
    idle := 0 // how many cycles in succession we had not wokeup somebody
    delay := uint32(0)
    for {
        // 判断当前循环,应该休眠的时间
        if idle == 0 { // start with 20us sleep...
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms...
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay)
        // STW 时休眠 sysmon
        if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {lock(&sched.lock)
            if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {atomic.Store(&sched.sysmonwait, 1)
                unlock(&sched.lock)
                // Make wake-up period small enough
                // for the sampling to be correct.
                maxsleep := forcegcperiod / 2
                if scavengelimit < forcegcperiod {maxsleep = scavengelimit / 2}
                shouldRelax := true
                if osRelaxMinNS > 0 {next := timeSleepUntil()
                    now := nanotime()
                    if next-now < osRelaxMinNS {shouldRelax = false}
                }
                if shouldRelax {osRelax(true)
                }
                // 进行休眠
                notetsleep(&sched.sysmonnote, maxsleep)
                if shouldRelax {osRelax(false)
                }
                lock(&sched.lock)
                // 唤醒后,清除休眠状态,继续执行
                atomic.Store(&sched.sysmonwait, 0)
                noteclear(&sched.sysmonnote)
                idle = 0
                delay = 20
            }
            unlock(&sched.lock)
        }
        // trigger libc interceptors if needed
        if *cgo_yield != nil {asmcgocall(*cgo_yield, nil)
        }
        // poll network if not polled for more than 10ms
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        now := nanotime()
        // 如果 netpoll 不为空,每隔 10ms 检查一下是否有 ok 的
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            // 返回了已经获取到结果的 goroutine 的列表
            gp := netpoll(false) // non-blocking - returns list of goroutines
            if gp != nil {incidlelocked(-1)
                // 把获取到的 g 的列表加入到全局待运行队列中
                injectglist(gp)
                incidlelocked(1)
            }
        }
        // retake P's blocked in syscalls
        // and preempt long running G's
        // 抢夺 syscall 长时间阻塞的 p 和长时间运行的 g
        if retake(now) != 0 {idle = 0} else {idle++}
        // check if we need to force a GC
        // 通过 gcTrigger.test() 函数判断是否超过设定的强制触发 gc 的时间间隔,if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {lock(&forcegc.lock)
            forcegc.idle = 0
            forcegc.g.schedlink = 0
            // 把 gc 的 g 加入待运行队列,等待调度运行
            injectglist(forcegc.g)
            unlock(&forcegc.lock)
        }
        // scavenge heap once in a while
        // 判断是否有 5 分钟未使用的 span,有的话,归还给系统
        if lastscavenge+scavengelimit/2 < now {mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
            lastscavenge = now
            nscavenge++
        }
        if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
            lasttrace = now
            schedtrace(debug.scheddetail > 0)
        }
    }
}

扫描 netpoll,并把 g 存放到去全局队列比较好理解,跟前面添加 p 和 m 的逻辑差不多,但是抢占这里就不是很理解了,你说抢占就抢占,被抢占的 g 岂不是很没面子,而且怎么抢占呢?

3.4.2. retake

const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
    n := 0
    // Prevent allp slice changes. This lock will be completely
    // uncontended unless we're already stopping the world.
    lock(&allpLock)
    // We can't use a range loop over allp because we may
    // temporarily drop the allpLock. Hence, we need to re-fetch
    // allp each time around the loop.
    for i := 0; i < len(allp); i++ {_p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall {// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            // pd.syscalltick 即 _p_.sysmontick.syscalltick 只有在 sysmon 的时候会更新,而 _p_.syscalltick 则会每次都更新,所以,当 syscall 之后,第一个 sysmon 检测到的时候并不会抢占,而是第二次开始才会抢占,中间间隔至少有 20us,最多会有 10ms
            t := int64(_p_.syscalltick)
            if int64(pd.syscalltick) != t {pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
            // 是否有空 p,有寻找 p 的 m,以及当前的 p 在 syscall 之后,有没有超过 10ms
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {continue}
            // Drop allpLock so we can take sched.lock.
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1)
            // 抢占 p,把 p 的状态转为 idle 状态
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                // 把当前 p 移交出去,上面已经分析过了
                handoffp(_p_)
            }
            incidlelocked(1)
            lock(&allpLock)
        } else if s == _Prunning {
            // Preempt G if it's running for too long.
            // 如果 p 是 running 状态,如果 p 下面的 g 执行太久了,则抢占
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {pd.schedtick = uint32(t)
                pd.schedwhen = now
                continue
            }
            // 判断是否超出 10ms, 不超过不抢占
            if pd.schedwhen+forcePreemptNS > now {continue}
            // 开始抢占
            preemptone(_p_)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

3.4.3. preemptone

这个函数的注释,作者就表明这种抢占并不是很靠谱????,我们先看一下实现吧

func preemptone(_p_ *p) bool {mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {return false}
    gp := mp.curg
    if gp == nil || gp == mp.g0 {return false}
    // 标识抢占字段
    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    // 更新 stackguard0,保证能检测到栈溢
    gp.stackguard0 = stackPreempt
    return true
}

在这里,作者会更新 gp.stackguard0 = stackPreempt,然后让 g 误以为栈不够用了,那就只有乖乖的去进行栈扩张,站扩张的话就用调用newstack 分配一个新栈,然后把原先的栈的内容拷贝过去,而在 newstack 里面有一段如下

if preempt {if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
        // Let the goroutine keep running for now.
        // gp->preempt is set, so it will be preempted next time.
        gp.stackguard0 = gp.stack.lo + _StackGuard
        gogo(&gp.sched) // never return
    }
}

然后这里就发现 g 被抢占了,那你栈不够用就有可能是假的,但是管你呢,你再去调度去吧,也不给你扩栈了,虽然作者和雨痕大神都吐槽了一下这个,但是这种抢占方式自动 1.5(也可能更早)就一直存在,且稳定运行,就说明还是很牛逼的了

4. 总结

在调度器的设置上,最明显的就是复用:g 的 free 链表,m 的 free 列表,p 的 free 列表,这样就避免了重复创建销毁锁浪费的资源

其次就是多级缓存:这一块跟内存上的设计思想也是一直的,p 一直有一个 g 的待运行队列,自己没有货过多的时候,才会平衡到全局队列,全局队列操作需要锁,则本地操作则不需要,大大减少了锁的创建销毁所消耗的资源

至此,g m p 的关系及状态转换大致都讲解完成了,由于对汇编这块比较薄弱,所以基本略过了,右面有机会还是需要多了解一点

5. 参考文档

  • 《go 语言学习笔记》
  • Golang 源码探索(二) 协程的实现原理
  • 【Go 源码分析】Go scheduler 源码分析

正文完
 0