关于c++:万字好文从无栈协程到C异步框架

61次阅读

共计 39521 个字符,预计需要花费 99 分钟才能阅读完成。

导语 | 本文咱们将尝试对整个 C++ 的协程做深入浅出的分析,不便大家的了解。再联合下层的封装,最终给出一个 C++ 异步框架理论业务应用的一种状态,不便大家更好的在理论我的项目中利用无栈协程。

浅谈协程

在开始开展协程前,咱们先来看一下一些非 C++ 语言中的协程实现。

(一)其余语言中的协程实现

很多语言外面,协程是作为 “ 一类公民 ” 间接退出到语言个性中的, 比方:

  • Dart1.9 示例代码
Future<int> getPage(t) async {var c = new http.Client();
 try {var r = await c.get('http://xxx');
  print(r);
  return r.length();} finally {await c.close();
 }
}
  • Python 示例代码
async def abinary(n):
  if n <= 0:
    return 1
  l = await abinary(n-1)
  r = await abinary(n-1)
  return l + 1 + r
  • C# 示例代码
aysnc Task<string> WaitAsync()
{await Task.Delay(10000);
 return "Finished";
}
  • 小结

泛滥语言都实现了本人的协程机制, 通过下面的例子, 咱们也能看到, 相干的机制使函数的执行特殊化了, 变成了能够屡次中断和重入的构造. 那么如果 C++ 要反对这种机制, 会是一个什么状况呢? 接下来咱们将先从最根本的原理逐渐开展相干的探讨。

(二) 从操作系统的调度说起

咱们接触的支流的操作系统, 如 Windows, 或者 Linux, 或者 MacOS, 都是抢占式多任务的操作系统, 所以大家对抢占式多任务的操作系统会比拟相熟. 相干的概念就是 过程 -> 线程 这些, 基本上各种语言通过操作系统提供的 Api, 都能间接获取操作系统提供的这些能力了. 其实操作系统按工作的调度形式来辨别, 有以下两种模式:

  • 合作式多任务操作系统
  • 抢占式多任务操作系统

抢占式多任务操作系统咱们刚刚说过了, 而协程自身的个性, 跟合作式多任务操作系统所提供的机制基本一致, 对于每个 Task, 咱们能够屡次的中断和继续执行, 说到这里, 相熟 Dos 开发的同学必定就会想到 “INT 21H” 了, 这个其实就是咱们晚期利用相干机制来实现多任务协同目标的一种形式了, 咱们也能够看成这是协程最早的雏形。

聊到中断, 其中比拟重要的就是执行环境的保留和复原了, 而上下文的保留能力能够是操作系统间接提供的, 也能够是程序机制本身所提供的了, 综上所述, 咱们大抵能够将 c++ 中的协程的实现计划的迭代看成如下状况:

  • 最早利用 setjump 来实现的合作式任务调度器
  • 零碎级实现, 如 linux 提供的 ucontext 相干 API, Windows 提供的 Fiber 相干的 Api
  • 由零碎级实现所衍生出的高性能计划, 个别是借签零碎级的实现, 移除一些非必须的操作所达成的, 代表的计划有大家熟知的 libco 和 boost::context, 也就是咱们通常所说的有栈协程实现
  • 无栈实现, 最开始是纯正应用 duff device hack 进去的计划, 后续被 MS 规整, 局部个性依赖 compiler 实现, 逐渐演化成当初的 c++20 coroutine 机制了。

(三)协程的执行简介

理解了协程在 C++ 中的局部历史, 咱们来简略理解一下协程的执行机制, 这里咱们间接以 C++20 为例, 先来看一下概览图:

对于协程的执行, 咱们次要关注以下这些中央:

  • 中断点和重入点的定义

有栈协程和无栈协程定义中断点和重入点的形式和机制略有差别, 执行到中断点和重入点的时候大家应用的保留和复原机制不太一样, 但以 Host App 的视角来看, 整体的执行过程其实是比拟统一的。

这里咱们是以 C++20 的无栈协程来举例的, 通过图中的关键字 co\_await,咱们定义了 point1 和 point2 两个成对的中断点和重入点。

咱们来看一下协程执行到中断点和重入点的时候具体产生的事件: 中断点: 协程中断执行的时候, 咱们须要对以后的执行状态:

  • 协程执行到哪了。
  • 协程以后应用的 context 进行保留, 并将程序的执行权归还给外界. 此时咱们也能够返回必要的值给外界, 不便外界更好的对协程的后续执行进行管制。

重入点: 重入点是由中断点带进去的概念, 既然函数的执行可能被中断 (suspend), 那咱们必定也须要提供机制相干的机制复原协程的执行了, 在简单执行的时候, 咱们须要对协程保留的执行状态进行复原:

  • 复原到上次挂起执行的中央继续执行
  • 复原保留的 context
  • 传递必要的值到协程

整个协程的执行区别于一般函数的单次执行返回后果,个别都会有屡次的中断与重入,直到协程执行实现或者被外界强行停止。

而有栈协程和无栈协程的实现,差别最大的中央就是如下两点了:

  • 怎么保留和复原以后的执行地位。
  • 怎么保留和复原以后协程援用到的内存 (变量等) 本篇次要偏重无栈协程, 无栈协程相干的机制后续会具体开展. 对有栈协程相干机制感兴趣的能够翻阅 libco 或 boost.context 相干的内容进行理解。

(四)小议无栈协程的呈现

其实之前介绍 C++ 协程历史的时候, 咱们有一个问题没有开展, 为啥有了像 libco, 与 boost.context 这样的高性能有栈协程实现机制后, 规范委员会还会持续寻求无栈协程的解决方案, 并最终将其作为 C++ 协程的实现机制呢, 这里剖析次要的起因是为了解决有栈协程人造存在的限度:

  • 业务复杂度收缩带来的爆栈问题
  • 应用过大的栈, 又会导致协程自身的切换开销回升或者占用内存过多.

而无栈协程解决这些问题的形式也十分间接, 既然栈会导致问题, 那么咱们就间接去除对栈的依赖, 通过其余形式来解决数据存储拜访的问题。

目前次要的计划是如下两种:

  • Duff Device Hack 实现

咱们前面介绍的 C++17 的实现就是基于这种计划, 因为仅仅是框架级的实现, 咱们可能应用的实现形式会受到限制, 计划自身存在如栈变量的应用有严格的限度等问题, 但对于一些非凡的场合, 如基于寄存器实现的 lua vm, 这种形式会比拟符合。

  • C++20 的 Coroutine

通过前面的剖析, 咱们其实会发现这与 Duff Device Hack 实现是一脉相承的, 只是通过 compiler 的配合, 像栈变量的主动解决等机制, 保障了用户能够低心智累赘的应用它. 但同时, 绝对其余语言的实现, 因为相干个性的设计是 ” 面向库作者的实现 ”, 理论应用根本都须要二次封装, 也就带来了社区很多负面的声音。

(五)小结

后面咱们对 C++ 中协程的历史做了简略的铺垫, 接下来咱们将对 C++17 中基于 Duff Device Hack 的无栈协程实现, 以及 C++20 中的无栈协程做更深刻的介绍。

C++17 Stackless Coroutine 实现

在异步操作比拟多的状况下, 咱们就思考用协程来取代原来的 Callback 设计. 但过后的 GCC 用的是 8.3 版本, 并不反对 coroutine20, 所以咱们最终采纳的是一个基于 C++17 的无栈协程实现计划, 也就是应用后面介绍的 Duff Device Hack 形式实现的无栈协程. 咱们先来看下过后的我的项目背景。

(一)我的项目的背景介绍

过后的状况也比较简单, R 工作室内有多个我的项目处于预研的状态, 所以大家打算协同共建一个工作室内的后盾 C++ Framework, 用于工作室内几个预研我的项目中. 其中比拟重要的一部分就是协程了, 过后引入协程的形式和目标都比拟间接, 首先是应用 Duff Device Hack 的机制来实现整个无栈协程. 另外就是整个外围指标是心愿通过引入协程和相干的调度器来帮忙简化多节点的异步编程反对. 整个框架蕴含的几大部分如下图所示, Coroutine 机制以及相干的 Scheduler 封装是在 app\_service 中作为 C++ 微服务的基础设施存在的。

理论应用下来, 协程和调度器次要带来了以下这些长处:

  • 防止大量两头类的定义和应用。
  • 基于逻辑过程自身用串行的形式实现相干代码即可 (可参考后续切场景的例子)
  • 更容易写出数据驱动向的实现。
  • 还有比拟要害的一点, 能够无效防止过多的异步 Callback 导致的逻辑凌乱和难于跟踪调试的问题。

(二)为何从 C++17 说起

咱们为什么先从 C++17 的无栈协程开始介绍, 这是因为 C++17 的实现与 20 的实现一脉相承. 如果咱们剖析 C++ 20 通过 Compiler 加工后的代码, 就会发现这点. 相比于 C++20 协程大量的细节暗藏在 Compiler 的解决中 (当然咱们前面也会介绍怎么查看 Compiler 解决的这部分逻辑), C++17 的计划, 整个组织都在咱们本人的代码层面, 用于了解无栈协程的整体实现显然是更适合的. 另外, 相干的调度器的实现, 与 C++17 和 C++20 都是兼容的, 像咱们我的项目过后的实现, 是能够很好的做到 C++20 与 C++17 的协程混用的, 也样也不便在过渡阶段, 我的项目能够更平滑的从 C++17 向 C++20 迁徙. 另外, 对于一些不反对 C++20 的受限应用场景, C++17 仍然具备它的实用性.

(三)实现概述

咱们先来看一下整个机制的概览图:

从上图中咱们可能理解到, 整个基于 Duff Device Hack 的无栈协程实现的形式. 首先咱们通过 CoPromise 对象来保留用作协程的 std::function 对象, 另外咱们也会记录协程以后的执行状态, 其次, 咱们还会在 CoPromise 中内置一个 std::tuple<> 用于保留咱们须要在协程挂起和复原时保留的状态值。

另外, 整个外围的执行机制是依赖于几个外围宏所组成的 switch case 状态机来驱动的. 联合上非凡的 LINE 宏, 咱们能够在每个 co\_await() 对象调用的时候, 设置 CoPromise 对象以后的执行状态为 LINE**, 而下次跳转的时候, 通过 switch(state) 就能正确跳转到上次执行中断的中央持续往下执行了. 当然, 咱们会看到咱们的 case **LINE** 其实被交叉到了 do{} while(0) 两头, 这个其实就利用到了 duff device 个性, 容许你通过 case 疾速的跳转到 for 循环或者 while 循环的外部, C 语言一个很非凡的个性. 利用这点, 首先咱们能够实现 **co\_awiat() 宏的封装, 其次, 咱们也能在逻辑代码的 for 循环以及 while 循环中, 正确的利用 co\_await(), 所以说 Duff Device 个性对于整个机制来说, 还是比拟要害的.

如上例中所述的 Test Code 代码, co\_begin() 和 co\_end() 开展后形成了 switch() {} 的开始和完结局部, 而两头咱们退出的 \_\_co\_await() 宏, 则会开展成用于实现中断点和重入点的 case 逻辑, 整体的封装还是很奇妙的.

(四)执行流程概述

整体的执行流程通过下面的剖析咱们也能比较简单的整理出来:

  • 宏开展造成一个逾越协程函数首尾的大的 swith case 状态机。
  • 协程执行时构建新的 CoPromise 对象, 正确的解决输出参数, 输出参数会被存储在 CoPromise 对象的 std::tuple<> 上, 并且每次重入时作为函数的入口参数以援用的形式转入函数外部
  • 每次 Resume() 时依据以后 CoPromise 记录的 state, 跳转到正确的 case label 持续往下执行.
  • 执行到下一个挂终点返回控制权到调度器

反复上述操作直到执行完结。

从整体机制上, 咱们也能简略看到 C++17 对应实现的一些限度:

\_\_co\_begin() 前不能有逻辑代码, 相干的代码会因为函数的从新执行被重复调用.

栈变量的应用, 因为自身机制的起因, 并不能正确的保留栈变量的值, 咱们须要透过机制自身提供的机制来解决状态值 – 这个指的是被当成 std::tuple<> 成员存储在 CoPromise 对象中的那些值, 每次函数执行会以援用的形式作为参数传递给协程函数.

(五)另外一个示例代码

mScheduler.CreateTask([](int& c, LocalStruct& locals) -> logic::CoTaskForScheduler {rco_begin();
  {
    locals.local_i = 1024;
    auto* task = rco_self_task();
    printf("step1 %d\n", locals.local_i);
  }
  rco_yield_next_frame();
  {
    c = 0;
    while(c < 5) {printf("in while loop c = %d\n", c);
      rco_yield_sleep(1000);
      c++;
    }
    rco_yield_next_frame();}
  rco_end();}, 3, LocalStruct{});

从上例能够看出, 尽管存在上一节中咱们提到的一些限度, 按照设定的规定进行编码实现, 整体应用还是比较简单易懂的。下面的 rco\_yield\_next\_frame() 和 rco\_yield\_sleep() 是利用 Scheduler 的调度能力封装进去的挂起到下一帧持续复原执行和休眠这两个异步操作语义。

(六)绕开栈变量限度的办法

提到栈变量的限度, 必定有同学会想到, 是否有办法绕开栈变量的限度, 用一种更灵便的形式解决协程中长期值的存取, 使其在逾越中断点和重入点的状况仍然无效?

答案是必定的. 因为咱们有明确的与协程关联的状态存储对象 CoPromise, 所以如果框架中有实现反射或者适应任意类型值存取的类型擦除机制, 咱们当然可能很简略的对原有的实现进行扩大.

在 rstudio 的框架实现中, 咱们通过在 CoPromise 对象上多存储一个额定的 std::map<std::string, reflection::Value> 的成员, 再配合适当的包装, 就很容易实现如下示例代码所展现的性能了:

rco_begin();
{rco_set_value("id", 35567);
}
rco_yield_next_frame();
{
  {int64_t& val = rco_ref_value("id", int64_t);
    val = 5;
  }
  locals.local_i = rco_to_value("id", int);
}
rco_end();

通过额定扩大的 rco\_set\_value(), rco\_ref\_value(), rco\_to\_value(), 咱们即实现了一个比较简单易用的通过 name 对各类型值进行存取的实现, 当然, 实际操作的其实都是在 CoPromise 上存储的 std::map<std::string, reflection::Value> 成员。

这块是反射的一个简略利用, 对于类型擦除的细节, 与本篇关联不大, 这里不进行具体的开展了。

(七)一个外部我的项目中后盾切场景的代码示例

本章的结尾咱们以一个具体的业务实例作为参考, 不便大家理解相干的实现在具体业务中的大抵工作情景。

一个原来参加的我的项目的后盾服务器是多节点的设计, 对于切场景来说, 须要拜访多个节点来实现相干的操作, 大抵的切场景时序图如下所示:

删减细节代码之后的次要异步代码如下图所示:

rco_begin();
{
    locals.clientReq = req;
    locals.session = CServerUtil::GetSessionObj(sessionId);
 // ...
    SSTbusppInstanceKey emptyInstKey;
    emptyInstKey.Init();
    if (locals.session->GetTargetGameSvrID() != emptyInstKey) {
        // ...
        rco_await(locals.gameSceneService->CheckChangeScene(locals.playerId, locals.checkChangeSceneReq));
        // ...
        // 保留大世界信息
        // ...
        rco_await(locals.gameSceneService->ExitMainland(locals.playerId, locals.exitMainlandReq));
        // ...
    }
    auto gameMgrClient = GServer->GetRpcClient(TbusppInstanceKey{TBUSPP_SERVER_GAMEMGRSVR, ""});
    locals.gameMgrService = rstudio::rpc_proxy::GameMgrService_Proxy::Create(gameMgrClient, GServer->GetRpcScheduler());
 // ...
    LOG_DEBUG(locals.playerId, "[CHANGE SCENE] ready to Queryline group");
}
rco_await(locals.gameMgrService->QueryMainland2(locals.playerId, locals.querySpaceReq));
{
    // ...
    rco_await(locals.gameSceneService->ChangeMainland(locals.playerId, locals.localInstanceKey, locals.changeMainlandReq));
    // ...
}
// ...
LOG_DEBUG(locals.playerId, "[CHANGE SCENE] send change mainland_conf");
rco_emit_finish_event(rstudio::logic::CoRpcFinishEvent(rstudio::reflection::Value(locals.clientRes)));

rco_return;
rco_end();

通过 rco\_await() 发动的多个异步 Rpc 调用, 咱们很好的实现了上述时序图对应的逻辑性能实现。

Rpc 相干的协程化封装在 C++20 中会有个相干的示例, 此处就不反复开展 C++17 的实现了。

C++20 Coroutine 机制简介

理解了 C++17 的 Stackless Coroutine 实现机制后, 咱们接着来看一下 C++20 Coroutine 的实现. 首先咱们先来通过外围对象概览图来简略理解一下 C++20 Coroutine:

如图所示, C++ Coroutine20 的外围对象有如下这些:

  • Function Body: 通常一般函数增加 co\_await 等协程关键字解决返回值就能够作为一个协程函数。
  • coroutine\_handle<>: 对协程的生命周期进行管制。
  • promise\_type: 异样解决, 后果接管, 同时也能够对协程局部行为进行配置, 如协程初始化时的状态, 完结时的状态等。
  • Awaitable 对象: 业务侧的中断重入点定义和数据传输定制点, 联合 co\_await 关键字, 咱们就能借助 compiler 实现正确的中断, 重入语义了。

从图上也能看到, 比照其它语言较精简的 Coroutine 实现, C++20 这套实现, 还是偏简单的, 这也是咱们常调侃的 “ 库作者向 ” 实现, 尽管整体应用很灵便, 也能跟泛型很好的搭配, 但咱们还是须要在框架层做大量的包装, 同时业务个别须要一个中央对利用中所有的协程做治理, 不便监控利用的整体运行状况等, 这也使得 C++ 这套个性没法很简略的间接在业务侧进行应用, 后续咱们讲到 Coroutine Scheduler 的时候会进一步开展相干的内容。

此处咱们只须要对 Coroutine 的外围对象的形成和作用有个简略的认知, 接下来咱们会联合相干的示例代码来深刻理解 C++20 Coroutine 的整体运作机制, 理解更多细节。

联合代码了解 Coroutine

(一)一个简略的示例 – 并不简略

#include <iostream>
#include <coroutine>

using namespace std;

struct resumable_thing
{
  struct promise_type
  {resumable_thing get_return_object()
    {return resumable_thing(coroutine_handle<promise_type>::from_promise(*this));
    }
    auto initial_suspend() { return suspend_never{}; }
    auto final_suspend() noexcept { return suspend_never{}; }
    void return_void() {}

    void unhandled_exception() {}
  };
  coroutine_handle<promise_type> _coroutine = nullptr;
  resumable_thing() = default;
  resumable_thing(resumable_thing const&) = delete;
  resumable_thing& operator=(resumable_thing const&) = delete;
  resumable_thing(resumable_thing&& other)
    : _coroutine(other._coroutine) {other._coroutine = nullptr;}
  resumable_thing& operator = (resumable_thing&& other) {if (&other != this) {
      _coroutine = other._coroutine;
      other._coroutine = nullptr;
    }
  }
  explicit resumable_thing(coroutine_handle<promise_type> coroutine) : _coroutine(coroutine)
  { }
  ~resumable_thing()
  {if (_coroutine) {_coroutine.destroy(); }
  }
  void resume() { _coroutine.resume(); }
};

resumable_thing counter() {
  cout << "counter: called\n";
  for (unsigned i = 1; ; i++)
  {co_await std::suspend_always{};
    cout << "counter:: resumed (#" << i << ")\n";
  }
}

int main()
{
  cout << "main:    calling counter\n";
  resumable_thing the_counter = counter();
  cout << "main:    resuming counter\n";
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  cout << "main:    done\n";
  return 0;
}

从下面的代码咱们也能看出, 尽管协程函数 counter() 的定义是简略的, 应用也是简略的, 但其实蕴含 promise\_type 定义的 resumable\_thing 的定义并不简略, 相比其余语言, C++ 的应用显著简单很多.

相干代码的输入如下:

main:    calling counter
counter: called
main:    resuming counter
counter: resumed (#1)
counter: resumed (#2)
counter: resumed (#3)
counter: resumed (#4)
counter: resumed (#5)
main:    done

(二)Coroutine20 的实现猜测

后面咱们说过, C++17 下对应的实现机制大抵如下:

那么对于 C++20 来说, 它的整体运作机制又是什么样子的呢? 显然, 咱们从示例代码和后面简略介绍的外围对象, 并不能推导出它的运作机制, 编译器帮咱们做了很多额定的解决, 这也导致咱们没有方法间接从代码了解它理论的执行状况.

这其实也是 C++20 Coroutine 应用的一大难点, 除了前文提到的, 个性通过 Awaitable 定制点凋谢给你的中央, 整体的运作机制, 咱们是很难间接得出的. 另外, 在一些多线程协程混用的简单状况下, 整体运作机制对于咱们实现正确的框架, 正确的剖析解决碰到的问题至关重要. 那么咱们当初的问题就变成了, 怎么去补全出蕴含编译器解决的整体代码?

(三)借助 “cppinsights”

因为 C++ 各种简单的 compiler 解决机制, 曾经有相干的 compiler 预处理剖析的工具被开发进去了, 咱们这里用的是一个叫 cppinsights 的工具, 这是一个基于 web 的工具, 所以咱们关上网页即可应用它, 网址是 cppinsights.io 工具的截图如下:

cppinsights 自身是基于 clang 的, 提供了多种 clang compiler 预处理信息的查看, 比方咱们当初须要用到的 coroutine transformation:

对于后面的示例代码, 咱们通过 cppinsights 解决后生成的代码如下:

/*************************************************************************************
 * NOTE: The coroutine transformation you've enabled is a hand coded transformation! *
 *       Most of it is _not_ present in the AST. What you see is an approximation.   *
 *************************************************************************************/
#include <iostream>
#include <coroutine>

using namespace std;

struct resumable_thing
{
  struct promise_type
  {inline resumable_thing get_return_object()
    {return resumable_thing(resumable_thing(std::coroutine_handle<promise_type>::from_promise(*this)));
    }

    inline std::suspend_never initial_suspend()
    {return std::suspend_never{};
    }

    inline std::suspend_never final_suspend() noexcept
    {return std::suspend_never{};
    }

    inline void return_void()
    { }

    inline void unhandled_exception()
    { }

    // inline constexpr promise_type() noexcept = default;};

  std::coroutine_handle<promise_type> _coroutine;
  inline constexpr resumable_thing() /* noexcept */ = default;
  // inline resumable_thing(const resumable_thing &) = delete;
  // inline resumable_thing & operator=(const resumable_thing &) = delete;
  inline resumable_thing(resumable_thing && other)
  : _coroutine{std::coroutine_handle<promise_type>(other._coroutine)}
  {other._coroutine.operator=(nullptr);
  }

  inline resumable_thing & operator=(resumable_thing && other)
  {if(&other != this) {this->_coroutine.operator=(other._coroutine);
      other._coroutine.operator=(nullptr);
    }

  }

  inline explicit resumable_thing(std::coroutine_handle<promise_type> coroutine)
  : _coroutine{std::coroutine_handle<promise_type>(coroutine)}
  { }

  inline ~resumable_thing() noexcept
  {if(static_cast<bool>(this->_coroutine.operator bool())) {this->_coroutine.destroy();
    }

  }

  inline void resume()
  {this->_coroutine.resume();
  }

};



struct __counterFrame
{void (*resume_fn)(__counterFrame *);
  void (*destroy_fn)(__counterFrame *);
  std::__coroutine_traits_impl<resumable_thing>::promise_type __promise;
  int __suspend_index;
  bool __initial_await_suspend_called;
  unsigned int i;
  std::suspend_never __suspend_44_17;
  std::suspend_always __suspend_48_14;
  std::suspend_never __suspend_44_17_1;
};

resumable_thing counter()
{
  /* Allocate the frame including the promise */
  __counterFrame * __f = reinterpret_cast<__counterFrame *>(operator new(__builtin_coro_size()));
  __f->__suspend_index = 0;
  __f->__initial_await_suspend_called = false;

  /* Construct the promise. */
  new (&__f->__promise)std::__coroutine_traits_impl<resumable_thing>::promise_type{};

  resumable_thing __coro_gro = __f->__promise.get_return_object() /* NRVO variable */;

  /* Forward declare the resume and destroy function. */
  void __counterResume(__counterFrame * __f);
  void __counterDestroy(__counterFrame * __f);

  /* Assign the resume and destroy function pointers. */
  __f->resume_fn = &__counterResume;
  __f->destroy_fn = &__counterDestroy;

  /* Call the made up function with the coroutine body for initial suspend.
     This function will be called subsequently by coroutine_handle<>::resume()
     which calls __builtin_coro_resume(__handle_) */
  __counterResume(__f);


  return __coro_gro;
}

/* This function invoked by coroutine_handle<>::resume() */
void __counterResume(__counterFrame * __f)
{
  try
  {
    /* Create a switch to get to the correct resume point */
    switch(__f->__suspend_index) {
      case 0: break;
      case 1: goto __resume_counter_1;
      case 2: goto __resume_counter_2;
    }

    /* co_await insights.cpp:44 */
    __f->__suspend_44_17 = __f->__promise.initial_suspend();
    if(!__f->__suspend_44_17.await_ready()) {__f->__suspend_44_17.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());
      __f->__suspend_index = 1;
      __f->__initial_await_suspend_called = true;
      return;
    }

    __resume_counter_1:
    __f->__suspend_44_17.await_resume();
    std::operator<<(std::cout, "counter: called\n");
    for(__f->i = 1; ; __f->i++) {

      /* co_await insights.cpp:48 */
      __f->__suspend_48_14 = std::suspend_always{};
      if(!__f->__suspend_48_14.await_ready()) {__f->__suspend_48_14.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());
        __f->__suspend_index = 2;
        return;
      }

      __resume_counter_2:
      __f->__suspend_48_14.await_resume();
      std::operator<<(std::operator<<(std::cout, "counter:: resumed (#").operator<<(__f->i), ")\n");
    }

    goto __final_suspend;
  } catch(...) {if(!__f->__initial_await_suspend_called) {throw ;}

    __f->__promise.unhandled_exception();}

  __final_suspend:

  /* co_await insights.cpp:44 */
  __f->__suspend_44_17_1 = __f->__promise.final_suspend();
  if(!__f->__suspend_44_17_1.await_ready()) {__f->__suspend_44_17_1.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());
  }

  ;
}

/* This function invoked by coroutine_handle<>::destroy() */
void __counterDestroy(__counterFrame * __f)
{
  /* destroy all variables with dtors */
  __f->~__counterFrame();
  /* Deallocating the coroutine frame */
  operator delete(__builtin_coro_free(static_cast<void *>(__f)));
}



int main()
{std::operator<<(std::cout, "main:    calling counter\n");
  resumable_thing the_counter = counter();
  std::operator<<(std::cout, "main:    resuming counter\n");
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  the_counter.resume();
  std::operator<<(std::cout, "main:    done\n");
  return 0;
}

cppinsights 自身也跟 Compiler Explorer 做了拉通, 做代码深度剖析的时候, 更多的联合这些开源工具, 很多时候还是十分有帮忙的.

那么有了 compiler 预处理后的代码, 再来剖析 C++20 Coroutine 的机制, 就变得简略了.

(四)Coroutine20 根本构造 – Compiler 视角

对于 compiler 预处理后的代码, 咱们间接联合结构图来剖析:

咱们会发现, couter 被编译器解决后根本就只是一个空壳函数了, 原来的实现逻辑被整体搬入了一个编译器帮咱们定义的函数 \_\_coutnerResume() 中, 而后呈现了一个编译器帮咱们定义的对象 \_\_couterFrame, 通过剖析代码很容易晓得, \_\_counterFrame 构造次要实现几局部的事件:

  • virtual table 局部, 正确的告知你协程应用的 resume 函数以及 destroy 函数
  • 主动解决的栈变量, 如下图中所示的 i
  • 各种应用到的 awaitable object, 这是因为 awaitable object 自身也是有状态的, 须要正确记录
  • 以后执行到的地位, 这个是通过整形的 \_\_suspend\_index 来记录的.

当咱们察看 \_\_counterResume() 的实现, 乏味的事件来了, 咱们发现, 其实 C++20 也是应用一个大的 switch-case 来作为协程执行的全局状态机, 只不过每个 case lablel 前面, 接的是 goto, 而不是像咱们在 C++17 上面那样, 间接嵌入的业务代码.

整体 C++20 的实现思路, 基本上与 17 的实现思路是一脉相承的, 只不过得益于 compiler 的反对, 很多事件咱们都由被动解决 -> 主动解决.

(五)Compiler 视角从新剖析示例代码

  • couter() – Function Body

咱们晓得, couter() 会被编译器改写, 最终其实是变成了三个函数:

  • 单纯负责生命周期以及生成正确的 \_\_counterFrame 对象的 counter(), 只是一个协程入口函数.
  • 负责真正执行逻辑的 \_\_counterResume() 函数, 它的输出参数就是 \_\_counterFrame 对象.
  • 负责删除 **counterFrame 对象的 **counterDestroy() 函数.

通过一拆三, 编译器很好的解决了协程的入口, 协程的中断重入, 和协程以及相干对象的销毁的问题.

  • coroutine\_handle<>

局部 coroutine\_handle<> 的定义代码如下

template <> struct coroutine_handle<void>{constexpr coroutine_handle() noexcept;
  constexpr coroutine_handle(nullptr_t) noexcept;
  coroutine_handle& operator=(nullptr_t) noexcept;
  constexpr void* address() const noexcept;
  constexpr static coroutine_handle from_address(void* addr);
  constexpr explicit operator bool() const noexcept;
  bool done() const;
  void operator()();
  void resume();
  void destroy();
private:
  void* ptr;// exposition only
};

咱们联合后面开展的代码, 曾经很好了解 coroutine\_handle<> 为何会有协程生命周期管制的能力了, 因为它关联了 xxxFrame 对象, 而通过后面的剖析, xxxFrame 的是虚表记录了协程 resume 和 destroy 的函数, 所以这个中央的 ptr, 其实就是一个 xxxFrame 对象, 正确的关联了 xxxFrame 对象, 透过它, 咱们天然可能领有 resume(), destroy() 等一系列的能力了, 这里并没有任何魔法的存在.

template <typename Promise>
struct coroutine_handle
: coroutine_handle<void>
{Promise& promise() const noexcept;
  static coroutine_handle from_promise(Promise&) noexcept;
};

另外通过继承的形式, coroutine\_handle<> 实现了与 Promise 对象关联和转换的性能.

  • promise\_type

同样, 咱们联合预处理后的代码:

/* This function invoked by coroutine_handle<>::resume() */
void __counterResume(__counterFrame * __f)
{
  try
  {
    /* Create a switch to get to the correct resume point */
    switch(__f->__suspend_index) {
      case 0: break;
      case 1: goto __resume_counter_1;
      case 2: goto __resume_counter_2;
    }
    /* initial suspend handle here~~ */
    __f->__suspend_44_17 = __f->__promise.initial_suspend();
__resume_counter_1:
    /* do somthing for yield~~ */
__resume_counter_2:
    /* do somthing for resume~~ */
    goto __final_suspend;
  } catch(...) {if(!__f->__initial_await_suspend_called) {throw ;}
    __f->__promise.unhandled_exception();}
__final_suspend:
  /* final suspend here~~ */
  __f->__suspend_44_17_1 = __f->__promise.final_suspend();}

通过 \_\_counterResume() 的逻辑实现, promise 为何能够对协程的初始化和完结行为进行管制, 也很高深莫测了, 因为 \_\_counterFrame 对象中关联了咱们定义的 promise\_type 类型, 所以咱们也能很间接的通过 \_\_counterFrame 拜访到 promise\_type 类型, 一方面充当配置项的角色, 如管制 initial\_suspend, final\_suspend. 另外, promise\_type 也作为一个 Wrapper, 对如 co\_yield 等进行本义执行, 以及异样的转发解决, 也是十分好了解的机制。

  • Awaitable 对象

常见的 awaitable 对象如咱们示例中看到的, 零碎预约义的:

  • std:suspend\_always
  • std::suspend\_never 另外咱们也能通过多种形式定义 awaitable 对象
  • 通过重载 promise\_type 的 await\_transform() – 这是 asio 所应用的形式, 侵入性比拟强
  • 通过为对象实现 operator co\_await()
  • 通过实现 awaitable 对象须要的三个子函数 await\_ready(), await\_suspend(), await\_resume() – 举荐的形式 那么当咱们调用 co\_await awaitable 的时候, 产生的事件是什么呢, 咱们同样通过预处理的代码来进行理解:
__resume_counter_1:
    __f->__suspend_44_17.await_resume();
    std::operator<<(std::cout, "counter: called\n");
    for(__f->i = 1; ; __f->i++) {

      /* co_await insights.cpp:48 */
      __f->__suspend_48_14 = std::suspend_always{};
      if(!__f->__suspend_48_14.await_ready()) {__f->__suspend_48_14.await_suspend(coroutine_handle);
        __f->__suspend_index = 2;
        return;
      }
__resume_counter_2:
      __f->__suspend_48_14.await_resume();
      std::cout << "counter:: resumed (#" << __f->i << ")\n";
    }

对于每一次的 co\_await, 编译器解决后的代码, 都会造成一个中断点 和一个重入点, 其实对应的是两个状态, 刚开始执行的时候, 进入的是中断点的逻辑, 也就是咱们看到的 \_\_resume\_counter\_1 对应 label 的代码, 而重入点则是 \_\_resume\_counter\_2 对应 label 的代码, 联合此处开展的实例代码, 咱们也能很好的了解 awaitable 三个子函数的具体作用了:

  • await\_ready() – 判断是否须要挂起, 如不须要挂起, 则间接执行后续逻辑, 这里也就是持续到 \_\_resume\_counter\_2 这个 label 执行重入点的逻辑
  • await\_suspend() – 中断点触发的时候执行的逻辑, 业务中咱们个别在此处发动异步操作
  • await\_resume() – 重入点触发的时候执行的逻辑. 整体的机制是不是清晰了很多?

(六)小结 – C++20 协程的特点总结

咱们总结 C++20 协程的特点:

  • 一套了解上稍显简单, 须要联合 cppinsights 等工具能力理解整体的运行机制
  • 适当封装, 还是可能很好的满足业务需要
  • 比照 17 版本的实现, 20 版基本上没有什么应用上的限度
  • 主动栈变量的解决, 能够让业务侧以更低的心智累赘来进行开发
  • 通过 Awaitable 对象, 咱们可能扩大 co\_await 反对的业务, 这种实现侵入性低, 理论应用累赘小
  • 对于异步操作较多, 多节点较多, 特地是多个异步操作级联的应用场景, 很值得实装.
  • 最初咱们解说应用的是 clang, 但对于 gcc, msvc, 这些同样实用, 规范的提案起源是统一的, 都是 msvc 发动的那份, compiler 实现上存在一些轻微的差别, 但根本不影响应用.

Coroutine Scheduler

(一)Sheduler 实现的动机

后面咱们也提到了, 要做到 “ 库作者向个性 ” => 面向业务的异步框架, 咱们还须要一些额定的工作, 这就是咱们马上要介绍的 Coroutine Scheduler – 协程调度器。

(二)Scheduler 外围机制

如上图所示, Scheduler 次要提供对 SchedTask 的治理, 以及两个根底机制(比照 17 版的三个 ) 不便协程相干业务机制的实现:

  • Awaitable 机制: 后面也介绍了利用 c++20 的 co\_await 关键字和 awaitable 对象,咱们能够很好的定义挂终点,以及替换协程和内部零碎的数据。
  • Return Callback 机制: 局部协程执行完后须要向外界反馈执行后果 (如协程模式执行的 Rpc Service).

(三)Scheduler 外围对象

  • ISchedTask & SchedTaskCpp20
using CoReturnFunction = std::function<void(const CoReturnObject*)>;class ISchedTask{friend class Scheduler;  public:    ISchedTask() = delete;    ISchedTask(const SchedTaskCpp17&) = delete;    ISchedTask(uint64_t taskId, Scheduler* manager);    virtual ~ISchedTask();    uint64_t GetId() const;    virtual int Run() = 0;    virtual bool IsDone() const = 0;    virtual CO_TASK_STATE GetCoState() const = 0;    void BindSleepHandle(uint64_t handle);    AwaitMode GetAwaitMode() const;    int GetAwaitTimeout() const;    template<typename AwaitEventType>    auto BindResumeObject(AwaitEventType&& awaitEvent)->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value>;    template<typename AwaitEventType>    auto GetResumeObjectAsType()->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value, AwaitEventType*>;    bool HasResumeObject() const noexcept;    void ClearResumeObject();    bool IsLastInvokeSuc() const noexcept;    bool IsLastInvokeTimeOut() const noexcept;    bool IsLastInvokeFailed() const noexcept;    void AddChildTask(uint64_t tid);    void AddWaitNofityTask(uint64_t tid);    const auto& GetChildTaskArray() const;    const auto& GetWaitNotifyArray() const;    void Terminate();    Scheduler* GetManager() const;    static ISchedTask* CurrentTask();    void DoYield(AwaitMode mode, int awaitTimeMs = 0);    void SetReturnFunction(CoReturnFunction&& func);    void DoReturn(const CoReturnObject& obj);    void DoReturn();  protected:    uint64_t     mTaskId;    Scheduler*      mManager;    std::vector<uint64_t>  mChildArray;    std::vector<uint64_t>  mWaitNotifyArray;    //value used to return from coroutine    AwaitMode     mAwaitMode = AwaitMode::AwaitDoNothing;    int       mAwaitTimeout = 0;    //value used to send to coroutine(now as a AwaitEvent)    reflection::UserObject  mResumeObject;    uint64_t     mSleepHandle = 0;    bool      mIsTerminate = false;    CoReturnFunction   mCoReturnFunc;};class SchedTaskCpp20: public ISchedTask{public:    SchedTaskCpp20(uint64_t taskId, CoTaskFunction&& taskFunc, Scheduler* manager);    ~SchedTaskCpp20();    int Run() override;    bool IsDone() const override;    CO_TASK_STATE GetCoState() const override;    void BindSelfToCoTask();    const CoResumingTaskCpp20& GetResumingTask() const;  protected:    CoResumingTaskCpp20   mCoResumingTask;    CoTaskFunction    mTaskFuncion;};

C++20 的 SchedTaskCpp20 次要实现对协程对象的封装,CoTaskFunction 用于存储相干的函数对象,而 CoResumingTaskCpp20 则如同后面示例中的 resumable\_thing 对象,外部有须要的 promise\_type 实现,咱们对协程的拜访也是通过它来实现的。

此处须要留神的是咱们保留了协程对象外,还额定保留了相干的函数对象,这是因为如果协程自身是一个 lambda, compiler 并不会帮咱们正确保护 lambda 的生命周期以及 lambda 所捕捉的函数,尚未分明是实现缺点还是性能就是如此,所以此处须要一个额定存在的 std::function<> 对象,来保障对应 lambda 的生命周期是正确的。

比照 17 的实现,咱们的 SchedTask 对象中次要保留了:reflection::UserObject mResumeObject: 次要用于异步期待的执行, 当一个异步期待胜利执行的时候, 向协程传递值。

原来利用事件去解决最终返回值的机制也替换成了 Return 回调的形式,相对来说更简略间接,利用 lambda 自身也能很不便的保留须要最终回传的长期值了。

  • Scheduler

Scheduler 的代码比拟多, 次要就是 SchedTask 的管理器, 另外也实现对后面提到的三种机制的反对, 文章重点剖析一下三种机制的实现代码.

  • Yield 解决
void Scheduler::Update()
{RSTUDIO_PROFILER_METHOD_INFO(sUpdate, "Scheduler::Update()", rstudio::ProfilerGroupType::kLogicJob);
    RSTUDIO_PROFILER_AUTO_SCOPE(sUpdate);

    //Handle need kill task first
    while(!mNeedKillArray.empty())
    {auto tid = mNeedKillArray.front();
        mNeedKillArray.pop();
        auto* tmpTask = GetTaskById(tid);
        if (tmpTask != nullptr)
        {DestroyTask(tmpTask);
        }
    }

    //Keep a temp queue for not excute next frame task right now
    decltype(mFrameStartTasks) tmpFrameTasks;
    mFrameStartTasks.swap(tmpFrameTasks);

    while (!tmpFrameTasks.empty())
    {auto task_id = tmpFrameTasks.front();
        tmpFrameTasks.pop();
        auto* task = GetTaskById(task_id);
        LOG_CHECK_ERROR(task);
        if (task)
        {AddToImmRun(task);
        }
    }
}

void Scheduler::AddToImmRun(ISchedTask* schedTask)
{LOG_PROCESS_ERROR(schedTask);
    schedTask->Run();

    if (schedTask->IsDone())
    {DestroyTask(schedTask);
        return;
    }

    {auto awaitMode = schedTask->GetAwaitMode();
        auto awaitTimeoutMs = schedTask->GetAwaitTimeout();
        switch (schedTask->GetAwaitMode())
        {
            case rstudio::logic::AwaitMode::AwaitNever:
                AddToImmRun(schedTask);
                break;
            case rstudio::logic::AwaitMode::AwaitNextframe:
                AddToNextFrameRun(schedTask);
                break;
            case rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout:
            case rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout:
                {HandleTaskAwaitForNotify(schedTask, awaitMode, awaitTimeoutMs);
                }
                break;
            case rstudio::logic::AwaitMode::AwaitDoNothing:
                break;
            default:
                RSTUDIO_ERROR(CanNotRunToHereError());
                break;
        }
    }
    Exit0:
    return;
}

下面是 Scheduler 的 Update() 以及 Update 用到的外围函数 AddToImmRun() 的实现代码, 在每个 task->Run() 后, 达到下一个挂终点, 返回内部代码的时候, 内部代码会依据 Task 以后的 AwaitMode 对协程后续行为进行管制, 次要是以下几种模式:

  • rstudio::logic::AwaitMode::AwaitNever: 立刻将协程退出回 mReadyTask 队列, 对应协程会被马上唤醒执行
  • rstudio::logic::AwaitMode::AwaitNextframe: 将协程退出到下一帧执行的队列, 协程将会在下一帧被唤醒执行
  • rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout: 期待外界告诉后再唤醒执行 (无超时模式), 留神该模式下如果始终没收到告诉, 相干协程会始终在队列中存在.
  • rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout: 同 3, 差异是存在一个超时工夫, 超时工夫到了也会唤醒协程, 业务方能够通过 ResumeObject 判断协程是被超时唤醒的.
  • **rstudio::logic::AwaitMode::AwaitDoNothing:** 非凡的 AwaitHandle 实现会应用该模式, 比方删除 Task 的实现, 都要删除 Task 了, 咱们必定不须要再将 Task 退出任何可唤醒队列了.
  • Resume 解决

Resume 机制次要是通过唤醒在 Await 队列中的协程的时候向关联的 Task 对象传递 ResumeObject 实现的:

//Not a real event notify here, just do need things
template <typename E>
auto ResumeTaskByAwaitObject(E&& awaitObj)
 -> std::enable_if_t<std::is_base_of<ResumeObject, E>::value>
{
    auto tid = awaitObj.taskId;
    if (IsTaskInAwaitSet(tid))
    {
        //Only in await set task can be resume
        auto* task = GetTaskById(tid);
        if (RSTUDIO_LIKELY(task != nullptr))
        {task->BindResumeObject(std::forward<E>(awaitObj));
            AddToImmRun(task);
        }

        OnTaskAwaitNotifyFinish(tid);
    }
}

而后再通过 rco\_get\_resume\_object() 宏在协程代码中获取对应的 ResumeObject. 宏的申明代码如下:

#define rco_get_resume_object(ResumeObjectType)      rco_self_task()->GetResumeObjectAsType<ResumeObjectType>()

自身就是一个简略的传值取值的过程. 留神传递 ResumeObject 后, 咱们也会马上将协程退出到 mReadTasks 队列中以不便在接下来的 Update 中唤醒它.

  • 一个 Awaitable 实现的范例

咱们以 Rpc 的协程化 Caller 实现为例,看看一个 awaitable 对象应该如何结构:

class RSTUDIO_APP_SERVICE_API RpcRequest
{
  public:
    RpcRequest() = delete;
    ////RpcRequest(const RpcRequest&) = delete;
    ~RpcRequest() = default;

    RpcRequest(const logic::GameServiceCallerPtr& proxy,
               const std::string_view funcName,
               reflection::Args&& arg, int timeoutMs) :
    mProxy(proxy)
        , mFuncName(funcName)
        , mArgs(std::forward<reflection::Args>(arg))
        , mTimeoutMs(timeoutMs)
    {}
    bool await_ready()
{return false;}
    void await_suspend(coroutine_handle<>) const noexcept
{auto* task = rco_self_task();
        auto context = std::make_shared<ServiceContext>();
        context->TaskId = task->GetId();
        context->Timeout = mTimeoutMs;
        auto args = mArgs;
        mProxy->DoDynamicCall(mFuncName, std::move(args), context);
        task->DoYield(AwaitMode::AwaitForNotifyNoTimeout);
    }
    ::rstudio::logic::RpcResumeObject* await_resume() const noexcept
{return rco_get_resume_object(logic::RpcResumeObject);
    }
  private:
    logic::GameServiceCallerPtr     mProxy;
    std::string         mFuncName;
    reflection::Args       mArgs;
    int           mTimeoutMs;
};

重点是后面说到的 await\_ready(), await\_suspend(), await\_resume() 函数的实现。

  • ReturnCallback 机制

有一些场合, 可能须要协程执行实现后向业务零碎发动告诉并传递返回值, 比方 Rpc Service 的协程反对实现等, 这个个性其实比拟相似 go 的 defer, 只是这里的实现更简略, 只反对繁多函数的指定而不是队列. 咱们间接以 RpcService 的协程反对为例来看一下这一块的具体应用.

首先是业务侧, 在创立完协程后, 须要给协程绑定后续协程执行实现后做进一步操作须要的数据:

task->SetReturnFunction(
    [this, server, entity, cmdHead, routerAddr,
     reqHead, context](const CoReturnObject* obj) {const auto* returnObj = dynamic_cast<const CoRpcReturnObject*>(obj);
    if (RSTUDIO_LIKELY(returnObj))
    {DoRpcResponse(server, entity.get(), routerAddr, &cmdHead,
                      reqHead, const_cast<ServiceContext&>(context),
                      returnObj->rpcResultType,
                      returnObj->totalRet, returnObj->retValue);
    }
});

这里将 Connection id 等信息通过 lambda 的 capture 性能间接绑定到 SchedTask 的返回函数,而后业务代码会利用 co\_return 自身的性能向 promise\_type 传递返回值:

CoTaskInfo HeartBeatService::DoHeartBeat(logic::Scheduler& scheduler, int testVal)
{
    return scheduler.CreateTask20([testVal]() -> logic::CoResumingTaskCpp20 {co_await logic::cotasks::Sleep(1000);

            printf("service yield call finish!\n");

            co_return CoRpcReturnObject(reflection::Value(testVal + 1));
        }
    );
}

最终咱们利用 promise\_type 的 return\_value() 来实现对设置的回调的调用:

void CoResumingTaskCpp20::promise_type::return_value(const CoReturnObject& obj)
{auto* task = rco_self_task();
    task->DoReturn(obj);
}

留神这个中央 task 上存储的 ExtraFinishObject 会作为 event 的一部分间接传递给业务零碎, 并在发动事件后调用删除协程工作的办法.

比照原版 17 的 Finish Event 实现,通过 Return Callback 的形式来对一些非凡的返回进行解决,这种机制是更容易应用的。

(四)示例代码

//C++ 20 coroutine
auto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");
mScheduler.CreateTask20([clientProxy]()
                        -> rstudio::logic::CoResumingTaskCpp20 {auto* task = rco_self_task();

    printf("step1: task is %llu\n", task->GetId());
    co_await rstudio::logic::cotasks::NextFrame{};

    printf("step2 after yield!\n");
    int c = 0;
    while (c < 5) {printf("in while loop c=%d\n", c);
        co_await rstudio::logic::cotasks::Sleep(1000);
        c++;
    }
    for (c = 0; c < 5; c++) {printf("in for loop c=%d\n", c);
        co_await rstudio::logic::cotasks::NextFrame{};}

    printf("step3 %d\n", c);
    auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false,
                                    []()-> logic::CoResumingTaskCpp20 {printf("from child coroutine!\n");
        co_await rstudio::logic::cotasks::Sleep(2000);
        printf("after child coroutine sleep\n");
    });
    printf("new task create in coroutine: %llu\n", newTaskId);
    printf("Begin wait for task!\n");
    co_await rstudio::logic::cotasks::WaitTaskFinish{newTaskId, 10000};
    printf("After wait for task!\n");

    rstudio::logic::cotasks::RpcRequest
        rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3}, 5000};
    auto* rpcret = co_await rpcReq;
    if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {assert(rpcret->totalRet == 1);
        auto retval = rpcret->retValue.to<int>();
        assert(retval == 4);
        printf("rpc coroutine run suc, val = %d!\n", retval);
    }
    else {printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType);
    }
    co_await rstudio::logic::cotasks::Sleep(5000);
    printf("step4, after 5s sleep\n");
    co_return rstudio::logic::CoNil;
} );

执行后果:

step1: task is 1
step2 after yield!
in while loop c=0
in while loop c=1
in while loop c=2
in while loop c=3
in while loop c=4
in for loop c=0
in for loop c=1
in for loop c=2
in for loop c=3
in for loop c=4
step3 5
new task create in coroutine: 2
Begin wait for task!
from child coroutine!
after child coroutine sleep
After wait for task!
service yield call finish!
rpc coroutine run suc, val = 4!
step4, after 5s sleep

比照 17 的实现,次要的益处是:

  • 代码更精简了
  • Stack 变量能够被 Compiler 主动解决,失常应用了。
  • co\_await 能够间接返回值,并有强制的类型束缚了。
  • 一个协程函数就是一个返回值为 logic::CoResumingTaskCpp20 类型的 lambda, 能够充分利用 lambda 自身的个性还实现正确的逻辑了。

Scheduler 的应用

(一)示例代码

//C++ 20 coroutineauto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");mScheduler.CreateTask20([clientProxy]()                        -> rstudio::logic::CoResumingTaskCpp20 {    auto* task = rco_self_task();    printf("step1: task is %llu\n", task->GetId());    co_await rstudio::logic::cotasks::NextFrame{};    printf("step2 after yield!\n");    int c = 0;    while (c < 5) {printf("in while loop c=%d\n", c);        co_await rstudio::logic::cotasks::Sleep(1000);        c++;    }    for (c = 0; c < 5; c++) {printf("in for loop c=%d\n", c);        co_await rstudio::logic::cotasks::NextFrame{};}    printf("step3 %d\n", c);    auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false,                                    []()-> logic::CoResumingTaskCpp20 {        printf("from child coroutine!\n");        co_await rstudio::logic::cotasks::Sleep(2000);        printf("after child coroutine sleep\n");    });    printf("new task create in coroutine: %llu\n", newTaskId);    printf("Begin wait for task!\n");    co_await rstudio::logic::cotasks::WaitTaskFinish{newTaskId, 10000};    printf("After wait for task!\n");    rstudio::logic::cotasks::RpcRequest        rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3}, 5000};    auto* rpcret = co_await rpcReq;    if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {assert(rpcret->totalRet == 1);        auto retval = rpcret->retValue.to<int>();        assert(retval == 4);        printf("rpc coroutine run suc, val = %d!\n", retval);    }    else {printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType);    }    co_await rstudio::logic::cotasks::Sleep(5000);    printf("step4, after 5s sleep\n");    co_return rstudio::logic::CoNil;} );

(二)小议 C++20 Coroutine 比照 C++17 Coroutine 带来的改良

通过后面的介绍, 咱们很容易得出以下几个 C++20 Coroutine 的劣势:

  • 原生关键字 co\_await, co\_return 的反对, 业务侧应用代码更加精简, 也进一步对立了大家对无栈协程的规范了解.
  • Stack 变量能够被 compiler 主动解决, 这点比照 C++17 须要自行组织状态变量来说是十分节约心智负责的.
  • co\_await 能够间接返回对应类型的值, 这样协程自身就有了强制的类型束缚, 整体业务的表白也会因为不须要从类型擦除的对象获取须要的类型, 变得更顺畅.

一个有意思的实例

咱们思考一个问题, 如果局部应用 OOP 进行设计的零碎, 应用协程的思路重构, 会是什么样子的?

刚好笔者原来的某个我的项目是应用 Python 作为脚本, 过后尝试应用 Python 的 Coroutine 实现了一版技能零碎, 明天咱们来尝试应用 C++20 Coroutine 从新实现它, 这样也可能比照一下, 在有协程调度器存在的状况下, 业务侧对协程的应用感触, 与其余语言如 Python 中的差别.

(一)一个 Python 实现的技能示例

咱们以一个原来在 python 中利用包装的协程调度器实现的技能零碎为例,先来看看相干的实现成果和外围代码。

python 的 stackless 协程实现不是咱们关注的重点,参考的第一个链接是相干的实现思路,感兴趣的能够关上相干链接具体理解,此处就不再开展细说了。

  • 实现成果

以下是相干实现的示例成果,次要是一个火球技能和实现和一个闪电链技能的实现:

  • 技能主流程代码

咱们先来看一下技能的主流程代码,能够发现应用协程形式实现,整个代码更函数式,区别于面向对象结构不同对象存储两头态数据的设计。

# handle one skill instance create
def skill_instance_run_func(instance, user, skill_data, target, target_pos, finish_func):
    # set return callback here
 yield TaskSetExitCallback(finish_func)
    # ... some code ignore here
 from common.gametime import GameTime
 init_time = GameTime.now_time
 for skill_step in step_list:
  step_start_time = GameTime.now_time
        # ... some code ignore here
        ### 1. period task handle
  if skill_step.cast_type == CastSkillStep.CAST_TYPE_PERIOD:
   #... some code ignore here
        ### 2. missle skill
  elif skill_step.cast_type == CastSkillStep.CAST_TYPE_MISSLE_TO_TARGET:
   if len(skill_step.cast_action_group_list) > 0:
    action_group = skill_step.cast_action_group_list[0]
    for i in range(skill_step.cast_count):
                    # yield for sleep
     yield TaskSleep(skill_step.cast_period)
     ret_val = do_skill_spend(skill_data, user, instance)
     if not ret_val:
      return
                    # sub coroutine(missle_handle_func)
     task_id = yield TaskNew(missle_handle_func(skill_data, instance, user, skill_step, action_group, target_id, target_pos))
     instance.add_child_task_id(task_id)
        ### 3. guide skill
  elif skill_step.cast_type == CastSkillStep.CAST_TYPE_GUIDE_TO_TARGET:
   #... some code ignore here
  now_time = GameTime.now_time
  step_pass_time = now_time - step_start_time
  need_sleep_time = skill_step.step_total_time - step_pass_time
  if need_sleep_time > 0:
   yield TaskSleep(need_sleep_time)
  instance.on_one_step_finish(skill_step)
 if skill_data.delay_end_time > 0:
  yield TaskSleep(skill_data.delay_end_time)
    # wait for child finish~~
 for task_id in instance.child_task_list:
  yield TaskWait(task_id)
 instance.task_id = 0

整体实现比较简单,整个技能是由多个 SkillStep 来配置的,整体技能的流程就是 for 循环执行所有 SkillStep,而后提供了多种 SkillStep 类型的解决,次要是以下几类:

  • CastSkillStep.CAST\_TYPE\_PERIOD:周期性触发的技能,次要应用 yield TaskSleep()
  • CastSkillStep.CAST\_TYPE\_MISSLE\_TO\_TARGET:导弹类技能,应用子协程性能
  • CastSkillStep.CAST\_TYPE\_GUIDE\_TO\_TARGET:疏导类技能,应用子协程性能

最初所有 step 利用结束会进入配置的休眠和期待子工作的阶段。

  • 子工作 – 导弹类技能相干代码

对于下面介绍的导弹类技能(火球),外围实现也比较简单,实现了一个飞行物按固定速度迫近指标的成果,具体代码如下,利用 yield 咱们能够实现在飞行物未达到目标点的时候每帧执行一次的成果:

### 1. handle for missle skill(etc: fire ball)def missle_handle_func(skill_data, instance, user, skill_step, action_group, target_id, target_pos): effect = instance.create_effect(skill_step.missle_info.missle_fx_path) effect.set_scale(skill_step.missle_info.missle_scale) cur_target_pos, is_target_valid = skill_step.missle_info.get_target_position(user, target_id, target_pos) start_pos = skill_step.missle_info.get_start_position(user, target_id, target_pos) is_reach_target = False from common.gametime import GameTime init_time = GameTime.now_time while True:  # ... some code ignore here  fly_distance = skill_step.missle_info.fly_speed*GameTime.elapse_time  if fly_distance < total_distance:   start_pos += fly_direction*math3d.vector(fly_distance, fly_distance, fly_distance)   effect.set_position(start_pos)  else:   is_reach_target = True   break        # do yield util next frame  yield effect.destroy() if is_reach_target:  target_list = skill_data.get_target_list(user.caster, target_id, target_pos)  for target in target_list:   action_group.do(user.caster, target)
  • 子工作 – 疏导类技能代码

对于下面介绍的疏导类技能(闪电链),依靠框架自身的 guide effect 实现,咱们利用 yield TaskSleep() 就能很好的实现相干的性能了:

### 2. handle for guide skill(etc: lighting chain)
def guide_handle_func(skill_data, instance, user, skill_step, start_pos, target_id, target_pos):
 effect = instance.create_effect(skill_step.guide_info.guide_fx_path)
 effect.set_scale(skill_step.guide_info.guide_scale)

 effect.set_position(start_pos)

 effect.set_guide_end_pos(target_pos - start_pos)

    # yield for sleep
 yield TaskSleep(skill_step.guide_info.guide_time)
 effect.destroy()

(二)对应的 C++ 实现

后面的 python 实现只是个引子,抛开具体的画面和细节,咱们来尝试用咱们构建的 C++20 版协程调度器来实现类似的代码(抛开显示相干的内容,纯正过程模仿):

//C++ 20 skill test coroutine
mScheduler.CreateTask20([instance]() -> rstudio::logic::CoResumingTaskCpp20 {rstudio::logic::ISchedTask* task = rco_self_task();
    task->SetReturnFunction([](const rstudio::logic::CoReturnObject*) {//ToDo: return handle code add here});

    for (auto& skill_step : step_list) {auto step_start_time = GGame->GetTimeManager().GetTimeHardwareMS();
        switch (skill_step.cast_type) {
            case CastSkillStep::CAST_TYPE_PERIOD: {//... some code ignore here}
                break;
            case CastSkillStep::CAST_TYPE_MISSLE_TO_TARGET: {if (skill_step.cast_action_group_list.size() > 0) {auto& action_group = skill_step.cast_action_group_list[0];
                        for (int i = 0; i < skill_step.cast_count; i++) {co_await rstudio::logic::cotasks::Sleep(skill_step.cast_period);
                            bool ret_val = do_skill_spend(skill_data, user, instance);
                            if (!ret_val) {co_return rstudio::logic::CoNil;}
                            auto task_id = co_await rstudio::logic::cotasks::CreateTask(true,
                             [&skill_step]()->rstudio::logic::CoResumingTaskCpp20 {
        auto cur_target_pos = skill_step.missle_info.get_target_position(user, target_id, target_pos);
                                auto start_pos = skill_step.missle_info.get_start_position(user, target_id, target_pos);
                                bool is_reach_target = false;
                                auto init_time = GGame->GetTimeManager().GetTimeHardwareMS();
                                auto last_time = init_time;
                                do {auto now_time = GGame->GetTimeManager().GetTimeHardwareMS();
                                    auto elapse_time = now_time - last_time;
                                    last_time = now_time;
                                    if (now_time - init_time >= skill_step.missle_info.long_fly_time) {break;}

                                    auto cur_target_pos = skill_step.missle_info.get_target_position(user, target_id, target_pos);

                                    rstudio::math::Vector3 fly_direction = cur_target_pos - start_pos;
                                    auto total_distance = fly_direction.Normalise();
                                    auto fly_distance = skill_step.missle_info.fly_speed * elapse_time;
                                    if (fly_distance < total_distance) {start_pos += fly_direction * fly_distance;}
                                    else {
                                        is_reach_target = true;
                                        break;
                                    }

                                    co_await rstudio::logic::cotasks::NextFrame{};} while (true);
                                if (is_reach_target) {//ToDo: add damage calculate here~~}

                             });
                            instance.add_child_task_id(task_id);
                        }
                    }
                }
                break;
            case CastSkillStep::CAST_TYPE_GUIDE_TO_TARGET: {//... some code ignore here}
                break;
            default:
                break;
        }

        auto now_time = GGame->GetTimeManager().GetTimeHardwareMS();
        auto step_pass_time = now_time - step_start_time;
        auto need_sleep_time = skill_step.step_total_time - step_pass_time;
        if (need_sleep_time > 0) {co_await rstudio::logic::cotasks::Sleep(need_sleep_time);
        }

        instance.on_one_step_finish(skill_step);
    }

    if (skill_data.delay_end_time > 0) {co_await rstudio::logic::cotasks::Sleep(skill_data.delay_end_time);
    }

    for (auto tid :instance.child_task_list) {co_await rstudio::logic::cotasks::WaitTaskFinish(tid, 10000);
    }
});

咱们能够看到, 依赖 C++20 的新个性和咱们本人封装的调度器, 咱们曾经能够很天然很顺畅的用比拟低的心智累赘来表白原来在 python 中实现的性能了,这应该算是一个非常明显的提高了。

(三)小结

通过下面两版实现的比照, 咱们不难发现:

  • 联合调度器, C++ Coroutine 的实现与脚本一样具备简洁性, 这得益于 Compiler 对 Stack 变量的主动解决, 以及规整的 co\_await 等关键字反对, 从某种程度上, 咱们能够认为这种解决提供了一个简略的类 GC 的能力, 咱们能够更低心智累赘的开发相干代码.
  • 协程的应用同时也会带来其余一些益处, 像防止多级 Callback 带来的代码扩散逻辑凌乱等问题, 这个在 C++17 协程应用的范例中曾经提到过, 此处不再反复.

RoadMap

(一)对 asio coroutine20 实现局部的思考

咱们晓得最新版的 asio 曾经在尝试应用 C++ Coroutine20 来简化它大量存在的异步操作. 先抛开具体的细节以及代码实现品质等问题, 咱们来看一下集体认为 asio 做得比拟好的两点:

  • 低应用老本的经典 callback 兼容计划

asio::awaitable<void> watchdog(asio::io_context& ctx) {asio::steady_timer timer(ctx);
  timer.expires_after(1s);
  co_await timer.async_wait(asio::use_awaitable);
  co_return;
}

这个实现比拟奇妙的中央在于, steady\_timer 的 async\_wait() 接口, 原来承受的是一个 callback 函数, 这个中央, asio 通过引入 asio::use\_awaitable 对象, 实现了 callback 语义到 co\_await 协程语义的转换, 这对于咱们兼容大量蕴含 callback 的历史代码, 是十分具备参考价值的.

asio coroutine 实现的剥析, 在笔者的另外一篇文章 asio 的 coroutine 实现剖析中有具体的开展, 感兴趣的读者能够自行翻阅.

  • 利用操作符定义复合工作
  auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
  if (!e)
  {
    co_await (
        (transfer(client, server, client_to_server_deadline) ||
          watchdog(client_to_server_deadline)
        )
        &&
        (transfer(server, client, server_to_client_deadline) ||
          watchdog(server_to_client_deadline)
        )
      );
  }

协程的应用, 不可避免的会呈现协程与子协程, 协程与协程之间的复合关系, Asio 通过重载 || 运算和 && 运算, 来尝试表白多个异步工作的组合, 具体的作用如下:

||: 用来表白两个同时开始的异步工作, 其中一个胜利执行, 则返回这个执行的后果, 并勾销另外一个异步工作的执行.&&: 用来表白两个同时执行的异步工作, 两个工作都胜利后返回蕴含这两个工作执行后果的 std::tuple<> 值, 其中任意一个工作失败, 则间接返回谬误.

通过这种机制, 咱们肯定水平领有了对工作的复合关系进行表白的能力, 比方对一个本来不反对超时的异步工作, 咱们能够非常简单的 || 上一个超时异步工作, 来解决它的超时反对问题. 这种设计也是很值得参考的.

(二)对于 executions

聊到异步, 不得不说起最近几年频繁调整提案, 直到最近提案才逐渐成熟的 executions 了. 咱们先来简略理解一下 executions:

在底层设计上, executions 与 ranges 十分类同, 都是先解决自身的 DSL 表白的问题, 再来构建更下层的利用, 区别在于 ranges 次要是应用了 CPO 以及 | 运算符来做到这一点, 而 executions 因为自身的复杂度基于 CPO 引入了更简单的 tag invoke 机制, 来组织本人的 DSL, 因为这种表白代码层面有很高的复杂度, 也被社区宽泛的戏称为 “ 存在大量的代码噪声 ”, 或者说开发了一种 ” 方言 ”. 但不可否认, 通过引入底层的 DSL 撑持个性, executions 很好的实现了结构化并发.

目前咱们能够参考学习的工程化实际, 次要是 Meta 公司开发的 libunifex 库, 在结构化并发这部分, libunfix 其实曾经做得比拟好了, 但其自身是存在一些缺点的, 一方面, libunifex 的调度器实现相比 asio, 还存在很大的落差, 另外, 一些反对工程利用的算法也有很多的缺失, 须要更长周期的倒退和稳固.

所以对此, 咱们目前的策略是放弃预研的状态, 在实现上尝试将 libunifex 的调度器更多的联合 asio 的调度器, 并实现一些咱们工程化比拟急需的算法, 逐渐引入 executions 的结构化并发, 对异步进行更好的开发与治理. 但不可否认的是, 目前综合来看, executions 的成熟度和易用性都远远比不上 C++ Coroutine20, 短时间来看, 还是基于 Coroutine 的异步框架更值得投入.

(三)对于后续的迭代

协程局部的个性目前是作为咱们自研引擎框架能力的一部分提供的, 一方面咱们会围绕 Coroutine 以及 Scheduler 补齐更多相干的个性, 如后面说到的对复合的异步工作的反对等, 另外咱们也会尝试一些 Executions 相干的摸索, 如异构并发反对等, 置信随着规范的进一步倒退, 越来越多的人对这块的投入和尝试, 整个 C++ 的异步会向着应用侧更简洁, 表达能力更强的方向进化.

参考资料:

1.asio 官网

2.libunifex 源码库

3.c++ 异步从实践到实际 – 总览篇

4.A Curious Course on Coroutines and Concurrency – David Beazley [1]

5.Marvin’s Blog【程式人生】- C++20 中的 Coroutine [2]

举荐浏览

可能是最全的数据仓库全景科普和开发方法论!

万字避坑指南!C++ 的缺点与思考(上)

看完这篇,成为 Grafana 高手!

10 大性能陷阱!每个 C ++ 工程师都要晓得

正文完
 0