乐趣区

关于flink:Trisk在Flink上实现以task为中心的流处理动态Reconfiguration的Control-Plane

摘要:本文整顿自新加坡国立大学计算机系博士在读生毛言粲在 Flink Forward Asia 2021 核心技术专场的分享。次要内容包含:

  1. 背景:流作业动静调控
  2. 挑战:兼顾普适、高效和易用
  3. 设计:以 Task 为核心的零碎设计
  4. 实现:基于 Flink 的 Barrier 机制
  5. 评估:Trisk 与已有零碎的性能比照

FFA 2021 直播回放 & 演讲 PDF 下载

一、背景:流作业动静调控

流数据处理是十分重要的一种数据处理形式,它在各个领域都有宽泛的利用,比方机器学习、数据分析和实时事件处理以及实时交易等畛域。流解决领有低提早和高吞吐量的个性,它被大规模部署成以流工作实例 stream task 形成的流作业 stream job 并行处理输出的数据流,流作业会部署成并行的流工作,这些流工作实例被两头流连贯,并造成一个有向无环图。

流数据的并行处理是通过将输出数据在并行任务之间进行分区,而后每个工作独立解决调配的分区工作实时实现的,因为流作业是长期执行且会随着工夫抖动,而不同的流作业有不同的性能需求,比方实时交易工作对提早很敏感,而一些数据分析的工作对吞吐量要求很高。为了达到不同流解决作业的性能要求,动静重配置流工作的技术很要害。

常见的数据抖动有如下几类:

  • 第一,输出速率的变动。流作业是长期执行,而数据流的输出速率会不可预测地产生动态变化,导致动态调配的资源无奈低提早高吞吐地解决数据流;
  • 第二,数据歪斜。流数据的数据分布会动态变化,比方某个数据的呈现频率增大会导致对应 stream task 的工作负载变大,提早变大;
  • 第三,新兴事件的产生。流数据中可能会呈现新兴事件或者数据,这种数据无奈被以后执行逻辑正确地执行。比方新型欺骗交易须要通过新的规定能力检测到。

针对不同的数据抖动,有不同类型的重配置技术来优化流作业,从而在保障资源利用率的同时,以高吞吐低延时的性能来解决流数据。

  • 针对输出数列的变动,能够通过 scaling 的形式动静伸缩资源,进步吞吐量、升高提早;
  • 针对数据歪斜,能够通过 load balancing 的形式来从新散布并行执行流工作之间的工作负载,以从新达到负载平衡;
  • 对于新兴事件的解决,能够通过 change of logic 的形式来更新流工作的执行逻辑,从而能够正确地解决新兴事件和数据。

有了不同类型的数据抖动和重配置技术后,须要思考下一个问题就是如何动静地检测数据抖动,并抉择适合的办法来调控流工作。为了解决这个的问题,通常是通过设计一个控制器来对工作进行动静重配置,控制器次要通过实时监听流作业剖析症状,而后针对不同的症状批改不同的流作业配置,来做性能优化。

这个过程分为三步:监听、诊断、重配置。

  1. 首先控制器能够实时监听流工作,目前流作业的控制器次要通过监听系统层面的 metrics 比方 CPU utilization,或利用层面的 metrics 比方端到端的数据处理提早、吞吐量积压等,来进行建模剖析和策略判断;
  2. 而后控制器通过控制策略来诊断症状,控制策略能够通过预约义的规定,比方 CPU 利用率高于肯定阈值就执行 scaling out,或进行模型剖析,比方预测须要达到的资源分配来诊断存在的问题;
  3. 最初控制器抉择不同类型的重配置办法去动静优化流作业。

为了缩小为不同流作业实现控制器的工程开销,须要有一个管制平台来对流作业进行托管。管制平台封装了 metrics 和重配置办法,并且对外提供相应的 API,从而开发者能够在流作业部署好之后通过在管制平台提交控制器,对流作业进行托管。这样的控制器也蕴含了自定义的控制策略,并且能够间接应用管制平台的 API 实现 metrics 的采集和重配置,暗藏了零碎底层的解决逻辑,简化了控制器的设计和开发。

大部分流解决零碎都封装了比拟成熟的 metrics 零碎,因而管制平台能够基于原有零碎 API 实现 metrics 的采集,然而动静重配置的反对仍是一个较大的挑战。

二、挑战:兼顾普适、高效和易用

动静重配置的管制平台该当具备三种性质:

  • 普适性,不同类型的控制策略须要应用不同类型的重配置办法;
  • 高效性,重配置的执行应在短时间内实现,并且尽量不阻塞原数据处理;
  • 易用性,API 应简略易用,用户调用时无需晓得零碎底层逻辑。

然而目前已有的解决方案只能满足上述局部性质,比方 Flink 反对动静地对流作业进行重配置,并提供了简略易用的 online interface 为用户实现控制器流作业的动静重配置。通过批改源代码和从新提交换作业的形式,Flink 的原生反对具备很强的普适性和应用性。然而重新部署也会带来很大的开销。比方资源重调配和全局的状态复原。

Flink 重配置的具体执行流程如下:首先 JobManager 会触发一个 Savepoint 到整个流作业的 pipeline 上,Savepoint 实现之后,以后流作业的 global snapshots 将会返回到 JobManager 中,JobManager 在收到所有的 snapshots 后,终止以后的 pipeline,而后以新的配置重新部署流作业,并从以后的 Savepoint 复原状态从新开始。

三、设计:以 Task 为核心的零碎设计

为了满足重配置的三种性质,咱们将介绍 Trisk:以 Task 为核心的流作业控制平台。

上图是 Trisk 的零碎架构,它反对对流解决的重配置进行定义和实现,提供了以 Task 为核心的配置形象,这个形象蕴含了以后流作业三个维度的执行配置,并且基于形象封装了原子操作,使得配置办法能够通过在形象上组合原子操作来定义。为了提高效率,不同于 Flink 自身提供的主旨和重启机制,Trisk 采纳了局部暂停和复原的技术来执行重配置,并且它的封装能够进一步利用 Flink 零碎中的 Checkpoint 机制来实现一致性。同时 Trisk 提供了易于应用的编程 API,有事后定义好的罕用重配置 API,还将原子操作封装为 API 来让用户自定义重配置。

Trisk 的工作流程如下:

Trisk runtime 保护了 restful API,用户能够通过接口提交管制逻辑代码。接着由 Trisk runtime 编译代码并生成对应的控制策略,它会依据以后流作业的 metrics 做诊断和重配置决策。控制策略诊断到以后运行的流作业的数据抖动后,会通过与 Trisk runtime 交互来对流作业进行重配置。

其过程如下:首先控制策略会从 Trisk runtime 中获取一个 Trisk 配置形象,用来获取以后流作业每个 task 的配置状况,而后会依据诊断后果应用不同类型的原子操作来对 Trisk 形象进行更新。比方,如果判断出了输出速率增高的问题,控制策略将会通过调配更多的资源来部署新的 task,并且重新分配 task 之间的工作量,来减少流作业的吞吐量。最初控制策略会通过把更新好的 Trisk 形象送回到 Trisk runtime 中,Trisk runtime 依据更新好的配置对流作业执行重配置优化。

Trisk 重配置的执行是通过与底层的流零碎进行交互来实现的,采纳了局部暂停与复原的办法来实现工作流程,因而能够防止终止整个流作业的状况下放弃一致性,并且只会对局部 task 进行更新来升高工夫开销。整个过程能够分为三步:prepare-sync-update。

其流程如下:prepare 阶段,流零碎基于更新后的 Trisk 形象,找出被更新的受影响的 task,并筹备这些 task 更新后的理论配置;sync 阶段,为保证数据一致性,执行期间需全局同步流作业并暂停受影响的 task,不受影响的 task 能够继续执行。这里通过 Flink 的 checkpoint barrier 机制实现这个同步过程;update 阶段,受影响的 task 将被独立更新,并在更新实现后继续执行。

Trisk 的三维形象源自于流工作的三个步骤:

  • 第一步:流作业提交到流零碎时,会被封装成一个 Logical Graph 外面蕴含了流工作的执行逻辑,其中顶点 operator 里蕴含了 User Defined Function,边示意 operator 之间的两头数据流,每个 operator 会应用 UDF 来解决输出的数据流,并生成输入流,流入前面的 operator。
  • 第二步:Logical Graph 的每一个 operator 会并行运行肯定数量的 stream task,且输出数据流会被调配到不同的 stream task 并行执行。每个 stream task 调配到的输出数据流被称为该 task 的工作负载配置。
  • 第三步,这些并行的 stream task 会被部署到服务器中物理执行,每个 stream task 都会在一台机器上调配到肯定的资源比方 CPU 和内存,这样的资源分配形容了 stream task 的 resource 配置。

因而 Trisk 的三维形象就是蕴含了以 task 为核心的 execution logic,workload、resources 配置,最终造成了一个有向无环图,寄存在了 Trisk runtime 中。

咱们对形象中的每一个维度的更新都封装了原子操作,通过对三维形象中每一个维度执行原子操作,能够细粒度地重配置流作业,从而满足重配置的普适性。比方 scaling 能够通过调配 resources 来重配置新的 task,并重调配并行任务之间的 workload 来实现。

上图展现了一个 scaling out 的例子,因为输出速率的不平均回升导致 task2 的负载增大、提早回升,且 task3 的利用率也很高,因而咱们须要通过执行 scaling out 来调配一个新的执行工作 task5 并转移一部分 task2 的 workload 到 task5 上,来让以后流作业能持续低提早高吞吐地解决输出流数据。

Trisk 提供了罕用的重配置 API,对应着咱们之前提到的三种重配置办法:scaling、load balancing、change of logic,用户能够应用提供的 API 在 Trisk 上实现控制策略。这些控制策略能够编译为运行在 Trisk runtime 上的线程来动静治理流作业。

上图例子显示了一个能够实现在流作业动静负载平衡的控制策略 load balance 的实现。它通过每秒检测 task 的工作负载,比方监听每个 task 的解决数据量的散布,并在 task 间的散布发生变化时重新分配 task 的 workload 来实现负载平衡。

同时用户也能够通过基于三维形象的原子操作来定义新的重配置办法。咱们将三种原子操作封装成了 assignLogic、assignWorkload、assignedResource 三个 API。

上图展现了 scaling 重配置办法基于对形象执行原子操作的代码实现。通过 assignResource 来为新创建的任务分配资源,而后通过 assignWorkload 重新分配并行任务之间的工作负载来实现。

四、实现:基于 Flink 的 Barrier 机制

Trisk 管制平台是独自运行的一个后盾服务,它封装提供了重配置 API。在 Flink 零碎层中,也退出了一些新的组件来和 Trisk runtime 交互,并且高效执行对流作业的重配置。在 runtime 层中,controller 保留了用户自定义的控制策略和重配置办法。StreamManager 是 Trisk 的外围,它为用户提供了 API 并且保护了 web service 来接管新的 controller。在零碎层中,JobReconfigCoordinator 保护 Trisk 形象到 Flink 物理配置的映射,并协调执行重配置来保障流作业在重配置前后的数据一致性。

每个 StreamTask 会保护一个 TaskConfigManager,它会治理并更新对应 StreamTask 中的配置,来实现重配置。

Flink 外部的组件架构如上图。JobReconfigCoordinator 存在于 Flink 的 JobManager 中,并且在每个 StreamTask 上都保护了一个 TaskConfigManager。JobReconfigCoordinator 和 TaskConfigManager 能够通过 Flink 网络层进行近程交互,实现管制逻辑。

上图展现了重配置在 Flink 上的执行总览。

在 prepare 阶段,Coordinator 会收到 Trisk runtime 层剖析好的形象并筹备好 StreamTask 的新配置。比方对 scaling 分配资源是通过获取一个新的 resource slot 实现,重调配 workload 是通过更新上游 task 的 result partition 和上游 task input gate 来实现的。对于 stateful 的 task 来,重调配 workload 还须要更新 task state backend。

在 synchronize 阶段,Coordinator 会利用 Flink 原有的 checkpoint barrier 机制,对受影响的 task 进行同步和暂停从而保证数据的一致性,其过程次要是通过从 source task 开始向整个 pipeline 发送 barrier,受影响的 StreamTask 会在接管到 barrier 之后暂停并期待来自 Coordinator 的更新指令。

同步实现后进入 update 阶段,Coordinator 会告诉所有受影响的 task 去并行执行 update 来更新本人的配置。StreamTask 在更新完本人的配置后会主动复原执行,并与上下游从新连贯。

具体的实现细节有如下几项:

首先,对 Trisk abstraction 外部的配置和 Flink 的 JobGraph、ExecutionGraph 做了映射,因而 prepare 阶段中 Coordinator 会去更新对应的 JobGraph 和 ExecutionGraph,而后通过 Flink 的 barrier 机制实现了重配置执行中的同步来保证数据一致性。

其次,每个 task 的原子操作都尽量利用 Flink 原有的机制对 StreamTask 进行动静批改。比方 assignWorkload 是通过从新初始化一个 state backend 再从新更新上游 task 的 result partition 和以后 task 的 input gate 实现的。

重配置的具体执行流程分为以下几步:

首先在 prepare 阶段,JobReconfigCoordinator 会更新 JobGraph 和 ExecutionGraph。而后依据更新状况标记受影响的 StreamTask。prepare 实现后,Coordinator 利用 barrier 机制实现整个 pipeline 的同步,从 source task 通过 inject barrier 发送到整个 pipeline。受影响的 task 收到所有上游 task 的 barrier 后会暂停并 ack 到 Coordinator 中,再向上游 task 发送 barrier。上游 task 收到 barrier 之后也执行相似的操作,受影响的 task 暂停并 ack,而不受影响的持续放弃执行。所有 task 都 ack 到了 Coordinator 之后,同步完结。

接下来进入 update 阶段,在 update 阶段,Coordinator 会告诉 TaskConfigManager 去更新 StreamTask 的配置,更新实现后与上下游从新连贯,并继续执行。

至此,重配置流程完结。

五、评估:Trisk 与已有零碎的性能比照

咱们进行了小规模试验,次要围绕以下两点指标:

  • 第一,在 Trisk 上实现的控制器总体成果如何,是否能满足控制器的优化指标比方提早管制?
  • 第二,比照已有的重配置执行技术,如 Flink 原生反对和前沿的 Megaphone 机制,Trisk 的执行效率如何?

试验环境如下:咱们将 Trisk 实现在 Flink-1.10.0 上,并配置了 4 个节点的 Flink standalone cluster,每个节点配置了 8 个 slots。咱们应用了一个实在利用 stock-exchange 和一个合成利用 word-count 来实现。stock-exchange 是一个实时的股票交易工作,须要实时处理股票交易订单,来防止对用户的交易决策造成影响。word-count 是一个罕用于数据分析中的操作,咱们次要对输出流的每一个 key 进行 count。

咱们在 stock-exchange 上实现了一个简略但具备代表性的 latency-aware 控制器。最后 stock-exchange 作业部署了 10 个工作,输出流是股票申报订单,输出曲线如左图所示。控制器能够通过应用 scaling 和 load balancing 来管制作业的提早,次要依据输出速率和工作负载来作出决策。比方在第一百秒的时候,因为输出速率增大,所以做 scaling out;而在第四百秒的时候,因为输出速率升高,所以会做 scaling in。

在 Trisk 和 Flink 上实现的控制器都须要大略 100 行代码,次要蕴含了控制策略的逻辑。

试验后果如右图所示。为了展现控制器的优化成果,咱们次要比照了 Trisk/Flink 的原生反对 / 动态配置下的 stock-exchange 作业的延时变动状况。红线是动态配置的 stock-exchange 作业,绿线是 Flink 上的控制器对流作业的优化成果,蓝线则是 Trisk 对 stock-exchange 的优化成果。

红线结果表明,尽管动态配置在开始时运行良好,但因为输出速率的减少,它无奈实时处理 100 秒过后的数据,导致提早减少了两个数量级。相比之下,应用 Flink 原生配置实现的控制器,可能适应工作负载的变动,然而在执行重配置期间会导致高提早峰值,大略比平时的提早高出 1~2 个数量级。而 Trisk 上做出决策的控制器展现了毫秒级的重配置实现工夫,且只有能够忽略不计的提早增量。这次要归功于 Trisk 的局部暂停与复原技术。

再将 Trisk 重配置执行期间的运行成果与两个现有的办法进行比拟,一个是 Flink 的终止和重启机制,以及 megaphone 提出了 fluid state migration 机制,能够在 key 层面对重配置进行同步和更新。试验中咱们对 word-count 应用了 load balancing,初始配置有 20 个工作,并在第 50 秒时触发 load balancing。整个过程会重调配所有并行任务之间的 workload。

为了理解他们的行为,咱们比拟了执行重配置时的提早和吞吐量。

从提早图能够看出,Trisk 比 Flink 重配置带来的提早低,而与 megaphone 相比,Trisk 具备最短的实现工夫,但峰值提早绝对较高。从吞吐量图中能够看出,在重配置过程中 Trisk 的吞吐量降落了,但复原得比 Flink 快。对于 megaphone 来说,fluid state migration 须要更长的工夫来实现重配置,但在重配置阶段会有更低的峰值提早和更高的吞吐量。

总的来说,咱们提出了 Trisk:以 Task 为核心的管制平台,能够普适、高效、和易用地反对重配置办法。在将来的工作中,咱们也将持续摸索在 Trisk 上实现更多样的控制策略,来更好地利用 Trisk 上的重配置办法。


FFA 2021 直播回放 & 演讲 PDF 下载

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

退出移动版