关于flink:面向流批一体的-Flink-Runtime-新进展

30次阅读

共计 7837 个字符,预计需要花费 20 分钟才能阅读完成。

摘要:本文整顿自阿里巴巴技术专家高赟 (云骞) 在 Flink Forward Asia 2021 核心技术专场的演讲。次要内容包含:

  1. 流批一体
  2. 语义欠缺与加强
  3. 性能优化
  4. Remote Shuffle
  5. 总结与瞻望

点击查看直播回放 & 演讲 PDF

一、流批一体

流批一体的指标是心愿可能为无限数据和有限数据提供一套对立的解决 API,包含 Datastream API 与 Table/SQL API,其中无限数据的解决对应离线解决,而有限数据的解决则对应在线解决。

之所以须要这么一套流批一体的解决 API,次要有以下两个起因:

  • 首先,随着实时计算的一直倒退,大多数企业数据处理的 pipeline 都是由离线解决和在线解决组成的,应用同一套开发 API 就能够缩小流解决和批处理作业开发过程中的学习与保护的老本;
  • 另外,在许多场景下,用户的流解决作业可能受限于提早数据或者线上逻辑变更的状况,例如用户可能会过很长时间才会发动评论或线上解决的逻辑可能须要进行降级,这种状况下就须要应用离线作业对之前解决的后果进行修改,即 backfill 的状况。

这种状况下如果应用两套不同的 API 会很难保护处理结果的一致性,因而用户须要一套对立的 API 来解决上述问题。

Flink 流批一体的 API 由 Datastream API 与 Table/SQL API 两局部组成,其中 Table/SQL API 是绝对比拟 high level 的 API,次要提供规范的 SQL 以及与它等价的 table 操作,datastream 则绝对 low level,用户能够显式地操作算子 time 和 state,这两套 API 能够相互转换联合应用。

针对两套 API,Flink 提供了两种不同的执行模式:

  • 其中流执行模式是通过保留算子的 state 等新数据达到的时候进行增量计算来实现的。它能够同时用于无限数据集和有限数据集的解决,能够反对任意解决逻辑,比方 salt 操作,它容许保留所有历史数据并反对 retraction,当一条新数据达到时,就能够更新保留的历史数据并对历史上收到的所有数据进行从新排序,最初对之前发送的后果进行 retraction 来实现。比方 reduce 算子,能够进一步优化来防止理论存储无限大的历史状态。另外增量计算中,因为数据达到时是无序的,因而它对 sql 的拜访也是无序的,从而可能导致随机 io。最初,流解决模式依赖于定时 checkpoint 来反对 failover,这种状况也会导致肯定的解决开销。
  • 因而对于无限数据集的解决,咱们也提供了专用的批处理模式,算子通过逐级运算的形式来解决,因而只能用于无限数据的解决。这种状况下算子实现能够进行特定的优化,比方对数据先进行排序,而后按 key 进行一一解决,从而防止无限大的状态随机 io 的问题。

Flink 可能保障,在两种执行模式下,雷同的无限输出数据的处理结果能够保持一致。此外,它对两种不同的模式也提供了对立的 pipelined region 调度器、对立的 shuffle service 插件接口以及对立的 connector 接口,为两种接口提供了对立的反对。

目前 Flink 的架构如上图所示,无论是在 API 上还是在具体实现上,曾经整体做到了流批一体的状态。

二、语义加强与欠缺

对于上述流批一体的定义,咱们在最近几个版本中也进行了继续的欠缺和优化,第一局部是对于流批一体语义的加强与欠缺。

首先是在流模式下反对局部 task 完结后持续做 checkpoint 工作。

目前流程之下作业的完结能够分为两种状况:

  • 如果 source 是无限的,作业最终会执行完结;
  • 在有限 source 的状况下,用户能够通过 stop-with-savepoint–drain 命令来终止作业,并保留一个 savepoint。如果不指定 drain 参数,就不会进行 drain 操作,这种状况个别是为了保留 savepoint 来重启作业,不属于作业终止的状况。

之前的 Flink 不反对局部 task 完结后进行 checkpoint,因为这会导致两个问题:

  • 第一,两阶段提交的 sink 在流模式下依赖于 checkpoint 实现数据端到端的一致性。这种状况下,两阶段提交的 sink 会首先将数据写入临时文件或内部事务中,只有当 Flink 外部的 checkpoint 胜利之后,在保障 checkpoint 之前的数据不会进行重放的前提下,两阶段提交的 sink 才能够释怀地通过重命名文件或提交事务的形式来进行事务理论的提交。如果局部 task 完结之后不能做 checkpoint,那么最初一部分数据总是无奈进行提交,也就无奈保障流模式与批模式处理结果的一致性。
  • 第二,对于同时包含无限数据 source 和有限数据 source 的混合作业,如果局部执行完结后不能进行 checkpoint,那么后续执行一旦产生 failover 就会因为回退导致较大的开销。

为了解决上述问题,咱们须要反对局部 task 完结之后也能够进行 checkpoint 的性能,并且批改作业完结的流程,使得应用两阶段提交的 task 最初能够期待一个 checkpoint 实现之后再退出。对于失常完结的状况,能够期待下一个 checkpoint 实现后退出;对于 drain 的状况,能够期待 savepoint 实现后退出。

为了实现局部 task 完结之后能进行 checkpoint,咱们对 checkpoint 的流程进行了批改。首先从新辨认新的 source task,就是那些前序工作都曾经终止但自身尚未终止的 task,而后从这些 task 开始发送 barrier 进行失常的 checkpoint 操作。因为 checkpoint 中 state 是以 jobvertext 为单位进行记录的,因而如果一个 jobvertext 中所有 task 都已完结,会在它的状态中记录一个非凡的标记 ver,如果是局部 task 完结,会保留所有正在运行的 task state 作为 jobvertext state,而所有其余 jobvertext 的解决流程与失常 checkpoint 保持一致。作业产生 failover 重启之后,会跳过齐全终止的 jobvertext,对其余的 task 的解决逻辑与失常的解决逻辑保持一致的。

基于上述工作,咱们对作业完结后的流程和语义也进行了从新的梳理。为了保障两阶段提交的 sink 可能在提交最初一部分数据后再退出,必须使这些 task 可能期待最初一个 checkpoint 之后再退出。目前的逻辑下,作业在天然完结的时候,首先会发送 max watermark,而后发送 EndOfPadtitionEvent。一个 task 收到 endofPadtitionEvent 之后会别离调用算子的 endOfEInput()、close() 和 dispose() 操作。如果要在最初插入一个 checkpoint,那么最好的形式插入到 close 办法之后。因为在这里,作业曾经实现了所有工作。

然而在理论场景下却有所区别,因为理论场景下会触发一个 savepoint,savepoint 胜利之后,source 会调用剖析式办法来完结执行,并发送 max watermark EndOfPadtitionEvent,后续逻辑和 checkpoint 状况下统一。因为后面曾经进行了 savepoint,如果在 close 之后再进行 checkpoint,会显得十分冗余。在这种状况下更适合的形式是先进行作业的完结,而后再进行 savepoint 操作,并且在 savepoint 操作的同时提交最初一部分数据。

然而这也存在一个问题,如果要保留最初一个 savepoint,那么所有 task 就必须期待同一个 savepoint 能力完结,在天然完结的状况下,不同的 task 能够期待不同的 checkpoint 来退出。然而在 savepoint 状况下,作业完结之前曾经发送过 EndOfPadtitionEvent,它会敞开 task 之间的网络通信,因而在作业终止之后无奈再从最开始的 source 做 savepoint。

为了解决这些问题,必须可能先告诉所有 task 进行完结但不敞开网络链接,等所有 task 完结之后再发动一个 savepoint 操作,并且在胜利之后再敞开网络链接,就能实现所有 task 期待最初同一个 savepoint 状态而完结。

为了反对这一批改,咱们引入了一条新的 EndOfDataEvent。工作收到 EndOfDataEvent 后会调用之前在 EndOfPadtitionEvent 进行的解决。之后 source 会立即发送一个 barrier 来触发 savepoint 操作,算子会它完结之后再执行退出逻辑。

此外,咱们也对之前比拟有歧义的 close() 和 dipose() 操作进行了重命名,别离改成了 finish() 和 close(),其中 finish() 只会在工作失常完结时进行调用,而 close() 会在作业失常完结和异样完结的时候都进行调用。

在语义局部,咱们进行的另外一个工作是 Hybrid source。

Hybrid source 反对用户读取历史批数据之后,再切回到无限流数据上进行解决,它实用于解决逻辑统一的状况下进行流批互转的操作。也就是在实时数据曾经落盘用户须要进行 backfill,且流批处理逻辑统一的状况下,用户能够不便地应用 hybrid source 来实现作业。

三、性能优化

除了在语义方面进行的工作之外,咱们在 runtime 层也进行了一些性能方面的优化。

3.1 调度部署性能优化

首先是对于调度局部的性能优化。Flink 因为存在 all to all 的连贯关系,两个并发为 n 的算子之间会有 n² 条边,这 n² 条边显式地存在 jm 的内存中,并且许多调度和部署逻辑也会间接依赖于它进行解决,从而导致 jm 内存空间和许多计算的工夫和空间复杂度都是 on²。因为 batch 作业个别具备更大的规模,并且调度更加细粒度,因而这会减轻调度和部署的性能问题。

为了解决这一问题,咱们利用 all to all 边的对称性,对内存中的数据结构和计算逻辑进行了重构,引入了 comsumergroup 的数据结构来代替之前的 excutionEdge 对算子之间的连贯关系进行对立形容。这种形式不再反复形容堆对称的信息,从而防止了 n² 的复杂度。基于这一新的形容形式,咱们不再在内存中保护 excutionEdge。

此外咱们调整了许多调度的算法,比方计算 pipeline region、在一个 task 完结之后计算后续须要调度的 task 等,将它们的工夫复杂度也升高到了 O(n)。

计算 pipeline region 过程中还有一部分非凡逻辑,Flink 在作业 dag 图中蕴含两种边,pipeline 边和 blocking 边。前者要求上下游的工作必须同时启动并通过网络传输数据,后者则要求上下游工作顺次启动并通过文件来传输数据。在调度前首先须要计算 pipeling region,一般来说能够依照 blocking 边进行打断,将所有通过 pipeline 边相连的 task 放到同一个 region 里,但这种逻辑存在一个问题,如上图所示能够看出,因为并发 1 的 task 和并发 2 的 task 之间是通过 blocking 边分成两个 region 的,如果间接通过 blocking 边打断将它分为两个 region。而因为 task1 和 task2 之间存在 all to all 的 shuffle 关系,最初在 region 组成的图上会存在环形依赖的问题,在调度的时候会产生死锁。

在之前的实际中,咱们通过 tarjan 强联通分支算法来辨认这种环境依赖,彼时的辨认是间接在 excutiongraph 上进行,所以它的工夫复杂度是 O(n²),因为 all to all 的边存在 n² 的连贯。通过进一步剖析发现,如果在 jobgraph 中间接进行 pipeline 的认证辨认,只有图中有 all to all 的边就肯定存在环形依赖,因而能够间接在 jobgraph 上先进行判断,辨认出所有 all to all 的边,而后在 excutiongraph 上再对非 all to all 的边进行解决。

通过这种形式,能够将环形依赖辨认的复杂度升高到 O(n)。

3.2 部署性能优化

另一部分优化是对于部署性能。

Flink 在部署 task 的时候会携带它的 shuffle descriptors。对于上游来说,shuffle descriptors 形容了数据产出的地位,而对于上游来说,它形容了须要拉取数据的地位。shuffle descriptors 与 ExcutionEdge 的数量是相等的,因而这个数量级也是 O(n²)。在内存中进行计算序列化存储的时候,shuffle descriptors 会耗费大量 CPU 和内存,卡死主线程,导致 TM 及耗尽内存的问题。

然而因为上下游存在对称性,因而有很多 shuffle descriptors 其实是反复的,咱们能够通过缓存 shuffle descriptors 的形式来升高保护它的数量。

另外为了进一步避免并发量过大导致 shuffle descriptors 过大,导致内存 oom,咱们改用 BlobServer 来传输 shuffle descriptors。

实现了上述优化当前,咱们采纳一个 10000×10000 的 all to all 两级作业进行测试,能够看出调度和内存占用缩减了 90% 以上,部署工夫缩减 65% 以上,极大进步了调度和部署的性能。

流执行模式调度和部署的优化极大地缩小作业 failover 时重新启动的工夫,然而一旦产生 failover,依然须要破费肯定的工夫来进行重新部署以及初始化、加载 state 等工作。为了进一步缩小这个工夫,咱们正在尝试在作业产生 failover 的时候,只重启出错节点。其中的难点在于如何保证数据的一致性,咱们目前正在摸索中。

另外一部分 runtime 的优化是在流模式下通过 Buffer Debloating 来动静调整 buffer 的大小,从而在反压的状况下缩小 checkpoint 的 buffer 对齐所须要的工夫,防止 checkpoint 超时。如果产生反压,当作业两头缓存的数据量过大时,能够及时缩小 buffer 的大小来管制两头缓存的数据大小,从而防止因为解决数据而阻塞 barrier 的状况。

四、Remote Shuffle

shuffle 是批处理作业执行中十分重要的一部分,因为云原生能够提供对立的运维 API、缩小运维开销,以及在离线混部和动静伸缩的状况下提供更好的反对,Flink 在最近的几个版本里也在踊跃拥抱云原生,比方提供了 Flink on k8s 的残缺的实现,以及反对动静伸缩的 schedule。然而因为 Flink shuffle 须要应用本地磁盘,如果要反对云原生的 Flink,咱们也须要实现存储计算拆散的 shuffle。存储计算拆散的架构能够使得计算资源与存储资源独自伸缩,防止 task manager 无奈在计算实现后立即退出,从而进步整个资源的利用率。同时也能够防止 task 执行失败导致 TM 退出而影响 shuffle 文件服务的稳定性,从而对上游的执行造成影响。

针对上述存储计算拆散 shuffle 的需要,咱们在外部也研发了 remote shuffle service,这一性能曾经于今年年初在外部上线。通过一段时间的试用,咱们在前段时间对这一零碎进行了开源,上面将对这一零碎进行具体介绍。

Flink 能够反对多种不同的场景,不同场景下的 shuffle 在存储介质传输方式和部署形式方面是存在较大差别的。比方流解决模式下,Flink 个别采纳的基于网络的在线传输方式,数据缓存在上游 TM 的内存中,并在上游 task 有闲暇 buffer 的时候及时进行发送。而剖析解决模式下,为了反对算子的逐级运行,Flink 还须要反对基于文件的离线传输方式,先写入离线文件中,上游 task 启动之后再通过网络发送给上游的 task,离线文件能够存在本地的 TM 中,也能够存在近程的服务中。

另外,不同的 shuffle 在生命周期治理、元数据管理和数据散发策略方面也存在许多独特需要,所有 shuffle 都须要调度器在启动上游 task 的时候,申请相应的 shuffle 资源,并对其进行记录。还须要调度器在部署上游 task 的同时,携带 shuffle 的资源描述符,从而使上游 task 能够顺利读取相应的数据。最初 shuffle 还依赖调度器在特定的生命周期比方完结或者执行失败的时候,对它的资源进行清理。

为了对不同的 shuffle 提供对立的反对,Flink 从 1.9 版本开始引入了插件化的 shuffle 架构。一个 shuffle 的插件次要由两局部组成,shuffle master 负责在 jm 端与调度器进行交互,从而实现申请和开释 shuffle 资源的性能;而 result partition 和 input gate 别离作为数据的 write 和 read 端,依据调度器提供的 shuffle 资源描述符,将数据输入到特定地位或从数据地位进行读取。而所有 shuffle 实现中,共性的局部则由 Flink 对立实现,调度器会通过 partition track 对曾经申请的 shuffle 资源进行记录,并依据作业的执行模式保护 shuffle 资源的生命周期。

通过对立的插件化 shuffle 接口,Flink 能够简化不同 shuffle 实现的复杂度,且容许不同的 shuffle 在理论存储与传输的形式上进行自由选择。

基于 Flink 对立的插件化 shuffle 接口,Flink remote shuffle 的整体架构如上图所示。它的 shuffle 服务由一个独自集群提供,其中 shuffle manager 作为整个集群的 master 节点,负责管理 worker 节点,并对 shuffle 数据集进行调配和治理。Shuffle worker 作为整个集群的 slave 节点,负责读写和清理数据集,Shuffle manager 还通过心跳对 Shuffle worker 和 Shuffle master 进行监听,在心跳超时的时候做数据删除和同步,从而使得集群中数据集的状态保持一致。

咱们对传输过程也进行了大量优化。网络局部基于 credit-based 协定来实现,它与 Flink 目前的网络传输协定相似,咱们还在其中实现了 tcp 连贯复用,以及压缩可控内存治理和零拷贝等一系列优化。io 局部咱们提供了反对 io 调度优化的 mapPartition 存储格局。通过 io 调度优化,它在 hdd 上的访问速度达到 140M/s 以上。

此外咱们目前也正在开发基于事后 merge 的 reducepartition 的存储格局,它会将数据依据上游事后进行 merge,并存储到特定的 worker 上,上游不能全副同时启动时,能够获得比 mapPartition 更好的成果。

在部署上,Remote shuffle 能够反对多种不同的部署形式,此外咱们也提供了版本间的协定兼容,使得当服务器端进行降级的时候,无需降级客户端。最初咱们还在零碎中提供了罕用 metric 的操作,更多的运维工具也正在踊跃开发中。

五、总结与瞻望

总的来说,Flink 目前曾经具备能够上线的流批一体的数据处理能力,将来咱们也将进一步晋升这项能力,并在更多的流批交融场景下提供反对,例如绝对 Hybrid source,在 backfill 的场景下,如果流批处理的逻辑不统一,咱们也在思考反对批处理完结后保留状态用于启动流解决作业的形式。

另外咱们也将进一步提高整个零碎的稳定性与性能,并更深刻地思考流模式和批处理模式的实质差别,以及流批一体更粗浅的外延。


点击查看直播回放 & 演讲 PDF

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

正文完
 0