乐趣区

关于flink:Flink-新一代流计算和容错阶段总结和展望

摘要:本文整顿自 Apache Flink 引擎架构师、阿里巴巴存储引擎团队负责人梅源在 Flink Forward Asia 2021 核心技术专场的演讲。本次演讲内容围绕 Flink 的高可用性探讨 Flink 新一代流计算的外围问题和技术选型,包含:

  1. Flink 高可用流计算的要害门路
  2. 容错 (Fault Tolerance) 2.0 及关键问题
  3. 数据恢复过程
  4. 稳固疾速高效的 Checkpointing
  5. 云原生下容错和弹性扩缩容

FFA 2021 直播回放 & 演讲 PDF 下载

一、高可用流计算的要害门路

上图的双向轴线是大数据利用随时间延迟的图谱,越往右边时间延迟要求越短,越往左提早要求没那么高。Flink 诞生之初大略是在上图两头,能够了解为往右对应的是流式计算,而往左边对应的是批式计算。过来一两年,Flink 的利用图谱向右边有了很大的扩大,也就是咱们常说的流批一体;与此同时咱们也素来没有进行过把图谱向更实时的方向推动。

Flink 是以流式计算起家,那么向更实时的方向推动到底是指什么?什么是更实时更极致的流式计算?

在失常解决的状况下,Flink 引擎框架自身除了定期去做 Checkpoint 的快照,简直没有其余额定的开销,而且 Checkpoint 快照很大一部分是异步的,所以失常解决下 Flink 是十分高效的,端到端的提早在 100 毫秒左右。正因为要反对高效的解决,Flink 在做容错复原和 Rescale 的时候代价都会比拟大:须要把整个作业停掉,而后从过来的快照检查点整体复原,这个过程大略须要几秒钟,在作业状态比拟大的状况下会达到分钟级。如果须要预热或启动其余服务过程,工夫就更长了。

所以,Flink 极致流计算的关键点在容错复原局部。这里说的极致的流计算是指对提早性、稳定性和一致性都有肯定要求的场景,比方风控平安。这也是 Fault Tolerance 2.0 要解决的问题。

二、容错 (Fault Tolerance) 2.0 及关键问题

容错复原是一个全链路的问题,包含 failure detect、job cancel、新的资源申请调度、状态复原和重建等。同时,如果想从已有的状态复原,就必须在失常处理过程中做 Checkpoint,并且将它做得足够轻量化才不会影响失常解决。

容错也是多维度的问题,不同的用户、不同的场景对容错都有不同需要,次要包含以下几个方面:

  • 数据一致性 (Data Consistency),有些利用比方在线机器学习是能够容忍局部数据失落;
  • 提早 (Latency),某些场景对端到端的提早要求没那么高,所以能够将失常解决和容错复原的时候要做的工作综合均匀一下;
  • 复原时的行为表现 (Recovery Behavior),比方大屏或者报表实时更新的场景下,可能并不需要迅速全量复原,更重要的在于迅速复原第一条数据;
  • 代价 (Cost),用户依据本人的需要,违心为容错付出的代价也不一样。综上,咱们须要从不同的角度去思考这个问题。

另外,容错也不仅仅是 Flink 引擎侧的问题。Flink 和云原生的联合是 Flink 将来的重要方向,咱们对于云原生的依赖形式也决定了容错的设计和走向。咱们冀望通过非常简单的弱依赖来利用云原生带来的便当,比方 across region durability,最终可能将有状态的 Flink 的利用像原生的无状态利用一样弹性部署。

基于以上思考,咱们在 Flink 容错 2.0 工作也有不同的侧重点和方向。

第一,从调度的角度来思考,每次谬误复原的时候,不会把和全局快照绝对应的所有 task 节点都回滚,而是只复原失败的单个或者局部节点,这个对须要预热或单个节点初始化工夫很长的场景是很有必要的, 比方在线机器学习场景。与此相关的一些工作比方 Approximate Task-local Recovery 已在 VVP 上线;Exactly-once Task-local Recovery,咱们也曾经获得了一些成绩。

接下来重点聊一下 Checkpoint 以及和云原生相干的局部。

三、Flink 中的数据恢复过程

那么,容错到底解决了什么?在我看来其本质是解决数据恢复的问题。

Flink 的数据能够粗略分为以下三类,第一种是元信息,相当于一个 Flink 作业运行起来所须要的最小信息汇合,包含比方 Checkpoint 地址、Job Manager、Dispatcher、Resource Manager 等等,这些信息的容错是由 Kubernetes/Zookeeper 等零碎的高可用性来保障的,不在咱们探讨的容错范畴内。Flink 作业运行起来当前,会从数据源读取数据写到 Sink 里,两头流过的数据称为解决的两头数据 Inflight Data (第二类)。对于有状态的算子比方聚合算子,解决完输出数据会产生算子状态数据 (第三类)。

Flink 会周期性地对所有算子的状态数据做快照,上传到长久稳固的海量存储中 (Durable Bulk Store),这个过程就是做 Checkpoint。Flink 作业产生谬误时,会回滚到过来的一个快照检查点 Checkpoint 复原。

咱们以后有十分多的工作是针对晋升 Checkpointing 效率来做的,因为在理论工作中,引擎层大部分 Oncall 或工单问题基本上都与 Checkpoint 相干,各种起因会造成 Checkpointing 超时。

上面简略回顾一下 Checkpointing 的流程,对这部分内容比拟相熟的同学能够间接跳过。Checkpointing 的流程分为以下几步:

第一步:Checkpoint Coordinate 从 Source 端插入 Checkpoint Barrier (上图黄色的竖条)。

第二步:Barrier 会随着两头数据处理向上游流动,流过算子的时候,零碎会给算子的以后状态做一个同步快照,并将这个快照数据异步上传到远端存储。这样一来,Barrier 之前所有的输出数据对算子的影响都已反映在算子的状态中了。如果算子状态很大,会影响实现 Checkpointing 的工夫。

第三步:当一个算子有多个输出的时候,须要算子拿到所有输出的 Barrier 之后能力开始做快照,也就是上图蓝色框的局部。能够看到,如果在对齐过程中有反压,造成两头解决数据流动迟缓,没有反压的那些线路也会被堵住,Checkpoint 会做得很慢,甚至做不进去。

第四步:所有算子的中间状态数据都胜利上传到远端稳固存储之后,一个残缺的 Checkpoint 才算真正实现。

从这 4 个步骤中能够看到,影响疾速稳固地做 Checkpoint 的因素次要有 2 个,一个是解决的两头数据流动迟缓,另一个是算子状态数据过大,造成上传迟缓,上面来讲一讲如何来解决这两个因素。

四、稳固疾速高效的 Checkpointing

针对两头数据流动迟缓,能够:

  1. 想方法不被两头数据梗塞:Unaligned Checkpoint——间接跳过阻塞的两头数据;
  2. 或者让两头的数据变得足够少:Buffer Debloating。
  3. 针对状态数据过大,咱们须要将每次做 Checkpoint 时上传的数据状态变得足够小:Generalized Log-Based Incremental Checkpoint。

上面来具体开展论述每一种解决办法。

4.1 Unaligned Checkpoint

Unaligned Checkpoint 的原理是将从 Source 插入的 Barrier 跳过两头数据刹时推到 Sink,跳过的数据一起放在快照里。所以对于 Unaligned Checkpoint 来说,它的状态数据不仅包含算子的状态数据,还包含解决的两头数据,能够了解成给整个 Flink Pipeline 做了一个残缺的刹时快照,如上图黄色框所示。尽管 Unaligned Checkpoint 能够十分疾速地做 Checkpoint,但它须要存储额定的 Pipeline Channel 的两头数据,所以须要存储的状态会更大。Unaligned Checkpoint 在去年 Flink-1.11 版本就曾经公布,Flink-1.12 和 1.13 版本反对 Unaligned Checkpoint 的 Rescaling 和动静由 Aligned Checkpoint 到 Unaligned Checkpoint 的切换。

4.2 Buffer Debloating

Buffer Debloating 的原理是在不影响吞吐和提早的前提下,缩减上下游缓存的数据。通过察看,咱们发现算子并不需要很大的 input/output buffer。缓存太多数据除了让作业在数据流动迟缓时把整个 pipeline 填满,让作业内存超用 OOM 以外,没有太大的帮忙。

这里能够做个简略的估算,对于每个 task,无论是输入还是输出,咱们总的 buffer 数目大略是每个 channel 对应的 exclusive buffer 数乘以 channel 的个数再加上专用的 floating buffer 数。这个 buffer 总数再乘以每个 buffer 的 size,失去的后果就是总的 local buffer pool 的 size。而后咱们能够把零碎默认值代进去算一下,就会发现并发略微大一点再多几次数据 shuffle,整个作业两头的流动数据很容易就会达到几个 Gigabytes。

理论中咱们并不需要缓存这么多数据,只须要足够量的数据保障算子不空转即可,这正是 Buffer Debloating 做的事件。Buffer Debloating 可能动静调整上下游总 buffer 的大小,在不影响性能的状况下最小化作业所需的 buffer size。目前的策略是上游会动静缓存上游大略一秒钟可能解决的数据。此外,Buffer Debloating 对 Unaligned Checkpoint 也是有益处的。因为 Buffer Debloating 缩小了两头流动的数据,所以 Unaligned Checkpoint 在做快照的时候,须要额定存储的两头数据也会变少。

上图是对 Buffer Debloating 在反压的状况下,Checkpointing 工夫随 Debloat Target 变动的工夫比照图。Debloat Target 是指上游缓存“预期工夫”内上游能解决的数据。这个试验中,Flink 作业共有 5 个 Network Exchange,所以总共 Checkpointing 所需的工夫大概等于 5 倍的 Debloat Target,这与试验后果也基本一致。

4.3 Generalized Log-Based Incremental Checkpoint

后面提到状态大小也会影响实现 Checkpointing 的工夫,这是因为 Flink 的 Checkpointing 过程由两个局部组成:同步的快照和异步上传。同步的过程通常很快,把内存中的状态数据刷到磁盘上就能够了。然而异步上传状态数据的局部和上传的数据量无关,因而咱们引入了 Generalized Log-Based Incremental Checkpoint 来管制每次快照时须要上传的数据量。

对于有状态的算子,它的外部状态产生扭转后,这个更新会记录在 State Table 里,如上图所示。当 Checkpointing 产生的时候,以 RocksDB 为例,这个 State Table 会被刷到磁盘上,磁盘文件再异步上传到远端存储。依据 Checkpoint 的模式,上传的局部能够是残缺的 Checkpoint 或 Checkpoint 增量局部。但无论是哪种模式,它上传文件的大小都是与 State Backend 存储实现强绑定的。例如 RocksDB 尽管也反对增量 Checkpoint,然而一旦触发多层 Compaction,就会生成很多新的文件,而这种状况下增量的局部甚至会比一个残缺的 Checkpoint 更大,所以上传工夫仍然不可控。

既然是上传过程导致 Checkpointing 超时,那么把上传过程从 Checkpointing 过程中剥离开来就能解决问题。这其实就是 Generalized Log-Based Incremental Checkpoint 想要做的事件:实质上就是将 Checkpointing 过程和 State Backend 存储 Compaction 齐全剥来到。

具体实现办法如下:对于一个有状态的算子,咱们除了将状态更新记录在 State Table 外面,还会再写一份增量到 State Changelog,并将它们都异步的刷到远端存储上。这样,Checkpoint 变成由两个局部组成,第一个局部是以后曾经物化存在远端存储上的 State Table,第二个局部是还没有物化的增量局部。因而真正做 Checkpoint 的时候,须要上传的数据量就会变得少且稳固,不仅能够把 Checkpoint 做得更稳固,还能够做得更高频。能够极大缩短端到端的提早。特地对于 Exactly Once Sink,因为须要实现残缺的 Checkpoint 当前能力实现二阶段提交。

五、云原生下容错和弹性扩缩容

在云原生的大背景下,疾速扩缩容是 Flink 的一大挑战,特地是 Flink-1.13 版本引入了 Re-active Scaling 模式后,Flink 作业须要频繁做 Scaling-In/Out,因而 Rescaling 已成为 Re-active 的次要瓶颈。Rescaling 和容错 (Failover) 要解决的问题在很大水平上是相似的:例如拿掉一台机器后,零碎须要疾速感知到,须要从新调度并且从新复原状态等。当然也有不同点,Failover 的时候只须要复原状态,将状态拉回到算子上即可;但 Rescaling 的时候,因为拓扑会导致并行度发生变化,须要重新分配状态。

状态复原的时候,咱们首先须要将状态数据从远端存储读取到本地,而后依据读取的数据重新分配状态。如上图所示,整个这个过程在状态稍大的状况下,单个并发都会超过 30 分钟。并且在理论中,咱们发现状态重新分配所须要的工夫远远大于从远端存储读取状态数据的工夫。

那么状态是如何重新分配的呢?Flink 的状态用 Key Group 作为最小单位来切分,能够了解成把状态的 Key Space 映射到一个从 0 开始的正整数集,这个正整数集就是 Key Group Range。这个 Key Group Range 和算子的所容许的最大并发度相干。如上图所示,当咱们把算子并发度从 3 变成 4 的时候,重新分配的 Task1 的状态是别离由原先的两个 Task 状态的一部分拼接而成的,并且这个拼接状态是间断且没有交加的,所以咱们能够利用这一个性做一些优化。

上图能够看到优化后,DB Rebuild 这部分优化成果还是非常明显的,但目前这部分工作还处于探索性阶段,有很多问题尚未解决,所以临时还没有明确的社区打算。

最初简略回顾一下本文的内容。咱们首先探讨了为什么要做容错,因为容错是 Flink 流计算的要害门路;而后剖析了影响容错的因素,容错是一个全链路的问题,包含 Failure Detection、Job Canceling、新的资源申请调度、状态复原和重建等,须要从多个维度去衡量思考这个问题;以后咱们的重点次要是放在如何稳固疾速做 Checkpoint 的局部,因为当初很多理论的问题都和做 Checkpoint 相干;最初咱们探讨了如何将容错放在云原生的大背景下与弹性扩缩容相结合的一些探索性工作。


FFA 2021 直播回放 & 演讲 PDF 下载

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

退出移动版