共计 5270 个字符,预计需要花费 14 分钟才能阅读完成。
最近在致力同步保护 SegmentFault 的文章积攒~不便后续继续更新~
原文是 2019 年 7 月底开源后陆续 po 的,这里对近况进行了调整和补充。
心愿本人和我的项目都能够继续提高 (๑╹ヮ╹๑)ノ 欢送多多交换!!
尽管我更新本博客比较慢,然而 github 上的 workflow 我的项目自身在继续更新中~这一年多以来改了好多好多。这里仅列出一些根本的特点:
- 包含通信、计算、文件 IO、定时器、计数器等异步资源
- 创新性引入的工作流概念,各种异步工作都能够被对等地组装起来实现简单的业务逻辑
- 外部自带多种通用协定,包含
http
,redis
,mysql
和kafka
等 - 跨平台、反对多种操作系统
- 自带命名服务,包含过程内服务治理与负载平衡,轻松构建微服务零碎
- 实用于实现任何计算与通信关系非常复杂的高性能高并发的后端服务
附上 github 地址:👉 https://github.com/sogou/workflow
明天我还是要抱着跟大家学习的心态,急不可待要从整体的角度来写一下 workflow 的架构。并且郑重声明一下,本篇只是自己作为开发者之一、想尽快和大家学习交换而写的集体梳理,咱们并没有官网号或公司内其余技术号去发文章,所以这里就当做是其中一个开发者的一丢丢分享了~
架构设计必然是从底向上开始,所以咱们间接从 kernel 目录的设计思路开始聊。
1. 封装调度器
上次说到,咱们作为异步调度框架,目前反对的异步调度资源分为 6 种:
这里能够举大家平时接触最多的网络通信框架和计算调度框架作为重点解说一下。
咱们须要封装调度器去操作这些系统资源,简略来说就是操作一批网络连接或者说线程。留神这里说的“操作”,也就是说调度器远不止连接池和线程池那么简略,咱们要做的事件是:
- 蕴含与治理资源池
- 实现如何对一批连贯尽可能高性能地响应其读写、如何尽可能快且尽可能通用地给出一个足够灵便的机制去让各线程执行各种计算
- 提供申请接口给下层应用
咱们以线程执行器 Executor 为例来看看具体怎么做。以上第二点尽可能快又足够灵便的机制,就是咱们设计的 ExecQueue,在以下代码得以体现:
class Executor
{
public:
// 一次要执行的接口,对于线程执行器来说,就是把一个执行工作扔进某个队列中
int request(ExecSession *session, ExecQueue *queue);
private:
// 执行器和系统资源,是一个蕴含关系
thrdpool_t *thrdpool;
};
2. 封装调度的根本单位
构思完了调度器,咱们须要构思一下被调度的根本单位。
对应每种能够调度对象的零碎接口,咱们必须封装本人的构造,作为每次与系统资源交互的根本单位,通过调度器提供的申请接口,扔到调度器里被调度。
具体来说,这显然是一次网络交互、或者一次线程须要执行的计算工作。而后每个根本单位上,能够有上下文、供子类做具体实现的接口 / 函数指针等等。
咱们以网络交互为例:
class CommSession
{
// 往连贯上要发的数据
virtual CommMessageOut *message_out() = 0;
// 连贯上收到数据流,如何切下一个数据包
virtual CommMessageIn *message_in() = 0;
// 本次网络事件被响应的机会
virtual void handle(int state, int error) = 0;
…
// 个别咱们的上下文是存在派生类上
};
阶段性总结一下,写到这里,咱们就能够欢快地做网络收发或者线程调度了~这些模块都曾经是能够独自拆出来用的。
作为框架,咱们基于上述的多种调度器和调度单位,能够给用户封装各种具体网络协议和计算算法。然而这还不是咱们的串并行任务零碎的外围价值。
3. 工作流
咱们想要实现工作流(无论是 DAG 还是串并连),意味着咱们须要一套机制去按程序触发具体的子工作执行、并接管其执行完之后要做的事件。实现的形式有很多,咱们做了一套子工作零碎来满足形象的任务调度,而这个工作自身是网络通信还是计算,都不重要。
因为咱们的子工作是要给异步框架用的,所以每个工作你不能只有一个接口:execute()
之类,咱们必须有开始执行的 dispatch()
和执行结束的 done()
两个须要实现,而工作流零碎自身只是做按程序调起你的开始和完结这两个接口的事件。
class SubTask
{
// 子工作被调起的机会
virtual void dispatch() = 0;
// 子工作执行实现的机会
virtual SubTask *done() = 0;
// 外部实现,决定了工作流走向
void subtask_done();
…
};
对于工作流,之后会具体介绍其概念,有做相似事件的小伙伴欢送多多交换互相学习,我也会多翻阅一些材料再写,这是十分十分有意思的一个主题。
4. 能够被工作流执行的根本调度单位
让每个根本单位能够被工作流执行上来,并且被某些调度器调度,做法很简略,从执行单位和子工作独特派生进去就能够了:
class CommRequest : public SubTask, public CommSession
{
// 咱们来实现以下 SubTask 的 dispatch 接口
// 这个网络工作被调起,咱们要做的事件,就是发送网络申请
// 这个通过调用具体通信器的 request 去发消息
void dispatch()
{
if (this->scheduler->request(this, this->object, this->wait_timeout,
&this->target) < 0)
{…}
}
// 而后是 CommSession 的 handle 接口
// 这个接口的意思是网络事件被响应的机会
// 假如咱们作为一个 client,发送完申请后,咱们关注的事件是这个 fd 上的写事件
// 所以这里被调起意味着有回复了(当然也可能超时
void handle(int state, int error)
{
// 解决各种谬误
…
// 咱们在这里调用一下 Subtask 的 subtask_done,让后续工作自身得以执行上来
this->subtask_done();}
};
学习委员划重点:每一个能够被调度的根本单位,想同时具备子工作的属性,则必须子类里执行这个subtask_done()
,以此买通工作流。
5. 根本工作
咱们目前为止,介绍的都是 kernel 的内容,当初咱们来接触一下更为具体的概念:工作。
咱们须要一层 infrastructure 的根本工作层,对接每一种具体的系统资源,比方:
- ExecRequest 封装进去的工作是个 WFThreadTask
- CommRequest 封装进去的工作就应该是个 WFNetworkTask
这里能够看到,资源和工作都是一一对应的,这是目前集体认为框架外部做得比拟好的形象之一。
持续以网络申请看看,派生进去的工作应该长怎么样。
看过咱们的 tutorial 的小伙伴应该晓得(后面文章也介绍过),咱们有工作流 Series 的概念。所以这一层的根本工作,都须要做的事件是:
- 治理好所在的 series(没有的话,默默创立一个,这样他人能力串到你后边~
- 异步所须要的上下文
- 异步所须要的回调函数
template<class REQ, class RESP>
class WFNetworkTask : public CommRequest
{void start()
{assert(!series_of(this));
Workflow::start_series_work(this, nullptr);
}
// 这个 user_data 是给开发者用的
void *user_data;
// 这是网络工作自身的上下文:要发送的申请和要接管的回复
REQ req;
RESP resp;
// 回调函数
std::function<void (WFNetworkTask<REQ, RESP> *)> callback;
};
6. 用户接口
方才看到的曾经是具体资源所对应的工作了~那么,咱们在这些资源上,能够做什么?
- 对于网络工作,咱们须要做协定;
- 对于计算工作,咱们须要写算法;
网络工作的协定方才看到,是两个模版类型,即咱们通过某种特化就能够指定一种具体协定的网络工作了(显然没有那么简略!然而先这样介绍哈哈哈 ^_^
using WFHttpTask = WFNetworkTask<protocol::HttpRequest,
protocol::HttpResponse>;
using http_callback_t = std::function<void (WFHttpTask *)>;
using WFRedisTask = WFNetworkTask<protocol::RedisRequest,
protocol::RedisResponse>;
using redis_callback_t = std::function<void (WFRedisTask *)>;
using WFMySQLTask = WFNetworkTask<protocol::MySQLRequest,
protocol::MySQLResponse>;
using mysql_callback_t = std::function<void (WFMySQLTask *)>;
using __WFKafkaTask = WFNetworkTask<protocol::KafkaRequest,
protocol::KafkaResponse>;
using __kafka_callback_t = std::function<void (__WFKafkaTask *)>;
而后,因为咱们是不依赖任何第三方协定库的,所以这些协定都是亲手解析的~写好了具体的 HttpMessage,咱们就能够特化出一个 Http 工作了。
所有用户通过工厂创立进去的工作,拿到的类型都在图二的 User Interface 层。
7. 具体实现
每种资源所对应的做法都是十分对称的,让咱们能够看到计算机世界的美,和巴赫的均匀律一样精妙~
- 网络对应的是协定、申请、回复
- 计算对应的则是算法、输出、输入
这里以算法工作来讲一下吧。咱们一个排序算法,用户拿到的是个 WFSortTask:
// 排序工作是线程的排序算法的特化,输入输出
template<typename T>
using WFSortTask = WFThreadTask<algorithm::SortInput<T>,
algorithm::SortOutput<T>>;
template<typename T>
using sort_callback_t = std::function<void (WFSortTask<T> *)>;
// 算法工厂
class WFAlgoTaskFactory
{
public:
// workflow 的所有工作都是要由工厂来 create 的~
template<typename T, class CB = sort_callback_t<T>>
static WFSortTask<T> *create_sort_task(const std::string& queue_name,
T *first, T *last,
CB callback);
…
// 这个接口能够创立一个具体用来做并行排序算法的工作
template<typename T, class CB = sort_callback_t<T>>
static WFSortTask<T> *create_psort_task(const std::string& queue_name,
T *first, T *last,
CB callback);
…
};
然而,具体到底是创立一个繁多的排序工作,还是我能够并行排序,是由调用 create_sort_task()
还是 create_psort_task()
接口来决定的。这是咱们设计框架时老大说得最多的一句话:
“一切都是行为派生!”
(P.S. 第二多的话有可能是 ” 颖欣你这里写得不对啊 ”😭。。。anyway…
咱们就能够看到图二,最上边的这层 Implementation,是外部针对不同 api 所生成的具体实现,然而返回给用户的都是同一类 task,这样用户在应用 callback 的时候,都是同一种参数,比方排序工作,大家都是:
std::function<void (WFSortTask<T> *)>;
8. 过程级资源管理
回到图一最上层:Instance Manager。
方才说到的执行器,申请接口是把一个要执行的工作扔到一个队列里。这个队列是在哪里创立的呢?
咱们全局会有过程级的一些资源,个别是应用单例模式,用户应用到的时候才会创立对应的资源管理器。上周有热心小伙伴提到过各种资源的纵向拆分问题,不便用户只用某种资源的异步调度,然而因为自身如果只用到网络,那么计算调度器是不会被创立的,所以一般来说编译到一起也没问题。如果小伙伴想编译时就拆开,目前来说能够本人改 cmake~
以上是横向介绍的一些档次,当前会有具体每个纵向资源的更具体的设计想法与大家交换~
1. C++ Workflow 异步调度框架 – 根本介绍篇(上一篇)
3. C++ Workflow 异步调度框架 – 性能优化上篇(下一篇)