摘要:本文整顿自阿里云高级开发工程师 Apache Flink Committer、Flink 1.16 Release Manager 黄兴勃(断尘),在 FFA 2022 核心技术专场的分享。本篇内容次要分为四个局部:
- 综述
- 继续当先的流解决
- 更稳固易用高性能的批处理
- 蓬勃发展的生态
点击查看直播回放和演讲 PPT
一、综述
Flink 1.16 同 Flink 1.15 相比,在 Commits、Issues、Contributors 上,放弃了较高的水准。最大的不同是,咱们在 Flink 1.16 中大部分的性能和代码,次要由中国开发者主导实现。
非常感谢二百四十多位中国 Contributors 对 Flink 1.16 的奉献。接下来,咱们具体的看一下 Flink 1.16 在三个方面的改良。
二、继续当先的流解决
Flink 作为流式计算引擎的规范,在 Flink 1.16 的流解决方面,仍然做了许多的改良和摸索。
State 是 Flink 中十分重要的概念。有了 State 的存在,才使得 Flink 在流解决上可能保障端到端的 Exactly Once 的语义。State 通过多年的继续倒退,在 Flink 1.15 提出了 Changelog State Backend,为了解决 RocksDB 因为 TM 须要同时进行 Compaction 和 Upload,导致它的 CPU 和带宽呈现周期性抖动的问题。这部分在引入 Changelog 后失去了解决。
它的基本原理是,当 TM 做 State 操作时,会同时双写。一部分数据会像原来一样,写到原来的本地 State Table 当中。与此同时,会将 State 数据以 Append Only 的模式,写到本地 Changelog 中。Changelog 中的这部分信息,会周期性的上传到远端 DFS。
因为这部分 Changelog 信息绝对固定,而且是一种周期定量的模式,所以使得在做 Checkpoint 时,长久化的数据变少,放慢了作业 Failover 的速度。其次,因为这份数据绝对较少,做 Checkpoint 时的速度会更快。
除此之外,它还改善了 Checkpoint 的问题,解决了 CPU 和带宽抖动的问题。因为它的速度更快,低延时,使得端到端的数据新鲜度更好,放弃在分钟级别以内。
这部分性能在 Flink 1.16 实现了全面生产可用,整个集群变得更加稳固。
咱们在 Flink 1.16 的 RocksDB 上,也做了很多改良,咱们引入了 Rescaling Benchmark。能够观测 RocksDB 的 Rescaling 耗时,以及耗时的局部在哪里。
除此之外,咱们也在 RocksDB Rescaling 上做了一些改良,大幅晋升了 RocksDB Rescaling 的性能。如上图所示,咱们能看到在左图中,对于一个 WordCount 作业,晋升了 3~4 倍的性能。
除了 RocksDB Rescaling 的改良,咱们对现有的 State Metrics 和 Monitor 也进行了改良。咱们将原来 RocksDB 的一些 log 信息,重定向到了 Flink log 中。其次,咱们将 RocksDB 基于 Database 级别的 Metric 信息引入到了 Flink 零碎中。用户能够通过 Flink Metric 零碎,查看 RocksDB 的状况。
方才讲了咱们在 Flink 1.16 中,对 State 和 RocksDB 的改良。除此之外,咱们在 Checkpoint 上也做了很多改良。
在 Flink 1.11 中,咱们引入了 Unaligned Checkpoint,并在 Flink 1.13 实现了生产可用。自此,很多公司开始在他们的生产环境中应用 Unaligned Checkpoint。但在应用过程中,也发现了一些问题。某公司的小伙伴在本人的生产环境中,应用了 Unaligned Checkpoint 后,发现了一些问题,并进行了改良,回馈给了社区。
咱们简略看一下 Unaligned Checkpoint 做的一些改良。
第一个是反对透支 buffer。这个性能的引入是为了解决咱们 Unaligned Checkpoint 因为 Flink 的执行流程是基于 Mailbox 的解决流程带来的可能的问题。咱们晓得,做 Checkpoint 的流程是在主线程中进行的,而当主线程在解决 Process Function 逻辑时,Process Function 输入的数据是须要往上游 buffer 输入须要申请一些 output buffer 的。
在上述过程当中,可能呈现在反压很重大的状况时,因为难以申请到 buffer,导致主线程卡在了 Process Function 逻辑中。在 Flink 1.16 之前,社区也做了相干的改良:当主线程须要进入 Process Function 逻辑前,须要事后申请 output buffer 里的 buffer。有了这个 buffer 后,它才会进入 Process Function 的逻辑,从而防止卡在外面。
上述计划解决了局部问题,然而仍没有解决其余的一类 Case。如果输出 / 输入的数据太大,或者 Process Function 是一个 Flatmap Function,须要输入多条数据的状况,一个 buffer 将无奈满足,主线程仍然会卡死在 Process Function 里。
在 Flink 1.16 中,引入了透支 buffer 的形式。如果 TM 上有额定的一些 buffer 的话,你就能够申请这部分内存。而后,通过透支这部分的 buffer,让主线程不会卡死在 Process Function 中,就能失常跳出 Process Function。主线程就能接管到 Unaligned Checkpoint Barrier。
之前,Unaligned Checkpoint 引入了一个 Timeout Aligned 机制。如果你的 Input Channel 接管到一个 Checkpoint Barrier 时,在指定时间段内没有实现 Barrier 对齐的话,Task 将会切换到 Unaligned Checkpoint。然而如果 Barrier 卡在了 output buffer 外面,上游的 Task 仍然是 Aligned Checkpoint。在 Flink 1.16 中,解决了 Barrier 卡在输入队列的状况。
通过以上这两个改良,Unaligned Checkpoint 失去了更大的晋升,稳定性也更高。
咱们在 Flink 1.16 中,对维表局部的加强。
- 咱们引入了一种缓存机制,晋升了维表的查问性能。
- 咱们引入了一种异步查问机制,晋升了整个吞吐。
- 咱们引入一种重试机制,次要为了解决维表查问时,遇到的内部零碎更新过慢,导致后果不正确,以及稳定性问题。
通过上述改良,咱们的维表查问能力失去了极大晋升。
在 Flink 1.16 中,咱们反对了更多的 DDL。比方 CREATE FUNCTION USING JAR,反对动静加载用户的 JAR,不便平台用户治理用户的 UDF。
其次,咱们反对 CREATE TABLE AS SELECT,让用户便捷的通过已有 Table 创立一个新 Table。
最初,ANALYZE TABLE 是 Table 的一种新语法。帮忙用户生成更高效的统计信息。这些统计信息会让优化器产生更好的执行图,晋升整个作业的性能。
除此之外,咱们在流上做了很多优化。这里只列举了几个比拟重要的优化。
咱们改良了流解决零碎中,非确定性问题。这部分非确定性的问题次要蕴含两局部,一个是维表查问上的非确定性问题,另一个是用户的 UDF 是非确定性的 UDF。
- 咱们在 Flink 1.16 提供了一套十分齐备的系统性解决方案。首先,咱们可能自动检测 SQL 中,是否有一些非确定性的问题。其次,引擎帮用户解决了维表查问的非确定性问题。最初,提供了一些文档,用户能依据这些文档,更好的发现和解决本人作业中非确定性的问题。
- 咱们终于解决了咱们 Protobuf Format 的反对。在 Flink 1.16 中,反对用户应用 PB 格局数据。
- 咱们引入了 RateLimitingStrategy。之前这部分的 Strategy 是定制化,不可配的。在 Flink 1.16 中,咱们把它变成可配置。用户可能依据本人的网络梗塞策略,实现本人的一套配置。
三、更稳固易用高性能的批处理
方才咱们聊的都是 Flink 在流解决方面的改良。Flink 不仅是一个流式计算引擎,而且是一个流批一体的计算引擎。所以咱们在批处理方面,也做了十分多的工作。Flink 1.16 的指标是,使批处理计算够达到更稳固地利用和高性能。
在易用性方面,现有的批生态中,很多用户的作业仍运行在 Hive 生态。在 Flink 1.16 中,咱们心愿 Hive 的 SQL 可能以十分高价的形式,迁徙到 Flink 上。Flink 1.16 的 Hive 生态兼容度达到了 94%。如果扣除掉一些 acid 操作,Hive 生态兼容度达到了 97%。与此同时,配合 Catalog,Hive SQL 在 Flink 引擎上, 可能运行联邦查问的能力。
在 Flink 1.16 中,咱们还引入了一个十分重要的组件 SQL Gateway。通过 SQL Gateway,以及反对的 HiveServer2,使得 Hive 生态中支流的生态产品能够十分天然的接入 Flink 生态。
在 Flink 1.16 SQL Gateway 中,咱们反对多租户,兼容 HiveServer2 协定,以及 HiveServer2 带来的 Hive 生态。有了 HiveServer2 的配合,整个 Hive 生态就能十分便捷的迁徙到 Flink 生态上。
接下来,看看 Flink 引擎自身为批做了哪些优化。首先,讲一讲与调度相干的优化。Flink 批作业常常会遇到这样的问题。因为一些热点机器的 IO 忙碌或 CPU 高负载,导致机器上运行的工作连累 Flink 批作业端到端的执行工夫。
在 Flink 1.16 中,为了解决这个问题,咱们引入了 Speculative Execution, 一种揣测执行的形式。它的基本原理是,在每个阶段,如果咱们检测到某一个机器,它是一种热点机器,它下面运行的工作被称为慢工作。所谓慢工作就是,它的慢工作执行工夫比同一阶段其余工作的执行工夫要长的多,从而把这台机器定义为热点机器。
有了热点机器之后,为了升高整个作业的执行工夫。咱们心愿把热点机器上运行的慢工作,通过一个备份工作,让它可能运行在其余非热点机器上。从而使得整个工作的总执行工夫缩短。
接下来,咱们简略看一下它的具体细节。首先,有一个叫 Slow Task Detector 的组件。这个组件会周期性的查看是否有一些慢工作以及慢工作对应的热点机器。
收集到了这些信息后,它会汇报给 Speculative Scheduler。Scheduler 会把这些信息汇报给 Blocklist Handler。而后 Blocklist Handler 会把这些机器加黑。
有了这些加黑机器之后,加黑机器上慢工作的备份工作会被调度到集群当中其余非热点的机器之上,让这些慢工作和备份工作同时运行。谁先实现就抵赖哪个工作的后果。被抵赖的那个实例,它的输入也能作为上游算子的输出。没能实现的工作将会被 cancel 掉。
在 Speculative Execution 中,咱们也引入了一些 Rest 和 Web UI 的反对。如上左图所示,能够察看到哪些慢工作被勾销,哪些是其备份工作。通过右图,能够实时看到哪些 TM 机器被标成了 black 机器。
对于 Speculative Execution 的后续工作,咱们以后不反对在 Sink 上的 Speculative Execution。其次,咱们现有的检测策略比拟毛糙。咱们并没有无效的思考到,因为数据歪斜导致一些慢工作的检测谬误。把一些自身是失常的机器,标成了热点机器。这都是咱们在 Flink 1.17 之后,须要做的工作。
咱们在 Shuffle 上做的工作。家喻户晓,Flink 有两个 Shuffle 策略,一个是 Pipelined Shuffle,另一个是 Blocking Shuffle。
流式 Pipelined Shuffle 的上下游 Task,可能同时调度运行。它的数据传输是一种空对空的传输,数据不落盘,性能更好。但它的毛病是,会占用更多的资源。因为它须要上下游的 Task,同时调度。在一些资源比拟缓和的状况下,可能导致作业难以拉起,或者整个作业因为资源索取,造成死锁。
在 Blocking Shuffle 方面,因为它在每个阶段,Task 都会把它的后果写到磁盘里。而后,上游的 Task 再通过磁盘,读取它的数据。这样是益处是,实践上只须要一个 Slot,就能实现整个批作业的运行。但它的毛病也不言而喻。因为每个阶段都要把后果数据落盘,下一步还须要读磁盘,所以它的性能较差。
基于这种思考,咱们在 Flink 1.16 提出了一种新的 Shuffle 策略,即 Hybrid Shuffle。其目是同时利用上述两套 Shuffle 的长处。在资源短缺时,咱们利用 Pipelined Shuffle 的性能劣势。在资源有余时,咱们利用 Blocking Shuffle 的资源优势。整套 Shuffle 策略是自适应切换的,这是 Hybrid Shuffle 的根本思维。
Hybrid Shuffle 在数据落盘方面,有两套策略。一套是全落盘,另一套是选择性落盘。
选择性落盘的益处是,咱们写更少的数据落盘,整个性能绝对更高。而全落盘的益处是在 Failover 时的性能会更好。依据用户场景是哪种不同,抉择到底用 Hybrid Shuffle 的哪种落盘策略。
在性能方面,Flink 1.16 的 Hybrid Shuffle 相比 Blocking Shufle,TPC-DS 执行工夫缩小了 7.2%。如果加上播送方面的优化,优化后的 TPC-DS 执行工夫比会比 Blocking Shuffle 缩小 17%。
对于 Hybrid Shuffle 的后续工作,一个是播送方面的性能优化。另一个是,与 Flink 1.15 提出的 Adaptive Batch Scheduler 适配,以及 Speculative Execution 的适配。
如上图所示,咱们在批处理方面还有许多其余优化。咱们简略列了一些比拟重要的优化。
- 首先,咱们反对了 Dynamic Partition Pruning,即动静分支裁剪。在 1.16 以前的分支裁剪策略都是基于执行图上的动态裁剪。但在批处理上,齐全能够利用 Runtime 的一些统计信息,更加高效的进行分支裁剪策略。这套实现让 Flink 1.16 在 TPC-DS 上有 30% 的性能晋升。
- 咱们引入了 Adaptive Hash Join,一种自适应策略。咱们利用 Runtime 的一些统计信息,自适应的将 Hash Join 回退到 Sort Merge Join,晋升 Join 的稳定性。
- 咱们在批处理之前的 Blocking Shuffle 上做了一些改良。咱们引入了更多的压缩算法 (LZO 和 ZSTD)。新的压缩算法是为了解决在数据大小以及 CPU 耗费上的均衡。
- 咱们优化了现有的 Blocking Shuffle 的实现。通过自适应的 buffer 调配,IO 程序读取,以及 Result Partition 共享,在 TPC-DS 上有 7% 的性能晋升。
咱们在 Batch SQL 上,反对 Join Hints。Join Hints 让用户能手动干涉 Join 策略。用户将会晓得生成更加高效的执行打算,晋升整个作业的性能。
四、蓬勃发展的生态
接下来,介绍一些蓬勃发展的生态产品。如上图所示,PyFlink 是咱们生态中十分重要的产品。PyFlink 通过 Flink 1.9 的一路倒退到了 Flink 1.16。
- Python API 的覆盖度达到了 95% 以上。一方面,咱们优化内置 Window 的反对。在 Flink 1.16 以前,咱们在 Flink 1.15 反对了自定义 Window。但对于须要自定义的 Window,用户的实现老本仍然较高,难以使用。另一方面,咱们在 Flink 1.16 引入了 side output、broadcast state 等反对。
- PyFlink 反对反对所有的内置 Connector&Format。裁减了 PyFlink 对接各种零碎的能力。
- PyFlink 反对 M1 和 Python 3.9。有了这两局部能力,升高了用户的上手老本。与此同时,Deprecate Python 3.6, 将在 Flink 1.17 里移除对 Python3.6 的反对,引入 Python3.10 的反对。
- PyFlink 搭建了本人的用户网站。提供了各种执行环境下的装置教程,能够在线运行 QuickStart 例子。这些例子间接挂载在在线的 notebook 网站。与此同时,咱们总结了许多用户常见的问题答疑,不便新用户上手。同时咱们整顿了 PyFlink 端到端的场景案例。这些局部内容实质上是为了升高新用户的门槛。
在性能方面,咱们在 PyFlink 1.15 时,引入了 Thread Mode。Thread Mode 绝对于 Process Mode 最大的不同是,它解决了 Python 过程和 Java 过程间的通信问题。如果是过程间通信,将会有一些序列化 / 反序列化的开销,而 Thread Mode 将不再有这种问题。
在 Flink 1.16,咱们对 Thread Mode 进行了残缺的反对。绝对于 Process Mode,它的性能会更好,端到端的提早会更低。
如上图所示,在 JSON 计算的场景下,Thread Mode 的端到端提早只有 Process Mode 的 1/500。它的性能在通用的典型场景,以及数据比拟常见的场景之下,Thread Mode 的性能根本追平了 Java。
由此可见,在 Flink 1.16 中,PyFlink 在性能和性能上, 曾经达到全面生产可用。除此之外,CEP 也是 Flink 生态中很重要的一部分。咱们在 Flink 1.16,对 CEP 的性能进行裁减。
- 咱们在 Batch SQL 上,反对了 CEP 能力。
- 咱们裁减了原有只反对首尾工夫距离的性能,反对了定义事件、事件间隔。
方才讲的是 Flink 外部重要的生态产品, 在 Flink 我的项目外, 咱们还有一些重要的生态我的项目。比方 Flink Table Store、Flink CDC、Flink ML、Feathub。
- Flink Table Store 配合 Changelog State Backend,实现端到端数据的新鲜度达到分钟级别内。
- 咱们在数据的正确性问题上,做了一些改良。使得 CDC 流进来后做 Join 和聚合会更加晦涩。
- 咱们在 DataStream 上反对了 Cache 性能,使 Flink ML 在实现内置算子时,可能失去更高性能。
- 前一段刚刚开源的 Feathub 我的项目。Feathub 依赖 PyFlink 作为它的计算引擎底座。随着 PyFlink 性能的晋升,Feathub 应用 Python Function 的性能靠近 Java Function 的性能,不再有劣势。
点击查看直播回放和演讲 PPT
更多内容
<p style=”text-align:center”><img src=”https://img.alicdn.com/imgextra/i3/O1CN0102Wuzs1dUVfQKlv59_!!6000000003739-2-tps-1920-675.png” alt=”img” style=”zoom:100%;” /></p>
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/product/bigdata/sc