乐趣区

关于flink:自适应批作业调度器为-Flink-批作业自动推导并行度

作者|王立杰 & 朱翥

点击进入 Flink 中文学习网

一、引言

对大部分用户来说,为 Flink 算子配置适合的并行度并不是一件容易的事。对于批作业,小的并行度会导致作业运行工夫长,故障复原慢,而不必要的大并行度会导致资源节约,工作部署和数据 shuffle 开销也会变大。

为了管制批作业的执行时长,算子的并行度应该和其须要解决的数据量成正比。用户须要通过预估算子须要解决的数据量来配置并行度。但精确预估算子须要解决的数据量是一件很艰难的事件:须要解决的数据量可能每天都在变动,作业中可能会存在大量的 UDF 和简单算子导致难以判断其产出的数据量。

为了解决这个问题,咱们在 Flink 1.15 中引入了一种新的调度器:自适应批作业调度器(Adaptive Batch Scheduler)。自适应批作业调度器会在作业运行时依据每个算子须要解决的理论数据量来主动推导并行度。它会带来以下益处:

  1. 大大降低批处理作业并发度调优的繁琐水平;
  2. 能够依据解决的数据量为不同的算子配置不同的并行度,这对于之前只能配置全局并行度的 SQL 作业尤其无益;
  3. 能够更好的适应每日变动的数据量。

二、用法

使 Flink 主动推导算子的并行度,须要进行以下配置:

  1. 启用自适应批作业调度器;
  2. 配置算子的并行度为 -1。

2.1 启用自适应批作业调度器

启用自适应批作业调度器,须要进行以下配置:

  1. 配置 jobmanager.scheduler: AdaptiveBatch
  2. execution.batch-shuffle-mode 配置为 ALL-EXCHANGES-BLOCKING (默认值)。因为目前自适应批作业调度器只反对 shuffle mode 为 ALL-EXCHANGES-BLOCKING 的作业。

此外,还有一些相干配置来指定主动推导的算子并行度的上上限、预期每个算子解决的数据量以及 source 算子的默认并行度,详情请参阅 Flink 文档 [1]

2.2 配置算子的并行度为 -1

自适应批作业调度器只会为用户未指定并行度的算子(即并行度为默认值 -1)推导并行度。所以须要进行以下配置:

  1. 配置 parallelism.default: -1
  2. 对于 SQL 作业,须要配置 table.exec.resource.default-parallelism: -1
  3. 对于 DataStream/DataSet 作业,防止在作业中通过算子的 setParallelism() 办法来指定并行度;
  4. 对于 DataStream/DataSet 作业,防止在作业中通过 StreamExecutionEnvironment/ExecutionEnvironment 的 setParallelism() 办法来指定并行度。

三、实现细节

接下来咱们将介绍自适应批作业调度器的实现细节。在此之前,咱们简要介绍一下波及到的一些术语概念:

  1. 逻辑节点(JobVertex)[2] 和逻辑拓扑(JobGraph)[3]:逻辑节点是为了更优的性能而将几个算子链接到一起造成的算子链,逻辑拓扑则是多个逻辑节点连贯组成的数据流图。
  2. 执行节点(ExecutionVertex)[4] 和执行拓扑(ExecutionGraph)[5]:执行节点对应一个可部署物理工作,是逻辑节点依据并行度进行开展生成的。例如,如果一个逻辑节点的并行度为 100,就会生成 100 个对应的执行节点。执行拓扑则是所有执行节点连贯组成的物理执行图。

以上概念的介绍能够参见 Flink 文档 [6]。须要留神的是,自适应批作业调度器是通过推导逻辑节点的并行度来决定该节点蕴含的算子的并行度的。

实现细节次要包含以下几局部:

  1. 使调度器可能收集执行节点产出数据的大小;
  2. 引入一个新组件 VertexParallelismDecider [7] 来负责依据逻辑节点须要解决的数据量计算其并行度;
  3. 反对动静构建执行拓扑,即执行拓扑从一个空的执行拓扑开始,而后随着作业调度逐步增加执行节点;
  4. 引入自适应批作业调度器来更新和调度执行拓扑。

后续章节会对以上内容进行具体介绍。

图 1 – 主动推导并行度的整体构造

3.1 收集执行节点产出的数据量

自适应批作业调度器是依据逻辑节点须要解决的数据量来决定其并行度的,因而须要收集上游节点产出的数据量。为此,咱们引入了一个 numBytesProduced 计数器来记录每个执行节点产出的数据分区 (ResultPartition) 的数据量,并在执行节点运行实现时将累计值发送给调度器。

3.2 为逻辑节点决定适合的并行度

咱们引入了一个新组件 VertexParallelismDecider 来负责为逻辑节点计算并行度。计算算法如下:

假如

  1. V 是用户配置的冀望每个执行节点解决的数据量;
  2. totalBytesnon-broadcast 是逻辑节点须要解决的非播送数据的总量;
  3. totalBytesbroadcast 是逻辑节点须要解决的播送数据的总量;
  4. maxBroadcastRatio 是每个执行节点解决的播送数据的比例下限;
  5. normalize(x) 是一个输入与 x 最靠近的 2 的幂的函数。

计算并行度的公式如下:

值得注意的是,咱们在这个公式中引入了两个非凡解决:

  1. 限度每个执行节点解决的播送数据的比例;
  2. 将并行度调整为 2 的幂。

此外,上述公式不能间接用来决定 source 节点的并行度,因为 source 节点不会生产数据。为了解决这个问题,咱们引入了配置选项 jobmanager.adaptive-batch-scheduler.default-source-parallelism,容许用户手动配置 source 节点的并行度。请留神,并非所有 source 都须要此选项,因为某些 source 能够本人推导并行度(例如,HiveTableSource,详情请参阅 HiveParallelismInference),对于这些 source,更举荐由它们本人推导并行度。

3.2.1 限度每个执行节点解决的播送数据的比例

咱们在公式限度每个执行节点解决的播送数据下限比例为 maxBroadcastRatio。即每个执行节点解决的非播送数据至多为 (1-maxBroadcastRatio) V。如果不这样做,当播送数据的数据量靠近 V* 时,即便非播送数据的量十分小,也可能会被计算出很大的并行度,这是不必要的,会导致资源节约和工作部署的开销变大。

通常状况下,一个执行节点须要解决的播送数据量会小于要解决的非播送数据。因而,咱们将 maxBroadcastRatio 默认设置为 0.5。目前,这个值是硬编码在代码中的,咱们后续会思考将其改为可配置的。

3.2.2 将并行度调整为 2 的幂

normalize 函数会将并行度调整为最近的 2 的幂,这样做是为了防止引入数据歪斜。为了更好的了解本节,咱们建议您先浏览 子分区动静映射 局部。

以图 4(b)为例,A1/A2 产生 4 个子分区,B 最终被决定的并行度为 3。这种状况下,B1 将生产 1 个子分区,B2 将生产 1 个子分区,B3 将生产 2 个子分区。咱们假如不同子分区的数据量都雷同,这样 B3 须要生产的数据量是 B1/B2 的 2 倍,从而导致了数据歪斜。

为了解决这个问题,咱们须要让所有上游执行节点生产的子分区数量都一样,也就是说上游产出的子分区数量应该是上游逻辑节点并行度的整数倍。为简略起见,咱们心愿用户指定的最大并行度为 2^N(如果不是则会被主动调整到不超过配置值的 2^N),而后将上游逻辑节点的并行度调整到最靠近的 2^M(M <= N),这样就能够保障子分区被上游平均生产。

不过这只是一个长期的解决方案,最终应该通过 主动负载平衡 来解决,咱们将在后续版本中实现。

3.3 动静构建执行拓扑

在引入自适应批作业调度器之前,执行拓扑是以动态形式构建的,也就是在调度开始前执行拓扑就被齐全创立进去了。为了使逻辑节点并行度能够在运行时决定,执行拓扑须要反对动静构建。

3.3.1 向执行拓扑动静增加节点和边

动静构建执行拓扑是指一个 Flink 作业从一个空的执行拓扑开始,而后随着调度逐渐附加执行节点,如图 2 所示。

执行拓扑由执行节点和执行边(ExecutionEdge)组成。只有在以下状况下,才会将逻辑节点开展创立执行节点并将其增加到执行拓扑:

  1. 对应逻辑节点的并行度曾经被确定(以便 Flink 晓得应该创立多少个执行节点);
  2. 所有上游逻辑节点都曾经被开展(以便 Flink 通过执行边将新创建的执行节点和上游执行节点连接起来)。

图 2 – 动静构建执行拓扑

3.3.2 子分区动静映射

在引入自适应批作业调度器之前,在部署执行节点时,Flink 须要晓得其上游逻辑节点的并行度。因为上游逻辑节点的并行度决定了上游执行节点须要产出的子分区数量。以图 3 为例,上游 B 的并行度为 2,因而上游的 A1/A2 须要产生 2 个子分区,索引为 0 的子分区被 B1 生产,索引为 1 的子分区被 B2 生产。

<img src=”https://img.alicdn.com/imgextra/i3/O1CN01wH5chh1C6F2eR1G4O_!!6000000000031-0-tps-860-924.jpg” alt=”img” style=”zoom:33%;” />

图 3 – 动态执行拓扑生产子分区的形式

但显然,这不适用于动态图,因为当部署上游执行节点时,上游逻辑节点的并行度可能尚未确定(即部署 A1/A2 时,B 的并行度还未确定)。为了解决这个问题,咱们须要使上游执行节点产生的子分区数量与上游逻辑节点的并行度解耦。

咱们通过以下办法实现解耦:将上游执行节点产生子分区的数量设置为上游逻辑节点的最大并行度(最大并行度是一个可配置的固定值),而后在上游逻辑节点并行度被确定后,将这些子分区均分给不同的上游执行节点进行生产。也就是说,部署上游执行节点时,每个上游执行节点都会被调配到一个子分区范畴来生产。假如 N 是上游逻辑节点并行度,P 是子分区的数量。对于第 k 个上游执行节点,生产的子分区范畴应该是:

以图 4 为例,B 的最大并行度为 4,因而 A1/A2 有 4 个子分区。而后如果 B 的确定并行度为 2,则子分区映射将为图 4(a),如果 B 的确定并行度为 3,则子分区映射将为图 4(b)。

图 4 – 动静执行拓扑生产子分区的形式

3.4 动静更新并调度执行拓扑

自适应批作业调度器调度作业的形式和默认调度器基本相同,惟一的区别是:自适应批作业调度器是从一个空的执行拓扑开始调度,在解决任何调度事件之前,都会尝试决定所有逻辑节点的并行度,而后尝试为逻辑节点生成对应的执行节点,并通过执行边连贯上游节点,更新执行拓扑。

调度器会在每次调度之前尝试依照拓扑程序决定所有逻辑节点的并行度:

  1. 对于 source 节点,其并行度会在开始调度之前就进行确定;
  2. 对于非 source 节点,须要在其所有上游节点数据产出实现后能力确定其并行度。

而后,调度程序将尝试依照拓扑程序将逻辑节点开展生成执行节点。一个能够被开展的逻辑节点应该满足以下条件:

  1. 该逻辑节点并行度已确定;
  2. 所有上游逻辑节点都曾经被开展。

四、将来瞻望 – 主动负载平衡

运行批作业时,可能会呈现数据歪斜(某个执行节点须要解决的数据远多于其余执行节点),这会导作业呈现长尾景象,拖慢作业的实现速度。如果 Flink 能够主动改善或者解决这个问题,能够给用户很大的帮忙。

一种典型的数据歪斜状况是某些子分区的数据量显著大于其余子分区。这种状况能够通过划分更细粒度的子分区,并依据子分区大小来均衡工作负载来解决(如图 5)。自适应批作业调度器的工作能够被认为是迈向它的第一步,因为主动从新均衡的要求相似于自适应批作业调度器,它们都须要动态图的反对和后果分区大小的采集。

基于自适应批作业调度器的实现,咱们能够通过减少最大并行度(为了更细粒度的子分区)和简略地更改子分区范畴划分算法(为了均衡工作负载)来解决上述问题。在目前的设计中,子分区范畴是依照子分区的个数来划分的,咱们能够改成依照子分区中的数据量来划分,这样每个子分区范畴内的数据量能够大致相同,从而均衡上游执行节点的工作量。

图 5 – 主动负载平衡

正文

[1] https://nightlies.apache.org/…

[2] https://github.com/apache/fli…

[3] https://github.com/apache/fli…

[4] https://github.com/apache/fli…

[5] https://github.com/apache/fli…

[6] https://nightlies.apache.org/… 数据结构

[7] https://github.com/apache/fli…


点击进入 Flink 中文学习网

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…

退出移动版