共计 44012 个字符,预计需要花费 111 分钟才能阅读完成。
1 Runtime 简介
Go 语言是互联网时代的 C,因为其语法简洁易学,对高并发领有语言级别的亲和性。而且不同于虚拟机的计划。Go 通过在编译时嵌入平台相干的零碎指令可间接编译为对应平台的机器码,同时嵌入 Go Runtime,在运行时实现本身的调度算法和各种并发管制计划,防止进入操作系统级别的过程 / 线程上下文切换,以及通过原子操作、自旋、信号量、全局哈希表、期待队列多种技术防止进入操作系统级别锁,以此来晋升整体性能。
Go 的 runtime 是与用户代码一起打包在一个可执行文件中,是程序的一部分,而不是向 Java 须要独自装置,与程序独立。所以用户代码与 runtime 代码在执行时没有界线都是函数调用。在 Go 语言中的关键字编译时会变成 runtime 中的函数调用。
Go Runtime 外围次要波及三大部分:内存调配、调度算法、垃圾回收;本篇文章咱们次要介绍 GMP 调度原理。对于具体应该叫 GPM 还是 GMP,我更偏向于成为 GMP,因为在 runtime 代码中常常看到如下调用:
1 buf := &getg().m.p.ptr().wbBuf
其中 getg 代表获取以后正在运行的 g 即 goroutine,m 代表对应的逻辑处理器,p 是逻辑调度器;所以咱们还是称为 GMP。
(以上局部图文来自:https://zhuanlan.zhihu.com/p/…)
2 GMP 概览
上面这个图尽管有些形象(不如花花绿绿的图片),确是目前我看到对整个调度算法设计的重要概念笼罩最全的。
1 +-------------------- sysmon ---------------//------+ | |
2 | | | |
3 | | | |
4 +---+ +---+-------+ +--------+ +---+---+ | |
5 go func() ---> | G | ---> | P | local | <=== balance ===> | global | <--//--- | P | M | | |
6 +---+ +---+-------+ +--------+ +---+---+ | |
7 | | | | |
8 | +---+ | | | |
9 +----> | M | <--- findrunnable ---+--- steal <--//--+ | |
10 +---+ | |
11 | | |
12 mstart | |
13 | | |
14 +--- execute <----- schedule | |
15 | | | |
16 | | | |
17 +--> G.fn --> goexit --+ |
咱们来看下其中的三大次要概念:
- G:Groutine 协程,领有运行函数的指针、栈、上下文(指的是 sp、bp、pc 等寄存器上下文以及垃圾回收的标记上下文),在整个程序运行过程中能够有无数个,代表一个用户级代码执行流(用户轻量级线程);
- P:Processor,调度逻辑处理器,同样也是 Go 中代表资源的调配主体(内存资源、协程队列等),默认为机器核数,能够通过 GOMAXPROCS 环境变量调整
- M:Machine,代表理论工作的执行者,对应到操作系统级别的线程;M 的数量会比 P 多,但不会太多,最大为 1w 个。
其中 G 分为三类:
- 主协程,用来执行用户 main 函数的协程
- 主协程创立的协程,也是 P 调度的次要成员
- G0,每个 M 都有一个 G0 协程,他是 runtime 的一部分,G0 是跟 M 绑定的,次要用来执行调度逻辑的代码,所以不能被抢占也不会被调度(一般 G 也能够执行 runtime_procPin 禁止抢占),G0 的栈是零碎调配的,比一般的 G 栈(2KB)要大,不能扩容也不能缩容
- sysmon 协程,sysmon 协程也是 runtime 的一部分,sysmon 协程间接运行在 M 不须要 P,次要做一些查看工作如:查看死锁、查看计时器获取下一个要被触发的计时工作、查看是否有 ready 的网络调用以复原用户 G 的工作、查看一个 G 是否运行工夫太长进行抢占式调度。
M 分为两类:
- 一般 M,用来与 P 绑定执行 G 中工作
- m0:Go 程序是一个过程,过程都有一个主线程,m0 就是 Go 程序的主线程,通过一个与其绑定的 G0 来执行 runtime 启动加载代码;一个 Go 程序只有一个 m0
- 运行 sysmon 的 M,次要用来运行 sysmon 协程。
方才说道 P 是用来调度 G 的执行,所以每个 P 都有本人的一个 G 的队列,当 G 队列都执行结束后,会从 global 队列中获取一批 G 放到本人的本地队列中,如果全局队列也没有待运行的 G,则 P 会再从其余 P 中窃取一部分 G 放到本人的队列中。而调度的机会个别有三种:
- 被动调度,协程通过调用
runtime.Goshed
办法被动让渡本人的执行权力,之后这个协程会被放到全局队列中,期待后续被执行 - 被动调度,协程在休眠、channel 通道阻塞、网络 I / O 梗塞、执行垃圾回收时被暂停,被动式让渡本人的执行权力。大部分场景都是被动调度,这是 Go 高性能的一个起因,让 M 永远不停歇,不处于期待的协程让出 CPU 资源执行其余工作。
- 抢占式调度,这个次要是 sysmon 协程上的调度,当发现 G 处于零碎调用(如调用网络 io)超过 20 微秒或者 G 运行工夫过长(超过 10ms),会抢占 G 的执行 CPU 资源,让渡给其余协程;避免其余协程没有执行的机会;(零碎调用会进入内核态,由内核线程实现,能够把以后 CPU 资源让渡给其余用户协程)
Go 的协程调度与操作系统线程调度区别次要存在四个方面:
- 调度产生地点:Go 中协程的调度产生在 runtime,属于用户态,不波及与内核态的切换;一个协程能够被切换到多个线程执行
- 上下文切换速度:协程的切换速度远快于线程,不须要通过内核与用户态切换,同时须要保留的状态和寄存器非常少;线程切换速度为 1 - 2 微秒,协程切换速度为 0.2 微秒左右
- 调度策略:线程调度大部分都是抢占式调度,操作系统通过收回中断信号强制线程切换上下文;Go 的协程根本是被动和被动式调度,调度机会可预期
- 栈大小:线程栈个别是 2MB,而且运行时不能更改大小;Go 的协程栈只有 2kb,而且能够动静扩容(64 位机最大为 1G)
以上根本是整个调度器的概括,不想看原理的同学能够不必往下看了,上面会进行源码级介绍;
3 GMP 的源码构造
源码局部次要波及三个文件:
1 runtime/amd_64.s 波及到过程启动以及对 CPU 执行指令进行管制的汇编代码,过程的初始化局部也在这外面 | |
2 runtime/runtime2.go 这里次要是运行时中一些重要数据结构的定义,比方 g、m、p 以及波及到接口、defer、panic、map、slice 等外围类型 | |
3 runtime/proc.go 一些外围办法的实现,波及 gmp 调度等外围代码在这里 |
这里咱们次要关怀 gmp 中与调度相干的代码;
3.1 G 源码局部
3.1.1 G 的构造
先来看下 g 的构造定义:
1 // runtime/runtime2.go | |
2 type g struct { | |
3 // 记录协程栈的栈顶和栈底地位 | |
4 stack stack // offset known to runtime/cgo | |
5 // 次要作用是参加一些比拟计算,当发现容量要超过栈调配空间后,能够进行扩容或者膨胀 | |
6 stackguard0 uintptr // offset known to liblink | |
7 stackguard1 uintptr // offset known to liblink | |
8 | |
9 // 以后与 g 绑定的 m | |
10 m *m // current m; offset known to arm liblink | |
11 // 这是一个比拟重要的字段,外面保留的一些与 goroutine 运行地位相干的寄存器和指针,如 rsp、rbp、rpc 等寄存器 | |
12 sched gobuf | |
13 syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc | |
14 syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc | |
15 stktopsp uintptr // expected sp at top of stack, to check in traceback | |
16 | |
17 // 用于做参数传递,睡眠时其余 goroutine 能够设置 param,唤醒时该 g 能够读取这些 param | |
18 param unsafe.Pointer | |
19 // 记录以后 goroutine 的状态 | |
20 atomicstatus uint32 | |
21 stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus | |
22 // goroutine 的惟一 id | |
23 goid int64 | |
24 schedlink guintptr | |
25 | |
26 // 标记是否能够被抢占 | |
27 preempt bool // preemption signal, duplicates stackguard0 = stackpreempt | |
28 preemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedule | |
29 preemptShrink bool // shrink stack at synchronous safe point | |
30 | |
31 // 如果调用了 LockOsThread 办法,则 g 会绑定到某个 m 上,只在这个 m 上运行 | |
32 lockedm muintptr | |
33 sig uint32 | |
34 writebuf []byte | |
35 sigcode0 uintptr | |
36 sigcode1 uintptr | |
37 sigpc uintptr | |
38 // 创立该 goroutine 的语句的指令地址 | |
39 gopc uintptr // pc of go statement that created this goroutine | |
40 ancestors *[]ancestorInfo // ancestor information goroutine(s) that created this goroutine (only used if debug.tracebackancestors) | |
41 // goroutine 函数的指令地址 | |
42 startpc uintptr // pc of goroutine function | |
43 racectx uintptr | |
44 waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order | |
45 cgoCtxt []uintptr // cgo traceback context | |
46 labels unsafe.Pointer // profiler labels | |
47 timer *timer // cached timer for time.Sleep | |
48 selectDone uint32 // are we participating in a select and did someone win the race? | |
49 } |
跟 g 相干的还有两个数据结构比拟重要:
stack 是协程栈的地址信息,须要留神的是 m0 绑定的 g0 是在过程被调配的零碎栈上调配协程栈的,而其余协程栈都是在堆上进行调配的。
gobuf 中保留了协程执行的上下文信息,这里也能够看到协程切换的上下文信息极少;sp 代表 cpu 的 rsp 寄存器的值,pc 代表 CPU 的 rip 寄存器值、bp 代表 CPU 的 rbp 寄存器值;ret 用来保留零碎调用的返回值,ctxt 在 gc 的时候应用。
其中几个寄存器作用如下:
- SP:永远指向栈顶地位
- BP:某一时刻的栈顶地位,当新函数调用时,把以后 SP 地址赋值给 BP、SP 指向新的栈顶地位
- PC:代表代码通过编译为机器码后,以后执行的机器指令(能够了解为以后语句)
1 // Stack describes a Go execution stack. | |
2 // The bounds of the stack are exactly [lo, hi), | |
3 // with no implicit data structures on either side. | |
4 // goroutine 协程栈的栈顶和栈底 | |
5 type stack struct { | |
6 lo uintptr // 栈顶,低地址 | |
7 hi uintptr // 栈底,高地址 | |
8 } | |
9 | |
10 // gobuf 中保留了十分重要的上下文执行信息,11 type gobuf struct { | |
12 // 代表 cpu 的 rsp 寄存器的值,永远指向栈顶地位 | |
13 sp uintptr | |
14 // 代表代码通过编译为机器码后,以后执行的机器指令(能够了解为以后语句)15 pc uintptr | |
16 // 指向所保留执行上下文的 goroutine | |
17 g guintptr | |
18 // gc 时候应用 | |
19 ctxt unsafe.Pointer | |
20 // 用来保留零碎调用的返回值 | |
21 ret uintptr | |
22 lr uintptr | |
23 // 某一时刻的栈顶地位,当新函数调用时,把以后 SP 地址赋值给 BP、SP 指向新的栈顶地位 | |
24 bp uintptr // for framepointer-enabled architectures | |
25 } |
3.1.2 G 的状态
就像线程有本人的状态一样,goroutine 也有本人的状态,次要记录在 atomicstatus 字段上:
1 // defined constants | |
2 const ( | |
3 // 代表协程刚开始创立时的状态,当新创建的协程初始化后,为变为_Gdead 状态,_Gdread 也是协程被销毁时的状态;4 // 刚创立时也被会置为_Gdead 次要是思考 GC 能够去用去扫描 dead 状态下的协程栈 | |
5 _Gidle = iota // 0 | |
6 // 代表协程正在运行队列中,期待被运行 | |
7 _Grunnable // 1 | |
8 // 代表以后协程正在被运行,曾经被调配了逻辑解决的线程,即 p 和 m | |
9 _Grunning // 2 | |
10 // 代表以后协程正在执行零碎调用 | |
11 _Gsyscall // 3 | |
12 // 示意以后协程在运行时被锁定,陷入阻塞,不能执行用户代码 | |
13 _Gwaiting // 4 | |
14 | |
15 _Gmoribund_unused // 5 | |
16 // 新创建的协程初始化后,或者协程被销毁后的状态 | |
17 _Gdead // 6 | |
18 | |
19 // _Genqueue_unused is currently unused. | |
20 _Genqueue_unused // 7 | |
21 // 代表在进行协程栈扫描时发现须要扩容或者缩容,将协程中的栈转移到新栈时的状态;这个时候不执行用户代码,也不在 p 的 runq 中 | |
22 _Gcopystack // 8 | |
23 | |
24 // 代表 g 被抢占后的状态 | |
25 _Gpreempted // 9 | |
26 | |
27 // 这几个状态是垃圾回收时波及,后续文章进行介绍 | |
28 _Gscan = 0x1000 | |
29 _Gscanrunnable = _Gscan + _Grunnable // 0x1001 | |
30 _Gscanrunning = _Gscan + _Grunning // 0x1002 | |
31 _Gscansyscall = _Gscan + _Gsyscall // 0x1003 | |
32 _Gscanwaiting = _Gscan + _Gwaiting // 0x1004 | |
33 _Gscanpreempted = _Gscan + _Gpreempted // 0x1009 | |
34 ) |
这里是利用常量定义的枚举。
Go 的状态变更能够看下图:
3.1.3 G 的创立
当咱们应用 go 关键字新建一个 goroutine 时,编译器会编译为 runtime 中对应的函数调用(newproc,而 go 关键字前面的函数成为协程的工作函数),进行创立,整体步骤如下:
- 用 systemstack 切换到零碎堆栈,调用 newproc1,newproc1 实现 g 的获取。
- 尝试从 p 的本地 g 闲暇链表和全局 g 闲暇链表找到一个 g 的实例。
- 如果下面未找到,则调用 malg 生成新的 g 的实例,且调配好 g 的栈和设置好栈的边界,接着增加到 allgs 数组外面,allgs 保留了所有的 g。
- 保留 g 切换的上下文,这里很要害,g 的切换依赖 sched 字段。
- 生成惟一的 goid,赋值给该 g。
- 调用 runqput 将 g 插入队列中,如果本地队列还有残余的地位,将 G 插入本地队列的尾部,若本地队列已满,插入全局队列。
- 如果有闲暇的 p 且 m 没有处于自旋状态 且 main goroutine 曾经启动,那么唤醒或新建某个 m 来执行工作。
这里对应的是 newproc 函数:
1 func newproc(siz int32, fn *funcval) {2 argp := add(unsafe.Pointer(&fn), sys.PtrSize) | |
3 gp := getg() | |
4 // 获取调用者的指令地址,也就是调用 newproc 时又 call 指令压栈的函数返回地址 | |
5 pc := getcallerpc() | |
6 // systemstack 的作用是切换到 m0 对应的 g0 所属的零碎栈 | |
7 // 应用 g0 所在的零碎栈创立 goroutine 对象 | |
8 // 传递参数包含 goroutine 的工作函数、argp 参数起始地址、siz 是参数长度、调用方的 pc 指针 | |
9 systemstack(func() {10 newg := newproc1(fn, argp, siz, gp, pc) | |
11 // 创立实现后将 g 放到创建者(某个 g,如果是过程初始化启动阶段则为 g0)所在的 p 的队列中 | |
12 _p_ := getg().m.p.ptr() | |
13 runqput(_p_, newg, true) | |
14 | |
15 if mainStarted {16 wakep() | |
17 } | |
18 }) | |
19 } |
其中 systemstack 是一段汇编代码,位于 asm_amd64.s 文件中,次要是寄存器指令的操作,笔者不懂汇编这里先不做介绍。
newproc1 是获取 newg 的函数,次要步骤:
1、首先避免以后 g 被抢占,绑定 m
2、对传入的参数占用的内存空间进行对齐解决
3、从 p 的闲暇队列中获取一个闲暇的 g,如果么有就创立一个 g,并在堆上创立协程栈,并设置状态为_Gdead 增加到全局 allgs 中
4、计算整体协程工作函数的参数空间大小,并设置 sp 指针
5、执行参数从 getg 的堆栈到 newg 堆栈的复制
6、设置 newg 的 sched 和 startpc、gopc 等跟上下文相干的字段值
7、设置 newg 状态为 runable 并设置 goid
8、接触 getg 与 m 的防抢占状态
代码正文如下:
1 func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g { | |
2 ..... | |
3 // 如果是初始化时候这个代表 g0 | |
4 _g_ := getg() | |
5 | |
6 if fn == nil { | |
7 _g_.m.throwing = -1 // do not dump full stacks | |
8 throw("go of nil func value") | |
9 } | |
10 // 使_g_.m.locks++,来避免这个时候 g 对应的 m 被抢占 | |
11 acquirem() // disable preemption because it can be holding p in a local var | |
12 // 参数的地址,上面一句目标是为了做到内存对齐 | |
13 siz := narg | |
14 siz = (siz + 7) &^ 7 | |
15 | |
16 // We could allocate a larger initial stack if necessary. | |
17 // Not worth it: this is almost always an error. | |
18 // 4*PtrSize: extra space added below | |
19 // PtrSize: caller's LR (arm) or return address (x86, in gostartcall). | |
20 if siz >= _StackMin-4*sys.PtrSize-sys.PtrSize {21 throw("newproc: function arguments too large for new goroutine") | |
22 } | |
23 | |
24 _p_ := _g_.m.p.ptr() | |
25 newg := gfget(_p_) // 首先从 p 的 gfree 队列中看看有没有闲暇的 g,有则应用 | |
26 if newg == nil { | |
27 // 如果没找到就应用 new 关键字来创立一个 g 并在堆上调配栈空间 | |
28 newg = malg(_StackMin) | |
29 // 将 newg 的状态设置为_Gdead,因为这样 GC 就不会去扫描一个没有初始化的协程栈 | |
30 casgstatus(newg, _Gidle, _Gdead) | |
31 // 增加到全局的 allg 切片中(须要加锁拜访)32 allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. | |
33 } | |
34 // 上面是查看协程栈的创立状况和状态 | |
35 if newg.stack.hi == 0 {36 throw("newproc1: newg missing stack") | |
37 } | |
38 | |
39 if readgstatus(newg) != _Gdead {40 throw("newproc1: new g is not Gdead") | |
41 } | |
42 // 计算运行空间大小并进行内存对齐 | |
43 totalSize := 4*sys.PtrSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame | |
44 totalSize += -totalSize & (sys.StackAlign - 1) // align to StackAlign | |
45 // 计算 sp 寄存器指针的地位 | |
46 sp := newg.stack.hi - totalSize | |
47 // 确定参数入栈地位 | |
48 spArg := sp | |
49 ......... | |
50 if narg > 0 { | |
51 // 将参数从 newproc 函数的栈复制到新的协程的栈中,memove 是一段汇编代码 | |
52 // 从 argp 地位移动 narg 大小的内存到 sparg 地位 | |
53 memmove(unsafe.Pointer(spArg), argp, uintptr(narg)) | |
54 // 因为波及到从栈到堆栈上的复制,go 在垃圾回收中应用了三色标记和写入屏障等伎俩,所以这里要思考屏障复制 | |
55 // 指标栈可能会有垃圾存在,所以设置屏障并且标记为灰色 | |
56 if writeBarrier.needed && !_g_.m.curg.gcscandone { // 如果启用了写入屏障并且源堆栈为灰色(指标始终为彩色),则执行屏障复制。57 f := findfunc(fn.fn) | |
58 stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps)) | |
59 if stkmap.nbit > 0 { | |
60 // We're in the prologue, so it's always stack map index 0. | |
61 bv := stackmapdata(stkmap, 0) | |
62 bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata) | |
63 } | |
64 } | |
65 } | |
66 // 把 newg 的 sched 构造体成员的所有字段都设置为 0,其实就是初始化 | |
67 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) | |
68 newg.sched.sp = sp | |
69 newg.stktopsp = sp | |
70 // pc 指针示意当 newg 被调度起来时从这个地位开始执行 | |
71 // 这里是先设置为 goexit,在 gostartcallfn 中会进行解决,更改 sp 为这里的 pc,将 pc 改为真正的协程工作函数 fn 的指令地位 | |
72 // 这样使得工作函数执行结束后,会继续执行 goexit 中相干的清理工作 | |
73 newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function | |
74 newg.sched.g = guintptr(unsafe.Pointer(newg)) // 保留以后的 g | |
75 gostartcallfn(&newg.sched, fn) // 在这里实现 g 启动时所有相干上下文指针的设置,次要为 sp、pc 和 ctxt,ctxt 被设置为 fn | |
76 newg.gopc = callerpc // 保留 newproc 的 pc,即调用者创立时的指令地位 | |
77 newg.ancestors = saveAncestors(callergp) | |
78 // 设置 startpc 为工作函数,次要用于函数调用栈的 trackback 和栈膨胀工作 | |
79 // newg 的执行开始地位并不依赖这个字段,而是通过 sched.pc 确定 | |
80 newg.startpc = fn.fn | |
81 if _g_.m.curg != nil { | |
82 newg.labels = _g_.m.curg.labels | |
83 } | |
84 // 判断 newg 的工作函数是不是 runtime 零碎的工作函数,是则 sched.ngsys+1;85 // 主协程则代表 runtime.main 函数,在这里就为判断为真 | |
86 if isSystemGoroutine(newg, false) {87 atomic.Xadd(&sched.ngsys, +1) | |
88 } | |
89 // Track initial transition? | |
90 newg.trackingSeq = uint8(fastrand()) | |
91 if newg.trackingSeq%gTrackingPeriod == 0 { | |
92 newg.tracking = true | |
93 } | |
94 // 更改以后 g 的状态为_Grunnable | |
95 casgstatus(newg, _Gdead, _Grunnable) | |
96 // 设置 g 的 goid,因为 p 会每次批量生成 16 个 id,每次 newproc 如果新建一个 g,id 就会加 1 | |
97 // 所以这里 m0 的 g0 的 id 为 0,而主协程的 goid 为 1,其余的顺次递增 | |
98 if _p_.goidcache == _p_.goidcacheend { | |
99 // Sched.goidgen is the last allocated id, | |
100 // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch]. | |
101 // At startup sched.goidgen=0, so main goroutine receives goid=1. | |
102 // 应用原子操作批改全局变量,这里的 sched 是在 runtime2.go 中的一个全局变量类型为 schedt | |
103 // 原子操作具备多线程可见性,同时比加锁性能更高 | |
104 _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch) | |
105 _p_.goidcache -= _GoidCacheBatch - 1 | |
106 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch | |
107 } | |
108 newg.goid = int64(_p_.goidcache) | |
109 _p_.goidcache++ | |
110 if raceenabled {111 newg.racectx = racegostart(callerpc) | |
112 } | |
113 if trace.enabled {114 traceGoCreate(newg, newg.startpc) | |
115 } | |
116 // 开释 getg 与 m 的绑定 | |
117 releasem(_g_.m) | |
118 | |
119 return newg | |
120 } |
其中有几个要害中央须要强调
3.1.4 协程栈在堆空间的调配
malg 函数,用来创立一个新 g 和对应的栈空间调配,这个函数次要强调的是栈空间调配局部,通过切换到零碎栈上进行空间调配,调配完后设置栈底和栈顶的两个地位的爱护字段,当栈上进行调配变量空间发现超过 stackguard1 时,会进行扩容,同时在某些条件下也会进行缩容
1 // Allocate a new g, with a stack big enough for stacksize bytes. | |
2 func malg(stacksize int32) *g {3 newg := new(g) | |
4 if stacksize >= 0 {5 stacksize = round2(_StackSystem + stacksize) | |
6 systemstack(func() {7 newg.stack = stackalloc(uint32(stacksize)) | |
8 }) | |
9 newg.stackguard0 = newg.stack.lo + _StackGuard | |
10 newg.stackguard1 = ^uintptr(0) | |
11 // Clear the bottom word of the stack. We record g | |
12 // there on gsignal stack during VDSO on ARM and ARM64. | |
13 *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0 | |
14 } | |
15 return newg | |
16 } |
stackalloc 代码位于 runtime/stack.go 文件中;
协程栈首先在过程初始化时会创立栈的治理构造:
1、栈池 stackpool,这个栈池次要用来对大小为 2、4、8kb 的小栈做缓存应用,应用的同样是 mspan 这种构造来存储;
2、为大栈调配的 stackLarge
1 OS | FixedStack | NumStackOrders | |
2 -----------------+------------+--------------- | |
3 linux/darwin/bsd | 2KB | 4 | |
4 windows/32 | 4KB | 3 | |
5 windows/64 | 8KB | 2 | |
6 plan9 | 4KB | 3 | |
7 | |
8 // Global pool of spans that have free stacks. | |
9 // Stacks are assigned an order according to size. | |
10 // order = log_2(size/FixedStack) | |
11 // There is a free list for each order. | |
12 var stackpool [_NumStackOrders]struct { | |
13 item stackpoolItem | |
14 _ [cpu.CacheLinePadSize - unsafe.Sizeof(stackpoolItem{})%cpu.CacheLinePadSize]byte | |
15 } | |
16 | |
17 //go:notinheap | |
18 type stackpoolItem struct { | |
19 mu mutex | |
20 span mSpanList | |
21 } | |
22 | |
23 // Global pool of large stack spans. | |
24 var stackLarge struct { | |
25 lock mutex | |
26 free [heapAddrBits - pageShift]mSpanList // free lists by log_2(s.npages) | |
27 } | |
28 | |
29 func stackinit() { | |
30 if _StackCacheSize&_PageMask != 0 {31 throw("cache size must be a multiple of page size") | |
32 } | |
33 for i := range stackpool {34 stackpool[i].item.span.init() | |
35 lockInit(&stackpool[i].item.mu, lockRankStackpool) | |
36 } | |
37 for i := range stackLarge.free {38 stackLarge.free[i].init() | |
39 lockInit(&stackLarge.lock, lockRankStackLarge) | |
40 } | |
41 } |
stackalloc 会首先判断栈空间大小,是大栈还是固定空间的小栈,
1、对于小栈,如果是还没有调配栈缓存空间,则进入 stackpoolalloc 函数进行调配空间(须要加锁),这里最终是从全局的 mheap 也就是堆空间中获取内存空间;如果有栈缓存空间,则从 g 对应的 mcache 中的 stackcache 上获取内存空间(无锁),如果 stackcache 上没有足够空间则调用 stackcacherefill 办法为 stackpool 进行扩容(也是从 mheap 中拿取,加锁)而后调配给协程
2、对于大栈,先从 stackLarge 中获取,如果没有则从 mheap 中获取,两个步骤都须要加载拜访;
3、最初创立 stack 构造返回给 newg
1 func stackalloc(n uint32) stack { | |
2 // Stackalloc must be called on scheduler stack, so that we | |
3 // never try to grow the stack during the code that stackalloc runs. | |
4 // Doing so would cause a deadlock (issue 1547). | |
5 thisg := getg() | |
6 ......... | |
7 | |
8 // Small stacks are allocated with a fixed-size free-list allocator. | |
9 // If we need a stack of a bigger size, we fall back on allocating | |
10 // a dedicated span. | |
11 var v unsafe.Pointer | |
12 if n < _FixedStack<<_NumStackOrders && n < _StackCacheSize {13 order := uint8(0) | |
14 n2 := n | |
15 for n2 > _FixedStack { | |
16 order++ | |
17 n2 >>= 1 | |
18 } | |
19 var x gclinkptr | |
20 if stackNoCache != 0 || thisg.m.p == 0 || thisg.m.preemptoff != "" { | |
21 // thisg.m.p == 0 can happen in the guts of exitsyscall | |
22 // or procresize. Just get a stack from the global pool. | |
23 // Also don't touch stackcache during gc | |
24 // as it's flushed concurrently. | |
25 lock(&stackpool[order].item.mu) | |
26 x = stackpoolalloc(order) | |
27 unlock(&stackpool[order].item.mu) | |
28 } else {29 c := thisg.m.p.ptr().mcache | |
30 x = c.stackcache[order].list | |
31 if x.ptr() == nil {32 stackcacherefill(c, order) | |
33 x = c.stackcache[order].list | |
34 } | |
35 c.stackcache[order].list = x.ptr().next | |
36 c.stackcache[order].size -= uintptr(n) | |
37 } | |
38 v = unsafe.Pointer(x) | |
39 } else { | |
40 var s *mspan | |
41 npage := uintptr(n) >> _PageShift | |
42 log2npage := stacklog2(npage) | |
43 | |
44 // Try to get a stack from the large stack cache. | |
45 lock(&stackLarge.lock) | |
46 if !stackLarge.free[log2npage].isEmpty() {47 s = stackLarge.free[log2npage].first | |
48 stackLarge.free[log2npage].remove(s) | |
49 } | |
50 unlock(&stackLarge.lock) | |
51 | |
52 lockWithRankMayAcquire(&mheap_.lock, lockRankMheap) | |
53 | |
54 if s == nil { | |
55 // Allocate a new stack from the heap. | |
56 s = mheap_.allocManual(npage, spanAllocStack) | |
57 if s == nil {58 throw("out of memory") | |
59 } | |
60 osStackAlloc(s) | |
61 s.elemsize = uintptr(n) | |
62 } | |
63 v = unsafe.Pointer(s.base()) | |
64 } | |
65 | |
66 if raceenabled {67 racemalloc(v, uintptr(n)) | |
68 } | |
69 if msanenabled {70 msanmalloc(v, uintptr(n)) | |
71 } | |
72 if stackDebug >= 1 {73 print("allocated", v, "\n") | |
74 } | |
75 return stack{uintptr(v), uintptr(v) + uintptr(n)} | |
76 } |
非 g0 的 g 为什么要在堆上调配空间?
尽管堆不如栈快,然而 goroutine 是 go 模仿的线程,具备动静扩容和缩容的能力,而零碎栈是线性空间,在零碎栈上产生缩容和扩容会存在空间有余或者栈空间碎片等问题;所以 go 这里在堆上调配协程栈;因为是在堆空间也就意味着这部分空间也须要进行垃圾回收和开释;所以 Go 的 GC 是多线程并发标记时,内存屏障是对整个协程栈标记灰色,来让回收器进行扫描。
3.1.5 G 的上下文设置和切换
协程栈的切换次要是在两个中央,由执行调度逻辑的 g0 切换到执行用户逻辑的 g 的过程,以及执行用户逻辑的 g 退出或者被抢占切换为 g0 执行调度的过程,抢占在下文中介绍
下面代码中当 newg 被初始化时,会初始化 sched 中的 pc 和 sp 指针,其中会把 pc 先设置为 goexit 函数的第二条指令。
1 // 把 newg 的 sched 构造体成员的所有字段都设置为 0,其实就是初始化 | |
2 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) | |
3 newg.sched.sp = sp | |
4 newg.stktopsp = sp | |
5 // pc 指针示意当 newg 被调度起来时从这个地位开始执行 | |
6 // 这里是先设置为 goexit,在 gostartcallfn 中会进行解决,更改 sp 为这里的 pc,将 pc 改为真正的协程工作函数 fn 的指令地位 | |
7 // 这样使得工作函数执行结束后,会继续执行 goexit 中相干的清理工作 | |
8 newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function | |
9 newg.sched.g = guintptr(unsafe.Pointer(newg)) // 保留以后的 g | |
10 gostartcallfn(&newg.sched, fn) // 在这里实现 g 启动时所有相干上下文指针的设置,次要为 sp、pc 和 ctxt,ctxt 被设置为 fn | |
11 newg.gopc = callerpc // 保留 newproc 的 pc,即调用者创立时的指令地位 |
而后进入 gostartcallfn 函数,最终是在 gostartcall 函数中进行解决
1 // gostartcallfn 位于 runtime/stack.go 中 | |
2 | |
3 func gostartcallfn(gobuf *gobuf, fv *funcval) { | |
4 var fn unsafe.Pointer | |
5 if fv != nil {6 fn = unsafe.Pointer(fv.fn) | |
7 } else {8 fn = unsafe.Pointer(funcPC(nilfunc)) | |
9 } | |
10 gostartcall(gobuf, fn, unsafe.Pointer(fv)) | |
11 } | |
12 | |
13 // gostartcall 位于 runtime/sys_x86.go 中 | |
14 | |
15 func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) { | |
16 // newg 的栈顶,目前 newg 栈上只有 fn 函数的参数,sp 指向的是 fn 的第一个参数 | |
17 sp := buf.sp | |
18 // 为返回地址预留空间 | |
19 sp -= sys.PtrSize | |
20 // buf.pc 中设置的是 goexit 函数中的第二条指令 | |
21 // 因为栈是自顶向下,先进后出,所以这里假装 fn 是被 goexit 函数调用的,goexit 在前 fn 在后 | |
22 // 使得 fn 返回后到 goexit 继续执行,以实现一些清理工作。23 *(*uintptr)(unsafe.Pointer(sp)) = buf.pc | |
24 buf.sp = sp // 从新设置栈顶 | |
25 // 将 pc 指向 goroutine 的工作函数 fn,这样当 goroutine 取得执行权时,从工作函数入口开始执行 | |
26 // 如果是主协程,那么 fn 就是 runtime.main,从这里开始执行 | |
27 buf.pc = uintptr(fn) | |
28 buf.ctxt = ctxt | |
29 } |
能够看到在 newg 初始化时进行的一系列设置工作,将 goexit 先压入栈顶,而后伪造 sp 地位,让 cpu 看起来是从 goexit 中调用的协程工作函数,而后将 pc 指针指向工作函数,当协程被执行时,从 pc 处开始执行,工作函数执行结束后执行 goexit;
这里是设置工作,具体的切换工作,须要经由 schedule 调度函数选中一个 g,进入 execute 函数设置 g 的相干状态和栈爱护字段等信息,而后进入 gogo 函数,通过汇编语言,将 CPU 寄存器以及函数调用栈切换为 g 的 sched 中相干指针和协程栈。gogo 函数源码如下:
1 // gogo 的具体汇编代码位于 asm_amd64.s 中 | |
2 | |
3 // func gogo(buf *gobuf) | |
4 // restore state from Gobuf; longjmp | |
5 TEXT runtime·gogo(SB), NOSPLIT, $0-8 | |
6 // 0(FP)示意第一个参数,即 buf=&gp.sched | |
7 MOVQ buf+0(FP), BX // gobuf | |
8 // DX = gp.sched.g,DX 代表数据寄存器 | |
9 MOVQ gobuf_g(BX), DX | |
10 MOVQ 0(DX), CX // make sure g != nil | |
11 JMP gogo<>(SB) | |
12 | |
13 TEXT gogo<>(SB), NOSPLIT, $0 | |
14 // 将 tls 保留到 CX 寄存器 | |
15 get_tls(CX) | |
16 // 上面这条指令把以后要运行的 g(下面第 9 行中曾经把 go_buf 中的 g 放入到了 DX 中),17 // 放入 CX 寄存器的 g 地位即 tls[0]这个地位,也就是线程的本地存储中,18 // 这样下次 runtime 中调用 getg 时获取的就是这个 g | |
19 MOVQ DX, g(CX) | |
20 MOVQ DX, R14 // set the g register | |
21 // 把 CPU 的 SP 寄存器设置为 g.sched.sp 这样就实现了栈的切换,从 g0 切换为 g | |
22 MOVQ gobuf_sp(BX), SP // restore SP | |
23 // 将 ret、ctxt、bp 别离存入对应的寄存器,实现了 CPU 上下文的切换 | |
24 MOVQ gobuf_ret(BX), AX | |
25 MOVQ gobuf_ctxt(BX), DX | |
26 MOVQ gobuf_bp(BX), BP | |
27 // 清空 sched 的值,相干值曾经存入到寄存器中,这里清空后能够缩小 GC 的工作量 | |
28 MOVQ $0, gobuf_sp(BX) // clear to help garbage collector | |
29 MOVQ $0, gobuf_ret(BX) | |
30 MOVQ $0, gobuf_ctxt(BX) | |
31 MOVQ $0, gobuf_bp(BX) | |
32 // 把 sched.pc 放入 BX 寄存器 | |
33 MOVQ gobuf_pc(BX), BX | |
34 // JMP 把 BX 的值放入 CPU 的 IP 寄存器,所以这时候 CPU 从该地址开始继续执行指令 | |
35 JMP BX |
AX、BX、CX、DX 是 8086 处理器的 4 个数据寄存器,能够简略认为相当于 4 个硬件的变量;
上文总体来说,将 g 存入到 tls 中(线程的本地存储),设置 SP 和相干寄存器为 g.sched 中的字段(SP、ret、ctxt、bp),而后跳转到 pc 指针地位执行指令
3.1.6 G 的退出解决
协程栈的退出须要分为两种状况,即运行 main 函数的主协程和一般的用户协程;
主协程的 fn 工作函数位于 proc.go 中的 main 函数中,对于主协程 g.shched.pc 指向的也是这个地位,这里会调用用户的 mian 函数(main_main),main_main 运行结束后,会调用 exit(0)间接退出,而不会跑到 goexit 函数中。
1 // runtime/proc.go 中的 main 函数 | |
2 // The main goroutine. | |
3 func main() {4 g := getg() | |
5 | |
6 ................. | |
7 fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime | |
8 fn() | |
9 .................. | |
10 .................. | |
11 exit(0) | |
12 .................. | |
13 } |
用户协程因为将 goexit 作为协程栈栈底,所以当执行完协程工作函数时,会执行 goexit 函数,goexit 是一段汇编指令:
1 // The top-most function running on a goroutine | |
2 // returns to goexit+PCQuantum. | |
3 TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME,$0-0 | |
4 BYTE $0x90 // NOP | |
5 CALL runtime·goexit1(SB) // does not return | |
6 // traceback from goexit1 must hit code range of goexit | |
7 BYTE $0x90 // NOP |
这里间接调用 goexit1,goexit1 位于 runtime/proc.go 中
1 // Finishes execution of the current goroutine. | |
2 func goexit1() { | |
3 if raceenabled {4 racegoend() | |
5 } | |
6 if trace.enabled {7 traceGoEnd() | |
8 } | |
9 mcall(goexit0) | |
10 } |
通过 mcall 调用 goexit0,mcall 是一段汇编代码它的作用是把执行的栈切换到 g0 的栈
1 TEXT runtime·mcall<ABIInternal>(SB), NOSPLIT, $0-8 | |
2 MOVQ AX, DX // DX = fn | |
3 | |
4 // save state in g->sched | |
5 // mcall 返回地址放入 BX 中 | |
6 MOVQ 0(SP), BX // caller's PC | |
7 // 上面局部是保留 g 的执行上下文,pc、sp、bp | |
8 // g.shced.pc = BX | |
9 MOVQ BX, (g_sched+gobuf_pc)(R14) | |
10 LEAQ fn+0(FP), BX // caller's SP | |
11 MOVQ BX, (g_sched+gobuf_sp)(R14) | |
12 MOVQ BP, (g_sched+gobuf_bp)(R14) | |
13 | |
14 // switch to m->g0 & its stack, call fn | |
15 // 将 g.m 保留到 BX 寄存器中 | |
16 MOVQ g_m(R14), BX | |
17 // 这段代码意思是从 m 构造体中获取 g0 字段保留到 SI 中 | |
18 MOVQ m_g0(BX), SI // SI = g.m.g0 | |
19 CMPQ SI, R14 // if g == m->g0 call badmcall | |
20 // goodm 中实现了从 g 的栈切换到 g0 的栈 | |
21 JNE goodm | |
22 JMP runtime·badmcall(SB) | |
23 goodm: | |
24 MOVQ R14, AX // AX (and arg 0) = g | |
25 MOVQ SI, R14 // g = g.m.g0 | |
26 get_tls(CX) // Set G in TLS | |
27 MOVQ R14, g(CX) | |
28 MOVQ (g_sched+gobuf_sp)(R14), SP // sp = g0.sched.sp | |
29 PUSHQ AX // open up space for fn's arg spill slot | |
30 MOVQ 0(DX), R12 | |
31 // 这里意思是调用 goexit0(g) | |
32 CALL R12 // fn(g) | |
33 POPQ AX | |
34 JMP runtime·badmcall2(SB) | |
35 RET |
goexit0 代码位于 runtime/proc.go 中,他次要实现最初的清理工作:
1、把 g 的状态从——Gruning 变为 Gdead
2、清空 g 的一些字段
3、接触 g 与 m 的绑定关系,即 g.m = nil;m.currg = nil
4、把 g 放入 p 的 freeg 队列中,下次创立 g 能够间接获取,而不必从内存调配
5、调用 schedule 进入下一次调度循环
1 // 这段代码执行在 g0 的栈上,gp 是咱们要解决退出的 g 的构造体指针 | |
2 // goexit continuation on g0. | |
3 func goexit0(gp *g) {4 _g_ := getg() // 获取 g0 | |
5 // 更改 g 的状态为_Gdead | |
6 casgstatus(gp, _Grunning, _Gdead) | |
7 if isSystemGoroutine(gp, false) {8 atomic.Xadd(&sched.ngsys, -1) | |
9 } | |
10 // 清空 g 的一些字段 | |
11 gp.m = nil | |
12 locked := gp.lockedm != 0 | |
13 gp.lockedm = 0 | |
14 _g_.m.lockedg = 0 | |
15 gp.preemptStop = false | |
16 gp.paniconfault = false | |
17 gp._defer = nil // should be true already but just in case. | |
18 gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data. | |
19 gp.writebuf = nil | |
20 gp.waitreason = 0 | |
21 gp.param = nil | |
22 gp.labels = nil | |
23 gp.timer = nil | |
24 | |
25 if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 { | |
26 // Flush assist credit to the global pool. This gives | |
27 // better information to pacing if the application is | |
28 // rapidly creating an exiting goroutines. | |
29 assistWorkPerByte := float64frombits(atomic.Load64(&gcController.assistWorkPerByte)) | |
30 scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes)) | |
31 atomic.Xaddint64(&gcController.bgScanCredit, scanCredit) | |
32 gp.gcAssistBytes = 0 | |
33 } | |
34 // 接触 g 与 m 的绑定关系 | |
35 dropg() | |
36 | |
37 if GOARCH == "wasm" { // no threads yet on wasm | |
38 gfput(_g_.m.p.ptr(), gp) | |
39 schedule() // never returns | |
40 } | |
41 | |
42 if _g_.m.lockedInt != 0 {43 print("invalid m->lockedInt =", _g_.m.lockedInt, "\n") | |
44 throw("internal lockOSThread error") | |
45 } | |
46 // 将 g 退出 p 的闲暇队列 | |
47 gfput(_g_.m.p.ptr(), gp) | |
48 if locked { | |
49 // The goroutine may have locked this thread because | |
50 // it put it in an unusual kernel state. Kill it | |
51 // rather than returning it to the thread pool. | |
52 | |
53 // Return to mstart, which will release the P and exit | |
54 // the thread. | |
55 if GOOS != "plan9" { // See golang.org/issue/22227. | |
56 gogo(&_g_.m.g0.sched) | |
57 } else { | |
58 // Clear lockedExt on plan9 since we may end up re-using | |
59 // this thread. | |
60 _g_.m.lockedExt = 0 | |
61 } | |
62 } | |
63 // 执行下一轮调度 | |
64 schedule() | |
65 } |
3.2 P 源码局部
3.2.1 P 的构造
1 // runtime/runtime2.go | |
2 | |
3 type p struct { | |
4 // 全局变量 allp 中的索引地位 | |
5 id int32 | |
6 // p 的状态标识 | |
7 status uint32 // one of pidle/prunning/... | |
8 link puintptr | |
9 // 调用 schedule 的次数,每次调用 schedule 这个值会加 1 | |
10 schedtick uint32 // incremented on every scheduler call | |
11 // 零碎调用的次数,每次进行零碎调用加 1 | |
12 syscalltick uint32 // incremented on every system call | |
13 // 用于 sysmon 协程记录被监控的 p 的零碎调用工夫和运行工夫 | |
14 sysmontick sysmontick // last tick observed by sysmon | |
15 // 指向绑定的 m,p 如果是 idle 状态这个值为 nil | |
16 m muintptr // back-link to associated m (nil if idle) | |
17 // 用于调配渺小对象和小对象的一个块的缓存空间,外面有各种不同等级的 span | |
18 mcache *mcache | |
19 // 一个 chunk 大小(512kb)的内存空间,用来对堆上内存调配的缓存优化达到无锁拜访的目标 | |
20 pcache pageCache | |
21 raceprocctx uintptr | |
22 | |
23 deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go) | |
24 deferpoolbuf [5][32]*_defer | |
25 | |
26 // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. | |
27 // 能够调配给 g 的 id 的缓存,每次会一次性申请 16 个 | |
28 goidcache uint64 | |
29 goidcacheend uint64 | |
30 | |
31 // Queue of runnable goroutines. Accessed without lock. | |
32 // 本地可运行的 G 队列的头部和尾部,达到无锁拜访 | |
33 runqhead uint32 | |
34 runqtail uint32 | |
35 // 本地可运行的 g 队列,是一个应用数组实现的循环队列 | |
36 runq [256]guintptr | |
37 // 下一个待运行的 g,这个 g 的优先级最高 | |
38 // 如果以后 g 运行完后还有残余可用工夫,那么就应该运行这个 runnext 的 g | |
39 runnext guintptr | |
40 | |
41 // Available G's (status == Gdead) | |
42 // p 上的闲暇队列列表 | |
43 gFree struct { | |
44 gList | |
45 n int32 | |
46 } | |
47 | |
48 ............ | |
49 // 用于内存对齐 | |
50 _ uint32 // Alignment for atomic fields below | |
51 ....................... | |
52 // 是否被抢占 | |
53 preempt bool | |
54 | |
55 // Padding is no longer needed. False sharing is now not a worry because p is large enough | |
56 // that its size class is an integer multiple of the cache line size (for any of our architectures). | |
57 } |
通过这里的构造能够看出,尽管 P 叫做逻辑处理器 Processor,实际上它更多是资源的管理者,其中蕴含了可运行的 g 队列资源、内存调配的资源、以及对调度循环、零碎调用、sysmon 协程的相干记录。通过 P 的资源管理来尽量实现无锁拜访,晋升利用性能。
3.2.2 P 的状态
当程序刚开始运行进行初始化时,所有的 P 都处于_Pgcstop 状态,随着的 P 的初始化(runtime.procresize),会被设置为_Pidle 状态。
当 M 须要运行时会调用 runtime.acquirep 来使 P 变为_Prunning 状态,并通过 runtime.releasep 来开释,从新变为_Pidele。
当 G 执行时须要进入零碎调用,P 会被设置为_Psyscall,如果这个时候被系统监控争夺(runtime.retake),则 P 会被从新批改为_Pidle。
如果在程序中产生 GC,则 P 会被设置为_Pgcstop,并在 runtime.startTheWorld 时从新调整为_Prunning。
(这部分文字来自《Go 程序员面试宝典》,图片来自这里)
3.2.3 P 的创立
P 的初始化是在 schedinit 函数中调用的,schedinit 函数是在 runtime 的汇编启动代码里调用的。
1 ........................... | |
2 CALL runtime·args(SB) | |
3 CALL runtime·osinit(SB) | |
4 CALL runtime·schedinit(SB) | |
5 ........................... |
shcedinit 中通过调用 procresize 进行 P 的调配。P 的个数默认等于 CPU 核数,如果设置了 GOMAXPROCS 环境变量,则会采纳设置的值来确定 P 的个数。所以 runtime.GOMAXPROCS 是限度的并行线程数量,而不是零碎线程即 M 的总数,M 是按需创立。
1 func schedinit() { | |
2 ................. | |
3 lock(&sched.lock) | |
4 sched.lastpoll = uint64(nanotime()) | |
5 procs := ncpu | |
6 if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { | |
7 procs = n | |
8 } | |
9 if procresize(procs) != nil {10 throw("unknown runnable goroutine during bootstrap") | |
11 } | |
12 unlock(&sched.lock) | |
13 | |
14 // World is effectively started now, as P's can run. | |
15 worldStarted() | |
16 ..................... | |
17 } |
下面获取 ncpu 的个数,而后传递给 procresize 函数。
无论是初始化时的调配,还是前期调整,都是通过 procresize 来创立 p 以及初始化
1 func procresize(nprocs int32) *p { | |
2 ............................. | |
3 old := gomaxprocs | |
4 ...................... | |
5 if nprocs > int32(len(allp)) { | |
6 // Synchronize with retake, which could be running | |
7 // concurrently since it doesn't run on a P. | |
8 lock(&allpLock) | |
9 if nprocs <= int32(cap(allp)) { | |
10 // 如果须要的 p 小于 allp 这个全局变量(切片)的 cap 能力,取其中的一部分 | |
11 allp = allp[:nprocs] | |
12 } else { | |
13 // 否则创立 nprocs 数量的 p,并把 allp 的中复制给 nallp | |
14 nallp := make([]*p, nprocs) | |
15 // Copy everything up to allp's cap so we | |
16 // never lose old allocated Ps. | |
17 copy(nallp, allp[:cap(allp)]) | |
18 allp = nallp | |
19 } | |
20 .................................... | |
21 unlock(&allpLock) | |
22 } | |
23 | |
24 // 进行 p 的初始化 | |
25 for i := old; i < nprocs; i++ {26 pp := allp[i] | |
27 if pp == nil {28 pp = new(p) | |
29 } | |
30 pp.init(i) | |
31 atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) | |
32 } | |
33 ............................... | |
34 return runnablePs | |
35 } |
在启动时候会依据 CPU 核数和 runtime.GOMAXPROCS 来设置 p 的个数,并由一个叫做 allp 的切片来为主,前期能够通过设置 GOMAXPROCS 来调整 P 的个数,然而性能耗费很大,会进行 STW;
3.3 M 源码局部
3.3.1 M 的构造
M 即 Machine,代表一个过程中的工作线程,构造体 m 保留了 M 本身应用的栈信息、以后正在 M 上执行的 G,以及绑定 M 的 P 信息等。
咱们来看下 m 的构造体:
1 type m struct { | |
2 // 每个 m 都有一个对应的 g0 线程,用来执行调度代码,3 // 当须要执行用户代码的时候,g0 会与用户 goroutine 产生协程栈切换 | |
4 g0 *g // goroutine with scheduling stack | |
5 morebuf gobuf // gobuf arg to morestack | |
6 ........................ | |
7 // tls 作为线程的本地存储 | |
8 // 其中能够在任意时刻获取绑定到以后线程上的协程 g、构造体 m、逻辑处理器 p、非凡协程 g0 等信息 | |
9 tls [tlsSlots]uintptr // thread-local storage (for x86 extern register) | |
10 mstartfn func() | |
11 // 指向正在运行的 goroutine 对象 | |
12 curg *g // current running goroutine | |
13 caughtsig guintptr // goroutine running during fatal signal | |
14 // 与当前工作线程绑定的 p | |
15 p puintptr // attached p for executing go code (nil if not executing go code) | |
16 nextp puintptr | |
17 oldp puintptr // the p that was attached before executing a syscall | |
18 id int64 | |
19 mallocing int32 | |
20 throwing int32 | |
21 // 与禁止抢占相干的字段,如果该字段不等于空字符串,要放弃 curg 始终在这个 m 上运行 | |
22 preemptoff string // if != "", keep curg running on this m | |
23 // locks 也是判断 g 是否被抢占的一个标识 | |
24 locks int32 | |
25 dying int32 | |
26 profilehz int32 | |
27 // spining 为 true 标识以后 m 正在处于本人找工作的自旋状态,28 // 首先查看全局队列看是否有工作,而后查看 network poller,尝试执行 GC 工作 | |
29 // 或者偷一部分工作,如果都没有则会进入休眠状态 | |
30 spinning bool // m is out of work and is actively looking for work | |
31 // 示意 m 正阻塞在 note 上 | |
32 blocked bool // m is blocked on a note | |
33 ......................... | |
34 doesPark bool // non-P running threads: sysmon and newmHandoff never use .park | |
35 // 没有 goroutine 须要运行时,工作线程睡眠在这个 park 成员上 | |
36 park note | |
37 // 记录所有工作线程的一个链表 | |
38 alllink *m // on allm | |
39 schedlink muintptr | |
40 lockedg guintptr | |
41 createstack [32]uintptr // stack that created this thread. | |
42 ............................. | |
43 } |
3.3.2 M 的状态
M 的状态并没有向 P 和 G 那样有多个状态常量,它只有自旋和非自旋两种状态
1 mstart | |
2 | | |
3 v | |
4 +------+ 找不到可执行工作 +-------+ | |
5 |unspin| ----------------------------> |spining| | |
6 | | <---------------------------- | | | |
7 +------+ 找到可执行工作 +-------+ | |
8 ^ | stopm | |
9 | wakep v | |
10 notewakeup <------------------------- notesleep |
当 M 没有工作时,它会自旋的来找工作,首先查看全局队列看是否有工作,而后查看 network poller,尝试执行 GC 工作,或者偷一部分工作,如果都没有则会进入休眠状态。当被其余工作线程唤醒,又会进入自旋状态。
3.3.3 M 的创立
runtime/proc.go 中的 newm 函数用来新建 m,而最终是依据不同的零碎,通过零碎调用来创立线程。
1 // newm 创立一个新的 m,将从 fn 或者调度程序开始执行 | |
2 func newm(fn func(), _p_ *p, id int64) { | |
3 // 这里实现 m 的创立 | |
4 mp := allocm(_p_, fn, id) | |
5 mp.doesPark = (_p_ != nil) | |
6 mp.nextp.set(_p_) | |
7 mp.sigmask = initSigmask | |
8 if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" { | |
9 // 咱们处于锁定的 M 或可能由 C 启动的线程。此线程的内核状态可能很奇怪(用户可能已将其锁定为此目标)。10 // 咱们不想将其克隆到另一个线程中。相同,申请一个已知良好的线程为咱们创立线程。11 lock(&newmHandoff.lock) | |
12 if newmHandoff.haveTemplateThread == 0 {13 throw("on a locked thread with no template thread") | |
14 } | |
15 mp.schedlink = newmHandoff.newm | |
16 newmHandoff.newm.set(mp) | |
17 if newmHandoff.waiting { | |
18 newmHandoff.waiting = false | |
19 notewakeup(&newmHandoff.wake) | |
20 } | |
21 unlock(&newmHandoff.lock) | |
22 return | |
23 } | |
24 // 这里调配真正的的操作系统线程 | |
25 newm1(mp) | |
26 } |
allocm 函数中实现 m 的创立,以及对应的 g0 协程的设置
1 func allocm(_p_ *p, fn func(), id int64) *m { | |
2 // 获取以后运行的 g | |
3 _g_ := getg() | |
4 // 将_g_对应的 m 的 locks 加 1,避免被抢占 | |
5 acquirem() // disable GC because it can be called from sysmon | |
6 if _g_.m.p == 0 {7 acquirep(_p_) // temporarily borrow p for mallocs in this function | |
8 } | |
9 | |
10 // 查看是有有闲暇的 m 能够开释,次要目标是开释 m 上的 g0 占用的零碎栈 | |
11 if sched.freem != nil {12 lock(&sched.lock) | |
13 var newList *m | |
14 for freem := sched.freem; freem != nil; { | |
15 if freem.freeWait != 0 { | |
16 next := freem.freelink | |
17 freem.freelink = newList | |
18 newList = freem | |
19 freem = next | |
20 continue | |
21 } | |
22 // stackfree must be on the system stack, but allocm is | |
23 // reachable off the system stack transitively from | |
24 // startm. | |
25 systemstack(func() {26 stackfree(freem.g0.stack) | |
27 }) | |
28 freem = freem.freelink | |
29 } | |
30 sched.freem = newList | |
31 unlock(&sched.lock) | |
32 } | |
33 // 创立一个 m 构造体 | |
34 mp := new(m) | |
35 mp.mstartfn = fn // 将 fn 设置为 m 启动后执行的函数 | |
36 mcommoninit(mp, id) | |
37 | |
38 // In case of cgo or Solaris or illumos or Darwin, pthread_create will make us a stack. | |
39 // Windows and Plan 9 will layout sched stack on OS stack. | |
40 if iscgo || mStackIsSystemAllocated() {41 mp.g0 = malg(-1) | |
42 } else { | |
43 // 设置 m 对应的 g0,并设置对应大小的 g0 协程栈,g0 是 8kb | |
44 mp.g0 = malg(8192 * sys.StackGuardMultiplier) | |
45 } | |
46 // 设置 g0 对应的 m | |
47 mp.g0.m = mp | |
48 | |
49 if _p_ == _g_.m.p.ptr() {50 releasep() | |
51 } | |
52 // 解除_g_的 m 的禁止抢占状态。53 releasem(_g_.m) | |
54 | |
55 return mp | |
56 } |
为什么 m.locks 加 1 能够禁止抢占,避免 GC;因为判断是否能够抢占,其中一个因素就是要 m.locks=0
1 func canPreemptM(mp *m) bool {2 return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning | |
3 } |
allocm 函数实现了 m 的创立,然而这只是 runtime 层面的一个数据结构,还并没有在零碎中有真正的线程。再来看 newm1 函数:
1 func newm1(mp *m) { | |
2 .............. | |
3 execLock.rlock() // Prevent process clone. | |
4 // 创立一个零碎线程,并且传入该 mp 绑定的 g0 的栈顶指针 | |
5 // 让零碎线程执行 mstart 函数,前面的逻辑都在 mstart 函数中 | |
6 newosproc(mp) | |
7 execLock.runlock() | |
8 } |
理论是通过 newostproc 来创立零碎线程;
能够看到这个函数在不同的零碎中有不同的实现,咱们次要看 linux 零碎,即 os_linux.go 文件代码:
1 func newosproc(mp *m) {2 stk := unsafe.Pointer(mp.g0.stack.hi) | |
3 ......................... | |
4 var oset sigset | |
5 sigprocmask(_SIG_SETMASK, &sigset_all, &oset) | |
6 ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) | |
7 sigprocmask(_SIG_SETMASK, &oset, nil) | |
8 ......................... | |
9 } |
在 linux 平台,是通过 clone 这个零碎调用来创立的线程;值得注意的是,这个零碎线程的栈是在堆上;因为其中的 stk 指向 mp.go.stack.hi,所以 g0 的堆栈也就是这个零碎线程的堆栈。
3.3.4 M 的休眠
M 的自旋指的是 m.spining 为 true,这个时候它会在 P 的本地 G 队列、全局 G 队列、nerpoller、偷其余 P 的 G,始终循环找可运行的 G 的过程中。
自旋状态用 m.spinning 和 sched.nmspinning 示意。其中 m.spinning 示意以后的 M 是否为自旋状态,sched.nmspinning 示意 runtime 中一共有多少个 M 在自旋状态。
当自旋了一段时间后,发现依然找不到工作,就会进入 stopm 函数中,使 M 对应的线程进行休眠。
1 func stopm() {2 _g_ := getg() | |
3 ..................... | |
4 lock(&sched.lock) | |
5 // 首先将 m 放到全局闲暇链表中,这里要加锁拜访全局链表 | |
6 mput(_g_.m) | |
7 unlock(&sched.lock) | |
8 // 进入睡眠状态 | |
9 mPark() | |
10 // 将 m 与 p 解绑 | |
11 acquirep(_g_.m.nextp.ptr()) | |
12 _g_.m.nextp = 0 | |
13 } | |
14 | |
15 func mPark() {16 g := getg() | |
17 for { | |
18 // 使工作线程休眠在 park 字段上 | |
19 notesleep(&g.m.park) | |
20 noteclear(&g.m.park) | |
21 if !mDoFixup() { | |
22 return | |
23 } | |
24 } | |
25 } |
理论将线程进行休眠的代码,是通过汇编语言进行 Futex 零碎调用来事项的。Futex 机制是 Linux 提供的一种用户态和内核态混合的同步机制。Linux 底层也是应用 futex 机制实现的锁。
1 //uaddr 指向一个地址,val 代表这个地址期待的值,当 *uaddr==val 时,才会进行 wait | |
2 int futex_wait(int *uaddr, int val); | |
3 // 唤醒 n 个在 uaddr 指向的锁变量上挂起期待的过程 | |
4 int futex_wake(int *uaddr, int n); |
能够看到在 futex_wait 中当一个地址等于某个期待值时,就会进行 wait;所以当 m 中的 park 的 key 等于某个值时则进入休眠状态。
3.3.5 M 的唤醒
M 的唤醒是在 wakep 函数中解决的。当一个新的 goroutine 创立或者有多个 goroutine 进入_Grunnable 状态时,会先判断是否有自旋的 M,如果有则不会唤醒其余的 M 而应用自旋的 M,当没有自旋的 M,但 m 闲暇队列中有闲暇的 M 则会唤醒 M,否则会创立一个新的 M
1 // Tries to add one more P to execute G's. | |
2 // Called when a G is made runnable (newproc, ready). | |
3 func wakep() {4 if atomic.Load(&sched.npidle) == 0 { | |
5 return | |
6 } | |
7 // be conservative about spinning threads | |
8 // 如果有其余的 M 处于自旋状态,那么就不论了,间接返回, 因为自旋的 M 会拼命找 G 来运行的 | |
9 if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) { | |
10 return | |
11 } | |
12 startm(nil, true) | |
13 } |
startm 先判断是否有闲暇的 P,如果没有则返回,如果有闲暇的 P,在尝试看看有没有闲暇的 M,有则唤醒该 M 来工作。如果没有闲暇 M,则新建一个 M,而后也进入唤醒操作。
1 func startm(_p_ *p, spinning bool) { | |
2 // 禁止抢占,避免 GC 垃圾回收 | |
3 mp := acquirem() | |
4 lock(&sched.lock) | |
5 // 如果 P 为 nil,则尝试获取一个闲暇 P | |
6 if _p_ == nil {7 _p_ = pidleget() | |
8 if _p_ == nil { // 没有闲暇的 P,则解除禁止抢占,间接返回 | |
9 unlock(&sched.lock) | |
10 if spinning {11 if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {12 throw("startm: negative nmspinning") | |
13 } | |
14 } | |
15 releasem(mp) | |
16 return | |
17 } | |
18 } | |
19 // 获取一个闲暇的 M | |
20 nmp := mget() | |
21 if nmp == nil { | |
22 // 如果没有闲暇的 m 则新建一个 | |
23 id := mReserveID() | |
24 unlock(&sched.lock) | |
25 | |
26 var fn func() | |
27 if spinning { | |
28 // The caller incremented nmspinning, so set m.spinning in the new M. | |
29 fn = mspinning | |
30 } | |
31 newm(fn, _p_, id) | |
32 // Ownership transfer of _p_ committed by start in newm. | |
33 // Preemption is now safe. | |
34 releasem(mp) | |
35 return | |
36 } | |
37 unlock(&sched.lock) | |
38 ...................... | |
39 // 标记该 M 是否在自旋 | |
40 nmp.spinning = spinning | |
41 // 暂存 P | |
42 nmp.nextp.set(_p_) | |
43 // 唤醒 M | |
44 notewakeup(&nmp.park) | |
45 // Ownership transfer of _p_ committed by wakeup. Preemption is now | |
46 // safe. | |
47 releasem(mp) | |
48 } |
唤醒线程的底层操作同样是依赖 futex 机制
1 // 唤醒 n 个在 uaddr 指向的锁变量上挂起期待的过程 | |
2 int futex_wake(int *uaddr, int n); |
4 启动过程
4.1 Go scheduler 构造
在 runtime 中全局变量 sched 代表全局调度器,数据结构为 schedt 构造体,保留了调度器的状态信息、全局可运行 G 队列
1 type schedt struct { | |
2 // 用来为 goroutine 生成惟一 id,须要以原子拜访模式进行拜访 | |
3 // 放在 struct 顶部,以便在 32 位零碎上能够进行对齐 | |
4 goidgen uint64 | |
5 ................... | |
6 lock mutex | |
7 // 闲暇的 m 组成的链表 | |
8 midle muintptr // idle m's waiting for work | |
9 // 闲暇的工作线程数量 | |
10 nmidle int32 // number of idle m's waiting for work | |
11 // 闲暇的且被 lock 的 m 的数量 | |
12 nmidlelocked int32 // number of locked m's waiting for work | |
13 mnext int64 // number of m's that have been created and next M ID | |
14 // 示意最多可能创立的工作线程数量 | |
15 maxmcount int32 // maximum number of m's allowed (or die) | |
16 nmsys int32 // number of system m's not counted for deadlock | |
17 nmfreed int64 // cumulative number of freed m's | |
18 // 整个 goroutine 的数量,可能主动放弃更新 | |
19 ngsys uint32 // number of system goroutines; updated atomically | |
20 // 由闲暇的 p 构造体对象组成的链表 | |
21 pidle puintptr // idle p's | |
22 // 闲暇的 p 构造体对象的数量 | |
23 npidle uint32 | |
24 // 自旋的 m 的数量 | |
25 nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go. | |
26 | |
27 // Global runnable queue. | |
28 // 全局的的 g 的队列 | |
29 runq gQueue | |
30 runqsize int32 | |
31 | |
32 disable struct { | |
33 // user disables scheduling of user goroutines. | |
34 user bool | |
35 runnable gQueue // pending runnable Gs | |
36 n int32 // length of runnable | |
37 } | |
38 | |
39 // Global cache of dead G's. | |
40 // 闲暇的 g 队列,这外面 g 的状态为_Gdead | |
41 gFree struct { | |
42 lock mutex | |
43 stack gList // Gs with stacks | |
44 noStack gList // Gs without stacks | |
45 n int32 | |
46 } | |
47 ................. | |
48 // 闲暇的 m 队列 | |
49 freem *m | |
50 ..................... | |
51 // 上次批改 gomaxprocs 的工夫 | |
52 procresizetime int64 // nanotime() of last change to gomaxprocs | |
53 ...................... | |
54 } |
这外面大部分是记录一些闲暇的 g、m、p 等,在 runtime2.go 中还有很多相干的全局变量:
1 // runtime/runtime2.go | |
2 var ( | |
3 // 保留所有的 m | |
4 allm *m | |
5 // p 的最大个数,默认等于 cpu 核数 | |
6 gomaxprocs int32 | |
7 // cpu 的核数,程序启动时会调用 osinit 获取 ncpu 值 | |
8 ncpu int32 | |
9 // 调度器构造体对象,记录了调度器的工作状态 | |
10 sched schedt | |
11 newprocs int32 | |
12 | |
13 allpLock mutex | |
14 // 全局的 p 队列 | |
15 allp []*p | |
16 ) | |
17 | |
18 // runtime/proc.go | |
19 var ( | |
20 // 代表过程主线程的 m0 对象 | |
21 m0 m | |
22 // m0 的 g0 | |
23 g0 g | |
24 // 全局的 mcache 对象,治理各种类型的 span 队列 | |
25 mcache0 *mcache | |
26 raceprocctx0 uintptr | |
27 ) |
4.2 启动流程
调度器的初始化和启动调度循环是在过程初始化是解决的,整个过程初始化流程如下:
Go 过程的启动是通过汇编代码进行的,入口函数在 asm_amd64.s 这个文件中的 runtime.rt0_go 局部代码;
1 // runtime·rt0_go | |
2 | |
3 // 程序刚启动的时候必然有一个线程启动(主线程)4 // 将以后的栈和资源保留在 g0 | |
5 // 将该线程保留在 m0 | |
6 // tls: Thread Local Storage | |
7 // set the per-goroutine and per-mach "registers" | |
8 get_tls(BX) | |
9 LEAQ runtime·g0(SB), CX | |
10 MOVQ CX, g(BX) | |
11 LEAQ runtime·m0(SB), AX | |
12 | |
13 // m0 和 g0 相互绑定 | |
14 // save m->g0 = g0 | |
15 MOVQ CX, m_g0(AX) | |
16 // save m0 to g0->m | |
17 MOVQ AX, g_m(CX) | |
18 // 解决 args | |
19 CALL runtime·args(SB) | |
20 // os 初始化,os_linux.go | |
21 CALL runtime·osinit(SB) | |
22 // 调度零碎初始化, proc.go | |
23 CALL runtime·schedinit(SB) | |
24 | |
25 // 创立一个 goroutine,而后开启执行程序 | |
26 // create a new goroutine to start program | |
27 MOVQ $runtime·mainPC(SB), AX // entry | |
28 PUSHQ AX | |
29 PUSHQ $0 // arg size | |
30 CALL runtime·newproc(SB) | |
31 POPQ AX | |
32 POPQ AX | |
33 | |
34 // start this M | |
35 // 启动线程,并且启动调度零碎 | |
36 CALL runtime·mstart(SB) |
能够看到通过汇编代码:
1、将 m0 与 g0 相互绑定,而后调用 runtime.osinit 函数,这个函数不同的操作系统有不同的实现;
2、而后调用 runtime.schedinit 进行调度零碎的初始化;
3、而后创立主协程;主 goroutine 创立实现后被退出到 p 的运行队列中,期待调度;
4、在 g0 上启动调用 runtime.mstart 启动调度循环,首先能够被调度执行的就是主 goroutine,而后主协程取得运行的 cpu 则执行 runtime.main 进而执行到用户代码的 main 函数。
初始化过程和堆栈图能够参考下图:
堆栈上,g0、m0、p0 与主协程关系如图所示:
4.3 调度器初始化
调度器初始化
1 func schedinit() { | |
2 ................ | |
3 // g0 | |
4 _g_ := getg() | |
5 if raceenabled {6 _g_.racectx, raceprocctx0 = raceinit() | |
7 } | |
8 // 最多启动 10000 个工作线程 | |
9 sched.maxmcount = 10000 | |
10 | |
11 // The world starts stopped. | |
12 worldStopped() | |
13 | |
14 moduledataverify() | |
15 // 初始化协程堆栈,包含专门调配小栈的 stackpool 和调配大栈的 stackLarge | |
16 stackinit() | |
17 // 整个堆内存的初始化调配 | |
18 mallocinit() | |
19 fastrandinit() // must run before mcommoninit | |
20 // 初始化 m0 | |
21 mcommoninit(_g_.m, -1) | |
22 cpuinit() // must run before alginit | |
23 alginit() // maps must not be used before this call | |
24 modulesinit() // provides activeModules | |
25 typelinksinit() // uses maps, activeModules | |
26 itabsinit() // uses activeModules | |
27 | |
28 sigsave(&_g_.m.sigmask) | |
29 initSigmask = _g_.m.sigmask | |
30 | |
31 if offset := unsafe.Offsetof(sched.timeToRun); offset%8 != 0 {32 println(offset) | |
33 throw("sched.timeToRun not aligned to 8 bytes") | |
34 } | |
35 | |
36 goargs() | |
37 goenvs() | |
38 parsedebugvars() | |
39 gcinit() | |
40 // 这部分是初始化 p,41 // cpu 有多少个核数就初始化多少个 p | |
42 lock(&sched.lock) | |
43 sched.lastpoll = uint64(nanotime()) | |
44 procs := ncpu | |
45 if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { | |
46 procs = n | |
47 } | |
48 if procresize(procs) != nil {49 throw("unknown runnable goroutine during bootstrap") | |
50 } | |
51 unlock(&sched.lock) | |
52 | |
53 // World is effectively started now, as P's can run. | |
54 worldStarted() | |
55 } |
4.4 启动调度零碎
调度零碎时在 runtime.mstart0 函数中启动的。这个函数是在 m0 的 g0 上执行的。
1 func mstart0() { | |
2 // 这里获取的是 g0 在零碎栈上执行 | |
3 _g_ := getg() | |
4 ............. | |
5 mstart1() | |
6 ............. | |
7 } | |
8 | |
9 func mstart1(dummy int32) {10 _g_ := getg() | |
11 // 确保 g 是零碎栈上的 g0 | |
12 // 调度器只在 g0 上执行 | |
13 if _g_ != _g_.m.g0 {14 throw("bad runtime·mstart") | |
15 } | |
16 ... | |
17 // 初始化 m,次要是设置线程的备用信号堆栈和信号掩码 | |
18 minit() | |
19 // 如果以后 g 的 m 是初始 m0,执行 mstartm0() | |
20 if _g_.m == &m0 { | |
21 // 对于初始 m,须要一些非凡解决,次要是设置零碎信号量的处理函数 | |
22 mstartm0() | |
23 } | |
24 // 如果有 m 的起始工作函数,则执行,比方 sysmon 函数 | |
25 // 对于 m0 来说,是没有 mstartfn 的 | |
26 if fn := _g_.m.mstartfn; fn != nil {27 fn() | |
28 } | |
29 if _g_.m.helpgc != 0 { | |
30 _g_.m.helpgc = 0 | |
31 stopm() | |
32 } else if _g_.m != &m0 { // 如果不是 m0,须要绑定 p | |
33 // 绑定 p | |
34 acquirep(_g_.m.nextp.ptr()) | |
35 _g_.m.nextp = 0 | |
36 } | |
37 // 进入调度循环,永不返回 | |
38 schedule() | |
39 } |
4.5 runtime.main 函数
当通过初始的调度,主协程获取执行权后,首先进入的就是 runtime.main 函数:
1 // The main goroutine. | |
2 func main() { | |
3 // 获取 main goroutine | |
4 g := getg() | |
5 ... | |
6 // 在零碎栈上运行 sysmon | |
7 systemstack(func() { | |
8 // 调配一个新的 m,运行 sysmon 零碎后盾监控 | |
9 //(定期垃圾回收和调度抢占)10 newm(sysmon, nil) | |
11 }) | |
12 ... | |
13 // 确保是主线程 | |
14 if g.m != &m0 {15 throw("runtime.main not on m0") | |
16 } | |
17 // runtime 外部 init 函数的执行,编译器动静生成的。18 runtime_init() // must be before defer | |
19 ... | |
20 // gc 启动一个 goroutine 进行 gc 打扫 | |
21 gcenable() | |
22 ... | |
23 // 执行 init 函数,编译器动静生成的,24 // 包含用户定义的所有的 init 函数。25 // make an indirect call, | |
26 // as the linker doesn't know the address of | |
27 // the main package when laying down the runtime | |
28 fn := main_init | |
29 fn() | |
30 ... | |
31 // 真正的执行 main func in package main | |
32 // make an indirect call, | |
33 // as the linker doesn't know the address of | |
34 // the main package when laying down the runtime | |
35 fn = main_main | |
36 fn() | |
37 ... | |
38 // 退出程序 | |
39 exit(0) | |
40 // 为何这里还须要 for 循环?41 // 上面的 for 循环肯定会导致程序崩掉,这样就确保了程序肯定会退出 | |
42 for { | |
43 var x *int32 | |
44 *x = 0 | |
45 } | |
46 } |
runtime.main 函数中须要留神的是在零碎栈上创立了一个新的 m,来执行 sysmon 协程,这个协程是用来定期执行垃圾回收和调度抢占。
其中能够看到首先获取了 main_init 函数,来执行 runtime 包中的 init 函数,而后是获取 main_main 来执行用户的 main 函数。
接着 main 函数执行实现后,调用 exit 让主过程退出,如果过程没有退出,就让 for 循环始终拜访非法地址,让操作系统把过程杀死,这样来确保过程肯定会退出。
5 协程的调度策略
调度循环启动之后,便会进入一个有限循环中,一直的执行 schedule->execute->gogo->goroutine 工作 ->goexit->goexit1->mcall->goexit0->schedule;
其中调度的过程是在 m 的 g0 上执行的,而 goroutine 工作 ->goexit->goexit1->mcall 则是在 goroutine 的堆栈空间上执行的。
5.1 调度策略
其中 schedule 函数解决具体的调度策略;execute 函数执行一些具体的状态转移、协程 g 与构造体 m 之间的绑定;gogo 函数是与操作系统相干的函数,执行汇编代码实现栈的切换以及 CPU 寄存器的复原。
先来看下 schedule 的代码:
1 func schedule() {2 _g_ := getg() | |
3 ... | |
4 top: | |
5 // 如果以后 GC 须要进行整个世界(STW), 则调用 gcstopm 休眠以后的 M | |
6 if sched.gcwaiting != 0 { | |
7 // 为了 STW,进行以后的 M | |
8 gcstopm() | |
9 // STW 完结后回到 top | |
10 goto top | |
11 } | |
12 ... | |
13 var gp *g | |
14 var inheritTime bool | |
15 ... | |
16 if gp == nil { | |
17 // 为了避免全局队列中的 g 永远得不到运行,所以 go 语言让 p 每执行 61 调度,18 // 就须要优先从全局队列中获取一个 G 到以后 P 中,并执行下一个要执行的 G | |
19 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {20 lock(&sched.lock) | |
21 gp = globrunqget(_g_.m.p.ptr(), 1) | |
22 unlock(&sched.lock) | |
23 } | |
24 } | |
25 if gp == nil { | |
26 // 从 p 的本地队列中获取 | |
27 gp, inheritTime = runqget(_g_.m.p.ptr()) | |
28 if gp != nil && _g_.m.spinning {29 throw("schedule: spinning with local work") | |
30 } | |
31 } | |
32 if gp == nil { | |
33 // 想尽办法找到可运行的 G,找不到就不必返回了 | |
34 gp, inheritTime = findrunnable() // blocks until work is available | |
35 } | |
36 ... | |
37 // println("execute goroutine", gp.goid) | |
38 // 找到了 g,那就执行 g 上的工作函数 | |
39 execute(gp, inheritTime) | |
40 } |
findrunnalbe 中首先还是从本地队列中查看,而后从全局队列中寻找,再从就绪的网络协程,如果这几个没有就去其余 p 的本地队列偷一些工作。
1 func findrunnable() (gp *g, inheritTime bool) {2 _g_ := getg() | |
3 | |
4 top: | |
5 ............................ | |
6 // 本地队列中查看 | |
7 if gp, inheritTime := runqget(_p_); gp != nil { | |
8 return gp, inheritTime | |
9 } | |
10 // 从全局队列中寻找 | |
11 if sched.runqsize != 0 {12 lock(&sched.lock) | |
13 gp := globrunqget(_p_, 0) | |
14 unlock(&sched.lock) | |
15 if gp != nil { | |
16 return gp, false | |
17 } | |
18 } | |
19 // 从就绪的网络协程中查找 | |
20 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {21 if list := netpoll(0); !list.empty() { // non-blocking | |
22 gp := list.pop() | |
23 injectglist(&list) | |
24 casgstatus(gp, _Gwaiting, _Grunnable) | |
25 if trace.enabled {26 traceGoUnpark(gp, 0) | |
27 } | |
28 return gp, false | |
29 } | |
30 } | |
31 | |
32 // 进入自旋状态 | |
33 procs := uint32(gomaxprocs) | |
34 if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { | |
35 if !_g_.m.spinning { | |
36 _g_.m.spinning = true | |
37 atomic.Xadd(&sched.nmspinning, 1) | |
38 } | |
39 // 从其余 p 的本地队列中偷工作 | |
40 gp, inheritTime, tnow, w, newWork := stealWork(now) | |
41 .............................. | |
42 } | |
43 } |
整个调度的优先级如下:
5.2 调度机会
5.1 讲了调度的策略,什么机会产生调度呢,次要有三种形式,被动调度、被动调度、抢占式调度。
5.2.1 被动调度
协程能够抉择被动让渡本人的执行权力,大多数状况下不须要这么做,但通过 runtime.Goched 能够做到被动让渡。
1 func Gosched() {2 checkTimeouts() | |
3 mcall(gosched_m) | |
4 } | |
5 | |
6 // Gosched continuation on g0. | |
7 func gosched_m(gp *g) { | |
8 if trace.enabled {9 traceGoSched() | |
10 } | |
11 goschedImpl(gp) | |
12 } | |
13 | |
14 func goschedImpl(gp *g) {15 status := readgstatus(gp) | |
16 if status&^_Gscan != _Grunning {17 dumpgstatus(gp) | |
18 throw("bad g status") | |
19 } | |
20 // 更改 g 的运行状态 | |
21 casgstatus(gp, _Grunning, _Grunnable) | |
22 // 接触 g 和 m 的绑定关系 | |
23 dropg() | |
24 // 将 g 放入全局队列中 | |
25 lock(&sched.lock) | |
26 globrunqput(gp) | |
27 unlock(&sched.lock) | |
28 | |
29 schedule() | |
30 } |
5.2.2 被动调度
大部分状况下的调度都是被动调度,当协程在休眠、channel 通道阻塞、网络 IO 阻塞、执行垃圾回收时会暂停,被动调度能够保障最大化利用 CPU 资源。被动调度是协程发动的操作,所以调度机会绝对明确。
首先从以后栈切换到 g0 协程,被动调度不会将 G 放入全局运行队列,所以被动调度须要一个额定的唤醒机制。
这外面波及的函数次要是 gopark 和 ready 函数。
gopark 函数用来实现被动调度,有_Grunning 变为_Gwaiting 状态;
1 func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { | |
2 if reason != waitReasonSleep {3 checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy | |
4 } | |
5 // 禁止抢占和 GC | |
6 mp := acquirem() | |
7 gp := mp.curg // 过来以后 m 上运行的 g | |
8 status := readgstatus(gp) | |
9 if status != _Grunning && status != _Gscanrunning {10 throw("gopark: bad g status") | |
11 } | |
12 // 设置相干的 wait 字段 | |
13 mp.waitlock = lock | |
14 mp.waitunlockf = unlockf | |
15 gp.waitreason = reason | |
16 mp.waittraceev = traceEv | |
17 mp.waittraceskip = traceskip | |
18 releasem(mp) | |
19 // can't do anything that might move the G between Ms here. | |
20 mcall(park_m) | |
21 } | |
22 | |
23 // park continuation on g0. | |
24 func park_m(gp *g) {25 _g_ := getg() | |
26 // 变更 g 的状态 | |
27 casgstatus(gp, _Grunning, _Gwaiting) | |
28 // 接触 g 与 m 的绑定关系 | |
29 dropg() | |
30 // 依据被动调度不同起因,执行不同的 waitunlockf 函数 | |
31 if fn := _g_.m.waitunlockf; fn != nil {32 ok := fn(gp, _g_.m.waitlock) | |
33 _g_.m.waitunlockf = nil | |
34 _g_.m.waitlock = nil | |
35 if !ok { | |
36 if trace.enabled {37 traceGoUnpark(gp, 2) | |
38 } | |
39 casgstatus(gp, _Gwaiting, _Grunnable) | |
40 execute(gp, true) // Schedule it back, never returns. | |
41 } | |
42 } | |
43 // 进入下一轮调度 | |
44 schedule() | |
45 } |
当协程要被唤醒时,会进入 ready 函数中,更改协程状态由_Gwaiting 变更为_Grunnable,放入本地运行队列期待被调度。
1 func ready(gp *g, traceskip int, next bool) { | |
2 .............. | |
3 status := readgstatus(gp) | |
4 | |
5 // Mark runnable. | |
6 _g_ := getg() | |
7 mp := acquirem() // disable preemption because it can be holding p in a local var | |
8 ............... | |
9 // 变更状态之后,放入 p 的部分运行队列中 | |
10 casgstatus(gp, _Gwaiting, _Grunnable) | |
11 runqput(_g_.m.p.ptr(), gp, next) | |
12 wakep() | |
13 releasem(mp) | |
14 } |
5.2.3 抢占式调度
如果一个 g 运行工夫过长就会导致其余 g 难以获取运行机会,当进行零碎调用时也存在会导致其余 g 无奈运行状况;当呈现这两种状况时,为了让其余 g 有运行机会,则会进行抢占式调度。
是否进行抢占式调度次要是在 sysmon 协程上判断的。sysmon 协程是一个不须要 p 的协程,它作用次要是运行后盾监控,进行 netpool(获取 fd 事件)、retake(抢占式调度)、forcegc(按工夫强制执行 GC)、scavenge heap(强制开释闲置堆内存,缩小内存占用)
其中与抢占式调度相干的就是 retake 函数,
这里咱们须要晓得的是间断运行 10ms 则认为工夫过长,进行抢占
产生零碎调用时,以后正在工作的线程会陷入期待状态,期待外部实现零碎调用并返回,所以也须要让渡执行权给其余 g,但这里当只有满足几种状况才会进行调度:
1、如果 p 的运行队列中有期待运行的 g 则抢占
2、如果没有闲暇的 p 则进行抢占
3、零碎调用工夫超过 10ms 则进行抢占
1 func retake(now int64) uint32 { | |
2 n := 0 | |
3 lock(&allpLock) | |
4 // 遍历所有的 P | |
5 for i := 0; i < len(allp); i++ {6 _p_ := allp[i] | |
7 if _p_ == nil { | |
8 // This can happen if procresize has grown | |
9 // allp but not yet created new Ps. | |
10 continue | |
11 } | |
12 pd := &_p_.sysmontick | |
13 s := _p_.status | |
14 sysretake := false | |
15 | |
16 if s == _Prunning || s == _Psyscall { | |
17 // 判断如果 g 的运行工夫过长则抢占 | |
18 t := int64(_p_.schedtick) | |
19 if int64(pd.schedtick) != t {20 pd.schedtick = uint32(t) | |
21 pd.schedwhen = now | |
22 } else if pd.schedwhen+forcePreemptNS <= now { | |
23 // 如果间断运行 10ms 则进行抢占 | |
24 preemptone(_p_) | |
25 sysretake = true | |
26 } | |
27 } | |
28 // 针对零碎调用状况进行抢占 | |
29 // 如果 p 的运行队列中有期待运行的 g 则抢占 | |
30 // 如果没有闲暇的 p 则进行抢占 | |
31 // 零碎调用工夫超过 10ms 则进行抢占 | |
32 if s == _Psyscall {33 // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). | |
34 t := int64(_p_.syscalltick) | |
35 if !sysretake && int64(pd.syscalltick) != t {36 pd.syscalltick = uint32(t) | |
37 pd.syscallwhen = now | |
38 continue | |
39 } | |
40 // On the one hand we don't want to retake Ps if there is no other work to do, | |
41 // but on the other hand we want to retake them eventually | |
42 // because they can prevent the sysmon thread from deep sleep. | |
43 if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { | |
44 continue | |
45 } | |
46 // Drop allpLock so we can take sched.lock. | |
47 unlock(&allpLock) | |
48 // Need to decrement number of idle locked M's | |
49 // (pretending that one more is running) before the CAS. | |
50 // Otherwise the M from which we retake can exit the syscall, | |
51 // increment nmidle and report deadlock. | |
52 incidlelocked(-1) | |
53 if atomic.Cas(&_p_.status, s, _Pidle) { | |
54 if trace.enabled {55 traceGoSysBlock(_p_) | |
56 traceProcStop(_p_) | |
57 } | |
58 n++ | |
59 _p_.syscalltick++ | |
60 handoffp(_p_) | |
61 } | |
62 incidlelocked(1) | |
63 lock(&allpLock) | |
64 } | |
65 } | |
66 unlock(&allpLock) | |
67 return uint32(n) | |
68 } |
在进行抢占式调度,Go 还波及到利用操作系统信号形式来进行抢占,这里就不在介绍,感兴趣能够本人去钻研。
另外,本文图片没有一张原创,画图十分消耗工夫,我没有那么多工夫,所以只能到处借用大家的图片,侵权请分割。
6 参考资料
- Go 语言的调度模型:https://www.cnblogs.com/lvpen…
- 深度揭秘 Go 语言 sync.Pool:https://www.cnblogs.com/qcrao…
- https://toutiao.io/posts/2gic…
- 万字长文深入浅出 go runtime:https://zhuanlan.zhihu.com/p/…
- 深刻 go runtime 的调度:https://zboya.github.io/post/…
- 字节 Go 面试:https://leetcode-cn.com/circl…
- golang 调度学习 - 调度过程:https://blog.csdn.net/diaosss…
- Go 调度器中的三种构造:G、P、M:https://blog.csdn.net/chenxun…
- Go 语言的调度模型:https://www.cnblogs.com/lvpen…
- Go GMP 的调度模型:https://zhuanlan.zhihu.com/p/…
- 具体分析 Go 语言调度模型的设计:https://www.elecfans.com/d/16…
- Go 的外围 goroutine sysmon:https://blog.csdn.net/RA681t5…
- 一文教你搞懂 Go 中栈操作:https://zhuanlan.zhihu.com/p/…
- 详解 Go 语言调度循环代码实现:https://www.luozhiyun.com/arc…
- goroutine 的创立、休眠与复原:https://zhuanlan.zhihu.com/p/…