关于c++:zbh的协程学习总结

4次阅读

共计 21236 个字符,预计需要花费 54 分钟才能阅读完成。

IO 倒退历程1

从同步 IO 谈起

​ 长期以来,应用 C /C++ 编写服务器程序的时候,应用的往往是多过程模式:一个父过程负责 accept 传入连贯,而后 fork 一个子过程解决;或者是一个父过程创立了一个 socket 之后,fork 出多个子过程同时执行 accept 和解决,(例如 Apache Http Server 2.4 版本前,采纳 per request per thread 模型2)。

​ 这时候个别应用同步 IO 来解决,也就是应用阻塞的 readwrite,当过程被 IO 阻塞的时候挂起过程。益处是这种同步开发方式和人类思维相近,编写简略。害处之一是效率低:随着 C10K 问题3 的提出,在高并发场景下,cpu 忙于过程或线程的切换,大量 cpu 工夫被节约。

异步 IO 降临

​ 所谓异步 IO,就是发动 IO 申请的线程不等 IO 操作实现,就继续执行随后的代码,IO 后果用其余形式告诉发动 IO 申请的程序。

​ 最简略的情景就是通过设置 O_NONBLOCK 标记应用非阻塞的readwrite,定时轮询资源是否可用。如果资源不可用,会返回相应的谬误如EAGAIN

​ 然而在并发量很大时,cpu 同样失陷于轮询中,效率很低(次要是每次都要进行零碎调用,开销大)。有没有什么办法让咱们被动接管资源就绪的告诉,而不是一一被动轮询呢?在 Linux 下,咱们能够应用 select/poll/epoll 来同时监听多个文件描述符,看似将非阻塞 IO 阻塞在了 select/poll/epoll 中,然而这种形式监听了多个描述符,而且防止了大部分无用的零碎调用开销。

​ 益处是高效:没有切换上下文的开销、没有过程间同步、通信问题,然而害处是开发不便。异步 IO 须要配合基于事件驱动的异步开发模式。在 EventLoop 中,什么事件到来,就调用相应的回调函数进行解决,和传统同步思维有很大不同。(std::function满天飞~,容易毁坏逻辑的连续性)。

协程退场

​ 有种说法是协程是一种轻量级的线程,这是针对它不须要操作系统调度染指、开销小而言,然而我感觉不够精确。实质上,协程就是一个能够暂停的函数,遇到阻塞的时候,就让出 cpu,当条件满足了就持续复原运行。

​ 从另一个角度来说,协程和过程、线程的区别就是两点 – 是否隔离变量,是否主动切换运行上下文。满足这两点区别的,就是协程。

  • 过程:变量隔离,主动切换运行上下文
  • 线程:不变量隔离,主动切换运行上下文切换
  • 协程:不变量隔离,不主动切换运行上下文切换

在不隔离变量的状况下,主动切换运行上下文,几乎就是 bug 之源,这两件事基本就不该同时反对。当前线程这个货色,应该被扫进历史的垃圾堆,线程带来的便当,比起它挖出的坑相比,有余一提。4

​ 之所以在协程的总结小文中上来就谈到这么多 IO 相干内容,是因为 C ++ 中的协程和 IO 有很大关系。协程实用于 IO 密集型利用,而非 CPU 密集型利用。协程的 暂停期待条件满足后复原执行 的特点人造适宜 IO。当文件描述符不可读 / 不可写时,让出 cpu;可读 / 可写后被复原执行,持续解决相应逻辑。让出 cpu 的动作与同步 IO 中过程 / 线程被阻塞而挂起相似,然而防止了过程 / 线程调度开销,而保留了同步 IO 的同步开发方式。这正是协程的弱小之处: 并发 + 同步5


协程分类6

协程能够依照以下 3 个角度进行分类

  • 对称 / 不对称:前者提供让出执行权的原语,后者提供唤起协程和把执行权转交给调用者的原语
  • 第一类 / 受限:协程是不是第一类对象
  • 有栈 / 无栈:协程能在调用栈的任意处挂起,还是只能在协程的函数体内挂起

这里看第三个分类的区别:

此处「有栈」和「无栈」的含意不是指协程在运行时是否须要栈,对于大多数语言来说,一个函数调用另一个函数,总是存在调用栈的;

而是指协程是否能够在其任意嵌套函数中被挂起,此处的嵌套函数能够了解为子函数、匿名函数等。显然有栈协程是能够的,而无栈协程则不能够。

它们的次要区别是:有栈协程能在调用栈的任意处挂起,无栈协程只能在协程的函数体内挂起

有栈协程

很多中央又把协程称为 subroutine,subroutine 是什么,就是函数。上古期间的计算机科学家们早就给出了概念,coroutine 就是能够中断并复原执行的 subroutine,从这个角度来看协程领有调用栈并不是一个奇怪的事件。

咱们再来思考 coroutine 与 subroutinue 相比有什么区别,你会发现区别仅有一个,就是 coroutinue 能够中断并复原,对应的操作就是 yield/resume,这样看来 subroutinue 不过是 coroutinue 的一个子集罢了。

也就是说把协程当做一个非凡的函数调用,有栈协程就是咱们现实中协程该有的模样。

既然把其当做一个非凡的函数调用,对咱们来说最严厉的挑战就是如何像切换函数一样去切换协程,难点在于除了像函数一样切换进来,还要在某种条件满足的时候切换回来,咱们的做法能够是 在协程外部存储本身的上下文,并在须要切换的时候把上下文切换就能够了。

Go 有栈协程

Go 语言的呈现提供了一种新的思路。

Go 语言的协程则相当于提供了一种很低成本的相似于多线程的执行体。

在 Go 语言中,协程的实现与操作系统多线程十分类似。操作系统个别应用抢占的形式来调度零碎中的多线程(即主动调配调度),而 Go 语言中,依靠于操作系统的多线程,在运行时刻库中实现了一个合作式的调度器。

这里的调度真正实现了上下文的切换,简略地说,Go 零碎调用执行时,调度器可能会保留以后执行协程的上下文到堆栈中。而后将以后协程设置为睡眠,转而执行其余的协程。

这里须要留神,所谓的 Go 零碎调用并不是真正的操作系统的零碎调用,而是 Go 运行时刻库提供的对底层操作系统调用的一个封装。

举例说明:Socket recv。咱们晓得这是一个零碎调用,Go 的运行时刻库也提供了简直截然不同的调用形式,但这只是建设在 epoll 之上的模仿层,底层的 socket 是工作在非阻塞的形式,而模仿层提供给咱们了看上去是阻塞模式的 socket。读写这个模仿的 socket 会进入调度器,最终导致协程切换。

目前 Go 调度器实现在用户空间,实质上是一种合作式的调度器。这也是为什么如果写了一个死循环在协程里,则协程永远没有机会被换出,一个 Processor 相当于就被节约掉了。

有栈的协程和操作系统多线程是很类似的。思考以下伪代码:

Go
func routine() int
{
    var a = 5
    sleep(1000)
    a += 1
    return a
}

sleep 调用时,会产生上下文的切换,以后的执行体被挂起,直到约定的工夫再被唤醒。

局部变量 a 在切换时会被保留在栈中,切换回来后从栈中复原,从而得以持续运行。

所谓有栈就是指执行体自身的栈。每次创立一个协程,须要为它调配栈空间。到底调配多大的栈的空间是一个技术活。分的多了,节约,分的少了,可能会溢出。Go 在这里实现了一个协程栈扩容的机制,绝对比拟优雅的解决了这个问题。

无栈协程

相比于有栈协程间接切换栈帧的思路,无栈协程在不扭转函数调用栈的状况下,采纳相似生成器(generator)的思路实现了上下文切换。

那么所谓的无栈协程是什么呢?其实 无栈协程的实质就是一个状态机(state machine),能够看做是有状态的函数(generator),每次执行时会依据以后的状态和输出参数,失去(generate)输入,但不肯定为最终后果。

即:将由 yield 切割的代码组成一个状态机(其实就是迭代器),切换状态 next() 就是执行下一块,如下:

Js
function* anotherGenerator(i) {
    yield i + 1;
    yield i + 2;
    yield i + 3;
}

Rust 的无栈协程7

无栈协程顾名思义就是不应用栈和上下文切换来执行异步代码逻辑的机制。这里异步代码尽管是异步的,但执行起来看起来是一个同步的过程。

从这一点上来看 Rust 协程与 Go 协程也没什么两样。举例说明:

Rust
async fn routine() 
{
    let mut a = 5;
    sleep(1000).await;
    a = a + 1;
    a
}

简直是一样的流程。Sleep 会导致睡眠,当工夫已到,从新返回执行,局部变量 a 内容应该还是 5。

Go 协程是有栈的,所以这个局部变量保留在栈中,而 Rust 是怎么实现的呢?答案就是 Generator 生成的状态机。

Generator 和闭包相似,可能捕捉变量 a,放入一个匿名的构造中,在代码中看起来是局部变量的数据 a,会被放入构造,保留在全局(线程)栈中。

另外值得一提的是,Generator 生成了一个状态机以保障代码正确的流程。从 sleep.await 返回之后会执行 a= a + 1 这行代码。async routine() 会依据外部的 .await 调用生成这样的状态机,驱动代码依照既定的流程去执行。

依照个别的说法。无栈协程有很多益处。首先不必调配栈。因为到底给协程调配多大的栈是个大问题。特地是在 32 位的零碎下,地址空间是无限的。每个协程都须要专门的栈,很显著会影响到能够创立的协程总数。其次,没有上下文切换,性能也更好一些。

总结8

  1. 有栈协程:用 rsp 栈寄存器来索引局部变量,上下文是协程公有的栈。拜访上下文数据也就是局部变量的时候,咱们无需显式的应用栈寄存器 + 偏移量来拜访,而是间接拜访变量名。
  2. 无栈协程:把栈指针寄存器换成 this 指针,将局部变量捕捉作为对象成员,整个上下文打包成一个 generator 构造体对象,能够称之为 面向对象的上下文

    用 this 来索引对象的成员变量,上下文就是对象本人。拜访上下文数据也就是成员变量的时候,咱们无需显式的应用 this+ 成员偏移量(或者变量名)来拜访(通过语法糖),而是间接拜访变量名。


C++ 协程设计9

​ 因为目前 C ++ 规范对协程反对还比拟简陋(C++20 中引入了协程,然而较为底层 + 没有好用的库,间隔可用还有较远距离,乐观来看在 C ++23 中 Asio 进入规范库后有很大施展空间),临时没有一个规范协程库,因而市面上诞生了很多不同类型协程库,设计指标不同,实现形式也有差别。上面就进行一个简短的总结。

协程设计的不同档次

首先,依据协程库的欠缺水平,总体上能够分为四个类型:

1.API 级:

这一级别实现协程上下文切换 api,或增加一些便于应用的封装;特点:没有协程调度。

代表作:boost.context, boost.coroutine, ucontext(unix), fiber(windows)

这一档次的协程库,仅仅提供了一个底层 api,要想拿来做我的项目,还有十分十分边远的间隔;不过这些协程 api 能够为咱们实现本人的协程库提供一个良好的根底。

2. 玩具级:

这一级别实现了协程调度,无需用户手动解决协程上下文切换;特点:没有 HOOK

代表作:libmill

这一档次的协程库,实现了协程调度(相似于操作系统有了过程调度机制); 稍好一些的意识到了阻塞网络 io 与协程的不协调之处,本人实现了一套网络 io 相干函数;

然而这也意味着波及网络的第三方库全副不可用了。hiredis 不能用了,mysqlclient 不能用了。放弃整个 C/C++ 生态全副本人轮,这个玩笑开的有点大,所以只能称之为“玩具级”。

3. 工业框架级:

这一级别以正确的形式 HOOK 了网络 io 相干的 syscall,能够不改代码的兼容大多数第三方库

代表作:libco、libgo

这一档次的协程库,可能模仿被 hook 的 syscall 的行为,可能兼容任何网络 io 行为的同步模型的第三方库。但因为 C++ 的灵活性,用户行为是不受限的,所以仍然存在几个边边角角的难点须要开发者留神:没有 gc(开发者要理解协程的调度机会和生命期),TLS 的问题,用户不按套路出牌、把逻辑代码 run 在协程之外,粗粒度的线程锁等等。

ps : libco 的开源社区很不沉闷,我还提了个 pr 10

4. 语言级

代表作:golang

这一档次的协程库,开发者的所有行为都是受限行为,能够实现无死角的欠缺的协程。

C++ 协程实现计划

回忆协程的暂停而后复原执行的实质,咱们能够总结下协程函数的执行特点(协程函数指协程上运行的函数,相似于线程执行函数,即 pthread_create 中传入的函数)11

  1. 首次调用协程函数,会从堆中调配一个协程上下文,调用方的返回地址、入口函数、交出控制权等信息保留在协程上下文中

    ​ 实际上,这个协程上下文很值得一提。对于一个管制流程而言,所须要的环境不外乎 CPU 上下文(寄存器信息)、栈、堆以及所要执行的代码 四个因素,代码执行地位信息由 RIP 寄存器决定(以 x86-64 为例),而且从堆中调配出的内存地址信息个别也会间接或间接的保留在运行栈中,所以,这四个因素能够简化为寄存器和栈两个因素(这也正是垃圾回收算法之一的可达性剖析算法的原理,以栈中和寄存器中对象作为垃圾回收的终点,遍历所有援用链,不可达的对象就视作垃圾予以回收。而另一种垃圾回收算法是咱们相熟的援用计数算法12)。

    上面是 libco 中 (x86-64 架构) 协程上下文的设计,也正是咱们刚刚所探讨的情景。

    struct coctx_t
    {void *regs[ 14]; // 寄存器信息
        size_t ss_size;   // 协程栈的大小
        char *ss_sp;      // 协程栈低地址

    };
  1. 当协程中途交出控制权后,协程上下文不会删除(相似于线程切换时候保留上下文环境)。
  2. 当协程再次取得控制权后,会主动从协程上下文复原调用环境,而后从上一次交出控制权的下一条语句继续执行。
  3. 协程函数运行结束后,协程上下文将被删除。

联合协程函数的运行特点,依据 C ++ 中协程的不同实现形式,能够分为 有栈协程 无栈协程

有栈协程:

​ 技术路线:一个线程能够创立大量协程,每个协程都会保留一个公有栈(实际上,不肯定每个协程都有一个独立栈,也可能是若干个协程共用一个栈以节俭内存),协程一旦执行到异步 IO 处,就会被动交出控制权。同一线程的所有协程以串行形式合作执行,没有数据争用的问题。

​ 一般来说,一个线程能够蕴含多个有栈协程,而且线程自身也不外乎一个控制流,也能够视作一个协程,线程栈即为协程栈,个别称作主协程。

​ 相比无栈协程,有栈协程无需批改语言规范和编译器,利用操作系统提供的 API 即可实现(例如 Linux 下的 ucontext 一族的 {get,set,make,swap}context 零碎调用,然而这个实现计划性能堪忧,次要是 ucontext 须要须要多一次零碎调用来保留信号掩码 13,须要几百~上千个 cycle,或者也能够采纳手写汇编实现,例如 libco,也能够利用弱小的 boost 库中的 coroutine21415)。另外,有栈协程反对 协程嵌套,也就是在协程函数中能够持续调用其余协程函数,这是因为有了协程栈,调用信息能够入栈失去保留。正是因为有栈协程的这些有点,目前支流的协程库都是以有栈协程为主。

协程栈的实现计划

协程栈的实现也有很多计划可供选择。

  1. 动态栈(Static Stack)

    动态栈就是固定大小的栈,个别每个协程有一个固定大小的独立栈。这种形式实现起来最为简便,然而理论应用中,栈的大小往往难以当时确定。太大了内存利用率低,造成内存节约,且因为内存瓶颈,无奈创立更多的协程,并发不够;太小了则容易造成栈溢出。

  2. 分段栈(Segmented Stack)

    gcc 提供的“黄金链接器”反对一种容许栈内存不间断的编译参数16,实现原理是在每个函数调用结尾都插入一段栈内存检测的代码,如果栈内存不够用了就申请一块新的内存,作为栈内存的连续。

    这种计划本应是最佳的实现,但如果遇到的第三方库没有应用这种形式来编译(留神:glibc 也是这里提到的”第三方库 ”),那就无奈在其中检测栈内存是否须要扩大,栈溢出的危险很大。

  3. 拷贝栈(Copy Stack)

    每次检测到栈内存不够用时,申请一块更大的新内存,将现有的栈内存 copy 过来,就像 std::vector 那样扩大内存。

    在某些语言上是能够实现这样的机制,但 C++ 是有指针的,栈内存的 Copy 会导致指向其内存地址的指针生效;又因为其指针的灵活性(能够加减运算),批改对应的指针成为了一种简直不可能实现的事件,这也正是 C ++ 难以实现追踪式垃圾回收的起因。

    如上面例子所示17,对指针 p 做了自加和自减操作。这在 C /C++ 中是被认为正当的。因为指针有所指向的对象,所以自加或自减能够找到 ” 下一个 ” 同样的对象。

    int main(){
        int *p = new int;
        p += 10;              // 挪动指针
        p -= 10;
        *p = 10;            
    }
  1. 共享栈(Shared Stack)

    申请一块大内存作为共享栈(比方:8MB),每次开始运行协程之前,先把协程栈的内存 copy 到共享栈中,运行完结后再计算协程栈真正应用的内存,copy 进去保存起来,这样每次只需保留真正应用到的栈内存量即可。

    这种计划极大水平上防止了内存的节约,做到了用多少占多少,等同内存条件下,能够启动的协程数量更多,libco 中采纳的就是这种办法。不过毛病是 memcpy 保留复原协程栈须要额定的工夫。

协程调度计划

依据调度形式能够将调度计划分为对称调度和非对称调度。

非对称调度

非对称调度是指所有创立的协程都必须和调用者进行切换,有着明确的调用和返回关系,门路如下:

主协程 -> 协程 A -> 协程 B -> 协程 A -> 主协程 -> 协程 C.....

如果不进行协程嵌套的话,所有协程都必须和主协程进行切换,因而,主协程也被称作调度协程。

对称调度

对称调度是指所有协程之间能够自在切换,个别须要调度器的参加,让出 CPU 后由调度器决定下一个运行哪个协程,如 golang 中的 goroutine 调度。

这两种形式仅仅是单个线程上进行的最根本的协程调度形式,如果谋求多核扩展性还能够配合工作窃取 (Work Stealing18) 设计更简单的调度形式。

无栈协程

​ 技术路线:将异步 IO 封装到协程函数中,协程函数发动异步 IO 后,立刻保留执行环境,而后吧控制权交给调用方(Caller),调用方持续往下执行;异步 IO 实现后,负责解决 IO 实现事件的回调函数取得控制权,回调函数再把控制权转交给发动 IO 的协程,发动 IO 的协程首先复原执行环境,而后从上一次交出控制权的下一条语句持续往下执行

​ 无栈协程中每个协程的执行环境,仅须要蕴含调用栈 (Call Stack) 的顶层栈帧(Top Call Stack Frame),而非整个调用栈,空间开销极小,然而因而不能嵌套调用。另外,无栈协程须要语言规范和编译器的反对(编译器将协程翻译成状态机),C++20 中引入的就是无栈协程。


HOOK 模块

基本原理与实现形式

​ hook 意为钩子,在 Linux 下,能够通过 hack 掉 glibc 的 readwrite 等 IO 操作函数,将其替换本人的实现,达到额定进行某些操作的目标。例如,咱们实现本人的协程框架的时候,个别心愿当文件描述符不可读时主动让出 CPU,而不是将协程阻塞(协程阻塞会阻塞运行协程的线程!),咱们就能够通过劫持 read 函数,替换为本人的实现达到该目标。

​ 基本原理是应用 LD_PRELOAD 环境变量,影响程序运行时的链接。因为 linux 下的动静连接器存在规定:当一个符号退出全局符号表的时候,如果雷同的符号名曾经存在,则前面退出的符号被疏忽。因为 glic 是最初链接的,那么通过定义在程序运行前优先加载的动态链接库,就能够达到用本人的.so 函数替换原有的函数。

​ 因为 hook 的目标都是在原有的函数行为上减少一些额定操作,并不需要扭转函数的行为(例如通过 hook malloc 和 free 函数,咱们能够查看是否所有调配的内存都失去了开释),所以咱们个别须要获取被 hook 的函数。这一操作是通过 dlsym 函数实现的(Linux 下的 dlfcn.h 给予了咱们运行时加载和解决动态链接库的能力)。

​ 如下所示,应用 RTLD_NEXT 来获取零碎 glibc 的 malloc 函数地址,因为待会应用 LD_PRELOAD 来优先加载咱们创立的 so 文件,因此零碎的 libc.so.6 排在第二位,也就是 next 19

#include<dlfcn.h>
void* (*g_sys_malloc)(size_t) = dlsym(RTLD_NEXT, "malloc");

void* malloc(size_t size)
{void *p = g_sys_malloc(size);
    printf("in malloc hook function ...\n");
    return p;
}

根本守则与具体实际

​ HOOK 中有一条重要的根本守则:HOOK 接口体现进去的行为与被 HOOK 的接口放弃 100% 统一。因为在协程中还是须要兼容应用第三方的同步网络库,接口行为不统一会导致第三方库莫名其妙的谬误。

​ HOOK 的根本思维是但凡有可能引起阻塞的零碎调用或库函数都要被 hook 住,因为协程自身是不能被阻塞的,一旦阻塞运行协程的线程也会跟着阻塞,其余协程也就无奈失去运行。

​ 以 libco 为例,罕用的 read、recv、write、send、connect 等都失去了 hook,与 epoll 配合应用。以 read 函数为例,咱们本人的 read 函数的行为应该如下(将 glibc 中的 read 函数记为g_sys_read

ssize_t read(int fd, void *buf, size_t count){将以后协程注册到定时器上,注册回调函数用于未来解决 g_sys_read() 函数的读超时
      
  调用 epoll_ctl() 将本人 (op = EPOLLIN, fd = current fd) 注册到以后执行环境的 epoll 实例上,注册回调函数用于解决可读事件
      
  调用 co_yield_env 函数让出 CPU
      
  等到该协程被主协程从新“唤醒”后继续执行
}

此外,为了满足用户的不同需要,咱们还须要进行判断

  1. 如果用户没有enable hook,那么不应用协程,那么间接返回g_sys_read
  2. 如果用户enable hook,然而将文件描述符设置为NonBlock,这代表用户本人解决 IO,返回非阻塞的g_sys_read
  3. 其余状况,应用本人的 read 函数 hookg_sys_read函数。

补充5

纤程

​ 事实上,仅仅有了协程还不够,如果想要真正应用,还须要额定的调度器。否则须要使用者本人 yield、resume 十分不敌对。后面咱们无心中将协程和调度机制绑定了在一起,然而实际上并不是这样的。除了咱们目前始终探讨的协程 (coroutine) 外,还有另外一种叫做 fiber 的纤程,fiber 能够视作是实现了调度机制的协程。在 boost 库中,boost::fiber = boost::coroutine2+ 调度机制。

正确应用姿态

​ 协程实用于 IO 密集的场景,次要是一直的创立协程,让调度器调度运行,在协程运行过程对于一些会阻塞的条件,做一个非阻塞的检测中, 发现条件不满足就让出 cpu,这就是常见轮转 + 非阻塞同步。

无栈协程的实现参考

​ 无栈协程因为每个中断点都是确定,那其实只须要将函数的代码再进行细分,保留好局部变量,做好调用过程的状态变动, 上面就将一个协程函数 fn 进行切分后变成一个 Struct,这样的实现绝对于有栈协程而言应用的内存更少。

​ 从上面代码就不难理解,为什么咱们说无栈协程实质上是一个状态机了,也可能了解,为什么无栈协程须要语言和编译器的反对(须要编译器将代码翻译成状态机)。

void fn(){
    int a, b, c;
    a = b + c;
    yield();
    b = c + a;
    yield();
    c = a + b;
}

---------------------------- 分割线 ---------------------------------
Struct fn{
    int a, b, c;
    int __state = 0;
    
    void resume(){switch(__state) {
        case 0:
             return fn1();
        case 1:
             return fn2();
        case 2:
             return fn3();}
    }
    
    void fn1(){a = b + c;}
    
    void fn2(){b = c + a;}
    
    void fn3(){c = a + b;}
};

libco 实现剖析

以 libco 源码为例,作一个简短的剖析

libco 采纳共享栈作为协程栈,栈的大小 128KB,每次运行协程前将保留的协程栈拷贝到共享栈下面运行。

如下是共享栈的构造,能够看到,一个共享栈中实际上并非只有一个栈,而是能够指定数目。如果有多个共享栈,在调配的时候采纳 round-robin 算法抉择一个。

struct stShareStack_t
{
    unsigned int alloc_idx;  // 以后下标,用于 round-robin 算法
    int stack_size;    // 共享栈大小
    int count;    // 共享栈数量
    stStackMem_t** stack_array; // 若干个共享栈
};

static stStackMem_t* co_get_stackmem(stShareStack_t* share_stack)
{if (!share_stack)
    {return NULL;}
    int idx = share_stack->alloc_idx % share_stack->count;  // 采纳 round-robin 算法调配共享栈
    share_stack->alloc_idx++;

    return share_stack->stack_array[idx];
}

接下来去看共享栈具体内存的构造,也没有什么非凡之处。

struct stStackMem_t
{
    stCoRoutine_t* occupy_co; // 以后运行的协程              stack_bp ---  高地址
    int stack_size;    // 栈大小,默认 128KB                                            |
    char* stack_bp; //stack_buffer + stack_size                    |
    char* stack_buffer; // 栈顶(低地址)stack_buffer --- 低地址
};

上面就是协程的主体构造,基本上都是最根本的信息,没有什么额定性能(例如信号掩码之类)。将必要信息加在了正文中不便了解,不再一一赘述。

struct stCoRoutine_t
{
    stCoRoutineEnv_t *env;         // 以后线程的协程运行环境
    pfn_co_routine_t pfn;          // 协程函数   ==  typedef void* (*pfn_co_routine_t)(void*);
    void *arg;                     // 函数参数
    coctx_t ctx;                // 协程上下文

    char cStart;                // 标记位,1 为协程曾经启动,否则为 0
    char cEnd;                    // 标记为,1 为协程曾经完结,否则为 0
    char cIsMain;                // 是否是主协程
    char cEnableSysHook;        // 是否启用 hook
    char cIsShareStack;            // 是否应用的是共享栈

    void *pvEnv;                // 零碎环境变量无关

    //char sRunStack[1024 * 128];
    stStackMem_t* stack_mem;     // 以后线程应用的共享栈


    //save satck buffer while confilct on same stack_buffer;
    char* stack_sp;             // 保留协程栈
    unsigned int save_size;
    char* save_buffer;

    stCoSpec_t aSpec[1024];
};
struct stCoRoutineEnv_t
{stCoRoutine_t *pCallStack[ 128];             // 作为协程调用栈应用,反对协程嵌套调用。能够看到,最多反对 128 个协程嵌套
    int iCallStackSize;                            // 记录入栈的协程数量, 便于 resume()(总是 resume()栈顶的协程)stCoEpoll_t *pEpoll;                        // 封装的 epoll 构造体,每个线程一个,阻塞时将以后协程监听的事件注册到该 epoll 下面,同时让出 CPU

    //for copy stack log lastco and nextco
    stCoRoutine_t* pending_co;                    // 要执行的下个协程
    stCoRoutine_t* occupy_co;                    // 要执行的上个协程
};
struct coctx_t
{void *regs[ 14];                            // 保留寄存器组
    size_t ss_size;                                // 协程理论用到的栈的大小
    char *ss_sp;                                // 保留协程栈的首地址
    
};

上面以一个最简略的、什么都不做、只输入 echo 信息的 demo 来看 libco 的次要执行流程

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <unistd.h>
#include <sys/time.h>
#include <errno.h>
#include <string.h>
#include "../coctx.h"
#include "../co_routine.h"
#include "../co_routine_inner.h"


void* RoutineFunc(void* args){co_enable_hook_sys();
    int* routineid = (int*)args;
    while (true)
    {printf("echo from routine %d\n", *routineid + 1);
        co_yield_ct();                                                // 协程暂停执行}
    return NULL;
}


int main()
{stShareStack_t* share_stack= co_alloc_sharestack(1, 1024 * 128);  // 1. 创立共享栈
    stCoRoutineAttr_t attr;
    attr.stack_size = 0;
    attr.share_stack = share_stack;

    stCoRoutine_t* co[2];                                            
    int routineid[2];
    for (int i = 0; i < 2; i++)
    {routineid[i] = i;
        co_create(&co[i], &attr, RoutineFunc, routineid + i);          // 2. 创立协程
    }

    while(true){
        std::cout << "before resume1" << std::endl;
        co_resume(co[0]);                                              // 3. 执行协程 0
        std::cout << "after resume1 and before resume2" << std::endl;
        co_resume(co[1]);                                              // 4. 执行协程 1
        std::cout << "after resume2" << std::endl;
    }
    return 0;
}

输入为

before resume1
echo from routine 1
after resume1 and before resume2
echo from routine 2
after resume2
...

第一步是创立共享栈

stShareStack_t* co_alloc_sharestack(int count, int stack_size)
{stShareStack_t* share_stack = (stShareStack_t*)malloc(sizeof(stShareStack_t));             // 1. 创立共享栈构造体
    share_stack->alloc_idx = 0;
    share_stack->stack_size = stack_size;

    //alloc stack array
    share_stack->count = count;
    stStackMem_t** stack_array = (stStackMem_t**)calloc(count, sizeof(stStackMem_t*));        // 2. 创立共享栈数组
    for (int i = 0; i < count; i++)
    {stack_array[i] = co_alloc_stackmem(stack_size);                                        // 3. 创立共享栈理论空间
    }
    share_stack->stack_array = stack_array;
    return share_stack;
}

第二步就是创立协程了, 进入 co_create 函数

int co_create(stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg)
{if( !co_get_curr_thread_env() ) 
    {co_init_curr_thread_env();                                                    // 如果是第一次创立协程,尚未初始化以后线程的协程运行环境,}                                                                                // 那么首先初始化协程运行环境
    stCoRoutine_t *co = co_create_env(co_get_curr_thread_env(), attr, pfn,arg );   // 创立协程
    *ppco = co;
    return 0;
}

首先判断是不是曾经筹备好了以后线程环境(第一个协程负责初始化线程环境)

线程环境是一个动态 TLS 变量stCoRoutineEnv_t,再次回顾一下这个构造

struct stCoRoutineEnv_t
{stCoRoutine_t *pCallStack[ 128];             // 作为协程调用栈应用,反对协程嵌套调用。能够看到,最多反对 128 个协程嵌套
    int iCallStackSize;                            // 记录入栈的协程数量, 便于 resume()(总是 resume()栈顶的协程)stCoEpoll_t *pEpoll;                        // 封装的 epoll 构造体,每个线程一个,阻塞时将以后协程监听的事件注册到该 epoll 下面,同时让出 CPU

    //for copy stack log lastco and nextco
    stCoRoutine_t* pending_co;                    // 要执行的下个协程
    stCoRoutine_t* occupy_co;                    // 要执行的上个协程
};

如果是创立第一个协程,那么首先初始化协程运行环境

void co_init_curr_thread_env()
{gCoEnvPerThread = (stCoRoutineEnv_t*)calloc(1, sizeof(stCoRoutineEnv_t) );
    stCoRoutineEnv_t *env = gCoEnvPerThread;

    env->iCallStackSize = 0;
    struct stCoRoutine_t *self = co_create_env(env, NULL, NULL,NULL);                // 创立主协程(即调度协程),也就是将线程自身视作协程
    self->cIsMain = 1;

    env->pending_co = NULL;
    env->occupy_co = NULL;

    coctx_init(&self->ctx);                                                        // 初始化协程构造体  ==>  memset(ctx, 0, sizeof(*ctx));

    env->pCallStack[env->iCallStackSize++] = self;                                // 将主协程入栈,因而任何时候,无论如何出栈,最终都会返回主协程,而主协程永不退出(永不 yield)

    stCoEpoll_t *ev = AllocEpoll();                                                    // 注册线程的 epoll 实例
    SetEpoll(env,ev);
}

接下来真正开始创立协程

struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
        pfn_co_routine_t pfn,void *arg )
{

    // 设置协程属性,次要是设置协程栈的大小
    stCoRoutineAttr_t at;
    if(attr)
    {memcpy( &at,attr,sizeof(at) );
    }
    if(at.stack_size <= 0)
    {at.stack_size = 128 * 1024;                                            // 栈最小为 128KB}
    else if(at.stack_size > 1024 * 1024 * 8)
    {at.stack_size = 1024 * 1024 * 8;                                    // 栈最大为 8MB}

    if(at.stack_size & 0xFFF)                                             // 栈大小 4K 对齐
    {
        at.stack_size &= ~0xFFF;
        at.stack_size += 0x1000;
    }

    stCoRoutine_t *lp = (stCoRoutine_t*)malloc(sizeof(stCoRoutine_t) );
    
    memset(lp,0,(long)(sizeof(stCoRoutine_t))); 


    lp->env = env;                                                            // 设置协程环境
    lp->pfn = pfn;                                                            // 设置协程函数
    lp->arg = arg;                                                            // 设置协程参数

    stStackMem_t* stack_mem = NULL;
    if(at.share_stack)
    {stack_mem = co_get_stackmem( at.share_stack);                        // 应用 round-robin 算法调配一个共享栈
        at.stack_size = at.share_stack->stack_size;
    }
    else
    {stack_mem = co_alloc_stackmem(at.stack_size);                        // 或者应用独立栈
    }
    lp->stack_mem = stack_mem;

    lp->ctx.ss_sp = stack_mem->stack_buffer;                                // 给协程上下文注册栈地址以及栈大小
    lp->ctx.ss_size = at.stack_size;                                        

    lp->cStart = 0;
    lp->cEnd = 0;
    lp->cIsMain = 0;
    lp->cEnableSysHook = 0;
    lp->cIsShareStack = at.share_stack != NULL;

    lp->save_size = 0;
    lp->save_buffer = NULL;

    return lp;
}

创立协程至此就临时告一段落,不过创立后协程还并没有跑起来,须要 co_resume 使协程跑起来。

void co_resume(stCoRoutine_t *co)
{
    stCoRoutineEnv_t *env = co->env;
    stCoRoutine_t *lpCurrRoutine = env->pCallStack[env->iCallStackSize - 1];   // 获得以后运行的协程(如果是第一次运行协程,那么是主协程)if(!co->cStart)
    {coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );                     // 如果是第一次运行,那么初始化要运行的协程
        co->cStart = 1;
    }
    env->pCallStack[env->iCallStackSize++] = co;                                 // 要运行的协程存入协程调用栈
    co_swap(lpCurrRoutine, co);                                                 // 将要运行的协程和以后协程进行切换
}

在咱们持续摸索之前,须要回顾一下函数调用和汇编的常识。同时看一下 14 个寄存器的规定20

push param // 压入函数参数
call // 压入调用函数的下一条指令的地址
push ebp  // 栈帧开始
mov ebp, esp // esp 指向栈顶元素的凑近栈的的地址
push ... // 保留寄存器 (分为调用者保留和被调用者保留两局部,依据指令集的不同)
// 创立局部变量
pop ... // 弹出寄存器
mov esp, ebp
pop ebp
ret 
    
// low | regs[0]: r15 |
//    | regs[1]: r14 |
//    | regs[2]: r13 |
//    | regs[3]: r12 |
//    | regs[4]: r9  |
//    | regs[5]: r8  |
//    | regs[6]: rbp |
//    | regs[7]: rdi |
//    | regs[8]: rsi |
//    | regs[9]: ret |  //ret func addr
//    | regs[10]: rdx |
//    | regs[11]: rcx |
//    | regs[12]: rbx |
// hig | regs[13]: rsp |
 
    在 64 位下保留 14 个寄存器。咱们晓得 X86 架构下有 8 个通用寄存器,X64 则有 16 个寄存器,那么为什么 64 位只应用保留 14 个寄存器呢?咱们能够在 coctx_swap.S 中看到 64 位下短少了对 %r10, %r11 寄存器的备份,x86-64 的 16 个 64 位寄存器别离是:%rax, %rbx, %rcx, %rdx, %esi, %edi, %rbp, %rsp, %r8-%r15。其中:%rax 作为函数返回值应用
%rsp 栈指针寄存器,指向栈顶
%rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函数参数,顺次对应第 1 参数,第 2 参数
%rbx,%rbp,%r12,%r13,%14,%15 用作数据存储,遵循被调用者爱护规定,简略说就是轻易用,调用子函数之前要备份它,以防被批改
%r10,%r11 用作数据存储,遵循调用者爱护规定,简略说就是应用之前要先保留原值
咱们来看看两个生疏的名词调用者爱护 & 被调用者爱护:调用者爱护:示意这些寄存器上存储的值,须要调用者 (父函数) 本人想方法先备份好,否则过会子函数间接应用这些寄存器将有情的笼罩。如何备份?当然是实现压栈(pushl), 等子函数调用实现,再通过栈复原(popl)
被调用者爱护:即示意须要由被调用者 (子函数) 想方法帮调用者 (父函数) 进行备份

这些就足够了!

int coctx_make(coctx_t* ctx, coctx_pfn_t pfn, const void* s, const void* s1) {char* sp = ctx->ss_sp + ctx->ss_size - sizeof(void*);
  sp = (char*)((unsigned long)sp & -16LL);                                            // 获得栈底地址(高地址,栈是从高向低成长)memset(ctx->regs, 0, sizeof(ctx->regs));
  void** ret_addr = (void**)(sp);                                                    
  *ret_addr = (void*)pfn;                                                            // 为什么 coctx_make()中须要将返回函数的地址赋值给 rsp?// 前面会进行解答
  ctx->regs[kRSP] = sp;                                                                // 设置寄存器

  ctx->regs[kRETAddr] = (char*)pfn;                                                    // 设置返回地址,即协程函数地址

  ctx->regs[kRDI] = (char*)s;                                                        // 设置函数参数
  ctx->regs[kRSI] = (char*)s1;
  return 0;    
}

接下来就真正开始进行切换!

void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{stCoRoutineEnv_t* env = co_get_curr_thread_env();

    //get curr stack sp
    char c;                                                    // 这里是想取得以后协程的应用的栈的大小,为此,咱们须要取得栈顶的地址。curr->stack_sp= &c;                                        // 这里用了一个小 trick,在栈上申明了一个 char 变量,其地址对齐后就是栈顶地址。// 如果不了解,请看一下下面的函数调用栈帧图
    
    // get curr stack sp 以及对应的汇编
    // char c;                                                // lea 0x7(%rsp), %rdx    这里 rsp + 7 是因为 char 变量无需对齐,而 rsp 须要 8 字节对齐
    // curr->stack_sp= &c;                                    // mov %rdx, 0xb0(%rbp)
    
    
    
    if (!pending_co->cIsShareStack)                            // 如果应用独立栈就没必要保留栈了
    {
        env->pending_co = NULL;
        env->occupy_co = NULL;
    }
    else 
    {
        env->pending_co = pending_co;                    
        //get last occupy co on the same stack mem        
        stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
        //set pending co to occupy thest stack mem;
        pending_co->stack_mem->occupy_co = pending_co;

        env->occupy_co = occupy_co;
        if (occupy_co && occupy_co != pending_co)
        {save_stack_buffer(occupy_co);                        // 保留要被笼罩的栈
        }
    }

    //swap context
    coctx_swap(&(curr->ctx),&(pending_co->ctx) );                // 进行寄存器上下文的切换

    //stack buffer may be overwrite, so get again;
    stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
    stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
    stCoRoutine_t* update_pending_co = curr_env->pending_co;
    
    if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
    {
        //resume stack buffer
        if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
        {memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);  // 复原要运行的协程的栈
        }
    }
}
void save_stack_buffer(stCoRoutine_t* occupy_co)
{
    ///copy out
    stStackMem_t* stack_mem = occupy_co->stack_mem;
    int len = stack_mem->stack_bp - occupy_co->stack_sp;              // 栈底(高地址)- 栈顶(方才通过定义 char 变量取得的低地址)即为理论应用的大小

    if (occupy_co->save_buffer)
    {free(occupy_co->save_buffer), occupy_co->save_buffer = NULL;
    }

    occupy_co->save_buffer = (char*)malloc(len); //malloc buf;
    occupy_co->save_size = len;

    memcpy(occupy_co->save_buffer, occupy_co->stack_sp, len);
}
.globl coctx_swap
#if !defined(__APPLE__)
.type  coctx_swap, @function
#endif
coctx_swap:
    leaq (%rsp),%rax                    // 首先保留以后寄存器上下文到 current->ctx 中,current->ctx 也就是第一个参数,依据传参规定第一个参数通过 rdi 传入 
    movq %rax, 104(%rdi)
    movq %rbx, 96(%rdi)
    movq %rcx, 88(%rdi)
    movq %rdx, 80(%rdi)
    movq 0(%rax), %rax
    movq %rax, 72(%rdi) 
    movq %rsi, 64(%rdi)
    movq %rdi, 56(%rdi)
    movq %rbp, 48(%rdi)
    movq %r8, 40(%rdi)
    movq %r9, 32(%rdi)
    movq %r12, 24(%rdi)
    movq %r13, 16(%rdi)
    movq %r14, 8(%rdi)
    movq %r15, (%rdi)
    xorq %rax, %rax

    movq 48(%rsi), %rbp                     // 从要运行的 pending->ctx 中复原寄存器的值,pending->ctx 也就是第二个参数,依据传参规定通过 rsi 传入
    movq 104(%rsi), %rsp
    movq (%rsi), %r15
    movq 8(%rsi), %r14
    movq 16(%rsi), %r13
    movq 24(%rsi), %r12
    movq 32(%rsi), %r9
    movq 40(%rsi), %r8
    movq 56(%rsi), %rdi
    movq 80(%rsi), %rdx
    movq 88(%rsi), %rcx
    movq 96(%rsi), %rbx
    leaq 8(%rsp), %rsp
    pushq 72(%rsi)                            // regs[9] 中保留了协程函数运行地址,即压入栈,ret 时即可胜利跳转

    movq 64(%rsi), %rsi
    ret

读到这里,你可能会有一个疑难,为什么保留寄存器上下文的时候将 rsp 保留到了 regs[kRETAddr]? 换言之,为什么 rsp 保留了返回地址?同时,我在 coctx_make()的正文中也留下了问题“为什么 coctx_make()中须要将返回函数的地址赋值给 rsp?“这须要函数调用的常识来解答。

---------
ret addr
---------
old rbp          <--- RBP
---------
local var1 
---------
local var2 
---------
fcn param #2
---------
fcn param #1
---------
ret addr          <--- RSP
---------
old rbp
---------

如图是 coctx_swap 执行前的栈帧,能够看到,RBP 依然指向上个栈帧的基地址,RSP 指向函数返回地址。

如果作为失常的函数调用,那么第一件事件就是要开拓本人的栈帧,也就是

push rbp
mov rbp, rsp

这两个动作由编译器主动插入,而后才将 rsp 增长到栈底。然而如果没有认真思考的话,很容易误以为 rsp 默认就指向栈顶,从而产生为什么 rsp 指向的是函数地址的纳闷。

显然在 coctx_swap 中,咱们并没有开拓本人的栈帧,因而 rsp 指向的依然是函数的返回地址,所有都失去了解释!

在协程中让出 cpu,操作就简略的多啦

void co_yield_env(stCoRoutineEnv_t *env)
{stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2];    // 实际上是主协程
    stCoRoutine_t *curr = env->pCallStack[env->iCallStackSize - 1];     // 以后正在运行的协程

    env->iCallStackSize--;

    co_swap(curr, last);
}

参考资料


  1. [基于汇编的 C/C++ 协程]: https://cloud.tencent.com/dev… ↩
  2. [IO 模型和基于事件驱动的 IO 多路复用模式]: https://zhuanlan.zhihu.com/p/… ↩
  3. [C10K problem]: https://en.wikipedia.org/wiki… ↩
  4. [Node.js 真的有协程吗?]https://www.zhihu.com/questio… ↩
  5. [深入浅出 c ++ 协程]:https://www.cnblogs.com/ishen… ↩
  6. [编程中的“协程”思维]https://alsritter.icu/posts/4… ↩
  7. [浅谈 Rust 和 Golang 协程设计]https://computeinit.com/archi… ↩
  8. [Node.js 真的有协程吗?]https://www.zhihu.com/questio… ↩
  9. [C++ 协程的近况、设计与实现]: https://www.jianshu.com/p/837… ↩
  10. [libco 中内存节约的 pr]:https://github.com/Tencent/li… ↩
  11. [C++ 协程综述]:http://qiusuoge.com/16296.html ↩
  12. [GC 垃圾回收的原理和波及的几种算法]:https://segmentfault.com/a/11… ↩
  13. [Why ucontext is so slow]https://news.ycombinator.com/… ↩
  14. [What is the difference between Coroutine, Coroutine2 and Fiber?]https://stackoverflow.com/que… ↩
  15. [boost.coroutine2]https://www.boost.org/doc/lib… ↩
  16. [GCC split stacks]https://gcc.gnu.org/wiki/Spli… ↩
  17. [深刻了解 C ++11:C++11 新个性解析与利用 5.2.4 节]https://book.douban.com/subje… ↩
  18. [Brpc bthread 简介]https://zhuanlan.zhihu.com/p/… ↩
  19. [linux hook 机制钻研]https://zhuanlan.zhihu.com/p/… ↩
  20. [libco 源码解析 1]https://blog.csdn.net/weixin_… ↩
正文完
 0