关于flink:Flink-111-SQL-使用攻略

22次阅读

共计 10304 个字符,预计需要花费 26 分钟才能阅读完成。

7 月 6 日,Apache Flink 1.11 正式公布。从 3 月初进行性能布局到 7 月初正式发版,1.11 用将近 4 个月的工夫重点优化了 Flink 的易用性问题,晋升用户的生产应用体验。

SQL 作为 Flink 中公认的外围模块之一,对推动 Flink 流批一体性能的欠缺至关重要。在 1.11 中,Flink SQL 也进行了大量的加强与欠缺,开发大性能 10 余项,不仅扩充了利用场景,还简化了流程,上手操作更简略。其中,值得注意的改变包含:

  • 默认 Planner 曾经切到 Blink planner 上。
  • 引入了对 CDC(Change Data Capture,变动数据捕捉)的反对,用户仅用几句简略的 SQL 即可对接 Debezium 和 Canal 的数据源。
  • 离线数仓实时化,用户可不便地应用 SQL 将流式数据从 Kafka 写入 Hive 等。

Flink SQL 演变

随着流计算的倒退,挑战不再仅限于数据量和计算量,业务变得越来越简单,开发者可能是资深的大数据从业者、初学 Java 的爱好者,或是不懂代码的数据分析者。如何进步开发者的效率,升高流计算的门槛,对推广实时计算十分重要。SQL 是数据处理中应用最宽泛的语言,它容许用户简明扼要地展现其业务逻辑。Flink 作为流批一体的计算引擎,致力于提供一套 SQL 反对全副利用场景,Flink SQL 的实现也齐全遵循 ANSI SQL 规范。之前,用户可能须要编写上百行业务代码,应用 SQL 后,可能只须要几行 SQL 就能够轻松搞定。Flink SQL 的倒退大略经验了以下阶段:

  • Flink 1.1.0:第一次引入 SQL 模块,并且提供 TableAPI,当然,这时候的性能还十分无限。
  • Flink 1.3.0:在 Streaming SQL 上反对了 Retractions,显著进步了 Streaming SQL 的易用性,使得 Flink SQL 反对了简单的 Unbounded 聚合连贯。
  • Flink 1.5.0:SQL Client 的引入,标记着 Flink SQL 开始提供纯 SQL 文本。
  • Flink 1.9.0:形象了 Table 的 Planner 接口,引入了独自的 Blink Table 模块。Blink Table 模块是阿里巴巴外部的 SQL 层版本,不仅在结构上有重大变更,在性能个性上也更加弱小和欠缺。
  • Flink 1.10.0:作为第一个 Blink 根本实现 merge 的版本,修复了大量遗留的问题,并给 DDL 带来了 Watermark 的语法,也给 Batch SQL 带来了残缺的 TPC-DS 反对和高效的性能。

通过了多个版本的迭代反对,SQL 模块在 Flink 中变得越来越重要,Flink 的 SQL 用户也逐步扩充。基于 SQL 模块的 Python 接口和机器学习接口也在疾速倒退。毫无疑问,SQL 模块作为最罕用的 API 之一和生态的集成变得越来越重要。

SQL 1.11 重要变更

Flink SQL 在原有的根底上扩大了新场景的反对:

  • Flink SQL 引入了对 CDC(Change Data Capture,变动数据捕捉)的反对,它使 Flink 能够不便地通过像 Debezium 这类工具来翻译和生产数据库的变动日志。
  • Flink SQL 扩大了类 Filesystem connector 对实时化用户场景和格局的反对,从而能够反对将流式数据从 Kafka 写入 Hive 等场景。

除此之外,Flink SQL 也从多个方面进步 SQL 的易用性,系统性的解决了之前的 Bug、欠缺了用户 API。

CDC 反对

CDC 格局是数据库中一种罕用的模式,业务上典型的利用是通过工具(比方 Debezium 或 Canal)将 CDC 数据通过特定的格局从数据库中导出到 Kafka 中。在以前,业务上须要定义非凡的逻辑来解析 CDC 数据,并把它转换成个别的 Insert-only 数据,后续的解决逻辑须要思考到这种特殊性,这种 work-around 的形式无疑给业务上带来了不必要的复杂性。如果 Flink SQL 引擎能原生反对 CDC 数据的输出,将 CDC 对接到 Flink SQL 的 Changelog Stream 概念上,将会大大降低用户业务的复杂度。

流计算的实质是就是不断更新、一直扭转后果的计算。思考一个简略的聚合 SQL,流计算中,每次计算产生的聚合值其实都是一个部分值,所以会产生 Changelog Stream。在以前想要把聚合的数据输入到 Kafka 中,如上图所示,简直是不可能的,因为 Kafka 只能接管 Insert-only 的数据。Flink 之前次要是因为 Source&Sink 接口的限度,导致不能反对 CDC 数据的输出。Flink SQL 1.11 通过了大量的接口重构,在新的 Source&Sink 接口上,反对了 CDC 数据的输出和输入,并且反对了 Debezium 与 Canal 格局(FLIP-105)。这一改变使动静 Table Source 不再只反对 append-only 的操作,而且能够导入内部的批改日志(插入事件)将它们翻译为对应的批改操作(插入、批改和删除)并将这些操作与操作的类型发送到后续的流中。

如上图所示,实践上,CDC 同步到 Kafka 的数据就是 Append 的一个流,只是在格局中含有 Changelog 的标识:

  • 一种形式是把 Changlog 标识看做一个一般字段,这也是目前广泛的应用形式。
  • 在 Flink 1.11 后,能够将它申明成 Changelog 的格局,Flink 外部机制反对 Interpret Changelog,能够原生辨认出这个非凡的流,将其转换为 Flink 的 Changlog Stream,并依照 SQL 的语义解决;同理,Flink SQL 也具备输入 Change Stream 的能力(Flink 1.11 暂无内置实现),这就意味着,你能够将任意类型的 SQL 写入到 Kafka 中,只有有 Changelog 反对的 Format。

为了生产 CDC 数据,用户须要在应用 SQL DDL 创立表时指指定“format=debezium-json”或者“format=canal-json”:

CREATE TABLE my_table (...) WITH (
    'connector'='...', -- e.g. 'kafka'
    'format'='debezium-json'
);

Flink 1.11 的接口都已 Ready,然而在实现上:

  • 只反对 Kafka 的 Debezium-json 和 Canal-json 读。
  • 欢送大家扩大实现本人的 Format 和 Connector。

Source & Sink 重构

Source & Sink 重构的一个重要目标是反对上节所说的 Changelog,然而除了 Changelog 以外,它也解决了诸多之前的遗留问题。新 Source & Sink 应用规范姿态(详见官网文档):

CREATE TABLE kafka_table (...) WITH (
    'connector' = 'kafka-0.10',
    'topic' = 'test-topic',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false'
);

Flink 1.11 为了向前兼容性,仍然保留了老 Source & Sink,应用“connector.type”的 Key,即可 Fallback 到老 Source & Sink 上。

■ Factory 发现机制

Flink 1.11 前,用户可能常常遇到一个异样,叫做 NoMatchingFactory 异样:

指的是,定义了一个 DDL,在用的时候,DDL 属性找不到对应的 TableFactory 实现,可能的起因是:

  • Classpath 下没有实现类,Flink SQL 是通过  Java SPI 的机制来发现 Factory;
  • 参数写错了。

然而报的异样让人十分纳闷,依据异样的提醒音讯,很难找到到底哪里的代码错了,更难明确晓得哪个 Key 写错了。

public interface Factory {String factoryIdentifier();
    ……
}

所以在 Flink 1.11 中,社区重构了 TableFactory 接口,提出了一个新的 Factory 接口,它有一个办法,叫做 FactoryIdentifier。当前所有的 Factory 的 look up 都通过 identifier。这样的话就十分清晰明了,找不到是因为 Classpath 下没 Factory 的类,找失去那就能够定位到 Factory 的实现中,进行确定性的校验。

■ 类型与数据结构

之前的 Source&Sink 接口反对用户自定义数据结构,即框架晓得如何把自定义的数据结构转换为 Flink SQL 意识的外部数据结构,如:

public interface TableSource<T> {TypeInformation<T> getReturnType();
    ...
}

用户能够自定义泛型 T,通过 getReturnType 来通知框架怎么转换。

不过问题来了,当 getReturnType 和 DDL 中申明的类型不统一时怎么办?特地是两套类型零碎的状况下,如:Runtime 的 TypeInformation,SQL 层的 DataType。因为精度等问题,可能导致经常出现类型不匹配的异样。

Flink 1.11 系统性地解决了这个问题。当初 Connector 开发者不能自定义数据结构,只能应用 Flink SQL 外部的数据结构:RowData。所以保障了默认 Type 与 DDL 的对应,不必再返回类型让框架去确认了。

RowData 数据结构在 SQL 外部设计进去为了:

  • 抽象类接口,在不同场景有适宜的高性能实现。
  • 蕴含 RowKind,符合流计算中的 CDC 数据格式。
  • 遵循 SQL 标准,比方蕴含精度信息。
  • 对应 SQL 类型的可枚举的数据结构。

■ Upsert 与 Primary Key

流计算的一个典型场景是把聚合的数据写入到 Upsert Sink 中,比方 JDBC、HBase,当遇到简单的 SQL 时,时常会呈现:

UpsertStreamTableSink 须要上游的 Query 有残缺的 Primary Key 信息,不然就间接抛异样。这个景象波及到 Flink 的 UpsertStreamTableSink 机制。顾名思义,它是一个更新的 Sink,须要按 Key 来更新,所以必须要有 Key 信息。

如何发现 Primary Key?一个办法是让优化器从 Query 中推断,如下图发现 Primary Key 的例子。

这种状况下在简略 Query 当中很好,也满足语义,也十分天然。然而如果是一个简单的 Query,比方聚合又 Join 再聚合,那就只有报错了。不能期待优化器有多智能,很多状况它都不能推断出 PK,而且,可能业务的 SQL 自身就不能推断出 PK,所以导致了这样的异样。

怎么解决问题?Flink 1.11 彻底的摈弃了这个机制,不再从 Query 来推断 PK 了,而是齐全依赖 Create table 语法。比方 Create 一个 jdbc_table,须要在定义中显式地写好 Primary Key(前面 NOT ENFORCED 的意思是不强校验,因为 Connector 兴许没有具备 PK 的强校验的能力)。当指定了 PK,就相当于就通知框架这个 Jdbc Sink 会依照对应的 Key 来进行更新。如此,就跟 Query 齐全没有关系了,这样的设计能够定义得十分清晰,如何更新齐全依照设置的定义来。

CREATE TABLE jdbc_table (
    id BIGINT,
    ...
    PRIMARY KEY (id) NOT ENFORCED
)

Hive 流批一体

首先看传统的 Hive 数仓。一个典型的 Hive 数仓如下图所示。一般来说,ETL 应用调度工具来调度作业,比方作业每天调度一次或者每小时调度一次。这里的调度,其实也是一个叠加的提早。调度产生 Table1,再产生 Table2,再调度产生 Table3,计算延时须要叠加起来。

问题是慢,提早大,并且 Ad-hoc 剖析提早也比拟大,因为后面的数据入库,或者后面的调度的 ETL 会有很大的提早。Ad-hoc 剖析再快返回,看到的也是历史数据。

所以当初风行构建实时数仓,从 Kafka 读计算写入 Kafka,最初再输入到 BI DB,BI DB 提供实时的数据服务,能够实时查问。Kafka 的 ETL 为实时作业,它的延时甚至可能达到毫秒级。实时数仓依赖 Queue,它的所有数据存储都是基于 Queue 或者实时数据库,这样实时性很好,延时低。然而:

  • 第一,基于 Queue,一般来说就是行存加 Queue,存储效率其实不高。
  • 第二,基于预计算,最终会落到 BI DB,曾经是聚合好的数据了,没有历史数据。而且 Kafka 存的一般来说都是 15 天以内的数据,没有历史数据,意味着无奈进行 Ad-hoc 剖析。所有的剖析全是预约义好的,必须要起对应的实时作业,且写到 DB 中,这样才可用。比照来说,Hive 数仓的益处在于它能够进行 Ad-hoc 剖析,想要什么后果,就能够随时失去什么后果。

是否联合离线数仓和实时数仓两者的劣势,而后构建一个 Lambda 的架构?

外围问题在于老本过高。无论是保护老本、计算成本还是存储老本等都很高。并且两边的数据还要放弃一致性,离线数仓写完 Hive 数仓、SQL,而后实时数仓也要写完相应 SQL,将造成大量的反复开发。还可能存在团队上分为离线团队和实时团队,两个团队之间的沟通、迁徙、对数据等将带来大量人力老本。现在,实时剖析会越来越多,一直的产生迁徙,导致反复开发的老本也越来越高。少部分重要的作业尚可承受,如果是大量的作业,保护老本其实是十分大的。

如何既享受 Ad-hoc 的益处,又能实现实时化的劣势?一种思路是将 Hive 的离线数仓进行实时化,就算不能毫秒级的实时,准实时也好。所以,Flink 1.11 在 Hive 流批一体上做了一些摸索和尝试,如下图所示。它能实时地按 Streaming 的形式来导出数据,写到 BI DB 中,并且这套零碎也能够用剖析计算框架来进行 Ad-hoc 的剖析。这个图当中,最重要的就是 Flink Streaming 的导入。

■ Streaming Sink

晚期 Flink 版本在 DataStreaming 层,曾经有一个弱小的 StreamingFileSink 将流数据写到文件系统。它是一个准实时的、Exactly-once 的零碎,能实现一条数据不多,一条数据不少的 Sink。

具体原理是基于两阶段提交:

  • 第一阶段:SnapshotPerTask,敞开须要 Commit 的文件,或者记录正在写的文件的 Offset。
  • 第二阶段:NotifyCheckpointComplete,Rename 须要 Commit 的文件。留神,Rename 是一个原子且幂等的操作,所以只有保障 Rename 的 At-least-once,即可保证数据的 Exactly-once。

这样一个 File system 的 Writer 看似比拟完满了。然而在 Hive 数仓中,数据的可见性是依赖 Hive Metastore 的,那在这个流程中,谁来告诉 Hive Metastore 呢?

SQL 层在 StreamingFileSink,扩大了 Partition 的 Committer。

相当于不仅要进行 File 的 Commit,还要进行 Partition 的 Commit。如图所示,FileWriter 对应之前的 StreamingFileSink,它提供的是 Exactly-once 的 FileWriter。而前面再接了一个节点 PartitionCommitter。反对的 Commit Policy 有:

  • 内置反对 Add partition 到 Hive metastore;
  • 反对写 SuccessFile 到文件系统当中;
  • 并且也能够自定义 Committer,比方能够 analysis partition、合并 partition 外面的小文件。

Committer 挂在 Writer 后,由 Commit Trigger 决定什么机会来 commit:

  • 默认的 commit 机会是,有文件就立刻 commit。因为所有 commit 都是可重入的,所以这一点是可容许的。
  • 另外,也反对通过 partition 工夫和 Watermark 来独特决定的。比方小时分区,如果当初工夫到 11 点,10 点的分区就能够 commit 了。Watermark 保障了作业以后的准确性。

■ Streaming Source

Hive 数仓中存在大量的 ETL 工作,这些工作往往是通过调度工具来周期性的运行,这样做次要有两个问题:

  • 实时性不强,往往调度最小也是小时级。
  • 流程简单,组件多,容易呈现问题。

针对这些离线的 ETL 作业,Flink 1.11 为此开发了实时化的 Hive 流读,反对:

  • Partition 表,监控 Partition 的生成,增量读取新的 Partition。
  • 非 Partition 表,监控文件夹内新文件的生成,增量读取新的文件。

甚至能够应用 10 分钟级别的分区策略,应用 Flink 的 Hive streaming source 和 Hive streaming sink,能够大大提高 Hive 数仓的实时性到准实时分钟级,在实时化的同时,也反对针对 Table 全量的 Ad-hoc 查问,进步灵活性。

SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'=’true’, 'streaming-source.consume-start-offset'='2020-05-20') */;

另外除了 Scan 的读取形式,Flink 1.11 也反对了 Temporal Join 的形式,也就是以前常说的 Streaming Dim Join。

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

目前反对的形式是 Cache All,并且是不感知分区的,比拟适宜小表的状况。

■ Hive Dialect

Flink SQL 遵循的是 ANSI-SQL 的规范,而 Hive SQL 有它本人的 HQL 语法,它们之间的语法、语义都有些许不同。

如何让 Hive 用户迁徙到 Flink 生态中,同时防止用户太大的学习老本?为此,Flink SQL 1.11 提供了 Hive Dialect,能够使得用户在 Flink 生态中应用 HQL 语言来计算。目前只反对 DDL,后续版本会逐渐攻坚 Qeuries。

■ Filesystem Connector**

Hive Integration 提供了一个重量级的集成,功能丰富,然而环境比较复杂。如果只是想要一个轻量级的 Filesystem 读写呢?

Flink table 在长久以来只反对一个 CSV 的 Filesystem Table,并且还不反对 Partition,行为上在某些方面也有些不合乎大数据计算的直觉。

Flink 1.11 重构了整个 Filesystem connector 的实现:

  • 联合 Partition,当初,Filesystem connector 反对 SQL 中 Partition 的所有语义,反对 Partition 的 DDL,反对 Partition Pruning,反对动态 / 动静 Partition 的插入,反对 Overwrite 的插入。
  • 反对各种 Formats:

■ CSV
■ JSON
■ Aparch AVRO
■ Apache Parquet
■ Apache ORC

  • 反对 Batch 的读写。
  • 反对 Streaming sink,也反对 Partition commit,反对写 Success 文件。

用几句简略的 SQL,不必搭建 Hive 集成环境即可:

  • 启动一个流作业写入 Filesystem 中,而后在 Hive 端即可查问到 Filesystem 上的数据,相比之前 Datastream 的作业,简略 SQL 即可搞定离线数据的入库。
  • 通过 Filesystem Connector 来查问 Hive 数仓中的数据,性能没有 Hive 集成那么全,然而定义简略。

Table 易用性

■ DDL Hints 和 Like

在 Flink 1.10 当前,Hive MetaStore 逐步成为 Flink streaming SQL 中 Table 相干的 Meta 信息的存储。比方,能够通过 Hive Catalog 保留 Kafka Tables。这样能够在启动的时候间接应用 Tables。

通过 DDL 这种形式,把 SQL 提交到 Cluster,就能够写入 Kafka,或者写入 MySQL、DFS。应用 Hive Catalog 后,是不是说只用写一次 DDL,之后的流计算作业都是间接应用 Kafka 的 Table 呢?

不齐全是,因为还是有一些缺点。比方,一个典型的 Kafka Table 有一些 Execution 相干的参数。因为 kafka 一般来说都是存 15 天以内的数据,须要指定每次生产的工夫偏移,工夫偏移是在一直变动的。每次提交作业,应用 Kafka Table 的参数是不一样的。而这些参数又存储在 Catalog 外面,这种状况下只能创立另外一张表,所以字段和参数要重写一遍,十分繁琐。

  • Flink 1.11,社区就开发了 Table Hints,它在 1.11 中目前只专一一个性能,即 Dynamic Table Options。用起来很简略,在 SQL 中 Select From 时,在 Table 前面写 Table Hints 的形式来指定其动静 Options,在不同的应用场景,指定不同的动静参数。
  • Flink 1.11,引入了 Like 语法。LIKE 是规范的 SQL 定义。相当于 Clone 一张表进去复用它的 schema。LIKE 反对多种 Constraints。能够抉择继承,也能够抉择齐全笼罩。

Table Hints:

SELECT id, name FROM kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

LIKE:

CREATE TABLE kafka_table2 WITH ('scan.startup.mode'='earliest-offset') LIKE kafka_table1;

这两个伎俩在对接 Hive Catalog 的根底上,是十分好的补充,可能尽可能的防止在每次作业的时候都写一大堆 Schema。

■ 内置 Connectors

Flink SQL 1.11 引入了新的三个内置 Connectors,次要是为了大家更不便的进行调试、测试,以及进行压测和线上的察看:

  • Datagen Source:一个无中生有产生数据的 Source,能够定义生成的策略,比方 Sequence,比方 Random 的生成。不便线下进行功能性的测试,也能够拿来性能测试。
  • Print Sink:间接在 Task 节点 Runtime 的打印出数据,比方线上作业某个 Sink 少数据了,不晓得是上游发来数据有问题,还是 Sink 逻辑有问题,这时能够额定接一个 Print Sink,排查上游数据到底有没有问题。
  • Blackhole Sink:默默把数据给吃掉,不便功能性的调试。

这三个 Connectors 的目标是为了在调试、测试中排除 Connectors 的影响,一般来说,Connectors 在流计算中是不可控的存在,很多问题把 Connectors 糅杂在一起,变得比较复杂难以排查。

SQL-API

■ TableEnvironment

TableEnvironment 作为 SQL 层的编程入口,无疑是十分重要的,之前的 API 次要是:

  • Table sqlQuery:从一段 Select 的 Query 中返回 Table 接口,把用户的 SQL 翻译成 Flink 的 Table。
  • void sqlUpdate:实质上是执行一段 DDL/DML。然而行为上,当是 DDL 时,间接执行;当是 DML 时,默默 Cache 到 TableEnvironment,等到后续的 execute 调用,才会真正的执行。
  • execute:真正的执行,提交作业到集群。

TableEnvironment 默默的 Cache 执行打算,而且多个 API 感觉上会很凌乱,所以,1.11 社区重构了编程接口,目标是想要提供一个洁净、并且不易出 bug 的清晰接口。

  • 单 SQL 执行:TableResult executeSql(String sql)
  • 多 SQL 执行:StatementSet createStatementSet()
  • TableResult:反对 collect、print、getJobClient

当初 executeSql 就是一个大一统的接口,不论是什么 SQL,是 Query 还是 DDL 还是 DML,间接丢给它都能够很不便地应用起来。并且,和 Datastream 也有了很清晰的界线:

  • 调用过 toDataStream:肯定要应用 StreamExecutionEnvironment.execute
  • 没调用过 toDataStream:肯定要应用 TableEnvironment.executeSql

■ SQL-Client

SQL-Client 在 1.11 对齐了很多 Flink 外部原本就反对的 DDL,除此之外值得注意的是,社区还开发了 Tableau 的后果展现模式,展现更天然一些,间接在命令行展现后果,而不是切换页面:

总结和瞻望

上述解读次要偏重在用户接口方面,社区曾经有比拟丰盛的文档,大家能够去官网查看这些性能的具体文档,以便更深刻的理解和应用。

Flink SQL 1.11 在 CDC 方面开了个头,外部机制和 API 上打下了夯实的根底,将来会内置更多的 CDC 反对,间接对接数据库 Binlog,反对更多的 Flink SQL 语法。后续版本也会从底层提供更多的流批一体反对,给 SQL 层带来更多的流批一体的可能性。

正文完
 0