乐趣区

解决问题-1474-个Flink-111-究竟有哪些易用性上的改善

7 月 7 日,Flink 1.11.0 正式公布了,作为这个版本的 release manager 之一,我想跟大家分享一下其中的经验感触以及一些代表性 feature 的解读。在进入深度解读前,咱们先简略理解下社区公布的个别流程,帮忙大家更好的了解和参加 Flink 社区的工作。

  • 首先在每个版本的布局初期,会从志愿者中选出 1-2 名作为 Release Manager。1.11.0 版本我作为中国这边的 Release Manager,同时还有一名来自 Ververica 的 Piotr Nowojski 作为德国方的 Release Manager,这在某种程度上也阐明中国的开发者和贡献度在整个社区的占比很重要。
  • 接下来会进行这个版本的 Feature Kickoff。在一些大的方向上,社区的布局周期可能比拟久,会分阶段、分步骤逾越多个版本实现,确保品质。每个版本的侧重点也会有所不同,比方前两个版本侧重于批处理的增强,而这个版本更侧重于流解决易用性的晋升。社区规划的 Feature 列表会在邮件列表中发动探讨,以收集更多的用户 / 开发者意见和反馈。
  • 个别的开发周期为 2-3 个月工夫,提前会明确布局出大略的 Feature Freeze 工夫,之后进行 Release Candidate 的公布和测试、以及 Bug Fix。个别通过几轮的迭代周期后会正式投票通过一个绝对稳固的 Candidate 版本,而后基于这个版本正式公布。

Flink 1.11.0 从 3 月初的性能布局到 7 月初的正式公布,历经了差不多 4 个月的工夫,对 Flink 的生态、易用性、生产可用性、稳定性等方面都进行了加强和改善,上面将一一跟大家分享。

一  综述

Flink 1.11.0 从 Feature 解冻后公布了 4 次 Candidate 才最终通过。经统计,一共有 236 个贡献者参加了这次版本开发,解决了 1474 个 Jira 问题,波及 30 多个 FLIP,提交了 2325 个 Commit。

纵观近五次版本公布,能够看出从 1.9.0 开始 Flink 进入了一个疾速倒退阶段,各个维度指标相比之前都有了简直翻倍的进步。也是从 1.9.0 开始阿里巴巴外部的 Blink 我的项目开始被开源 Flink 整合,到 1.10.0 通过两个大版本曾经全副整合结束,对 Flink 从生态建设、功能性、性能和生产稳定性上都有了大幅的加强。

Flink 1.11.0 版本的最后定位是重点解决易用性问题,晋升用户业务的生产应用体验,整体上不做大的架构调整和性能开发,偏向于疾速迭代的小版本开发。然而从下面统计的各个指标来看,所谓的“小版本”在各个维度的数据也丝毫不逊色于前两个大版本,解决问题的数量和参加的贡献者人数也在继续减少,其中来自中国的贡献者比例达到 62%。

上面咱们会深度分析 Flink 1.11.0 带来了哪些让大家期待已久的个性,从用户间接应用的 API 层始终到执行引擎层,咱们都会抉择一些有代表性的 Feature 从不同维度解读,更残缺的 Feature 列表请大家关注公布的 Release Blog。

二  生态欠缺和易用性晋升

这两个维度在某种程度上是相辅相成的,很难严格辨别开,生态兼容上的缺失经常造成应用上的不便,晋升易用性的过程往往也是不断完善相干生态的过程。在这方面用户感知最显著的应该就是 Table & SQL API 层面的应用。

1  Table & SQL 反对 Change Data Capture(CDC)

CDC 被宽泛应用在复制数据、更新缓存、微服务间同步数据、审计日志等场景,很多公司都在应用开源的 CDC 工具,如 MySQL CDC。通过 Flink 反对在 Table & SQL 中接入和解析 CDC 是一个强需要,在过往的很多探讨中都被提及过,能够帮忙用户以实时的形式解决 Changelog 流,进一步扩大 Flink 的利用场景,例如把 MySQL 中的数据同步到 PG 或 ElasticSearch 中,低延时的 Temporal Join 一个 Changelog 等。

除了思考到下面的实在需要,Flink 中定义的“Dynamic Table”概念在流上有两种模型:Append 模式和 Update 模式。通过 Append 模式把流转化为“Dynamic Table”在之前的版本中曾经反对,因而在 1.11.0 中进一步反对 Update 模式也从概念层面残缺的实现了“Dynamic Table”。

为了反对解析和输入 Changelog,如何在内部零碎和 Flink 零碎之间编解码这些更新操作是首要解决的问题。思考到 Source 和 Sink 是连接内部零碎的一个桥梁,因而 FLIP-95 在定义全新的 Table Source 和 Table Sink 接口时解决了这个问题。

在公开的 CDC 调研报告中,Debezium 和 Canal 是用户中最风行应用的 CDC 工具,这两种工具用来同步 Changelog 到其它的零碎中,如音讯队列。据此,FLIP-105 首先反对了 Debezium 和 Canal 这两种格局,而且 Kafka Source 也曾经能够反对解析上述格局并输入更新事件,在后续的版本中会进一步反对 Avro(Debezium)和 Protobuf(Canal)。

CREATE TABLE my_table (...) WITH (  
'connector'='...', -- e.g. 'kafka'  
'format'='debezium-json',  
'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)  
'debezium-json.ignore-parse-errors'='true' -- default: false
);

2  Table & SQL 反对 JDBC Catalog

1.11.0 之前,用户如果依赖 Flink 的 Source/Sink 读写关系型数据库或读取 Changelog 时,必须要手动创立对应的 Schema。而且当数据库中的 Schema 发生变化时,也须要手动更新对应的 Flink 作业以保持一致和类型匹配,任何不匹配都会造成运行时报错使作业失败。用户常常埋怨这个看似冗余且繁琐的流程,体验极差。

实际上对于任何和 Flink 连贯的内部零碎都可能有相似的上述问题,在 1.11.0 中重点解决了和关系型数据库对接的这个问题。FLIP-93 提供了 JDBC catalog 的根底接口以及 Postgres catalog 的实现,这样不便后续实现与其它类型的关系型数据库的对接。

1.11.0 版本后,用户应用 Flink SQL 时能够主动获取表的 Schema 而不再须要输出 DDL。除此之外,任何 Schema 不匹配的谬误都会在编译阶段提前进行查看报错,防止了之前运行时报错造成的作业失败。这是晋升易用性和用户体验的一个典型例子。

3  Hive 实时数仓

从 1.9.0 版本开始 Flink 从生态角度致力于集成 Hive,指标打造批流一体的 Hive 数仓。通过前两个版本的迭代,曾经达到了 Batch 兼容且生产可用,在 TPC-DS 10T Benchmark 下性能达到 Hive 3.0 的 7 倍以上。

1.11.0 在 Hive 生态中重点实现了实时数仓计划,改善了端到端流式 ETL 的用户体验,达到了批流一体 Hive 数仓的指标。同时在兼容性、性能、易用性方面也进一步进行了增强。

在实时数仓的解决方案中,凭借 Flink 的流式解决劣势做到实时读写 Hive:

  • Hive 写入:FLIP-115 欠缺扩大了 FileSystem Connector 的根底能力和实现,Table/SQL 层的 sink 能够反对各种格局(CSV、Json、Avro、Parquet、ORC),而且反对 Hive Table 的所有格局。
  • Partition 反对:数据导入 Hive 引入 Partition 提交机制来管制可见性,通过 sink.partition-commit.trigger 管制 Partition 提交的机会,通过 sink.partition-commit.policy.kind 抉择提交策略,反对 SUCCESS 文件和 Metastore 提交。
  • Hive 读取:实时化的流式读取 Hive,通过监控 Partition 生成增量读取新 Partition,或者监控文件夹内新文件生成来增量读取新文件。

在 Hive 可用性方面的晋升:

  • FLIP-123 通过 Hive Dialect 为用户提供语法兼容,这样用户无需在 Flink 和 Hive 的 CLI 之间切换,能够间接迁徙 Hive 脚本到 Flink 中执行。
  • 提供 Hive 相干依赖的内置反对,防止用户本人下载所需的相干依赖。当初只须要独自下载一个包,配置 HADOOP_CLASSPATH 就能够运行。

在 Hive 性能方面,1.10.0 中曾经反对了 ORC(Hive 2+)的向量化读取,1.11.0 中咱们补全了所有版本的 Parquet 和 ORC 向量化反对来晋升性能。

4  全新 Source API

后面也提到过,Source 和 Sink 是 Flink 对接内部零碎的一个桥梁,对于欠缺生态、可用性及端到端的用户体验是很重要的环节。社区早在一年前就曾经布局了 Source 端的彻底重构,从 FLIP-27 的 ID 就能够看出是很早的一个 Feature。然而因为波及到很多简单的外部机制和思考到各种 Source Connector 的实现,设计上须要思考的很全面。从 1.10.0 就开始做 POC 的实现,最终赶上了 1.11.0 版本的公布。

先简要回顾下 Source 之前的次要问题:

  • 对用户而言,在 Flink 中革新已有的 Source 或者从新实现一个生产级的 Source Connector 不是一件容易的事件,具体体现在没有公共的代码能够复用,而且须要了解很多 Flink 外部细节以及实现具体的 Event Time 调配、Watermark 产出、Idleness 监测、线程模型等。
  • 批和流的场景须要实现不同的 Source。
  • Partitions/Splits/Shards 概念在接口中没有显式表白,比方 Split 的发现逻辑和数据生产都耦合在 Source Sunction 的实现中,这样在实现 Kafka 或 Kinesis 类型的 Source 时减少了复杂性。
  • 在 Runtime 执行层,Checkpoint 锁被 Source Function 抢占会带来一系列问题,框架很难进行优化。

FLIP-27 在设计时充分考虑了上述的痛点:

  • 首先在 Job Manager 和 Task Manager 中别离引入两种不同的组件 Split Enumerator 和 Source Reader,解耦 Split 发现和对应的生产解决,同时不便随便组合不同的策略。比方现有的 Kafka Connector 中有多种不同的 Partition 发现策略和实现耦合在一起,在新的架构下,咱们只须要实现一种 Source Reader,就能够适配多种 Split Enumerator 的实现来对应不同的 Partition 发现策略。
  • 在新架构下实现的 Source Connector 能够做到批流对立,惟一的小区别是对批场景的无限输出,Split Enumerator 会产出固定数量的 Split 汇合并且每个 Split 都是无限数据集;对于流场景的有限输出,Split Enumerator 要么产出有限多的 Split 或者 Split 本身是有限数据集。
  • 简单的 Timestamp Assigner 以及 Watermark Generator 通明的内置在 Source Reader 模块内运行,对用户来说是无感知的。这样用户如果想实现新的 Source Connector,个别不再须要反复实现这部分性能。

目前 Flink 已有的 Source Connector 会在后续的版本中基于新架构来从新实现,Legacy Source 也会持续保护几个版本放弃兼容性,用户也能够依照 Release 文档中的阐明来尝试体验新 Source 的开发。

5  PyFlink 生态

家喻户晓,Python 语言在机器学习和数据分析畛域有着宽泛的应用。Flink 从 1.9.0 版本开始发力兼容 Python 生态,Python 和 Flink 合力为 PyFlink,把 Flink 的实时分布式解决能力输入给 Python 用户。前两个版本 PyFlink 曾经反对了 Python Table API 和 UDF,在 1.11.0 中扩充对 Python 生态库 Pandas 的反对以及和 SQL DDL/Client 的集成,同时 Python UDF 性能有了极大的晋升。

具体来说,之前一般的 Python UDF 每次调用只能解决一条数据,而且在 Java 端和 Python 端都须要序列化 / 反序列化,开销很大。1.11.0 中 Flink 反对在 Table & SQL 作业中自定义和应用向量化 Python UDF,用户只须要在 UDF 润饰中额定减少一个参数 udf_type=“pandas”即可。这样带来的益处是:

  • 每次调用能够解决 N 条数据。
  • 数据格式基于 Apache Arrow,大大降低了 Java、Python 过程之间的序列化 / 反序列化开销。
  • 不便 Python 用户基于 Numpy 和 Pandas 等数据分析畛域罕用的 Python 库,开发高性能的 Python UDF。

除此之外,1.11.0 中 PyFlink 还反对:

  • PyFlink table 和 Pandas DataFrame 之间无缝切换(FLIP-120),加强 Pandas 生态的易用性和兼容性。
  • Table & SQL 中能够定义和应用 Python UDTF(FLINK-14500),不再必须 Java/Scala UDTF。
  • Cython 优化 Python UDF 的性能(FLIP-121),比照 1.10.0 能够晋升 30 倍。
  • Python UDF 中用户自定义 Metric(FLIP-112),不便监控和调试 UDF 的执行。

上述解读的都是偏重 API 层面,用户开发作业能够间接感知到的易用性的晋升。上面咱们看看执行引擎层在 1.11.0 中都有哪些值得关注的变动。

三  生产可用性和稳定性晋升

1  反对 Application 模式和 Kubernetes 加强

1.11.0 版本前,Flink 次要反对如下两种模式运行:

  • Session 模式:提前启动一个集群,所有作业都共享这个集群的资源运行。劣势是防止每个作业独自启动集群带来的额定开销,毛病是隔离性稍差。如果一个作业把某个 Task Manager(TM)容器搞挂,会导致这个容器内的所有作业都跟着重启。尽管每个作业有本人独立的 Job Manager(JM)来治理,然而这些 JM 都运行在一个过程中,容易带来负载上的瓶颈。
  • Per-job 模式:为了解决 Session 模式隔离性差的问题,每个作业依据资源需要启动独立的集群,每个作业的 JM 也是运行在独立的过程中,负载绝对小很多。

以上两种模式的独特问题是须要在客户端执行用户代码,编译生成对应的 Job Graph 提交到集群运行。在这个过程须要下载相干 Jar 包并上传到集群,客户端和网络负载压力容易成为瓶颈,尤其当一个客户端被多个用户共享应用。

1.11.0 中引入了 Application 模式(FLIP-85)来解决上述问题,依照 Application 粒度来启动一个集群,属于这个 Application 的所有 Job 在这个集群中运行。外围是 Job Graph 的生成以及作业的提交不在客户端执行,而是转移到 JM 端执行,这样网络下载上传的负载也会扩散到集群中,不再有上述 Client 单点上的瓶颈。

用户能够通过 bin/flink run-application 来应用 Application 模式,目前 Yarn 和  Kubernetes(K8s)都曾经反对这种模式。Yarn application 会在客户端将运行作业须要的依赖都通过 Yarn Local Resource 传递到 JM。K8s Application 容许用户构建蕴含用户 Jar 与依赖的镜像,同时会依据作业主动创立 TM,并在完结后销毁整个集群,相比 Session 模式具备更好的隔离性。K8s 不再有严格意义上的 Per-Job 模式,Application 模式相当于 Per-Job 在集群进行提交作业的实现。

除了反对 Application 模式,Flink 原生 K8s 在 1.11.0 中还欠缺了很多根底的性能个性(FLINK-14460),以达到生产可用性的规范。例如 Node Selector、Label、Annotation、Toleration 等。为了更不便的与 Hadoop 集成,也反对依据环境变量主动挂载 Hadoop 配置的性能。

2  Checkpoint & Savepoint 优化

Checkpoint 和 Savepoint 机制始终是 Flink 放弃先进性的外围竞争力之一,社区在这个畛域的改变很审慎,最近的几个大版本中简直没有大的性能和架构上的调整。在用户邮件列表中,咱们常常能看到用户反馈和埋怨的相干问题:比方 Checkpoint 长时间做不进去失败,Savepoint 在作业重启后不可用等等。1.11.0 有抉择的解决了一些这方面的常见问题,进步生产可用性和稳定性。

1.11.0 之前,Savepoint 中 Meta 数据和 State 数据别离保留在两个不同的目录中,这样如果想迁徙 State 目录很难辨认这种映射关系,也可能导致目录被误删除,对于目录清理也同样有麻烦。1.11.0 把两局部数据整合到一个目录下,这样不便整体转移和复用。另外,之前 Meta 援用 State 采纳的是绝对路径,这样 State 目录迁徙后门路发生变化也不可用,1.11.0 把 State 援用改成了相对路径解决了这个问题(FLINK-5763),这样 Savepoint 的治理保护、复用更加灵便不便。

理论生产环境中,用户常常遭逢 Checkpoint 超时失败、长时间不能实现带来的困扰。一旦作业 failover 会造成回放大量的历史数据,作业长时间没有进度,端到端的提早减少。1.11.0 从不同维度对 Checkpoint 的优化和提速做了改良,指标实现分钟甚至秒级的轻量型 Checkpoint。

首先,减少了 Checkpoint Coordinator 告诉 Task 勾销 Checkpoint 的机制(FLINK-8871),这样防止 Task 端还在执行曾经勾销的 Checkpoint 而对系统带来不必要的压力。同时 Task 端放弃曾经勾销的 Checkpoint,能够更快的参加执行 Coordinator 新触发的 Checkpoint,某种程度上也能够防止新 Checkpoint 再次执行超时而失败。这个优化也对前面默认开启 Local Recovery 提供了便当,Task 端能够及时清理生效 Checkpoint 的资源。

其次,在反压场景下,整个数据链路沉积了大量 Buffer,导致 Checkpoint Barrier 排在数据 Buffer 前面,不能被 Task 及时处理对齐,也就导致了 Checkpoint 长时间不能执行。1.11.0 中从两个维度对这个问题进行解决:

1)尝试 缩小数据链路中的 Buffer 总量(FLINK-16428),这样 Checkpoint Barrier 能够尽快被解决对齐。

  • 上游输入端管制单个 Sub Partition 沉积 Buffer 的最大阈值(Backlog),防止负载不均场景下单个链路上沉积大量 Buffer。
  • 在不影响网络吞吐性能的状况下正当批改上下游默认的 Buffer 配置。
  • 上下游数据传输的根底协定进行了调整,容许单个数据链路能够配置 0 个独占 Buffer 而不死锁,这样总的 Buffer 数量和作业并发规模解耦。依据理论需要在吞吐性能和 Checkpoint 速度两者之间衡量,自定义 Buffer 配比。

这个优化有一部分工作曾经在 1.11.0 中实现,残余局部会在下个版本持续推动实现。

2)实现了 全新的 Unaligned Checkpoint 机制(FLIP-76)从根本上解决了反压场景下 Checkpoint Barrier 对齐的问题。实际上这个想法早在 1.10.0 版本之前就开始酝酿设计,因为波及到很多模块的大改变,实现机制和线程模型也很简单。咱们实现了两种不同计划的原型 POC 进行了测试、性能比照,确定了最终的计划,因而直到 1.11.0 才实现了 MVP 版本,这也是 1.11.0 中执行引擎层惟一的一个重量级 Feature。其根本思维能够概括为:

  • Checkpoint Barrier 跨数据 Buffer 传输,不在输入输出队列排队期待解决,这样就和算子的计算能力解耦,Barrier 在节点之间的传输只有网络延时,能够忽略不计。
  • 每个算子多个输出链路之间不须要期待 Barrier 对齐来执行 Checkpoint,第一个到的 Barrier 就能够提前触发 Checkpoint,这样能够进一步提速 Checkpoint,不会因为个别链路的提早而影响整体。
  • 为了和之前 Aligned Checkpoint 的语义保持一致,所有未被解决的输入输出数据 Buffer 都将作为 Channel State 在 Checkpoint 执行时进行快照长久化,在 Failover 时连同 Operator State 一起进行复原。换句话说,Aligned 机制保障的是 Barrier 后面所有数据必须被解决完,状态实时体现到 Operator State 中;而 Unaligned 机制把 Barrier 后面的未解决数据所反映的 Operator State 延后到 Failover Restart 时通过 Channel State 回放进行体现,从状态复原的角度来说最终都是统一的。留神这里尽管引入了额定的 In-Flight Buffer 的长久化,然而这个过程理论是在 Checkpoint 的异步阶段实现的,同步阶段只是进行了轻量级的 Buffer 援用,所以不会过多占用算子的计算工夫而影响吞吐性能。

 Unaligned Checkpoint 在反压重大的场景下能够显著减速 Checkpoint 的实现工夫,因为它不再依赖于整体的计算吞吐能力,而和零碎的存储性能更加相干,相当于计算和存储的解耦。然而它的应用也有肯定的局限性,它会减少整体 State 的大小,对存储 IO 带来额定的开销,因而在 IO 曾经是瓶颈的场景下就不太适宜应用 Unaligned Checkpoint 机制。

1.11.0 中 Unaligned Checkpoint 还没有作为默认模式,须要用户手动配置来开启,并且只在 Exactly-Once 模式下失效。但目前还不反对 Savepoint 模式,因为 Savepoint 波及到作业的 Rescale 场景,Channel State 目前还不反对 State 拆分,在前面的版本会进一步反对,所以 Savepoint 目前还是会应用之前的 Aligned 模式,在反压场景下有可能须要很长时间能力实现。

四  总结

Flink 1.11.0 版本的开发过程中,咱们看到越来越多来自中国的贡献者参加到外围性能的开发中,见证了 Flink 在中国的生态倒退越来越凋敝,比方来自腾讯公司的贡献者参加了 K8s、Checkpoint 等性能开发,来自字节跳动公司的贡献者参加了 Table & SQL 层以及引擎网络层的一些开发。心愿更多的公司可能参加到 Flink 开源社区中,分享在不同畛域的教训,使 Flink 开源技术始终放弃先进性,可能普惠到更多的受众。

通过 1.11.0“小版本”的短暂调整,Flink 正在酝酿下一个大版本的 Feature,置信肯定会有很多重量级的个性退场,让咱们刮目相待!

退出移动版