乐趣区

关于flink:FLIP147支持包含结束任务的-Checkpoint-操作与作业结束流程修正

作者|高赟(云骞)

点击进入 Flink 中文学习网

第一局部

简介

Flink 能够同时反对无限数据集和有限数据集的分布式解决。在最近几个版本中,Flink 逐渐实现了流批一体的 DataStream API 与 Table / SQL API。大部分用户都同时有流解决与批处理的需要,流批一体的开发接口能够帮忙这些用户减小开发、运维与保障两类作业处理后果一致性等方面的复杂度,例如阿里巴巴双十一的场景 [1]

图 1. 流执行模式与批执行模式的比照。以 Count 算子为例,在流模式下,达到的数据是无序的,算子将会读写与该元素对应的状态并进行增量计算。而在批模式下,算子将首先对数据排序,雷同 Key 的数据将被对立解决。

在流批一体的接口之下,Flink 提供了两种不同的执行模式,即流执行模式与批执行模式。流执行模式下 Flink 基于中间状态增量的解决达到的数据,它能够同时反对无限数据与有限数据的解决。批执行模式则是基于按拓扑序顺次执行作业中的所有工作,并通过事后对数据进行排序来防止对状态的随机拜访,因而它只能用于无限数据的解决,然而个别状况下能够获得更好的性能。尽管许多场景下用户间接采纳批处理模式来解决无限数据集,然而也存在许多场景用户依然依赖流解决模式来解决无限数据集。例如,用户可能想要应用 SQL 的 Retraction 性能或者用户可能依赖于流模式下数据近似按工夫有序的性质(例如 Kappa+ 架构 [2])。此外,许多用户须要执行同时包含有限数据流与无限维表的作业,这类作业也必须采纳流执行模式。

在流执行模式下,Checkpointing [3] 是保障 Exactly-once 语义的外围机制。通过定期的保留作业的状态,当产生谬误时 Flink 能够从最新的保留点复原并继续执行。然而,在之前的版本中,Flink 不反对当局部工作执行完结之后进行 Checkpoint 操作。对于同时包含有限和无限数据输出的作业,这个问题将导致当无限数据输出解决实现后作业无奈持续进行 Checkpoint 操作,从而导致当产生谬误时须要从很久之前从新计算。

此外,无奈对于局部工作完结后的作业进行 Checkpoint 操作也会影响应用两阶段提交 Sink 来保障端到端一致性 [4] 的作业。为了保障端到端一致性,两阶段提交的 Sink 通常首先将数据写入临时文件或者启用内部零碎的事务,而后在 Checkpoint 胜利实现后提交 Checkpoint 之前写入的数据,从而防止在产生谬误后重放这部分数据导致数据反复。然而,如果作业中蕴含无限数据源,在这部分源节点工作完结后作业将无奈持续提交数据。特地是对于所有数据源均为无限数据源的状况,作业总是无奈提交最初一次 Checkpoint 到作业执行完结两头的这部分数据。在之前的实现中,Flink 在作业完结时间接疏忽这部分数据,这对于用户带来了极大的困扰,在邮件列表中也有不少用户在询问这个问题。

因而,为了欠缺流执行模式对无限数据流的反对,Flink 须要:

  1. 反对工作完结后持续进行 Checkpoint 操作。
  2. 修改作业完结的流程,保障所有数据都能够被失常提交。

下文咱们将首先简要形容针对这两个指标所进行的改变。在第二局部,咱们也将分享更具体的实现。

反对蕴含结束任务的 Checkpoint

总体来说,反对蕴含结束任务的 Checkpoint 操作的外围思路是给曾经执行实现的算子打标,从而在重启后能够跳过这部分算子的执行。如图 2 所示,在 Flink 中,Checkpoint 是由所有算子的状态来组成的。如果一个算子的所有并发都曾经执行实现,那们咱们就能够将该算子标记为『执行实现』,并在重启后跳过。对于其它算子,算子的状态是由所有以后还在运行的并发的状态组成,当重启后,算子状态将在所有并发中从新划分。

图 2. 扩大的 Checkpoint 格局

为了可能在有工作执行实现的状况下实现上述 Checkpoint 操作,咱们批改了 Checkpoint 操作的流程。之前在进行 Checkpoint 时,JobManager 中的 Checkpoint 控制器将首先告诉所有的源节点保留以后状态,而后源节点将通过 Barrier 事件告诉后续的算子。因为当初源节点可能曾经执行实现,Checkpoint 控制器须要改为告诉那些自身尚未执行完结、然而所有的前驱工作都曾经执行实现的工作。最初,如果一个算子所有工作要么在开始 Checkpoint 的时候曾经变为『实现』状态、要么在保留以后状态时曾经解决实现所有数据,该算子就会被标记为『执行实现』。

除了在 Checkpoint 时的确有工作执行实现的状况下咱们限度作业降级时对作业拓扑构造的批改,上述批改对用户是通明的。具体来说,咱们不容许用户在一个被标记为『执行实现』的算子前减少新的算子,因为这将导致一个『执行实现』的算子有一个尚未『执行实现』的前驱,而这违反了 Flink 中算子按拓扑序完结的语义。

修改作业完结的流程

基于上述对蕴含结束任务的作业进行 Checkpoint 的能力,咱们当初能够解决两阶段提交的算子在流模式下无奈提交最初一部分数据的问题。总的来说,Flink 作业完结有两种可能的形式:

  1. 所有数据源都是无限的,这种状况下作业会在解决完所有输出数据并提交所有输入到内部零碎后完结。
  2. 用户显式执行 stop-with-savepoint [–drain] 操作。这种状况下作业会创立一个 Savepoint 后完结。如果指定了 –drain,作业将永恒完结,这种状况下须要实现所有内部零碎中长期数据的提交。另一方面,如果没有指定该参数,那么作业预期后续会基于该 Savepoint 重启,这种状况下则不须要肯定实现所有长期数据的提交,只有放弃 Savepoint 中记录的状态与内部零碎中长期数据的状态统一即可。

咱们首先看一下所有数据源无限的状况。为了可能实现端到端的一致性,应用两阶段提交的算子只在 Checkpoint 实现之后才会提交该 Checkpoint 之前的数据。然而,在之前的实现中,对于最初一个周期性的 Checkpoint 到作业执行完结期间所产生的数据,作业是没有一个适合的机会来进行提交的,从而导致数据失落。须要留神的是咱们在作业完结的时候间接提交这部分数据也是不可取的:如果在某个工作实现提交之后,因为其它工作产生谬误导致产生了重启,那么从最初一次 Checkpoint 开始的数据就会被重放,从而导致数据反复。

用户通过 stop-with-savepoint [–drain] 来进行作业的状况同样存在问题。在之前的实现中,Flink 将首先阻塞所有的工作,而后创立一个 Savepoint。在 Savepoint 胜利之后,所有的数据源工作将被动进行运行,从而使整个作业执行完结。只管看起来咱们能够通过这次 Savepoint 来提交所有数据,然而在当初的实现中,还有一些逻辑实际上是在作业进行运行的过程中执行的,如果这些逻辑产生了新的数据,这些数据最终会失落。例如,在之前的实现中 endInput() 办法就是在作业进行过程中执行的,一些算子可能在该办法中发送数据,例如用于异步操作的 AsyncWaitOperator。

最初,只管不指定 drain 参数时,执行 stop-with-savepoint 不须要提交所有的数据,然而咱们还是心愿这种状况下作业完结的流程能够与前两种状况对立,从而保障代码的可维护性。

为了解决现有实现中存在的问题,咱们须要批改作业完结的流程来保障在须要的时候所有的数据都能够保障提交。如图 3 所示,一个间接的想法是咱们能够在工作生命周期减少一步,让工作在完结之前期待下一个 Checkpoint 实现。然而,如下文所述,这种形式依然不能解决所有问题。

图 3. 两种保障工作完结前实现数据提交的办法比照。第一种办法间接在工作的生命周期中插入一步,即期待下一个 Checkpoint 完结,但这种形式下不同工作无奈期待同一个 Checkpoint / Savepoint。第二种形式解耦了『实现执行逻辑』与『工作完结』,从而容许所有工作首先实现数据处理,而后它们有机会期待同一个 Checkpoint / Savepoint。

对于所有数据源都是无限的状况,这种间接的形式能够解决数据无奈提交的问题,然而它可能导致比较严重的性能问题。如图 4 所示,如果有多个级连的工作,每个工作都蕴含两阶段提交的 Sink,那么每个工作都须要在完结之前期待下一次 Checkpoint 实现,这样整个作业须要期待 3 个 Checkpoint 能力完结,这将对作业的执行工夫有较大的影响。

图 4. 一个有多级工作并且每个工作都蕴含两阶段提交算子的例子

对于 stop-with-savepoint [–drain] 的状况,这种间接的想法就不能施行了,因为这种状况下因为不同的工作必须期待不同的 Checkpoint / Savepoint,最终作业无奈失去一个残缺的 Savepoint。

因而,咱们无奈采纳这种间接的想法。咱们采纳的形式是将『作业实现所有执行逻辑』与『作业完结』解耦:咱们首先让所有工作实现所有的执行逻辑,包含 调用“endInput()”这些生命周期办法在内,而后所有的工作就能够并行的期待下一个 Checkpoint / Savepoint 了。此外,对于 stop-with-savepoint [–drain] 的状况,咱们也相似的反转以后实现:所有任首先实现所有的执行逻辑,而后它们就能够期待下一个 Savepoint 实现后完结。能够看出,通完这这种形式,咱们能够以同样的流程来对立所有作业完结的状况。

基于这一思维,如图 3 的右半局部所示,为理解耦『作业实现所有执行逻辑』与『作业完结』,咱们引入了一个新的 EndOfData 事件。对于每一个工作,在完所有的执行逻辑后,它将首先向所有上游发送一个 EndOfData 事件,这样上游也能够明确推断出本人实现了所有的执行逻辑。而后所有的工作就能够并行的期待下一次 Checkpoint 或者指定的 Savepoint 实现后,此时这些工作能够向内部零碎提交所有数据后完结。

最初,在批改过程中,咱们还重新整理和重命名了『close()』和『dispose()』两个算子生命周期算法。这两个办法的语义是有所区别的,因为 close() 实际上只在作业失常完结的状况下调用,而 dispose() 在失常完结和异样退出的状况下都会调用。然而,用户很难从这两个名子上看出这个语义。因而,咱们将这两上办法重命名为了『finish()』和『close()』:

  1. finish() 标记着所有的算子曾经执行实现,并且不会再产生新的数据。因而,只有当作业失常完结并且曾经齐全执行实现(即所有数据源执行完结或者用户应用了 stop-with-savepoint –drain)的状况下才会调用。
  2. close() 在所有状况下都会调用,用于开释工作占用的资源。

第二局部

在上述第一局部中,咱们曾经简要介绍了为反对蕴含结束任务的 Checkpoint 以及优化作业完结流程所做的工作。在这一部分中咱们将更介绍更多实现细节,包含蕴含结束任务时 Checkpoint 的具体流程与作业完结的具体流程。

蕴含结束任务的 Checkpoint 实现

如第一局部所述,反对蕴含结束任务的 Checkpoint 操作的核心思想是对曾经齐全执行实现的算子打标,并在重启后跳过这些算子的执行。为了实现这一思维,咱们须要批改当初 Checkpoint 的流程来创立这些标记并且在复原时应用这些标记。本节将介绍这一流程的细节实现。

在之前的实现中,只有当所有工作都在运行状态时才能够进行 Checkpoint 操作。如图 5 所示,在这些状况下 Checkpoint 协调器将首先告诉所有的数据源工作,数据源工作在实现状态保留后再持续告诉后续工作。相似的,在局部工作执行完结的状况下,咱们须要首先找到以后仍在运行的局部中的新的『源工作』,也就是那些正在运行然而所有前驱工作都曾经执行实现的工作,而后通过告诉这些工作来启动 Checkpoint 负责。Checkpoint 协调器在 JobManaer 中工作记录的最新状态原子的计算以后的『源工作』列表。

告诉这些源工作的过程可能存在 状态竞争:当 Checkpoint 协调器选中一个工作进行告诉的过程中,这个工作可能恰好执行实现并汇报完结状态,从而导致告诉失败。在这种状况下,咱们抉择终止这次 Checkpoint。

图 5. 局部工作完结后 Checkpoint 的 Trigger 形式

为了在 Checkpoint 中记录算子的完结状态,咱们须要扩大 Checkpoint 的格局。一个 Checkpoint 是由所有有状态算子的状态组成的,而每个算子的状态则是由它所有并发实例的状态组成。这里须要指出的是,工作(Task)这一概念并不在 Checkpoint 中反映。工作更多的是一个物理执行的窗口,用于驱动它所蕴含的所有算子并发实例的执行。然而在一个作业的屡次执行中,因为用户可能批改作业拓扑构造,从而使工作的划分发生变化,因而工作在两次执行中可能不是一一对应的。因而,执行完结的标记须要附加在 Checkpoint 中的算子状态上。

如第一局部图 2 所示,在进行 Checkpoint 时依据算子以后的执行状态能够将算子分为三类:

  1. 齐全执行完结:如果一个算子的所有并发实例都执行实现,该算子能够被认为齐全执行完结,在重启后能够跳过该算子的执行。咱们就须要对这些算子打标。
  2. 局部执行完结:如果一个算子的局部实例执行实现,那么它在作业重启后须要继续执行残余的逻辑。整体上咱们能够认为这种状况下算子的状态是由所有仍在执行的并发实例的状态组成的,这些状态能够代表尚未执行实现的逻辑。
  3. 没有实现的实例:这种状况下算子状态与现有实现雷同。

后续作业从 Checkpoint 中重启时,咱们能够跳过齐全执行完结的算子,并且继续执行其它两种类型的算子。

然而,对于局部执行完结的算子,理论状况会更加简单。在重启时,局部执行完结算子的残余状态将会被从新散发到所有实例中,这一流程与算子并发批改的状况相似。对于所有类型的状态,Keyed State [5] 与一般的 Oeprator State [6] 的状态能够失常散发,然而 Broacast State [7] 与 Union Operator State [8] 存在问题:

  1. Broadcast State 在重启后总是将第一个并发实例的状态播送给所有的新的并发实例。然而,如果第一个并发实例曾经执行完结,那么它的状态将为空,这将导致所有并发实例的状态变为空,算子将从头执行,这是不合乎预期的。
  2. Union Operator State 在重启后会将所有算子的状态聚合后分发给所有的新的并发实例。基于这一行为,许多算子可能会抉择其中一个并发实例来存储所有并发实例共享的状态。相似的,如果所选中的并发实例曾经执行完结,那么这部分状态就失落了。

这理论批改并发的场景中,这两个问题是不会产生的,因为这种状况下并不存在已执行实现的子工作。为了解决上述问题,对于 Broadcast State,咱们抉择任意一个运行状态的子工作做为播送状态的起源;对于 Union Operator State,咱们须要保障可能收集所有子工作的状态,因而目前如果咱们察看到一个应用了 Union Operator State 的算子局部执行完结的话,咱们勾销这次 Checkpoint,后续等到该算子所有子工作执行实现,Checkpoint 将能够持续。

原则上,用户能够在两次执行期间对拓扑进行批改。然而,思考到工作完结的状况,对拓扑批改有肯定的限度:用户不能在一个齐全完结的算子之前减少新的算子。Flink 将在作业重启时进行检测并在有此类批改时报错。

修改后的作业完结流程

如第一局部所述,基于在局部工作完结后持续做 Checkpoint 的能力,咱们能够对现有作业完结流程进行修改,从而保障两阶段提交的算子总是能够失常提交数据。本节将详细描述批改前后的完结流程。

原作业完结流程

如前文所述,作业完结包含两种状况:所有数据源完结或用户执行 stop-with-savepoint –drain。咱们首先来看一下之前的作业完结流程。

所有数据源完结

如果所有的数据源都是无限的,那么作业将在所有数据处理实现后完结,并且所有数据都须要提交。在这种状况下,数据源工作将首先发送一个 MAX_WATERMARK (Long.MAX_VALUE) 而后开始结束任务。在完结过程中,工作将顺次对所有算子调用 endOfInput()、close()、和 dispose(),而后向上游发送 EndOfPartitionEvent 事件。后续工作在收所有输出边中都读到 EndOfPartitionEvent 事件后,也会开始执行完结流程,这一过程一直反复直到所有工作都完结。

  1. Source operators emit MAX_WATERMARK
  1. On received MAX_WATERMARK for non-source operators

    a. Trigger all the event-time timers

    b. Emit MAX_WATERMARK

  1. Source tasks finished

    a. endInput(inputId) for all the operators

    b. close() for all the operators

    c. dispose() for all the operators

    d. Emit EndOfPartitionEvent

    e. Task cleanup

  1. On received EndOfPartitionEvent for non-source tasks

    a. endInput(int inputId) for all the operators

    b. close() for all the operators

    c. dispose() for all the operators

    d. Emit EndOfPartitionEvent

    e. Task cleanup

用户执行 stop-with-savepoint –drain

用户能够对无限或有限数据流作业执行 stop-with-savepoint [–drain] 操作来完结作业。在这种状况下,作业将首先触发一个同步 Savepoint 操作,并且阻塞所有工作直到该 Savepoint 实现。如果 Savepoint 胜利实现,那么所有的数据源工作将被动执行完结流程,后续流程与所有数据源无限的状况相似。

  1. Trigger a savepoint
  1. Sources received savepoint trigger RPC

    a. If with –-drain

    ​ i. source operators emit MAX_WATERMARK

    b. Source emits savepoint barrier

  1. On received MAX_WATERMARK for non-source operators

    a. Trigger all the event times

    b. Emit MAX_WATERMARK

  1. On received savepoint barrier for non-source operators

    a. The task blocks till the savepoint succeed

  1. Finish the source tasks actively

    a. If with –-drain

    ​ ii. endInput(inputId) for all the operators

    b. close() for all the operators

    c. dispose() for all the operators

    d. Emit EndOfPartitionEvent

    e. Task cleanup

  1. On received EndOfPartitionEvent for non-source tasks

    a. If with –-drain

    ​ i. endInput(int inputId) for all the operators

    b. close() for all the operators

    c. dispose() for all the operators

    d. Emit EndOfPartitionEvent e. Task cleanup

该命令有一个可选的 –drain 的参数,如果未指定该参数,后续作业能够从 Savepoint 复原执行,否则用户预期作业永恒完结。因而,只有用户指定该参数的状况下,作业才会发送 MAX_WATERMARK 并且对所有算子调用 endInput()。

修改后作业完结流程

如第一局部所述,在修改后的完结流程中,咱们通过减少一个新的 EndOfData 事件解耦了『工作实现执行逻辑』与『工作完结』。每个工作将首先在实现全副执行逻辑后向上游发送一个 EndOfData 事件,这样上游工作也能够先实现所有执行逻辑,而后所有工作就能够并行的期待下一个 Checkpoint 或 指定的 Savepoint 来实现提交所有数据。

本节将详细描述修改后的执行流程。因为咱们将 close() / dispose() 办法重命名了 finish() / close(),咱们将在后续形容中保持应用这一术语。

修改后的执行流程如下:

  1. Source tasks finished due to no more records or stop-with-savepoint.

    a. if no more records or stop-with-savepoint –-drain

    ​ i. source operators emit MAX_WATERMARK

    ​ ii. endInput(inputId) for all the operators

    ​ iii. finish() for all the operators

    ​ iv. emit EndOfData[isDrain = true] event

    b. else if stop-with-savepoint

    ​ i. emit EndOfData[isDrain = false] event

    c. Wait for the next checkpoint / the savepoint after operator finished complete

    d. close() for all the operators

    e. Emit EndOfPartitionEvent

    f. Task cleanup

  1. On received MAX_WATERMARK for non-source operators

    a. Trigger all the event times

    b. Emit MAX_WATERMARK

  1. On received EndOfData for non-source tasks

    a. If isDrain

    ​ i. endInput(int inputId) for all the operators

    ​ ii. finish() for all the operators

    b. Emit EndOfData[isDrain = the flag value of the received event]

  1. On received EndOfPartitionEvent for non-source tasks

    a. Wait for the next checkpoint / the savepoint after operator finished complete

    b. close() for all the operators

    c. Emit EndOfPartitionEvent

    d. Task cleanup

图 6. 一个应用修改后的完结流程的作业的例子

一个例子如图 6 所示。咱们首先来看一下所有数据源都是无限的状况。

如果工作 C 首先在解决完所有数据后完结,它将首先发送 MAX_WATERMARK,而后对所有算子执行对应完结的生命周期办法并发送 EndOfData 事件。在这之后,它首先期待下一个 Checkpoint 实现,而后发送 EndOfPartitionEvent 事件。

工作 D 在收到 EndOfData 事件后将首先对算子执行完结对应对应完结的生命周期办法。因为任何在算子执行完结后开始的 Checkpoint 都能够提交残余的数据,而工作 C 提交数据所依赖的 Checkpoint 的 Barrier 事件在 EndOfData 事件后达到,因而工作 D 实际上能够与工作 C 应用同样的 Checkpoint 实现数据提交。

工作 E 略有不同,因为它有两个输出,而工作 A 可能持续运行一段时间。因而,工作 E 必须等到它从两个输出都读取到 EndOfData 事件后才能够开始完结算子执行,并且它须要依赖一个不同的 Checkpoint 来实现数据提交。

另一方面,当应用 stop-with-savepoint [–drain] 来完结作业时,整个流程与数据源无限的状况雷同,只是所有工作不是期待任意的下一个 Checkpoint,而是期待一个指定的 Savepoint 来实现数据提交。此外,这种状况下因为工作 C 与工作 A 肯定同时完结,因而咱们能够保障工作 E 也能够在完结前等到这个特定的 Savepoint。

论断

通过反对在局部工作完结后的 Checkpoint 作业并且修改作业完结的流程,咱们能够反对同时应用无限数据源与有限数据源的作业,并且能够保障所有数据源都是无限的状况下最初一部分数据能够失常提交。这部分批改保障了数据一致性与完结完整性,并且反对了蕴含无限数据源的作业的谬误复原。这一机制次要在 1.14 实现,并且在 1.15 中默认关上。如果遇到任何问题,欢送在 dev 或 user / user-zh 邮件列表中发动探讨或提出问题。

[1] https://www.ververica.com/blo…

[2] https://www.youtube.com/watch…

[3] https://nightlies.apache.org/…

[4] https://flink.apache.org/feat…

[5] https://nightlies.apache.org/…

[6] https://nightlies.apache.org/…

[7] https://nightlies.apache.org/…

[8] https://nightlies.apache.org/…


点击进入 Flink 中文学习网

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…

退出移动版