本文整顿自阿里巴巴高级技术专家朱翥、阿里巴巴高级技术专家贺小令在 9 月 24 日 Apache Flink Meetup 的演讲。次要内容包含:
- Adaptive Batch Scheduler
- Speculative Execution
- Hybrid Shuffle
- Dynamic Partition Pruning
点击查看原文视频 & 演讲 PPT
Flink 是流批一体计算框架,早些年次要用于流计算场景。近些年随着流批一体概念的推广,越来越多的企业开始应用 Flink 解决批业务。
尽管 Flink 在框架层面人造反对批处理,但在理论生产应用中仍然存在问题。因而在近几个版本中,社区也始终在继续改良 Flink 批处理问题,这些改良体现在 API、执行与运维三个层面。
在 API 层面,咱们始终在改良 SQL,欠缺其语法,并使其可能兼容 HIVE SQL;咱们也在欠缺 DataStream 接口来更好的反对批处理作业开发。在运维层面,咱们心愿 Flink batch 可能更易于在生产中应用,所以咱们欠缺了 history server,以更好地展现作业在运行中以及完结后的状态,同时也引入了兼容 Hive 生态的 SQLGateway。在执行层面,咱们对于算子、执行打算、调度、网络层都进行了性能与稳定性的改良。
其中一个次要思路是依据运行时信息,比方数据量、数据模式、执行工夫、可用资源等,自适应地优化作业执行,包含依据数据量主动为作业节点设置适合的并发度,通过预测执行来发现与缓解慢节点对作业的影响,引入自适应数据传输方式来进步资源利用率与解决性能,对多分区表进行动静分区裁剪来进步解决效率。
这些改良,有的使得 Flink 批处理更易于应用,有的对批处理作业的稳定性提供了保障,有的晋升了作业执行性能,或是兼而有之。
一、Adaptive Batch Scheduler
此前,作业上线前都须要进行并发度调优。对于批处理作业的用户而言,他们遇到的状况是这样的:
- 批处理作业往往十分多,对作业进行并发度调优会是十分大的工作量,费时费力。
- 每日数据量可能都在变动,特地是大促期间数据会有数倍乃至数十倍的增长,因而很难预估数据,导致调优艰难。同时,如果要在流动前后更改并发度配置,也会更加消耗人力。
- Flink 由多个计算节点串联而成的执行拓扑组成。两头节点因为算子复杂性以及数据自身的特质,难以预判数据量,很难进行节点细粒度的并发度配置。而一个全局对立的并发,则可能导致资源节约,乃至额定的调度部署、网络传输的开销。
- 此外,SQL 作业,除了 source 和 sink 外,只能配置全局对立的并行度,没法进行细粒度并行度设置,因而也会面临资源节约与额定开销的问题。
为了解决问题,Flink 引入了自适应批处理调度器,用户能够配置心愿每个并发实例解决的数据量,Flink 会依据运行过程中理论各个节点的数据量主动决定各个逻辑节点的理论并发度,从而保障每个执行并发解决的数据量大抵合乎用户预期。
以上配置形式的特点是配置与数据作业的数据量无关,因而比拟通用,一套配置能够实用于很多作业,不须要独自为每个作业进行调优。其次,主动设置的并行度可能适配每天不同的数据量。同时,因为能够在运行时采集到每个节点理论须要解决的数据量,所以可能进行节点粒度的并行度设置,实现更优的成果。
其流程如上图所示:当上游逻辑节点 A 的所有执行节点执行完并产出数据结束后,能够采集产出数据总量,即节点 B 要生产的数据量。而后将信息交给并行度计算策略组件计算适合的并行度,动静生成执行节点拓扑进行调度与部署。
在传统 Flink 执行中,执行拓扑是动态的,作业提交过程中即已知所有节点的并行度,因而上游在执行时即可为上游每一个生产它的执行节点划分独自的数据子分区。上游启动时只需读取对应数据子分区即可获取数据。然而在动静并发度的状况下,上游执行时上游并发度还未确定,因而须要解决的次要问题是使上游节点的执行与上游节点的并发度解耦。
为了反对动静执行拓扑,咱们进行了以下改良:上游节点产出的数据分区数不禁上游并发度决定,而是依据上游最大分并发度来决定。
如上图右侧所示,上游可能有四个并发,能够将 A 产出的数据分为四份,则上游理论决定的并发可能会有一个、两个、三个、四个,而后再为每个节点调配生产的分区范畴。比方上游并发为 2 时,各自生产两个数据分区;上游并发为 3 时,可能有的生产一个数据分区,有的生产两个。最终使得上游节点执行不再受到上游并发度的制约,可能进行灵便的数据调配,动静执行拓扑的理念也得以实现。
主动并发度可能实现两方面的成果:其一,用户不再须要为每个作业独自配置并行度,Flink batch 的应用更简略;其二,细粒度并发度设置能够进步对资源的利用率,防止无意义的大并发度。咱们通过多 client TPC-DS 尽可能打满集群进行测试,开启了自适应并发度设置后,总执行工夫缩短 7.9%。
自适应批处理调度也为后续优化提供了很好的根底。基于灵便的数据分区与调配形式,可能采集各个数据分区的理论数据量,从而在比方有数据歪斜导致各个分区大小不一的状况下,能够将小分区合并,交给同一个上游解决,使上游节点解决的数据比拟平衡。
其次,因为引入了动静执行拓扑的能力,能够依据执行时的信息来动静制订更优的执行打算。比方能够依据 join 节点两端各自的数据量大小来决定应该采纳何种 join 形式。
二、Speculative Execution
生产中的热点机器无奈防止。比方用户生产中作业会跑在混部集群或批作业的密集回刷等都可能导致某些机器负载特地高,使得运行在该节点上的工作远远慢于其余节点上的工作,从而拖慢整个作业的执行工夫。同时,偶发的机器异样也会导致同样的问题。这些迟缓的工作会影响整个作业的执行工夫,使得作业的产出基线无奈失去保障。成为了局部用户应用 Flink 来进行批处理的妨碍。
因而,咱们在 Flink 1.16 中引入了预测执行机制。
开启预测执行之后,如果 Flink 发现批处理作业中有工作显著慢于其余工作,则会为其拉起新的执行实例。这些新执行实例会与原来的慢工作实例生产同样的数据并且产出同样的后果,而原先慢工作的执行实例也会被保留下来。最先实现的实例会被调度器认可为惟一实现的实例,其数据也会被上游发现与生产。而其余实例会被被动勾销,数据会被革除掉。
为了实现预测执行,咱们为 Flink 引入了以下组件:
Slow Task Detector 次要用于定期检测与汇报慢工作。
在目前的实现中,当逻辑节点的某个执行节点特地慢,超过其大部分节点执行时长中位数的某个阈值后,则会被认为是慢节点。预测执行调度器会接管到慢节点,并将慢工作所在机器节点辨认为热点机器。通过黑名单机制(Blocklist Handler),将热点机器加黑,使得后续调度的新工作不会落到加黑的机器上。黑名单机制目前反对 Yarn、K8s 与 standalone 等 Flink 目前最常的用部署形式。
如果慢节点运行中的执行实例数量没有达到配置下限,则会为其拉起预测执行实例直至数量下限,并部署到没有被加黑的机器上。任何执行实例完结后,调度器会辨认是否有其余相干的执行实例也在运行中,如果有,则将其被动勾销。
完结的实例产出的数据会被展示给上游,并触发上游节点调度。
咱们框架层面反对了 Source 节点的预测执行,保障同一个 Source 并发的不同执行实例总是能够读取到雷同的数据。基本思路是引入缓存来记录各个 Source 并发曾经获取到的数据分片以及每个执行实例曾经解决的分片。当一个执行实例解决完该 Source 并发以后被调配的所有分片之后,能够申请新分片,新分片也会被退出缓存中。
因为在框架层面进行了对立的反对,目前大部分曾经存在的 Source 不须要进行额定批改即可反对预测执行。只有在应用了新版 Source 并且其应用了自定义 SourceEvent 的状况下,须要 SourceEnumerator 实现额定接口,否则在开启预测执行时会抛出异样。该接口次要用于保障用户自定义的事件能够被交给正确的执行实例。因为开启了预测执行后,一个并发可能会有多个执行实例同时运行。
咱们在 Rest 与 WebUI 层面也对预测执行进行了反对。预测执行产生时,能够在作业节点具体界面看到预测执行并发的所有执行实例。同时也能在资源总览卡片上看到被加黑的 TaskManager 数量,以及没有被占用然而被加黑所以也无奈被应用的 slot 数量,用户能够借此评判以后资源的应用状况。此外,在 TaskManager 界面可能查看以后被加黑的 TaskManager。
以后版本中,Sink 暂不反对预测执行,后续咱们会优先实现对 Sink 节点预测执行的反对。其中须要解决的问题为保障每个 Sink 只会 commit 一份数据,并且其余被勾销的 Sink 产生的数据能够被清理掉。
此外,咱们也在打算进一步改良慢工作检测策略。以后,一旦产生数据歪斜,个别执行并发的数据量可能会大于其余执行并发,因而执行时长也会大于其余节点,但此节点可能并不是慢工作。因而须要可能正确辨认解决该状况,从而防止拉起有效预测执行实例浪费资源。目前的初步思路为:依据各个执行实例理论解决的数据量对工作执行时长进行归一化,这也依赖于前文提到的 Adaptive Batch Scheduler 对各个节点产出的的数据量的采集。
三、Hybrid Shuffle
Flink 次要有两种数据交换形式:
- 流式 Pipeline Shuffle:其特点为上下游会同时启动,空对空传输数据,不须要落盘,因而在性能上具备肯定劣势。然而它对资源需求量比拟大,往往须要作业可能同时获取到数倍于单节点并行度的资源方能运行,而这对于生产批处理作业而言难以满足。同时,因其有批量资源的需要,没有同时获取到则作业无奈运行,多个作业同时争夺资源时,可能会产生资源死锁。
- 批式 Blocking Shuffle:数据会间接落盘,上游间接从上游的落盘数据中读取。替换形式使得作业对于资源的自适应能力比拟强,实践上不须要上下游同时运行,只有有一个 slot 则整个作业都能够执行实现。然而性能绝对较差,须要等到上游 stage 运行完之后能力运行上游 stage,同时数据全副落盘会产生 IO 开销。
因而,咱们心愿有一种 Shuffle 模式可能将两者劣势联合,在资源短缺时,能够施展流式 shuffle 的性能劣势;而在资源受限的状况下,能够让作业具备批式 shuffle 的资源自适应能力,即便只有一个 slot 也能运行。同时,适配资源的能力自适应切换,用户无需感知,无需进行独自调优。
为此,咱们引入了 Hybrid Shuffle。
在该模式下,上游产出后果的 Result Partition 接管到 shuffle 数据时,会将其缓存在内存中。如果上游曾经启动并且与上游建设了连贯,内存中的数据即可通过网络层空对空间接传输给上游,无需进行落盘;而如果上游还未启动并且上游产出的数据曾经将内存填满,数据也能够 Spill 到磁盘上,使上游能够持续产出数据,不会造成反压影响上游进而导致上游无奈持续解决。
Hybrid Shuffle 模式不再要求上下游必须同时运行,同时,如果上游连贯时上游数据曾经落盘,上游依然能够在上游往 partition 中写数据的同时读取曾经落盘的数据。如果上游解决性可能高,比上游产出速度更快,落盘数据读完之后能够持续从上游内存区读取数据,又回退到空对空的数据传输方式,达到一种较优的性能。
通过这样的形式,上游无需期待上游数据产出后再进行调度,上游产出数据的同时即可将上游拉起,只有有短缺的资源即可与上游同时运行并读取其产出的数据。在资源有闲暇的状况下,能够进步整个集群的资源利用率。须要留神,上游依然须要在所有上游都已部署之后能力部署,一旦上游先于上游部署实现,可能还是会产生调度死锁。
Hybrid Shuffle 引入了两种落盘策略:
- 全落盘:升高异常情况下的工作重启开销。出现异常后,只需重启呈现问题的节点与其上游节点即可,无需重跑上游节点,适宜集群不稳固或非常容易触发 failover 的场景。
- 选择性落盘:升高落盘开销。如果上游能够先拉起,数据则毋庸落盘走空对空传输;如果上游未拉起,则数据能够 spill 到磁盘上。比拟适宜对作业性能要求较高或集群资源数比拟多而用户又心愿批作业可能尽快解决实现的场景。
在 Flink1.16 中,Hybrid Shuffle 相比 Blocking Shuffle,TPC-DS 执行工夫缩小 7.2%。但 Hybrid Shuffle 对于播送解决场景的性能有待优化,预计在 Flink1.17 中将解决该问题,整体执行工夫预计将比 Blocking Shuffle 缩小 17%。
其次,以后 Hybrid Shuffle 是基于 Default Scheduler 实现的,因而不兼容主动并发设置以及预测执行。为了更好地反对批处理,还须要进行整合。
四、Dynamic Partition Pruning
优化器很重要的工作就是防止有效计算和冗余计算。Partition 表在生成中被宽泛应用,这里咱们将介绍在分区表中如何缩小有效分区的读取。
咱们以几个从 TPC-DS 模型中简化的例子来介绍该优化。如上图所示,有一张 sales 表,partition 字段名为 slod_date,该表共有 2000 个分区。
上图中 SQL 语句指从 sales 表外面抉择 slod_date=2 的数据。没有分区裁剪的状况下,须要读取所有 partition 数据,再做 filter;有动态分区裁剪的状况下,在优化阶段即可通过 filter pushdown 等各种优化将确定的分区告知 Scan 节点。Scan 在执行过程中,只需读取特定分区,大大减少了读 IO,放慢了作业执行。
上图有两张表,别离是事实表 sales 表和维度表 date_dim,两张表做 join。有一个 filter 条件作用在维度表上,因而无奈执行动态分区裁剪的优化。
维度表 date_dim 会读取所有数据并做 filter,事实表 sales 表会读取所有 partition 再做 join。这里只有 year = 2000 并且 sold_date = date_sk 相干数据能够被输入,能够推导出知很多 partition 数据都是有效的,但这些分区没法在动态优化阶段剖析进去,须要在运行阶段依据维度表的数据动态分析进去,因而叫动静分区裁剪。
动静分区裁剪的思路如下:
- 第一步,执行 join 维度表测算子,比方 Scan(date_dim)、Filter。
- 第二步,将第一步的 Filter 后果发给分区表算子 Scan。
- 第三步:将步骤二的数据过滤掉有效分区,只读取无效数据。
- 第四步:依据步骤一和三的后果实现 Join。
动静分区裁剪与动态分区裁剪的区别在于,动静分区裁剪无奈在优化阶段确定哪些 partition 数据无效,必须在作业执行之后方能确定。
Flink 上的动静分区裁剪实现步骤如下:
首先会在 Physical Plan 上加一个非凡节点 DynamicFilterDataCollector(以下简称 DataCollector),作用为将 Filter 数据进行收集并去重,只保留相干字段并发给分区表 Scan。分区表 Scan 获取到数据之后进行分区裁剪,最初实现 Join。在 Streaming Graph 上,Source 算子(对应 Scan 节点)没有 input,但咱们心愿 Source 算子可能接管 DataCollector 算子传来的数据,同时维表侧 data_dim Scan 和 year=2000 的 Filter 与左边 sales Scan 调度上没有依赖关系,可能会导致左边算子先被执行,右边算子后被执行,从而无奈实现动静分区裁剪优化。
因而,咱们引入了 OrderEnforce 节点,其目标是为了告知调度器它们之间的数据依赖关系,从而确保右边算子优先被调度,以确保动静分区裁剪优化可能被正确执行。
后续,咱们也打算从框架层面来解决上述调度依赖的问题使 Streaming Graph 变得更优雅。
上图为具体执行图。
左侧 data_dim Scan 与 Filter 先执行,将数据发给 DataCollector 和 Join。为了解决 Source 算子没有 input 边的问题,咱们应用了 Flink Coordinator 机制,DataCollector 收集完数据之后会发给 Coordinator 并实现有效分区的裁剪,分区表 Scan 再从 Coordinator 获取无效的分区信息。Sales Scan 节点执行完后,再进行最初的 Join。
DataCollector 与 OrderEnforce 两头也有一条数据边,数据边内不会有实在的数据传输,仅用于告诉调度器 DataCollector 比 OrderEnforce 先被调取起来。
上图为基于 TPC-DS 10T 数据集优化前后的性能比照,其中蓝色是非分区表,红色是分区表。优化后工夫节俭约 30%。
更多 Flink Batch 相干技术问题,可扫码退出钉钉交换群~
Q&A
Q:热点机器产生慢工作时,会调配其余机器拉起实例,再从新执行慢工作。再次拉起实例时,是否还会产生热点?
A:实践上有可能,因为预测执行自身是通过资源换工夫的一种策略。然而生产实践证明这种机制无效,相比额定资源开销以及进一步引发热点,trade off 仍然是划算的。
Q:揣测执行是依据数据量来判断吗?
A:以后的策略是依据执行时长来判断。比方大部分工作的执行工夫中位数是一分钟,有工作执行超过了 1.5 分钟,则会被认为是慢工作。具体数值可配。
Q:慢节点检测是可配的吗?
A:该策略目前是硬编码,临时还不反对配置策略。后续等策略稳固后,可能会凋谢给用户,用户可通过二次开发或插件的模式更改慢工作检测策略。
Q:揣测执行机制对 DataStream API 与 Flink SQL 都能提供反对吗?
A:是的。
Q:拉黑机制上,是否会存在拉黑太多或拉白不及时而导致资源节约?
A:目前预测执行下的加黑比拟激进,加黑默认只会继续 1 分钟。然而如果慢工作继续呈现,则会一直刷新加黑工夫,因而慢工作所在的慢机器节点也会始终在加黑列表中。
点击查看原文视频 & 演讲 PPT
更多内容
Flink Forward Asia 2022
本届 Flink Forward Asia 更多精彩内容,可点击浏览原文或扫描图片二维码观看全副议题的视频回放及获取 FFA 2022 峰会材料!
PC 端观看:https://flink-forward.org.cn/「 倡议返回 FFA 2022 大会官网观看全副议题的视频回放 」
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…