乐趣区

官宣-千呼万唤Apache-Flink-1110-正式发布啦

Apache Flink 社区很荣幸的发表 Flink 1.11.0 版本正式公布!超过 200 名贡献者参加了 Flink 1.11.0 的开发,提交了超过 1300 个修复或优化。这些批改极大的进步了 Flink 的可用性,并且加强了各个 API 栈的性能。其中一些比拟重要的批改包含:

  1. 外围引擎局部 引入了非对齐的 Checkpoint 机制。这一机制是对 Flink 容错机制的一个重要改良,它能够进步重大反压作业的 Checkpoint 速度。
  2. 实现了一套新的 Source 接口。通过对立流和批作业 Source 的运行机制,提供罕用的外部实现如事件工夫解决,watermark 生成和闲暇并发检测,这套新的 Source 接口能够极大的升高实现新的 Source 时的开发复杂度。
  3. Flink SQL 引入了对 CDC(Change Data Capture,变动数据捕捉)的反对,它使 Flink 能够不便的通过像 Debezium 这类工具来翻译和生产数据库的变动日志。Table API 和 SQL 也扩大了文件系统连接器对更多用户场景和格局的反对,从而能够反对将流式数据从 Kafka 写入 Hive 等场景。
  4. PyFlink 优化了多个局部的性能,包含对向量化的用户自定义函数(Python UDF)的反对。这些改变使 Flink Python 接口能够与罕用的 Python 库(如 Pandas 和 NumPy)进行互操作,从而使 Flink 更适宜数据处理与机器学习的场景。

Flink 1.11.0 的二进制公布包和源代码能够在 Flink 官网的下载页面取得,对应的 PyFlink 公布包能够在 PyPI 网站下载。详情能够参阅公布阐明,公布性能更新与更新后的文档。

咱们心愿您下载试用这一版本后,能够通过 Flink 邮件列表和 JIRA 网站和咱们分享您的反馈意见。

GitHub 下载地址 ▼

https://flink.apache.org/down…

新的性能和优化

非对齐的 Checkpoints(Beta 版本)

当 Flink 发动一次 Checkpoint 时,Checkpoint Barrier 会从整个拓扑的 Source 登程始终流动到 Sink。对于超过一个输出的算子,来自各个输出的 Barrier 首先须要对齐,而后这个算子能力进行 state 的快照操作以及将 Barrier 公布给后续的算子。个别状况下对齐能够在几毫秒内实现,然而当反压时,对齐可能成为一个瓶颈:

  1. Checkpoint Barrier 在有反压的输出通道中流传的速度十分慢(须要期待后面的数据处理实现),这将会阻塞对其它输出通道的数据处理并最终进一步反压上游的算子。
  2. Checkpoint Barrier 流传慢还会导致 Checkpoint 工夫过长甚至超时,在最坏的状况下,这可能导致整个作业进度无奈更新。

为了进步 Checkpoint 在反压状况下的性能,Flink 社区在 1.11.0 版本中初步实现了非对齐的 Checkpoint 机制(FLIP-76)。与对齐的 Checkpoint(图 1)相比,这种形式下算子不须要期待来自各个输出通道的 Barrier 对齐,相同,这种形式容许 Barrier 越过后面的待处理的数据(即在输入和输出 Buffer 中的数据)并且间接触发 Checkpoint 的同步阶段。这一过程如图 2 所示。


图 1. 对齐的 Checkpoint


图 2. 非对齐的 Checkpoint

因为被越过的流传中的数据必须作为快照的一部分被长久化,非对齐的 Checkpoint 机制会减少 Checkpoint 的大小。然而,好的方面是它能够极大的缩小 Checkpoint 须要的工夫,因而即便在非稳固的环境中,用户也能够看到更多的作业进度。这是因为非对齐的 Checkpoint 能够缩小 Recovery 的负载。对于非对齐的 Checkpoint 更具体的信息以及将来的开发计划,能够别离参考相干文档和 FLINK-14551。

和其它 Beta 版本的个性一样,咱们十分期待和感谢您试用之后和社区分享您的感触。

留神:开启这一特色须要通过 Chekpoint 选项配置 enableUnalignedCheckpoints 参数。须要留神的是,非对齐的 Checkpoint 只有在 CheckpointMode 被设置为 CheckpointMode.EXACTLY_ONCE 的时候才无效。

对立的 Watermark 生成器

目前 Flink 的 Watermark 生成(也叫做调配)依赖于两个接口:AssignerWithPunctuatedWatermarks 与 AssignerWithPeriodicWatermarks,这两个接口与记录时间戳提取的关系也比拟凌乱,从而使 Flink 难以实现一些用户急需的性能,如反对闲暇检测;此外,这还会导致代码反复且难以保护。通过 FLIP-126,现有的 watermark 生成接口被对立为一个独自的接口,即 WatermarkGenerator,并且它和 TimestampAssigner 独立。

这一批改使用户能够更好的管制 watermark 的发送逻辑,并且简化实现反对 watermark 生成和工夫戳提取的 Source 的难度(能够参考新的 Source 接口)。基于这一接口,Flink 1.11 中还提供了许多内置的 Watermark 生成策略(例如 forBoundedOutOfOrderness, forMonotonousTimestamps),并且用户能够应用本人的实现。

■ 反对 Watermark 闲暇检测

WatermarkStrategy.withIdleness()办法容许用户在配置的工夫内(即超时工夫内)没有记录达到时将一个流标记为闲暇,从而进一步反对 Flink 正确处理多个并发之间的事件工夫歪斜的问题,并且防止了闲暇的并发提早整个零碎的事件工夫。通过将 Kafka 连接器迁徙至新的接口(FLINK-17669),用户能够受害于针对单个并发的闲暇检测。

留神:这一 FLIP 的批改目前不会影响现有程序,然而咱们举荐用户后续尽量应用新的 Watermark 生成接口,防止后续版本禁用之前的 Watermark 生成器带来的影响。

新的 Source 接口(Beta)

1.11 以编写一个生产可用的 Flink Source 连接器并不是一个简略的工作,它须要用户对 Flink 外部实现有肯定的理解,并且须要在连接器中自行实现事件工夫提取、Watermark 生成和闲暇检测等性能。针对这一问题,Flink 1.11 引入了一套新的 Source 接口 FLIP-27 来解决上述问题,并且同时解决了须要为批作业和流作业编写两套 Source 实现的问题。

通过将分区发现和实现生产每一个分区的数据分成不同的组件(即 SplitEnumerator 和 SourceReader),新的 Source 接口容许将不同的分区发现策略和分区生产的具体实现任意组合。

例如,现有的 Kafka 连接器提供了多种不同的分区发现策略,这些策略的实现和其实代码的实现耦合在一起。如果迁徙到新的接口,Kafka Source 将能够应用雷同的分区生产的实现(即 SourceReader),并且针对不同的分区发现策略编写独自的 SplitEnumerator 的实现。

■ 流批对立

应用新版 Source 接口的 Source 连接器将能够同时用于无限数据(批)作业和有限数据(流)作业。这两种场景仅有一个很小的区别:在无限数据的状况下,分区发现策略将返回一个固定大小的分区并且每一个分区的数据都是无限的;在有限数据的状况下,要么每个分区的数据量是有限的,要么分区发现策略能够一直的产生新的分区。

■ 内置的 Watermark 和事务工夫解决

在新版 Source 接口中,TimestampAssigner 和 WatermarkGenerator 将通明的作为分区生产具体实现(SourceReader)的一部分,因而用户不须要实现任何工夫戳提取和 Watermark 生成的代码。

留神:现有的 Source 连接器尚未基于新的 Source 接口从新实现,这将在后续版本中逐步实现。如果想要基于新的 Source 接口实现本人的 Source,能够参考 Data Source 文档和 Source 开发的一些倡议。

Application 部署模式

在 1.11 之前,Flink 的作业有两种部署模式,其中 Session 模式是将作业提交到一个长期运行的 Flink Session 集群,Job 模式是为每个作业启动一个专门的 Flink 作业集群。这两种模式下用户作业的 main 办法都是客户端执行的,然而这种形式存在肯定的问题:如果客户端是更大程序的一部分的话,生成 JobGraph 容易成为零碎的瓶颈;其次,这种形式也不能很好的适应像 Docker 和 K8s 这样的容器环境。

Flink 1.11 引入了一种新的部署模式,即 Application 模式(FLIP-85)。这种模式下用户程序的 main 办法将在集群中而不是客户端运行。这样,作业提交就会变得非常简单:用户将程序逻辑和依赖打包进一人可执行的 jar 包里,集群的入口程序(ApplicationClusterEntryPoint)负责调用其中的 main 办法来生成 JobGraph。

Flink 1.11 曾经能够反对基于 K8s 的 Application 模式(FLINK-10934)。

其它性能批改

■ 对立 JM 的内存配置(FLIP-116)

在 1.10 中,Flink 对立了 TM 端的内存治理和配置,相应的在 1.11 中,Flink 进一步对 JM 端的内存配置进行了批改,使它的选项和配置形式与 FLIP-49 中引入的 TM 端的配置形式保持一致。这一批改影响所有的部署类型,包含 standalone,Yarn,Mesos 和新引入的 K8s。

留神:复用之前的 Flink 配置将会失去不同的 JVM 参数,从而可能影响性能甚至导致异样。如果想要更新到 1.11 的话,请肯定要参考迁徙文档。

Web UI 性能加强

在 1.11 中,社区对 Flink Web UI 进行了一系列的优化。首要的批改是优化了 TM 和 JM 的日志展现(FLIP-103),其次,Flink Web UI 还引入了打印所有线程列表的工具(FLINK-14816)。在后续的版本中,Web UI 还将进一步优化,包含更好的反压检测,更灵便和可配置的异样展现以及对 Task 出错历史的展现。

对立 Docker 镜像

1.11 将所有 Docker 相干的资源都对立整顿到了 apache/flink-docker 我的项目中,并且扩大了入口脚本从而容许用户在不同模式下应用默认的 docker 镜像,防止了许多状况下用户本人创立镜像的麻烦。对于如何在不同环境和部署模式下应用和定制 Flink 官网 Docker 镜像,请参考具体文档。

Table API/SQL:反对 CDC(Change Data Capture)

CDC 是数据库中一种罕用的模式,它捕捉数据库提交的批改并且将这些批改播送给其它的上游消费者。CDC 能够用于像同步多个数据存储和防止双写导致的问题等场景。长期以来 Flink 的用户都心愿可能将 CDC 数据通过 Table API/SQL 导入到作业中,而 Flink 1.11 实现了这一点。

为了可能在 Table API / SQL 中应用 CDC,Flink 1.11 更新了 Table Source 与 Sink 的接口来反对 changelog 模式(参考新的 Table Source 与 Sink 接口)并且反对了 Debezium 与 Canal 格局(FLIP-105)。这一改变使动静 Table Source 不再只反对 append-only 的操作,而且能够导入内部的批改日志(插入事件)将它们翻译为对应的批改操作(插入,批改和删除)并将这些操作与操作的类型发送到后续的流中。

为了生产 CDC 数据,用户须要在应用 SQL DDL 创立表时指指定“format=debezium-json“或者“format=canal-json”:

  CREATE TABLE my_table (...) WITH (
  'connector'='...', -- e.g. 'kafka'
  'format'='debezium-json',
  'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
  'debezium-json.ignore-parse-errors'='true' -- default: false
);

Flink 1.11 仅反对 Kafka 作为批改日志的数据源以及 JSON 编码格局的批改日志;后续 Flink 将进一步反对 Avro(Debezium)和 Protobuf(Canal)格局。Flink 还打算在将来反对 UDF MySQL 的 Binlog 以及 Kafka 的 Compact Topic 作为数据源,并且将对批改日志的反对扩大到批作业。

留神:目前有一个已知的 BUG(FLINK-18461)会导致应用批改日志的 Source 无奈写入到 Upsert Sink 中(例如,MySQL,HBase,ElasticSearch)。这个问题会在下一个版本(即 1.11.1)中修复。

Table API/SQL:反对 JDBC Catalog 和 Postgres Catalog

Flink 1.11 反对了一种通用的 JDBC Catalog 接口(FLIP-93),这一接口容许 Table API/SQL 的用户主动的从通过 JDBC 连贯的关系数据库中导出表构造。这一性能防止了之前用户须要手动复制表构造以及进行类型映射的麻烦,并且容许 Flink 在编译时而不是运行时对表构造进行查看。

首先在 1.11 中实现的是 Postgres Catalog。

Table API/SQL:反对 Avro,ORC 和 Parquet 格局的文件系统连接器

为了进步用户应用 Flink 进行端到端的流式 ETL 的体验,Flink 1.11 在 Table API/SQL 中引入了新的文件系统连接器。它基于 Flink 本人的文件系统形象和 StreamingFileSink 来实现,从而保障和 DataStream API 有雷同的能力和统一的行为。

这也意味着 Table API/SQL 的用户能够应用 StreamingFileSink 当初曾经反对的文件格式,例如(Avro)Parquet,以及在这 1.11 中新减少的文件格式,例如 Avro 和 ORC。

CREATE TABLE my_table (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',         
  'path' = 'file:///path/to/file,'format'='...',  -- supported formats: Avro, ORC, Parquet, CSV, JSON         
  ...
);

新的全能的文件系统连接器能够通明的反对流作业和批作业,提供 Exactly-once 语义并且提供了残缺的分区的反对,从而绝对于之前的 Connector 极大的扩大了能够反对的场景。例如,用户能够容易的实现将流式数据从 Kafka 写入 Hive 的场景。

后续的文件系统连接器的优化能够参考 FLINK-17778。

Table API/SQL:反对 Python UDF

在 1.11 之前 Table API/SQL 的用户只能通过 Java 或 Scala 来实现 UDF。在 1.11 中,Flink 扩大了 Python 语言的利用范畴,除了 PyFlink 外,Flink 1.11 还在 SQL DDL 语法(FLIP-106)和 SQL Client(FLIP-114)中反对了 Python UDF。用户还能够在零碎 Catalog 中通过 SQL DDL 或者 Java Catalog API 来注册 Python UDF,这样这些 UDF 能够在作业中共享。

其它的 Table API/SQL 优化

■ Hive Connect 兼容 Hive DDL 和 DML(FLIP-123)

从 1.11 开始,用户能够在 Table API/SQL 和 SQL Client 中应用 Hive 语法(HiveQL)来编写 SQL 语句。为了反对这一个性,Flink 引入了一种新的 SQL 方言,用户能够动静的为每一条语句抉择应用 Flink(default)或 Hive(hive)办法。对于所反对的 DDL 和 DML 的残缺列表,请参考 Hive 方言的文档。

Flink SQL 语法的扩大和优化

  • Flink 1.11 引入了主键束缚的概念,从而能够在 Flink SQL DDL 的运行时优化中应用(FLIP-87)。
  • 视图对象曾经在 SQL DDL 中残缺反对,能够通过 CREATE/ALTER/DROP VIEW 等语句应用(FLIP-71)。
  • 用户能够在 DQL 和 DML 中应用动静表属性动静指定或笼罩 Table 的选项(FLIP-113)。
  • 为了简化 connector 参数的配置,进步异样解决的能力,Table API/SQL 批改了一些配置项的名称(FLIP-122)。这一改变不会毁坏兼容性,用户依然能够应用老的名称。

■ 新的 Table Source 和 Sink 接口(FLIP-95)

Flink 1.11 引入了新的 Table Source 和 Sink 接口(即 DynamicTableSource 和 DynamicTableSink),这一接口能够对立批作业和流作业,在应用 Blink Planner 时提供更高效的数据处理并且能够反对批改日志的解决(参考反对批改日志)。新的接口简化了用户实现新的自定义的连接器和批改现有连接器的复杂度。一个基于反对批改日志语义的数据解析格局来实现定制表扫描的 Source 的案例请参考这一文档。

留神:只管这一批改不会毁坏兼容性,然而咱们举荐 Table API/SQL 的用户尽快将现有的 Source 和 Sink 降级到新的接口上。

重构 Table Env 接口(FLIP-84)

1.11 之前 TableEnvironment 和 Table 上类似的接口的行为并不完全相同,这导致了接口的不统一并使用户感到困惑。为了解决这一问题并使基于 Table API/SQL 的编写程序更加晦涩,Flink 1.11 引入了新的办法来对立这些不统一的行为,例如执行触发的机会(即 executeSql()),后果展现(即 print(),collecto())并且为后续版本的重要性能(如多语句执行)打下了根底。

留神:在 FLIP-84 中被标记为过期的办法不会被立即删掉,然而咱们倡议用户采纳新的办法。对于新的办法和过期办法的残缺列表,能够查看 FLIP-84 的总结局部。

新的类型推断和 Table API UDF(FLIP-65)

在 Flink 1.9 中,社区开始在 Table API 中反对一种新的类型零碎来进步与规范 SQL 的一致性(FLIP-37)。在 1.11 中这一工作靠近实现,通过反对在 Table API UDF 中应用新的类型零碎(目前反对 scalar 函数与 table 函数,打算下一版本也反对 aggregate 函数)。

PyFlink:反对 Pandas UDF

在 1.11 之前,PyFlink 中的 Python UDF 仅反对规范的 Python 标量类型。这带来了一些限度:

  1. 在 JVM 和 Python 过程之间传递数据会导致较大序列化、反序列化开销。
  2. 难以集成罕用的高性能 Python 数值计算框架,例如 Pandas 和 NumPy。

为了克服这些限度,社区引入了对基于 Pandas 的(标量)向量 Python UDF 的反对(FLIP-97)。因为能够通过利用 Apache Arrow 来最小化序列化 / 反序列化的开销,向量 UDF 的性能个别会十分好;此外,将 pandas.Series 作为输入输出的类型能够充沛复用 Pandas 和 NumPy 库。这些特点使 Pandas UDF 特地适宜并行机器学习和其它大规模、分布式的数据迷信的计算作业(例如特征提取或分布式模式服务)。

@udf(input_types=[DataTypes.BIGINT(),DataTypes.BIGINT()],result_type=DataTypes.BIGINT(),udf_type="pandas")
defadd(i,j):
  returni+j

为了使 UDF 变为 Pandas UDF,须要在 udf 的装璜器中增加额定的参数 udf_type=”pandas”,如文档所示。

PyFlink 的其它优化

反对转换器 fromPandas/toPandas(FLIP-120)

Arrow 还被用来优化 PyFlink Table 和 pandas.DataFrame 之间的转换,从而使用户能够在不同的解决引擎之间无缝切换,而不须要编写非凡的连接器进行直达。应用 fromPandas()和 toPandas() 办法的安例,能够参考相干文档。

反对用户自定义的 Table Function(User-defined Table Function,UDTF)(FLINK-14500)

从 1.11 开始,用户能够在 PyFlink 定义和注册自定义的 UDTF。与 Python UDF 相似,UDTF 能够承受 0 个,一个或多个标量值作为参数,然而能够返回任意多行数据作为输入而不是只能返回单个值。

基于 Cython 对 UDF 的性能进行优化(FLIP-121)

Cython 是一个 Python 语言预编译的超集,它常常被用来进步大规模数据计算函数的性能,因为它能够将代码执行速度优化到机器指令级别,并且能够很好的与罕用的基于 C 语言实现的库配合,例如 NumPy。从 Flink 1.11 开始,用户能够结构包含 Cython 反对的 PyFlink[60]并且能够通过 Cython 来优化 Python UDF。这种优化能够极大的晋升代码的性能(与 1.10 的 Python UDF 相比最高能有 30 倍的晋升)。

Python UDF 反对用户自定义的 Metrics(FLIP-112)

为了使用户能够更容易的监控和调试 Python UDF 的执行,PyFlink 当初反对收集和输入 Metric 的值到内部零碎中,并且反对自定义域和变量。用户能够在 UDF 的 open 办法中通过调用 function_context.get_metric_group() 来拜访一个 Metric 零碎,如文档所示。

其它重要优化

  • [FLINK-17339] 从 1.11 开始,Blink Planner 将变为 Table API/SQL 的默认 Planner。实际上,在 1.10 中 SQL Client 的默认 Planner 曾经变为 Blink Planner。老的 Planner 依然将会反对,然而后续不会再有大的变更。
  • [FLINK-5763] Savepoints 将所有的状态写入到单个目录下(包含元数据和程序状态)。这使得用户能够容易的看出每个 Savepoint 的 State 蕴含哪些文件,并且容许用户间接通过挪动目录来实现 Savepoint 的重定位。
  • [FLINK-16408] 为了缩小 JVM 元数据空间的压力,Flink 1.11 中对于单个 TaskExecutor 只有下面还有某个作业的 Slot,该作业的 ClassLoader 就会被复用。这一改变会扭转 Flink 谬误复原的行为,因为 static 字段不会被从新初始化。
  • [FLINK-11086] Flink 当初能够反对 Hadoop 3.0.0 以上的版本。留神 Flink 我的项目并未提供任何更新的“flink-shaded-hadoop-*”的 jar 包,而是须要用户本人将相应的 Hadoop 依赖退出 HADOOP_CLASSPATH 环境变量(举荐的形式)或者将 Hadoop 依赖退出到 lib/ 目录下。
  • [FLINK-16963] 所有 Flink 内置的 Metric Report 当初被批改为 Flink 的插件。如果要应用它们,不应该搁置到 lib/ 目录下(会导致类抵触),而是要搁置到 plugins/ 目录下。
  • [FLINK-12639] 社区正在对 Flink 文档进行重构,从 1.11 开始,您可能会留神到文档的导航和内容组织产生了一些变动。

具体公布阐明

如果你想要降级到 1.11 的话,请具体浏览具体公布阐明。与之前所有 1.x 版本相比,1.11 能够保障所有标记为 @Public 的接口的兼容。

退出移动版