共计 26789 个字符,预计需要花费 67 分钟才能阅读完成。
作者:孙伟
1、进程 / 线程 / 协程基本概念
一个进程可以有多个线程,一般情况下固定 2MB 内存块来做栈,用来保存当前被调用 / 挂起的函数内部的变量,CPU 在执行调度的时候切换的是线程,如果下一个线程也是当前进程的,就只有线程切换,“很快”就能完成;如果下一个线程不是当前的进程,就需要切换进程,这就得费点时间了。
线程分为内核态线程和用户态线程,用户态线程需要绑定内核态线程,CPU 并不能感知用户态线程的存在,它只知道它在运行 1 个线程,这个线程实际是内核态线程。
用户态线程实际有个名字叫协程(co-routine),为了容易区分,我们使用协程指用户态线程,使用线程指内核态线程。
协程跟线程是有区别的,线程由 CPU 调度是抢占式的,协程由用户态调度是协作式的,一个协程让出 CPU 后,才执行下一个协程。
协程和线程绑定关系有以下 3 种:
N:1,N 个协程绑定 1 个线程,优点就是协程在用户态线程即完成切换,不会陷入到内核态,这种切换非常的轻量快速。但也有很大的缺点,1 个进程的所有协程都绑定在 1 个线程上,一是某个程序用不了硬件的多核加速能力,二是一旦某协程阻塞,造成线程阻塞,本进程的其他协程都无法执行了,根本就没有并发的能力了。
1:1,1 个协程绑定 1 个线程,这种最容易实现。协程的调度都由 CPU 完成了,不存在 N:1 缺点,但有一个缺点是协程的创建、删除和切换的代价都由 CPU 完成,有点略显昂贵了。
M:N,M 个协程绑定 N 个线程,是 N:1 和 1:1 类型的结合,克服了以上 2 种模型的缺点,但实现起来最为复杂。
2、Golang 简介
2.1 Goroutine 概念
因为线程切换需要很大的上下文,这种切换消耗了大量 CPU 时间,所以 Go 的并行单元并不是传统意义上的线程,而是采用更轻量的协程(goroutine)来处理,大大提高了并行度,因此 Go 被称为“最并行的语言”。
2.2 与其他并发模型的对比
Python 等解释性语言采用的是多进程并发模型,进程的上下文是最大的,所以切换耗费巨大,同时由于多进程通信只能用 socket 通讯,或者专门设置共享内存,给编程带来了极大的困扰与不便;
C++ 等语言通常会采用多线程并发模型,相比进程,线程的上下文要小很多,而且多个线程之间本来就是共享内存的,所以编程相比要轻松很多。但是线程的启动和销毁,切换依然要耗费大量 CPU 时间;于是出现了线程池技术,将线程先储存起来,保持一定的数量,来避免频繁开启 / 关闭线程的时间消耗,但是这种初级的技术存在一些问题,比如有线程一直被 IO 阻塞,这样的话这个线程一直占据着坑位,导致后面的任务排不到队,拿不到线程来执行;
Go 的并发较为复杂,Go 采用了更轻量的数据结构来代替线程,这种数据结构相比线程更轻量,他有自己的栈,切换起来更快。然而真正执行并发的还是线程,Go 通过调度器将 goroutine 调度到线程中执行,并适时地释放和创建新的线程,并且当一个正在运行的 goroutine 进入阻塞(常见场景就是等待 IO)时,将其脱离占用的线程,将其他准备好运行的 goroutine 放在该线程上执行。通过较为复杂的调度手段,使得整个系统获得极高的并行度同时又不耗费大量的 CPU 资源。
2.3 Goroutine 的特点
非阻塞。Goroutine 的引入是为了方便高并发程序的编写。一个 Goroutine 在进行阻塞操作(比如系统调用)时,会把当前线程中的其他 Goroutine 移交到其他线程中继续执行,从而避免了整个程序的阻塞。
调度器。虽然 Golang 引入了垃圾回收(gc),在执行 gc 时就要求 Goroutine 是停止的,但 Go 通过自己实现调度器,也可以方便的实现该功能。通过多个 Goroutine 来实现并发程序,既有异步 IO 的优势,又具有多线程、多进程编写程序的便利性。
自己维护堆栈。当然引入 Goroutine,也意味着引入了极大的复杂性。一个 Goroutine 既要包含要执行的代码,又要包含用于执行该代码的栈、PC(PC 值 = 当前程序执行位置 +8)和 SP 指针。堆栈指针需要保证各种模式下程序完成性。
既然每个 Goroutine 都有自己的栈,那么在创建 Goroutine 时,就要同时创建对应的栈。Goroutine 在执行时,栈空间会不停增长。栈通常是连续增长的,由于每个进程中的各个线程共享虚拟内存空间,当有多个线程时,就需要为每个线程分配不同起始地址的栈。这就需要在分配栈之前先预估每个线程栈的大小。如果线程数量非常多,就很容易栈溢出。
为了解决这个问题,就有了 Split Stacks 技术:创建栈时,只分配一块比较小的内存,如果进行某次函数调用导致栈空间不足时,就会在其他地方分配一块新的栈空间。新的空间不需要和老的栈空间连续。函数调用的参数会拷贝到新的栈空间中,接下来的函数执行都在新栈空间中进行。Golang 的栈管理方式与此类似,但是为了更高的效率,使用了连续栈(Golang 连续栈)实现方式也是先分配一块固定大小的栈,在栈空间不足时,分配一块更大的栈,并把旧的栈全部拷贝到新栈中。这样避免了 Split Stacks 方法可能导致的频繁内存分配和释放。
Goroutine 的执行是可以被抢占的。如果一个 Goroutine 一直占用 CPU,长时间没有被调度过,就会被 runtime 抢占掉,把 CPU 时间交给其他 Goroutine。这个可以通过 debug/goroutine 阻塞实现。
2.4 结构体
M:指 go 中的工作者线程,是真正执行代码的单元;
P:是一种调度 goroutine 的上下文,goroutine 依赖于 P 进行调度,P 是真正的并行单元;
G:即 goroutine,是 go 语言中的一段代码(以一个函数的形式展现),最小的并行单元;
P 必须绑定在 M 上才能运行,M 必须绑定了 P 才能运行,而一般情况下,最多有 MAXPROCS(通常等于 CPU 数量)个 P,但是可能有很多个 M,真正运行的只有绑定了 M 的 P,所以 P 是真正的并行单元。
每个 P 有一个自己的 runnableG 队列,可以从里面拿出一个 G 来运行,同时也有一个全局的 runnable G 队列,G 通过 P 依附在 M 上面执行。不单独使用全局的 runnable G 队列的原因是,分布式的队列有利于减小临界区大小,想一想多个线程同时请求可用的 G 的时候,如果只有全局的资源,那么这个全局的锁会导致多少线程一直在等待。
但是如果一个正在执行的 G 进入了阻塞,典型的例子就是等待 IO,那么他和它所在的 M 会在那边等待,而上下文 P 会传递到其他可用的 M 上面,这样这个阻塞就不会影响程序的并行度。
G 结构体
type g struct {
// Stack parameters.
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
// stackguard1 is the stack pointer compared in the C stack growth prologue.
// It is stack.lo+StackGuard on g0 and gsignal stacks.
// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
stack stack // offset known to runtime/cgo // 描述了真实的栈内存, 包括上下界、
stackguard0 uintptr // offset known to liblink
stackguard1 uintptr // offset known to liblink
_panic *_panic // innermost panic – offset known to liblink
_defer *_defer // innermost defer
m *m // current m; offset known to arm liblink // 当前的 M
sched gobuf //goroutine 切换时, 用于保存 g 的上下文
syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
stktopsp uintptr // expected sp at top of stack, to check in traceback
param unsafe.Pointer // passed parameter on wakeup 用于传递参数, 睡眠时 其他 goroutine 可以设置 param, 唤醒时该 goroutine 可以获取
atomicstatus uint32
stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
goid int64 //goroutine 的 ID
waitsince int64 // approx time when the g become blocked g 被阻塞的 大概时间
waitreason string // if status==Gwaiting
schedlink guintptr
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
paniconfault bool // panic (instead of crash) on unexpected fault address
preemptscan bool // preempted g does scan for gc
gcscandone bool // g has scanned stack; protected by _Gscan bit in status
gcscanvalid bool // false at start of gc cycle, true if G has not run since last scan; TODO: remove?
throwsplit bool // must not split stack
raceignore int8 // ignore race detection events
sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine
sysexitticks int64 // cputicks when syscall has returned (for tracing)
traceseq uint64 // trace event sequencer
tracelastp puintptr // last P emitted an event for this goroutine
lockedm muintptr // G 被锁定只能在这个 M 运行
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
gopc uintptr // pc of go statement that created this goroutine
startpc uintptr // pc of goroutine function
racectx uintptr
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
cgoCtxt []uintptr // cgo traceback context
labels unsafe.Pointer // profiler labels
timer *timer // cached timer for time.Sleep
selectDone uint32 // are we participating in a select and did someone win the race?
// Per-G GC state
// gcAssistBytes is this G’s GC assist credit in terms of
// bytes allocated. If this is positive, then the G has credit
// to allocate gcAssistBytes bytes without assisting. If this
// is negative, then the G must correct this by performing
// scan work. We track this in bytes to make it fast to update
// and check for debt in the malloc hot path. The assist ratio
// determines how this corresponds to scan work debt.
gcAssistBytes int64
}
Gobuf 结构体
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret sys.Uintreg
lr uintptr
bp uintptr // for GOEXPERIMENT=framepointer
}
其中最主要的当然是 sched 了,保存了 goroutine 的上下文。goroutine 切换的时候不同于线程有 OS 来负责这部分数据,而是由一个 gobuf 对象来保存,这样能够更加轻量级,再来看看 gobuf 的结构
M 结构体
type m struct {
g0 *g // 带有调度栈的 goroutine
gsignal *g // 处理信号的 goroutine
tls [6]uintptr // thread-local storage
mstartfn func()
curg *g // 当前运行的 goroutine
caughtsig guintptr
p puintptr // 关联 p 和执行的 go 代码
nextp puintptr
id int32
mallocing int32 // 状态
spinning bool // m 是否 out of work
blocked bool // m 是否被阻塞
inwb bool // m 是否在执行写屏蔽
printlock int8
incgo bool // m 在执行 cgo 吗
fastrand uint32
ncgocall uint64 // cgo 调用的总数
ncgo int32 // 当前 cgo 调用的数目
park note
alllink *m // 用于链接 allm
schedlink muintptr
mcache *mcache // 当前 m 的内存缓存
lockedg *g // 锁定 g 在当前 m 上执行,而不会切换到其他 m
createstack [32]uintptr // thread 创建的栈
}
结构体 M 中有两个 G 是需要关注一下的:
一个是 curg,代表结构体 M 当前绑定的结构体 G。
另一个是 g0,是带有调度栈的 goroutine,这是一个比较特殊的 goroutine。普通的 goroutine 的栈是在堆上分配的可增长的栈,而 g0 的栈是 M 对应的线程的栈。所有调度相关的代码,会先切换到该 goroutine 的栈中再执行。也就是说线程的栈也是用的 g 实现,而不是使用的 OS 的。
P 结构体
type p struct {
lock mutex
id int32
status uint32 // 状态,可以为 pidle/prunning/…
link puintptr
schedtick uint32 // 每调度一次加 1
syscalltick uint32 // 每一次系统调用加 1
sysmontick sysmontick
m muintptr // 回链到关联的 m
mcache *mcache
racectx uintptr
goidcache uint64 // goroutine 的 ID 的缓存
goidcacheend uint64
// 可运行的 goroutine 的队列
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr // 下一个运行的 g
sudogcache []*sudog
sudogbuf [128]*sudog
palloc persistentAlloc // per-P to avoid mutex
pad [sys.CacheLineSize]byte
}
其中 P 的状态有 Pidle, Prunning, Psyscall, Pgcstop, Pdead;在其内部队列 runqhead 里面有可运行的 goroutine,P 优先从内部获取执行的 g,这样能够提高效率。
Schedt 结构体
type schedt struct {
goidgen uint64
lastpoll uint64
lock mutex
midle muintptr // idle 状态的 m
nmidle int32 // idle 状态的 m 个数
nmidlelocked int32 // lockde 状态的 m 个数
mcount int32 // 创建的 m 的总数
maxmcount int32 // m 允许的最大个数
ngsys uint32 // 系统中 goroutine 的数目,会自动更新
pidle puintptr // idle 的 p
npidle uint32
nmspinning uint32
// 全局的可运行的 g 队列
runqhead guintptr
runqtail guintptr
runqsize int32
// dead 的 G 的全局缓存
gflock mutex
gfreeStack *g
gfreeNoStack *g
ngfree int32
// sudog 的缓存中心
sudoglock mutex
sudogcache *sudog
}
大多数需要的信息都已放在了结构体 M、G 和 P 中,schedt 结构体只是一个壳。可以看到,其中有 M 的 idle 队列,P 的 idle 队列,以及一个全局的就绪的 G 队列。schedt 结构体中的 Lock 是非常必须的,如果 M 或 P 等做一些非局部的操作,它们一般需要先锁住调度器。
2.5 具体函数
goroutine 调度器的代码在 /src/runtime/proc.go 中,一些比较关键的函数分析如下。
2.5.1 schedule 函数
schedule 函数在 runtime 需要进行调度时执行,为当前的 P 寻找一个可以运行的 G 并执行它,寻找顺序如下:
1)调用 runqget 函数来从 P 自己的 runnable G 队列中得到一个可以执行的 G;
2)如果 1)失败,则调用 findrunnable 函数去寻找一个可以执行的 G;
3)如果 2)也没有得到可以执行的 G,那么结束调度,从上次的现场继续执行。
4)注意)// 偶尔会先检查一次全局可运行队列,以确保公平性。否则,两个 goroutine 可以完全占用本地 runqueue。通过 schedtick 计数 %61 来保证
代码如下:
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg()
if _g_.m.locks != 0 {
throw(“schedule: holding locks”)
}
if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}
// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m’s g0 stack.
if _g_.m.incgo {
throw(“schedule: in cgo”)
}
top:
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _g_.m.p.ptr().runSafePointFn != 0 {
runSafePointFn()
}
var gp *g
var inheritTime bool
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw(“schedule: spinning with local work”)
}
}
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
resetspinning()
}
if gp.lockedm != 0 {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
startlockedm(gp)
goto top
}
execute(gp, inheritTime)
}
2.5.2 findrunnable 函数
findrunnable 函数负责给一个 P 寻找可以执行的 G,它的寻找顺序如下:
1)调用 runqget 函数来从 P 自己的 runnable G 队列中得到一个可以执行的 G;
2)如果 1)失败,调用 globrunqget 函数从全局 runnableG 队列中得到一个可以执行的 G;
3)如果 2)失败,调用 netpoll(非阻塞)函数取一个异步回调的 G
4)如果 3)失败,尝试从其他 P 那里偷取一半数量的 G 过来;
5)如果 4)失败,再次调用 globrunqget 函数从全局 runnableG 队列中得到一个可以执行的 G;
6)如果 5)失败,调用 netpoll(阻塞)函数取一个异步回调的 G;
7)如果 6)仍然没有取到 G,那么调用 stopm 函数停止这个 M。
代码如下:
// Finds a runnable goroutine to execute.
// Tries to steal from other P’s, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.
top:
_p_ := _g_.m.p.ptr()
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 {
runSafePointFn()
}
if fingwait && fingwake {
if gp := wakefing(); gp != nil {
ready(gp, 0, true)
}
}
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// local runq
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if gp := netpoll(false); gp != nil {// non-blocking
// netpoll returns list of goroutines linked by schedlink.
injectglist(gp.schedlink.ptr())
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// Steal work from other P’s.
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.
goto stop
}
// If number of spinning M’s >= number of busy P’s, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
// We have nothing to do. If we’re in the GC mark phase, can
// safely scan and blacken objects, and have work to do, run
// idle-time marking rather than give up the P.
if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := _p_.gcBgMarkWorker.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don’t need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp
// return P and block
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw(“findrunnable: wrong p”)
}
pidleput(_p_)
unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we’ve checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see “Worker thread parking/unparking” comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw(“findrunnable: negative nmspinning”)
}
}
// check all runqueues once again
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
// Check for idle-priority GC work again.
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
lock(&sched.lock)
_p_ = pidleget()
if _p_ != nil && _p_.gcBgMarkWorker == 0 {
pidleput(_p_)
_p_ = nil
}
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// Go back to idle GC check.
goto stop
}
}
// poll network
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 {
throw(“findrunnable: netpoll with p”)
}
if _g_.m.spinning {
throw(“findrunnable: netpoll with spinning”)
}
gp := netpoll(true) // block until new work is available
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if gp != nil {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
injectglist(gp.schedlink.ptr())
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
injectglist(gp)
}
}
stopm()
goto top
}
2.5.3 newproc 函数
newproc 函数负责创建一个可以运行的 G 并将其放在当前的 P 的 runnable G 队列中,它是类似”go func() { …}”语句真正被编译器翻译后的调用,核心代码在 newproc1 函数。这个函数执行顺序如下:
1)获得当前的 G 所在的 P,然后从 free G 队列中取出一个 G;
2)如果 1)取到则对这个 G 进行参数配置,否则新建一个 G;
3)将 G 加入 P 的 runnable G 队列。
代码如下:
// Go1.10.8 版本默认 stack 大小为 2KB
_StackMin = 2048
// 创建一个 g 对象, 然后放到 g 队列
// 等待被执行
// Create a new g running fn with narg bytes of arguments starting
// at argp. callerpc is the address of the go statement that created
// this. The new g is put on the queue of g’s waiting to run.
func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) {
_g_ := getg()
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw(“go of nil func value”)
}
_g_.m.locks++ // disable preemption because it can be holding p in a local var
siz := narg
siz = (siz + 7) &^ 7
// We could allocate a larger initial stack if necessary.
// Not worth it: this is almost always an error.
// 4*sizeof(uintreg): extra space added below
// sizeof(uintreg): caller’s LR (arm) or return address (x86, in gostartcall).
if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
throw(“newproc: function arguments too large for new goroutine”)
}
_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn’t look at uninitialized stack.
}
if newg.stack.hi == 0 {
throw(“newproc1: newg missing stack”)
}
if readgstatus(newg) != _Gdead {
throw(“newproc1: new g is not Gdead”)
}
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
totalSize += -totalSize & (sys.SpAlign – 1) // align to spAlign
sp := newg.stack.hi – totalSize
spArg := sp
if usesLR {
// caller’s LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
if narg > 0 {
memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
// This is a stack-to-stack copy. If write barriers
// are enabled and the source stack is grey (the
// destination is always black), then perform a
// barrier copy. We do this *after* the memmove
// because the destination stack may have garbage on
// it.
if writeBarrier.needed && !_g_.m.curg.gcscandone {
f := findfunc(fn.fn)
stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
// We’re in the prologue, so it’s always stack map index 0.
bv := stackmapdata(stkmap, 0)
bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata)
}
}
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg) {
atomic.Xadd(&sched.ngsys, +1)
}
newg.gcscanvalid = false
casgstatus(newg, _Gdead, _Grunnable)
if _p_.goidcache == _p_.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch – 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
if raceenabled {
newg.racectx = racegostart(callerpc)
}
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
runqput(_p_, newg, true)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
_g_.m.locks–
if _g_.m.locks == 0 && _g_.preempt {// restore the preemption request in case we’ve cleared it in newstack
_g_.stackguard0 = stackPreempt
}
}
2.5.4 goexit0 函数
goexit 函数是当 G 退出时调用的。这个函数对 G 进行一些设置后,将它放入 free G 列表中,供以后复用,之后调用 schedule 函数调度。
// goexit continuation on g0.
func goexit0(gp *g) {
_g_ := getg()
// 设置 g 的 status 从 _Grunning 变为 _Gdead
casgstatus(gp, _Grunning, _Gdead)
if isSystemGoroutine(gp) {
atomic.Xadd(&sched.ngsys, -1)
}
// 对该 g 进行释放设置 基本为 nil /0
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = “”
gp.param = nil
gp.labels = nil
gp.timer = nil
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
gp.gcAssistBytes = 0
}
// Note that gp’s stack scan is now “valid” because it has no
// stack.
gp.gcscanvalid = true
dropg()
if _g_.m.lockedInt != 0 {
print(“invalid m->lockedInt = “, _g_.m.lockedInt, “\n”)
throw(“internal lockOSThread error”)
}
_g_.m.lockedExt = 0
// 把这个 g 推到 free G 列表
gfput(_g_.m.p.ptr(), gp)
if locked {
// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.
// Return to mstart, which will release the P and exit
// the thread.
if GOOS != “plan9” {// See golang.org/issue/22227.
gogo(&_g_.m.g0.sched)
}
}
schedule()
}
2.5.5 handoffp 函数
handoffp 函数将 P 从系统调用或阻塞的 M 中传递出去,如果 P 还有 runnable G 队列,那么新开一个 M,调用 startm 函数,新开的 M 不空旋。
// Hands off P from syscall or locked M.
// Always runs without a P, so write barriers are not allowed.
//go:nowritebarrierrec
func handoffp(_p_ *p) {
// handoffp must start an M in any situation where
// findrunnable would return a G to run on _p_.
// 如果这个 P 的队列不为空或调度内的 size 不为空 那么 进行 startm 且不空旋
if !runqempty(_p_) || sched.runqsize != 0 {
startm(_p_, false)
return
}
// 如果正在进行 GC 处理 同上
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
// 如果没活可做了,检查下有没有 空闲 / 自旋的 M
// 否则 不需要我们做自旋
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) {// TODO: fast atomic
startm(_p_, true)
return
}
// 调度上锁 将这个 P 摘除走
lock(&sched.lock)
if sched.gcwaiting != 0 {
_p_.status = _Pgcstop
sched.stopwait–
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait–
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
pidleput(_p_)
unlock(&sched.lock)
}
2.5.6 startm 函数
startm 函数调度一个 M 或者必要时创建一个 M 来运行指定的 P。
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P’s does nothing.
// May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrierrec
func startm(_p_ *p, spinning bool) {
// 加锁
lock(&sched.lock)
if _p_ == nil {
_p_ = pidleget()
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it’s okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw(“startm: negative nmspinning”)
}
}
return
}
}
mp := mget()
unlock(&sched.lock)
if mp == nil {
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_)
return
}
if mp.spinning {
throw(“startm: m is spinning”)
}
if mp.nextp != 0 {
throw(“startm: m has p”)
}
if spinning && !runqempty(_p_) {
throw(“startm: p has runnable gs”)
}
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_)
notewakeup(&mp.park)
}
2.5.7 sysmon 函数
sysmon 函数是 Go runtime 启动时创建的,负责监控所有 goroutine 的状态,判断是否需要 GC,进行 netpoll 等操作。sysmon 函数中会调用 retake 函数进行抢占式调度。
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
lock(&sched.lock)
sched.nmsys++
checkdead()
unlock(&sched.lock)
// If a heap span goes unused for 5 minutes after a garbage collection,
// we hand it back to the operating system.
scavengelimit := int64(5 * 60 * 1e9)
if debug.scavenge > 0 {
// Scavenge-a-lot for testing.
forcegcperiod = 10 * 1e6
scavengelimit = 20 * 1e6
}
lastscavenge := nanotime()
nscavenge := 0
lasttrace := int64(0)
idle := 0 // how many cycles in succession we had not wokeup somebody
delay := uint32(0)
for {
if idle == 0 {// start with 20us sleep…
delay = 20
} else if idle > 50 {// start doubling the sleep after 1ms…
delay *= 2
}
if delay > 10*1000 {// up to 10ms
delay = 10 * 1000
}
usleep(delay)
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
atomic.Store(&sched.sysmonwait, 1)
unlock(&sched.lock)
// Make wake-up period small enough
// for the sampling to be correct.
maxsleep := forcegcperiod / 2
if scavengelimit < forcegcperiod {
maxsleep = scavengelimit / 2
}
shouldRelax := true
if osRelaxMinNS > 0 {
next := timeSleepUntil()
now := nanotime()
if next-now < osRelaxMinNS {
shouldRelax = false
}
}
if shouldRelax {
osRelax(true)
}
notetsleep(&sched.sysmonnote, maxsleep)
if shouldRelax {
osRelax(false)
}
lock(&sched.lock)
atomic.Store(&sched.sysmonwait, 0)
noteclear(&sched.sysmonnote)
idle = 0
delay = 20
}
unlock(&sched.lock)
}
// trigger libc interceptors if needed
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false) // non-blocking – returns list of goroutines
if gp != nil {
// Need to decrement number of idle locked M’s
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P’s but before it starts M’s to run the P’s,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M’s
// and reports deadlock.
incidlelocked(-1)
injectglist(gp)
incidlelocked(1)
}
}
// retake P’s blocked in syscalls
// and preempt long running G’s
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// check if we need to force a GC
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
forcegc.g.schedlink = 0
injectglist(forcegc.g)
unlock(&forcegc.lock)
}
// scavenge heap once in a while
if lastscavenge+scavengelimit/2 < now {
mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
lastscavenge = now
nscavenge++
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
}
}
2.5.8 retake 函数
枚举所有的 P 如果 P 在系统调用中 (_Psyscall), 且经过了一次 sysmon 循环 (20us~10ms), 则抢占这个 P,调用 handoffp 解除 M 和 P 之间的关联,如果 P 在运行中 (_Prunning), 且经过了一次 sysmon 循环并且 G 运行时间超过 forcePreemptNS(10ms), 则抢占这个 P
并设置 g.preempt = true,g.stackguard0 = stackPreempt。
为什么设置了 stackguard 就可以实现抢占? 因为这个值用于检查当前栈空间是否足够, go 函数的开头会比对这个值判断是否需要扩张栈。newstack 函数判断 g.stackguard0 等于 stackPreempt, 就知道这是抢占触发的, 这时会再检查一遍是否要抢占。抢占机制保证了不会有一个 G 长时间的运行导致其他 G 无法运行的情况发生。
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we’re already stopping the world.
lock(&allpLock)
// We can’t use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
// Retake P from syscall if it’s there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don’t want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M’s
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
} else if s == _Prunning {
// Preempt G if it’s running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
if pd.schedwhen+forcePreemptNS > now {
continue
}
preemptone(_p_)
}
}
unlock(&allpLock)
return uint32(n)
}
3、调度器总结
3.1 调度器的两大思想
复用线程:协程本身就是运行在一组线程之上,不需要频繁的创建、销毁线程,而是对线程的复用。在调度器中复用线程还有 2 个体现:1)work stealing,当本线程无可运行的 G 时,尝试从其他线程绑定的 P 偷取 G,而不是销毁线程。2)handoff,当本线程因为 G 进行系统调用阻塞时,线程释放绑定的 P,把 P 转移给其他空闲的线程执行。
利用并行:GOMAXPROCS 设置 P 的数量,当 GOMAXPROCS 大于 1 时,就最多有 GOMAXPROCS 个线程处于运行状态,这些线程可能分布在多个 CPU 核上同时运行,使得并发利用并行。另外,GOMAXPROCS 也限制了并发的程度,比如 GOMAXPROCS = 核数 /2,则最多利用了一半的 CPU 核进行并行。
3.2 调度器的两小策略:
抢占:在 coroutine 中要等待一个协程主动让出 CPU 才执行下一个协程,在 Go 中,一个 goroutine 最多占用 CPU 10ms,防止其他 goroutine 被饿死,这就是 goroutine 不同于 coroutine 的一个地方。
全局 G 队列:在新的调度器中依然有全局 G 队列,但功能已经被弱化了,当 M 执行 work stealing 从其他 P 偷不到 G 时,它可以从全局 G 队列获取 G。
4、参考资料
Golang 代码仓库:https://github.com/golang/go
《ScalableGo Schedule》:https://docs.google.com/docum…
《GoPreemptive Scheduler》:https://docs.google.com/docum…
网上文章:https://studygolang.com/artic…https://studygolang.com/artic…https://studygolang.com/artic…https://studygolang.com/artic…https://studygolang.com/artic… 调度实例分析 https://www.cnblogs.com/sunsk… 抢占式 https://blog.csdn.net/u010853… schedule 剖析理解 分析的很到位 – 建议大家认真阅读几遍 - 因为图形很形象。