关于数据库:技术内幕-StarRocks-Pipeline-执行框架上

39次阅读

共计 5807 个字符,预计需要花费 15 分钟才能阅读完成。

导读:欢送来到 StarRocks 技术底细系列文章,咱们将为你全方位揭晓 StarRocks 背地的技术原理和实际细节,助你从 0 开始疾速上手这款明星开源数据库产品。本期 StarRocks 技术底细将次要介绍 StarRocks Pipeline 执行框架的基本概念、原理及代码逻辑。

#01

背景介绍

Pipeline 调度与 MPP 调度之间存在着显著的差别,前者是单机多核调度,后者是分布式集群的多机调度。总结下来,Pipeline 调度的目标包含三点:

  1. 升高计算节点的任务调度代价;
  2. 晋升 CPU 利用率;
  3. 充分利用多核计算能力,晋升查问性能、主动设置并行度、打消人为设置并行度的不准确性。

本文将次要介绍 Pipeline 执行框架的基本概念、原理以及代码逻辑,帮忙读者疾速入门 StarRocks 的 Pipeline 执行框架,通过浏览本文,你将把握:

  1. 如何在 Pipeline 执行框架中增加算子;
  2. Source 算子和 Sink 算子如何异步化;
  3. 怎么将单机执行打算拆分成 Pipeline;
  4. 增加新的表达式或者函数须要次要的方面。

#02

基本概念

在深刻 Pipeline 执行框架的细节之前,咱们先来理解一下整体所需的基本概念。这些基本概念,独特形成了 Pipeline 执行框架的底层,倡议大家把握分明。

1、MPP 调度基本概念

物理执行打算(ExecPlan)

物理执行打算是 FE 生成的,由物理算子形成的执行树;SQL 通过 parse、anlyze、rewrite、optimize 等阶段解决,最终生成物理执行打算。

打算碎片(PlanFragment)

PlanFragment 是物理执行打算的局部。只有当执行打算被 FE 拆分成若干个 PlanFragment 后,能力多机并行执行。PlanFragment 同样由物理算子形成,另外还蕴含 DataSink,上游的 PlanFragment 通过 DataSink 向上游 PlanFragment 的 Exchange 算子发送数据。

碎片实例(Fragment Instance)

Fragment Instance 是 PlanFragment 的一个执行实例,StarRocks 的 table 通过分辨别桶被拆分成若干 tablet,每个 tablet 以多正本的模式存储在计算节点上,能够将 PlanFragment 的实例化成多个 Fragment Instance 解决散布在不同机器上的 tablet,从而实现数据并行计算。FE 确定 Fragment Instance 的数量和执行 Fragment Instance 的指标 BE,而后 FE 向 BE 投递 Fragment Instance。在 Pipeline 执行引擎中,BE 上的 PipelineBuilder 会把 PlanFragment 进一步拆分成若干 Pipeline,每个 Pipeline 会依据 Pipeline 并行度参数而被实例化成一组 PipelineDriver,PipelineDriver 是 Pipeline 实例,也是 Pipeline 执行引擎所能调度的根本工作。

物理算子(ExecNode)

物理算子是形成物理执行打算 PlanFragment 的根本元素,例如 OlapScanNode,HashJoinNode 等等。

2、FE 负责 MPP 调度

咱们以上面的简略 SQL 为例,进一步阐明上述概念:

select A.c0, B.c1  from A, B  where A.c0  = B.c0

第一步: FE 产生物理打算并且拆分 PlanFragment,如下图所示,物理打算被拆分成三个 PlanFragment,其中 Fragment 1 蕴含 HashJoinNode,Fragment 0 为 HashJoinNode 的右孩子。

第二步: FE 确定 PlanFragment 的实例数量,创立 Fragment Instance。如果一个 Fragment Instance 把另外一个 Fragment Instance 的输入后果作为输出,则产生数据的一方为上游,输出数据的一方为上游,上游插入 DataStreamSink 用来发送数据,上游插入 ExchangeNode 算子用来接收数据。如下图所示,其中 PlanFragment 1 有 3 个 Fragment Instance。

第三步: FE 将所有 Fragment Instance,一次性 (all-at-once) 投递给 BE,BE 执行 Fragment Instance。

3、Pipeline 调度基本概念

Pipeline

Pipeline 是一组算子形成的链,开始算子为 SourceOperator,开端算子为 SinkOperator。Pipeline 两头的算子只有一个输出端和输入端。

SourceOperator 作为 Pipeline 的起始算子,为 Pipeline 后续算子产生数据,SourceOperator 获取数据的途经有:

  1. 读本地文件或者内部数据源,比方 ScanOperator;
  2. 取得上游 Fragment Instance 的输入数据,比方 ExchangeSourceOperator;
  3. 取得上游 Pipeline 的 SinkOperator 的计算结果,比方 LocalExchangeSourceOperator。

SinkOperator 作为 Pipeline 的开端算子,排汇 Pipeline 的计算结果,并输入数据,输入途经有:

  1. 把计算结果输入到磁盘或者内部数据源,” 比方 OlapTableSinkOperator, ResultSinkOperator”;
  2. 把后果发给上游 Fragment Instance,比方 ExchangeSinkOperator;
  3. 把后果发给上游 Pipeline 的 SourceOperator,比方 LocalExchangeSinkOperator;

Pipeline 的两头算子,既可取得前驱算子的输出,又能够输入数据给后继算子。

Pipeline 计算时,从前向后,先从 SourceOperator 取得 chunk,输入给下一个算子,该算子解决 chunk,产生输入 chunk,而后输入给再下一个算子,这样一直地向前解决,最终后果会输入到 SinkOperator。对于每对相邻的算子,Pipeline 执行线程调用前一个算子 pull_chunk 函数取得 chunk,调用后一个算子的 push_chunk 函数将 chunk 推给它。Pipeline 的 SinkOperator 可能须要全量物化,而其余算子,则采纳 chunk-at-a-time 的形式工作。

以 TPCH-Q5 为例,执行打算,能够划分成若干条 Pipeline,Pipeline 之间也存在上下游数据依赖。如下图所示:

  • P2 依赖 P1
  • P3 依赖 P2
  • P6 依赖 P3,P4,P5
  • P7 依赖 P6
  • P8 依赖 P7

PlanFragment 为树状构造,须要进一步转换为 Pipeline。转换工作由 BE 上的 PipelineBuilder 实现,FE 自身对 Pipeline 无感知。一个 PlanFragment 能够拆分成若干条 Pipeline,相应地,PlanFragment 中的物理算子也须要转换为 Pipeline 算子,比方物理算子 HashJoinNode 须要转换为 HashJoinBuildOperator 和 HashJoinProbeOperator。

Pipeline 算子

Pipeline 算子是组成 Pipeline 的元素,BE 的 PipelineBuilder 拆分 PlanFragment 为 Pipeline 时,物理算子须要转换为成 Pipeline 算子。

Pipeline 实例 (PipelineDriver)

PipelineDriver 是 Pipeline 实例,一条 Pipeline 能够产生多个 PipelineDriver。在代码实现中,Pipeline 由一组 OperatorFactory 形成,Pipeline 能够调用 OperatorFactory 的 create 办法,生成一组 Operator,这组 Operator 即形成 PipelineDriver。如下图所示,依据 dop=3(degree-of-parallelism),Pipeline 实例化 3 条 PipelineDriver,输出数据也被拆分成三局部,每个 PipelineDriver 各自解决一部分。

PipelineDriver 也是 Pipeline 执行引擎的根本调度单位,其本质上是一个协程,具备三种状态:Ready、Running 和 Blocked。

  1. Pipeline 执行线程从就绪队列取得处于 Ready 状态的 PipelineDriver,设置状态为 Running,并执行;
  2. PipelineDriver 本身不会阻塞并挂起执行线程,因为它的阻塞操作(比方网络收发,获取 Tablet 数据,读表面由其余的线程异步化解决。PipelineDriver 发动阻塞操作后,状态会被执行线程标记为 Blocked,并且被动让出 (yield)CPU,放回阻塞队列,执行线程从就绪队列抉择其余的 PipelineDriver 执行。
  3. 当 PipelineDriver 执行工夫超过规定的工夫片(如 20ms),则 PipelineDriver 也会 yield,此时 PipelineDriver 会被标记为 Ready 状态拜访就绪队列,切换其余 Ready 状态的 PipelineDriver 执行。如下图所示:
  • Running:PipelineDriver 在以后执行线程中执行,执行线程重复调用相邻算子的 pull_chunk/push_chunk 函数挪动 chunk。
  • Blocked:PipelineDriver 处于阻塞状态,期待就绪事件,此时 PipelineDriver 不占用执行线程,被搁置在阻塞队列中,由专门的 Poller 线程继续查看 PipelineDriver 的状态,当 PipelineDriver 期待的事件就绪后,状态设置为 Ready,放回就绪队列。
  • Ready:PipelineDriver 执行工夫超过工夫片,会被放回就绪队列;阻塞解除的 PipelineDriver 也会放回就绪队列。执行线程从就绪队列中取得 PipelineDriver 并执行。执行线程的数量为计算节点 BE 的物理核数,而同时 BE 须要调度的 PipelineDriver 可能成千上万,因而执行线程是全局资源,跨所有查问,被所有的 PipelineDriver 所复用(multiplexing)。

Pipeline 引擎中协程调度模型和传统的线程调度模型的次要区别是,前者实现了用户态的 yield 语义,而后者依赖 OS 的线程调度,在高并发场景下的频繁的上下文切换减少了调度老本,升高了 CPU 的无效利用率。如下图所示: 

阻塞操作异步化

实现 Pipeline 执行引擎的协程调度,最为要害解决是阻塞操作异步化,如果没有实现异步化,PipelineDriver 的阻塞操作会导致执行线程陷入内核挂起,进化为 OS 线程调度。为了防止执行线程的上下文切换,须要管制执行线程的数量不超过物理核数,并且执行线程为跨查问的全局资源,这种阻塞挂起会显著影响 CPU 利用率和其余 PipelineDriver 的调度。因而,波及阻塞的操作,须要异步化解决,例如:

  1. ScanOperator 读 Tablet 数据,拜访磁盘。
  2. ExchangeSinkOperator 发送数据,ExchangeSourceOperator 接收数据。
  3. HashJoinProbeOperator 所在 PipelineDriver 期待 HashJoinBuildOperator 实现 HashTable 的构建和 RuntimeFilter 的生成。
  4. 须要全量物化的物理算子拆分成一对 SinkOperator 和 SourceOperator,其中 SinkOperator 位于上游的 Pipeline,而 SourceOperator 位于上游的 Pipeline,SourceOperator 须要期待 SinkOperator 算子实现。比方物理算子 AggregateBlockingNode 转换为 Pipeline 引擎的 AggregateBlockingSinkOperator 和 AggregateBlockingSourceOperator,后者须要期待前者实现。

4、BE 负责 Pipeline 调度

BE 执行 PipelineDriver 应用两种类型的线程和两种队列,别离为 Pipeline 执行引擎的工作线程 PipelineDriverExecutor、阻塞态 PipelineDriver 的轮询线程 PipelineDriverPoller。队列别离为就绪 Driver 队列 (Ready Driver queue) 和阻塞 Driver 队列(Blocked Driver queue),如下图所示:

  • 执行线程 PipelineDriverExecutor: 一直地从就绪 Driver 队列中取得就绪态的 PipelineDriver 并执行,把被动让出 CPU 的 PipelineDriver 再次放回就绪 Driver 队列,把处于阻塞态 PipelineDriver 放入阻塞 Driver 队列。
  • 轮询线程 PipelineDriverPoller: 一直地遍历阻塞 Driver 队列,跳过依然处于阻塞态的 PipelineDriver,将解除阻塞态的 PipelineDriver,设置为 Ready 状态,放回就绪 Driver 队列。

本文次要解说了 Pipeline 执行引擎想解决的问题及一般性原理。

对于 Pipeline 执行引擎的实现,BE 端拆分 Pipeline 的逻辑,以及 Pipeline 实例 PipelineDriver 的调度执行逻辑,StarRocks Pipeline 执行框架(下)见!

读到这里,好学的你是不是又产生了一些新思考与启发?扫描下方用户群二维码退出 StarRocks 社区一起自在交换!

对于 StarRocks

面世两年多来,StarRocks 始终专一打造世界顶级的新一代极速全场景 MPP 数据库,帮忙企业建设“极速对立”的数据分析新范式,助力企业全面数字化经营。

以后曾经帮忙腾讯、携程、顺丰、Airbnb、滴滴、京东、众安保险等超过 170 家大型用户构建了全新的数据分析能力,生产环境中稳固运行的 StarRocks 服务器数目达数千台。

2021 年 9 月,StarRocks 源代码凋谢,在 GitHub 上的星数已超过 3200 个。StarRocks 的寰球社区飞速成长,至今已有超百位贡献者,社群用户冲破 7000 人,吸引几十家国内外行业头部企业参加共建。

正文完
 0