关于sql:官宣|Apache-Flink-1130-正式发布流处理应用更加简单高效

50次阅读

共计 11160 个字符,预计需要花费 28 分钟才能阅读完成。

简介:Flink 1.13.0 版本让流解决利用的应用像一般利用一样简略和天然,并且让用户能够更好地了解流作业的性能。

​翻译 | 高赟
Review | 朱翥、马国维

GitHub 地址
https://github.com/apache/flink
欢送大家给 Flink 点赞送 star~

Flink 1.13 公布了!Flink 1.13 包含了超过 200 名贡献者所提交的 1000 多项修复和优化。

这一版本中,Flink 的一个次要指标获得了重要停顿,即 让流解决利用的应用和一般利用一样简略和天然。Flink 1.13 新引入的被动扩缩容使得流作业的扩缩容和其它利用一样简略,用户仅须要批改并发度即可。

这个版本还包含一系列重要改变使 用户能够更好的了解流作业的性能。当流作业的性能不迭预期的时候,这些改变能够使用户能够更好的剖析起因。这些改变包含用于辨认瓶颈节点的负载和反压可视化、剖析算子热点代码的 CPU 火焰图和剖析 State Backend 状态的 State 拜访性能指标。

除了这些个性外,Flink 社区还增加了大量的其它优化,咱们会在本文后续探讨其中的一些。咱们心愿用户能够享受新的版本和个性带来的便当,在本文最初,咱们还会介绍降级 Flink 版本须要留神的一些变动。

咱们激励用户下载试用新版 Flink 并且通过邮件列表和 JIRA 来反馈遇到的问题。

重要个性

被动扩缩容

Flink 我的项目的一个初始指标,就是心愿流解决利用能够像一般利用一样简略和天然,被动扩缩容是 Flink 针对这一指标上的最新进展。

当思考资源管理和局部的时候,Flink 有两种可能的模式。用户能够将 Flink 利用部署到 k8s、yarn 等资源管理零碎之上,并且由 Flink 被动的来治理资源并按需分配和开释资源。这一模式对于常常扭转资源需要的作业和利用十分有用,比方批作业和实时 SQL 查问。在这种模式下,Flink 所启动的 Worker 数量是由利用设置的并发度决定的。在 Flink 中咱们将这一模式叫做被动扩缩容。

对于长时间运行的流解决利用,一种更适宜的模型是用户只须要将作业像其它的长期运行的服务一样启动起来,而不须要思考是部署在 k8s、yarn 还是其它的资源管理平台上,并且不须要思考须要申请的资源的数量。相同,它的规模是由所调配的 worker 数量来决定的。当 worker 数量发生变化时,Flink 主动的改变利用的并发度。在 Flink 中咱们将这一模式叫做被动扩缩容。

Flink 的 Application 部署模式开启了使 Flink 作业更靠近一般利用(即启动 Flink 作业不须要执行两个独立的步骤来启动集群和提交利用)的致力,而被动扩缩容实现了这一指标:用户不再须要应用额定的工具(如脚本、K8s 算子)来让 worker 的数量与利用并发度设置保持一致。

用户当初能够将主动扩缩容的工具利用到 Flink 利用之上,就像一般的应用程序一样,只有用户理解扩缩容的代价:有状态的流利用在扩缩容的时候须要将状态从新散发。

如果想要尝试被动扩缩容,用户能够减少 scheduler-mode: reactive 这一配置项,而后启动一个利用集群(Standalone 或者 K8s)。更多细节见被动扩缩容的文档。

剖析利用的性能

对所有应用程序来说,可能简略的剖析和了解利用的性能是十分要害的性能。这一性能对 Flink 更加重要,因为 Flink 利用个别是数据密集的(即须要解决大量的数据)并且须要在(近)实时的提早内给出后果。

当 Flink 利用解决的速度跟不上数据输出的速度时,或者当一个利用占用的资源超过预期,下文介绍的这些工具能够帮你剖析起因。

瓶颈检测与反压监控

Flink 性能剖析首先要解决的问题常常是:哪个算子是瓶颈?

为了答复这一问题,Flink 引入了形容作业忙碌(即在解决数据)与反压(因为上游算子不能及时处理后果而无奈持续输入)水平的指标。利用中可能的瓶颈是那些忙碌并且上游被反压的算子。

Flink 1.13 优化了反压检测的逻辑(应用基于工作 Mailbox 计时,而不在再于堆栈采样),并且从新实现了作业图的 UI 展现:Flink 当初在 UI 上通过色彩和数值来展现忙碌和反压的水平。

Web UI 中的 CPU 火焰图

Flink 对于性能另一个常常须要答复的问题:瓶颈算子中的哪局部计算逻辑耗费微小?

针对这一问题,一个无效的可视化工具是火焰图。它能够帮忙答复以下问题:

  • 哪个办法调当初在占用 CPU?
  • 不同办法占用 CPU 的比例如何?
  • 一个办法被调用的栈是什么样子的?

火焰图是通过反复采样线程的堆栈来构建的。在火焰图中,每个办法调用被示意为一个矩形,矩形的长度与这个办法呈现在采样中的次数成正比。火焰图在 UI 上的一个例子如下图所示。

火焰图的文档包含启用这一性能的更多细节和指令。

State 拜访提早指标

另一个可能的性能瓶颈是 state backend,尤其是当作业的 state 超过内存容量而必须应用 RocksDB state backend 时。

这里并不是想说 RocksDB 性能不够好(咱们十分喜爱 RocksDB!),然而它须要满足一些条件能力达到最好的性能。例如,用户可能很容易遇到非故意的在云上因为应用了谬误的磁盘资源类型而不能满足 RockDB 的 IO 性能需求的问题。

基于 CPU 火焰图,新的 State Backend 的提早指标能够帮忙用户更好的判断性能不合乎预期是否是由 State Backend 导致的。例如,如果用户发现 RocksDB 的单次访问须要几毫秒的工夫,那么就须要查看内存和 I/O 的配置。这些指标能够通过设置 state.backend.rocksdb.latency-track-enabled 这一选项来启用。这些指标是通过采样的形式来监控性能的,所以它们对 RocksDB State Backend 的性能影响是微不足道的。

通过 Savepoint 来切换 State Backend

用户当初能够在从一个 Savepoint 重启时切换一个 Flink 利用的 State Backend。这使得 Flink 利用不再被限度只能应用利用首次运行时抉择的 State Backend。

基于这一性能,用户当初能够首先应用一个 HashMap State Backend(纯内存的 State Backend),如果后续状态变得过大的话,就切换到 RocksDB State Backend 中。

在实现层,Flink 当初对立了所有 State Backend 的 Savepoint 格局来实现这一性能。

K8s 部署时应用用户指定的 Pod 模式

原生 kubernetes 部署(Flink 被动要求 K8s 来启动 Pod)中,当初能够应用自定义的 Pod 模板。

应用这些模板,用户能够应用一种更合乎 K8s 的形式来设置 JM 和 TM 的 Pod,这种形式比 Flink K8s 集成内置的配置项更加灵便。

生产可用的 Unaligned Checkpoint

Unaligned Checkpoint 目前已达到了生产可用的状态,咱们激励用户在存在反压的状况下试用这一性能。

具体来说,Flink 1.13 中引入的这些性能使 Unaligned Checkpoint 更容易应用:

  • 用户当初应用 Unaligned Checkpoint 时也能够扩缩容利用。如果用户须要因为性能起因不能应用 Savepoint 而必须应用 Retained checkpoint 时,这一性能会十分不便。
  • 对于没有反压的利用,启用 Unaligned Checkpoint 当初代价更小。Unaligned Checkpoint 当初能够通过超时来主动触发,即一个利用默认会应用 Aligned Checkpoint(不存储传输中的数据),而只在对齐超过肯定工夫范畴时主动切换到 Unaligned Checkpoint(存储传输中的数据)。

对于如何启用 Unaligned Checkpoint 能够参考相干文档。

机器学习迁徙到独自的仓库

为了减速 Flink 机器学习的停顿(流批对立的机器学习),当初 Flink 机器学习开启了新的 flink-ml 仓库。咱们采纳相似于 Stateful Function 我的项目的治理形式,通过应用一个独自的仓库从而简化代码合并的流程并且能够进行独自的版本公布,从而进步开发的效率。

用户能够关注 Flink 在机器学习方面的停顿,比方与 Alink(Flink 罕用机器学习算法套件)的互操作以及 Flink 与 Tensorflow 的集成。

SQL / Table API 停顿

与之前的版本相似,SQL 和 Table API 依然在所有开发中占用很大的比例。

通过 Table-valued 函数来定义工夫窗口

在流式 SQL 查问中,一个最常常应用的是定义工夫窗口。Flink 1.13 中引入了一种新的定义窗口的形式:通过 Table-valued 函数。这一形式不仅有更强的表达能力(容许用户定义新的窗口类型),并且与 SQL 规范更加统一。

Flink 1.13 在新的语法中反对 TUMBLE 和 HOP 窗口,在后续版本中也会反对 SESSION 窗口。咱们通过以下两个例子来展现这一办法的表达能力:

  • 例 1:一个新引入的 CUMULATE 窗口函数,它能够反对按特定步长扩大的窗口,直到达到最大窗口大小:
SELECT window_time, window_start, window_end, SUM(price) AS total_price 
  FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, window_time;
  • 例 2:用户在 table-valued 窗口函数中能够拜访窗口的起始和终止工夫,从而使用户能够实现新的性能。例如,除了惯例的基于窗口的聚合和 Join 之外,用户当初也能够实现基于窗口的 Top-K 聚合:
SELECT window_time, ...
  FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) 
      as rank 
    FROM t
  ) WHERE rank <= 100; 

进步 DataStream API 与 Table API / SQL 的互操作能力

这一版本极大的简化了 DataStream API 与 Table API 混合的程序。

Table API 是一种十分不便的利用开发接口,因为这经反对表达式的程序编写并提供了大量的内置函数。然而有时候用户也须要切换回 DataStream,例如当用户存在表达能力、灵活性或者 State 拜访的需要时。

Flink 新引入的 StreamTableEnvironment.toDataStream()/.fromDataStream() 能够将一个 DataStream API 申明的 Source 或者 Sink 当作 Table 的 Source 或者 Sink 来应用。次要的优化包含:

  • DataStream 与 Table API 类型零碎的主动转换。
  • Event Time 配置的无缝集成,Watermark 行为的高度一致性。
  • Row 类型(即 Table API 中数据的示意)有了极大的加强,包含 toString() / hashCode() 和 equals() 办法的优化,按名称拜访字段值的反对与稠密示意的反对。
Table table = tableEnv.fromDataStream(
  dataStream,
  Schema.newBuilder()
    .columnByMetadata("rowtime", "TIMESTAMP(3)")
    .watermark("rowtime", "SOURCE_WATERMARK()")
    .build());

DataStream<Row> dataStream = tableEnv.toDataStream(table)
  .keyBy(r -> r.getField("user"))
  .window(...);

SQL Client: 初始化脚本和语句汇合(Statement Sets)

SQL Client 是一种间接运行和部署 SQL 流或批作业的简便形式,用户不须要编写代码就能够从命令行调用 SQL,或者作为 CI / CD 流程的一部分。

这个版本极大的进步了 SQL Client 的性能。当初基于所有通过 Java 编程(即通过编程的形式调用 TableEnvironment 来发动查问)能够反对的语法,当初 SQL Client 和 SQL 脚本都能够反对。这意味着 SQL 用户不再须要增加胶水代码来部署他们的 SQL 作业。

配置简化和代码共享

Flink 后续将不再反对通过 Yaml 的形式来配置 SQL Client(注:目前还在反对,然而曾经被标记为废除)。作为代替,SQL Client 当初反对应用一个初始化脚本在主 SQL 脚本执行前来配置环境。

这些初始化脚本通常能够在不同团队 / 部署之间共享。它能够用来加载罕用的 catalog,利用通用的配置或者定义规范的视图。

./sql-client.sh -i init1.sql init2.sql -f sqljob.sql

更多的配置项

通过减少配置项,优化 SET / RESET 命令,用户能够更不便的在 SQL Client 和 SQL 脚本外部来管制执行的流程。

通过语句汇合来反对多查问

多查问容许用户在一个 Flink 作业中执行多个 SQL 查问(或者语句)。这对于长期运行的流式 SQL 查问十分有用。

语句集能够用来将一组查问合并为一组同时执行。

以下是一个能够通过 SQL Client 来执行的 SQL 脚本的例子。它初始化和配置了执行多查问的环境。这一脚本包含了所有的查问和所有的环境初始化和配置的工作,从而使它能够作为一个自蕴含的部署组件。

-- set up a catalog
CREATE CATALOG hive_catalog WITH ('type' = 'hive');
USE CATALOG hive_catalog;

-- or use temporary objects
CREATE TEMPORARY TABLE clicks (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP
) WITH (
  'connector' = 'kafka',
  'topic' = 'clicks',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

-- set the execution mode for jobs
SET execution.runtime-mode=streaming;

-- set the sync/async mode for INSERT INTOs
SET table.dml-sync=false;

-- set the job's parallelism
SET parallism.default=10;

-- set the job name
SET pipeline.name = my_flink_job;

-- restore state from the specific savepoint path
SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab;

BEGIN STATEMENT SET;

INSERT INTO pageview_pv_sink
SELECT page_id, count(1) FROM clicks GROUP BY page_id;

INSERT INTO pageview_uv_sink
SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;

END;

Hive 查问语法兼容性

用户当初在 Flink 上也能够应用 Hive SQL 语法。除了 Hive DDL 方言之外,Flink 当初也反对罕用的 Hive DML 和 DQL 方言。

为了应用 Hive SQL 方言,须要设置 table.sql-dialect 为 hive 并且加载 HiveModule。后者十分重要,因为必须要加载 Hive 的内置函数后能力正确实现对 Hive 语法和语义的兼容性。例子如下:

CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog
USE CATALOG myhive;
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect = hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries 

须要留神的是,Hive 方言中不再反对 Flink 语法的 DML 和 DQL 语句。如果要应用 Flink 语法,须要切换回 default 的方言配置。

优化的 SQL 工夫函数

在数据处理中工夫解决是一个重要的工作。然而与此同时,解决不同的时区、日期和工夫是一个日益简单的工作。

在 Flink 1.13 中,咱们投入了大量的精力来简化工夫函数的应用。咱们调整了工夫相干函数的返回类型使其更加准确,例如 PROCTIME(),CURRENT\_TIMESTAMP() 和 NOW()。

其次,用户当初还能够基于一个 TIMESTAMP\_LTZ 类型的列来定义 Event Time 属性,从而能够优雅的在窗口解决中反对夏令时。

用户能够参考 Release Note 来查看该局部的残缺变更。

PyFlink 外围优化

这个版本对 PyFlink 的改良次要是使基于 Python 的 DataStream API 与 Table API 与 Java/scala 版本的对应性能更加统一。

Python DataStream API 中的有状态算子

在 Flink 1.13 中,Python 程序员能够享受到 Flink 状态解决 API 的所有能力。在 Flink 1.12 版本重构过的 Python DataStream API 当初曾经领有残缺的状态拜访能力,从而使用户能够将数据的信息记录到 state 中并且在后续拜访。

带状态的解决能力是许多依赖跨记录状态共享(例如 Window Operator)的简单数据处理场景的根底。

以下例子展现了一个自定义的计算窗口的实现:

class CountWindowAverage(FlatMapFunction):
    def __init__(self, window_size):
        self.window_size = window_size

    def open(self, runtime_context: RuntimeContext):
        descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()]))
        self.sum = runtime_context.get_state(descriptor)

    def flat_map(self, value):
        current_sum = self.sum.value()
        if current_sum is None:
            current_sum = (0, 0)
        # update the count
        current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
        # if the count reaches window_size, emit the average and clear the state
        if current_sum[0] >= self.window_size:
            self.sum.clear()
            yield value[0], current_sum[1] // current_sum[0]
        else:
            self.sum.update(current_sum)

ds = ...  # type: DataStream
ds.key_by(lambda row: row[0]) \
  .flat_map(CountWindowAverage(5))

PyFlink DataStream API 中的用户自定义窗口

Flink 1.13 中 PyFlink DataStream 接口减少了对用户自定义窗口的反对,当初用户能够应用规范窗口之外的窗口定义。

因为窗口是解决有限数据流的外围机制(通过将流切分为多个无限的『桶』),这一性能极大的进步的 API 的表达能力。

PyFlink Table API 中基于行的操作

Python Table API 当初反对基于行的操作,例如用户对行数据的自定义函数。这一性能使得用户能够应用非内置的数据处理函数。

一个应用 map() 操作的 Python Table API 示例如下:

@udf(result_type=DataTypes.ROW([DataTypes.FIELD("c1", DataTypes.BIGINT()),
   DataTypes.FIELD("c2", DataTypes.STRING())]))
def increment_column(r: Row) -> Row:
  return Row(r[0] + 1, r[1])

table = ...  # type: Table
mapped_result = table.map(increment_column)

除了 map(),这一 API 还反对 flat\_map(),aggregate(),flat\_aggregate() 和其它基于行的操作。这使 Python Table API 的性能与 Java Table API 的性能更加靠近。

PyFlink DataStream API 反对 Batch 执行模式

对于无限流,PyFlink DataStream API 当初曾经反对 Flink 1.12 DataStream API 中引入的 Batch 执行模式。

通过复用数据有限性来跳过 State backend 和 Checkpoint 的解决,Batch 执行模式能够简化运维,并且进步无限流解决的性能。

其它优化

基于 Hugo 的 Flink 文档

Flink 文档从 JekyII 迁徙到了 Hugo。如果您发现有问题,请务必告诉咱们,咱们十分期待用户对新的界面的感触。

Web UI 反对历史异样

Flink Web UI 当初能够展现导致作业失败的 n 次历史异样,从而晋升在一个异样导致多个后续异样的场景下的调试体验。用户能够在异样历史中找到根异样。

优化失败 Checkpoint 的异样和失败起因的汇报

Flink 当初提供了失败或被勾销的 Checkpoint 的统计,从而使用户能够更简略的判断 Checkpoint 失败的起因,而不须要去查看日志。

Flink 之前的版本只有在 Checkpoint 胜利的时候才会汇报指标(例如长久化数据的大小、触发工夫等)。

提供『恰好一次』一致性的 JDBC Sink

从 1.13 开始,通过应用事务提交数据,JDBC Sink 能够对反对 XA 事务的数据库提供『恰好一次』的一致性反对。这一个性要求指标数据库必须有(或链接到)一个 XA 事务处理器。

这一 Sink 当初只能在 DataStream API 中应用。用户能够通过 JdbcSink.exactlyOnceSink(…) 来创立这一 Sink(或者通过显式初始化一个 JdbcXaSinkFunction)。

PyFlink Table API 在 Group 窗口上反对用户自定义的聚合函数

PyFlink Table API 当初对 Group 窗口同时反对基于 Python 的用户自定义聚合函数(User-defined Aggregate Functions, UDAFs)以及 Pandas UDAFs。这些函数对许多数据分析或机器学习训练的程序十分重要。

在 Flink 1.13 之前,这些函数仅能在有限的 Group-by 聚合场景下应用。Flink 1.13 优化了这一限度。

Batch 执行模式下 Sort-merge Shuffle 优化

Flink 1.13 优化了针对批处理程序的 Sort-merge Blocking Shuffle 的性能和内存占用状况。这一 Shuffle 模式是在 Flink 1.12 的 FLIP-148 中引入的。

这一优化防止了大规模作业下一直呈现 OutOfMemoryError: Direct Memory 的问题,并且通过 I/O 调度和 broadcast 优化进步了性能(尤其是在机械硬盘上)。

HBase 连接器反对异步维表查问和查问缓存

HBase Lookup Table Source 当初能够反对异步查问模式和查问缓存。这极大的进步了应用这一 Source 的 Table / SQL 维表 Join 的性能,并且在一些典型状况下能够缩小对 HBase 的 I/O 申请数量。

在之前的版本中,HBase Lookup Source 仅反对同步通信,从而导致作业吞吐以及资源利用率升高。

降级 Flink 1.13 须要留神的改变

  • FLINK-21709 – 老的 Table & SQL API 打算器曾经被标记为废除,并且将在 Flink 1.14 中被删除。Blink 打算器在若干版本之前曾经被设置为默认打算器,并且将成为将来版本中的惟一打算器。这意味着 BatchTableEnvironment 和 DataSet API 互操作后续也将不再反对。用户须要切换到对立的 TableEnvironment 来编写流或者批的作业。
  • FLINK-22352 – Flink 社区决定废除对 Apache mesos 的反对,将来有可能会进一步删除这部分性能。用户最好可能切换到其它的资源管理零碎上。
  • FLINK-21935 – state.backend.async 这一配置曾经被禁用了,因为当初 Flink 总是会异步的来保留快照(即之前的配置默认值),并且当初没有实现能够反对同步的快照保留操作。
  • FLINK-17012 – Task 的 RUNNING 状态被细分为两步:INITIALIZING 和 RUNNING。Task 的 INITIALIZING 阶段包含加载 state 和在启用 unaligned checkpoint 时复原 In-flight 数据的过程。通过显式辨别这两种状态,监控零碎能够更好的辨别工作是否曾经在理论工作。
  • FLINK-21698 – NUMERIC 和 TIMESTAMP 类型之间的间接转换存在问题,当初曾经被禁用,例如 CAST(numeric AS TIMESTAMP(3))。用户应该应用 TO\_TIMESTAMP(FROM\_UNIXTIME(numeric)) 来代替。
  • FLINK-22133 – 新的 Source 接口有一个小的不兼容的批改,即 SplitEnumerator.snapshotState() 办法当初多承受一个 checkpoint id 参数来示意正在进行的 snapshot 操作所属的 checkpoint 的 id。
  • FLINK-19463 – 因为老的 Statebackend 接口承载了过多的语义并且容易引起困惑,这一接口被标记为废除。这是一个纯 API 层的改变,而并不会影响利用运行时。对于如何降级现有作业,请参考作业迁徙指引。

其它资源

二进制和代码能够从 Flink 官网的下载页面取得,最新的 PyFlink 公布能够从 PyPI 取得。

如果想要降级到 Flink 1.13,请参考公布阐明。这一版本与之前 1.x 的版本在标记为 @Public 的接口上是兼容的。

用户也能够查看新版本批改列表与更新后的文档来取得批改和新性能的具体列表。

原文链接:
https://flink.apache.org/news/2021/05/03/release-1.13.0.html

版权申明:本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

正文完
 0