关于flink:Flink-SQL-111-新功能与最佳实践

11次阅读

共计 9002 个字符,预计需要花费 23 分钟才能阅读完成。

本文整顿自 Flink PMC member 云邪在 Apache Flink Meetup 2020 · 上海站的 talk,旨在帮忙用户疾速理解新版本 Table & SQL 在 Connectivity 和 Simplicity 等方面的优化及理论开发应用的最佳实际,次要分为以下四个局部:

  1. 简要回顾 Flink 1.8 ~ Flink 1.11 版本在 Apache 社区的发展趋势,其中国内开发者的积极参与和中文社区的蓬勃发展对 Flink 在社区和 GitHub 的活跃度做出了重要奉献。
  2. 具体解读 Flink SQL 1.11 新性能,E.g. connectors 参数简化 + 动静 Table 参数缩小代码冗余,内置 connectors + LIKE 语法帮忙疾速测试,重构的 TableEnvironment、TableSource / TableSink 接口晋升易用性,Hive Dialect + CDC 进一步反对流批一体。
  3. 重点展现新版本对 Hive 数仓实时化的反对和 Flink SQL 引入 CDC 的数据同步最佳实际。
  4. 简要解读 Flink SQL 1.12 将来布局。

作者:伍翀(云邪),Apache Flink PMC member,阿里巴巴技术专家

整理者:陈婧敏(清樾)

校对:闵阁

1 Flink 1.8 ~ 1.11 社区发展趋势回顾

自 2019 年初阿里巴巴发表向 Flink 社区奉献 Blink 源码并在同年 4 月公布 Flink 1.8 版本后,Flink 在社区的沉闷水平犹如坐上小火箭般回升,每个版本蕴含的 git commits 数量以 50% 的增速继续上涨,吸引了一大批国内开发者和用户参加到社区的生态倒退中来,中文用户邮件列表(user-zh@)更是在往年 6 月首次超出英文用户邮件列表(user@),在 7 月超出比例达到了 50%。比照其它 Apache 开源社区如 Spark,Kafka 的用户邮件列表数(每月约 200 封左右)能够看出,整个 Flink 社区的倒退仍然十分衰弱和沉闷。

2 Flink SQL 新性能解读

在理解 Flink 整体发展趋势后,咱们来看下最近公布的 Flink 1.11 版本在 connectivity 和 simplicity 方面都带来了哪些令人耳目一新的性能。

#### FLIP-122:简化 connector 参数

整个 Flink SQL 1.11 在围绕易用性方面做了很多优化,比方 FLIP-122,优化了 connector 的 property 参数名称简短的问题。以 Kafka 为例,在 1.11 版本之前用户的 DDL 须要申明成如下形式

 CREATE TABLE user_behavior (...) WITH (
  'connector.type'='kafka',
  'connector.version'='universal',
  'connector.topic'='user_behavior',
  'connector.startup-mode'='earliest-offset',
  'connector.properties.zookeeper.connect'='localhost:2181',
  'connector.properties.bootstrap.servers'='localhost:9092',
  'format.type'='json'
);

而在 Flink SQL 1.11 中则简化为

CREATE TABLE user_behavior (...) WITH (
  'connector'='kafka',
  'topic'='user_behavior',
  'scan.startup.mode'='earliest-offset',
  'properties.zookeeper.connect'='localhost:2181',
  'properties.bootstrap.servers'='localhost:9092',
  'format'='json'
);

DDL 表白的信息量丝毫未少,然而看起来清新许多 :) Flink 的开发者们为这个优化做了很多探讨,有趣味能够围观 FLIP-122 Discussion Thread。

#### FLINK-16743:内置 connectors

Flink SQL 1.11 新退出了三种内置的 connectors,如下表所示

connector 形容 应用场景
‘connector’=’datagen’ 用于生成随机数据的 source 罕用于测试
‘connector’=’blackhole’ 不做任何解决的 sink 罕用于性能测试
‘connector’=’print’ 打印到规范输入流 (.out 文件) 的 sink 罕用于调试

在内部 connector 环境还没有 ready 时,用户能够抉择 datagen source 和 print sink 疾速构建 pipeline 相熟 Flink SQL;对于想要测试 Flink SQL 性能的用户,能够应用 blackhole 作为 sink;对于调试排错场景,print sink 会将计算结果打到规范输入(比方集群环境下就会打到 taskmanager.out 文件),使得定位问题的老本大大降低。

#### FLIP-110:LIKE 语法

Flink SQL 1.11 反对用户从已定义好的 table DDL 中疾速“fork”本人的版本并进一步批改 watermark 或者 connector 等属性。比方上面这张 base_table 上想加一个 watermark,在 Flink 1.11 版本之前,用户只能从新将表申明一遍,并退出本人的批改,堪称“牵一发而动全身”。

-- before Flink SQL 1.11
CREATE TABLE base_table (
 id BIGINT,
 name STRING,
 ts TIMESTAMP
) WITH (
 'connector.type'='kafka',
 ...
);

CREATE TABLE derived_table (
 id BIGINT,
 name STRING,
 ts TIMESTAMP,
 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
 'connector.type'='kafka',
 ...
);

从 Flink 1.11 开始,用户只须要应用 CREATE TABLE LIKE 语法就能够实现之前的操作

-- Flink SQL 1.11
CREATE TABLE base_table (
 id BIGINT,
 name STRING,
 ts TIMESTAMP
) WITH (
 'connector'='kafka',
 ...
);

CREATE TABLE derived_table (WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) LIKE base_table;

而内置 connector 与 CREATE TABLE LIKE 语法搭配应用则会如下图个别产生“天雷勾地火”的成果,极大晋升开发效率。

#### FLIP-113:动静 Table 参数

对于像 Kafka 这种音讯队列,在申明 DDL 时通常会有一个启动点位去指定开始生产数据的工夫,如果须要更改启动点位,在老版本上就须要从新申明一遍新点位的 DDL,十分不不便。

CREATE TABLE user_behavior (
  user_id BIGINT,
  behavior STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector'='kafka',
  'topic'='user_behavior',
  'scan.startup.mode'='timestamp',
  'scan.startup.timestamp-millis'='123456',
  'properties.bootstrap.servers'='localhost:9092',
  'format'='json'
);

从 Flink 1.11 开始,用户能够在 SQL client 中按如下形式设置开启 SQL 动静参数(默认是敞开的),如此即可在 DML 里指定具体的启动点位。

SET 'table.dynamic-table-options.enabled' = 'true';

SELECT user_id, COUNT(DISTINCT behaviro)
FROM user_behavior /*+ OPTIONS('scan.startup.timestamp-millis'='1596282223') */
GROUP BY user_id;

除启动点位外,动静参数还反对像 sink.partitionscan.startup.mode 等更多运行时参数,感兴趣可移步 FLIP-113 取得更多信息。

#### FLIP-84:重构优化 TableEnvironment 接口

Flink SQL 1.11 以前的 TableEnvironment 接口定义和行为有一些不够清晰,比方

  • TableEnvironment#sqlUpdate() 办法对于 DDL 会立刻执行,但对于 INSERT INTO DML 语句却是 buffer 住的,直到调用 TableEnvironment#execute() 才会被执行,所以在用户看起来程序执行的语句,理论产生的成果可能会不一样。
  • 触发作业提交有两个入口,一个是 TableEnvironment#execute(), 另一个是 StreamExecutionEnvironment#execute(),于用户而言很难了解应该应用哪个办法触发作业提交。
  • 单次执行不承受多个 INSERT INTO 语句。

针对这些问题,Flink SQL 1.11 提供了新 API,即 TableEnvironment#executeSql(),它对立了执行 sql 的行为,无论接管 DDL、查问 query 还是 INSERT INTO 都会立刻执行。针对多 sink 场景提供了 StatementSetTableEnvironment#createStatementSet() 办法,容许用户增加多条 INSERT 语句一起执行。

除此之外,新的 execute 办法都有返回值,用户能够在返回值上执行 print, collect 等办法。

新旧 API 比照如下表所示

Current Interface New Interface
tEnv.sqlUpdate("CREATE TABLE...”); TableResult result = tEnv.executeSql("CREATE TABLE...”);
tEnv.sqlUpdate("INSERT INTO...SELECT...”);<br/>tEnv.execute(); TableResult result = <br/> tEnv.executeSql("INSERT INTO ... SELECT...”);
tEnv.sqlUpdate("insert into xx ...”); <br/>tEnv.sqlUpdate("insert into yy ...”); <br/>tEnv.execute(); StatementSet ss =tEnv.createStatementSet(); <br/>ss.addInsertSql("insert into xx ...”); <br/>ss.addInsertSql("insert into yy ...”); <br/>TableResult result = ss.execute();

对于在 Flink 1.11 上应用新接口遇到的一些常见问题,云邪做了对立解答,可在 Appendix 局部查看。

#### FLIP-95:TableSource & TableSink 重构

开发者们在 Flink SQL 1.11 版本花了大量经验对 TableSource 和 TableSink API 进行了重构,外围优化点如下

  • 移除类型相干接口,简化开发,解决蛊惑的类型问题,反对全类型
  • 寻找 Factory 时,更清晰的报错信息
  • 解决找不到 primary key 的问题
  • 对立了流批 source,对立了流批 sink
  • 反对读取 CDC 和输入 CDC
  • 间接高效地生成 Flink SQL 外部数据结构 RowData

老 TableSink API 如下所示,其中有 6 个办法是类型相干并且还充斥着 deprecated 办法,导致 connector 常常出 bug。新 DynamicTableSink API 去掉了所有类型相干接口,因为所有的类型都是从 DDL 来的,不须要 TableSink 通知框架是什么类型。而对于用户来说,最直观的体验就是在老版本上遇到各种奇奇怪怪报错的概率升高了很多,比方不反对的精度类型和找不到 primary key / table factory 的诡异报错在新版本上都不复存在了。对于 Flink 1.11 是如何解决这些问题的具体能够在 Appendix 局部浏览。

#### FLIP-123:Hive Dialect

Flink 1.10 版本对 Hive connector 的反对达到了生产可用,然而老版本的 Flink SQL 不反对 Hive DDL 及应用 Hive syntax,这无疑限度了 Flink connectivity。在新版本中,开发者们为反对 HiveQL 引入了新 parser,用户能够在 SQL client 的 yaml 文件中指定是否应用 Hive 语法,也能够在 SQL client 中通过 set table.sql-dialect=hive/default 动静切换。更多信息能够参考 FLIP-123。

以上简要介绍了 Flink 1.11 在 缩小用户不必要的输出和操作 方面对 connectivity 和 simplicity 方面做出的优化。上面会重点介绍在 内部零碎和数据生态 方面对 connectivity 和 simplicity 的两个外围优化,并附上最佳实际介绍。

3 Hive 数仓实时化 & Flink SQL + CDC 最佳实际

FLINK-17433:Hive 数仓实时化

下图是一张十分经典的 Lambda 数仓架构,在整个大数据行业从批处理逐渐拥抱流计算的许多年里代表“最先进的生产力”。然而随着业务倒退和规模扩充,两套独自的架构所带来的开发、运维、计算成本问题曾经日益凸显。

而 Flink 作为一个流批一体的计算引擎,在最后的设计上就认为“万物实质皆是流”,批处理是流计算的特例,如果可能在本身提供高效批处理能力的同时与现有的大数据生态联合,则能以最小侵入的形式革新现有的数仓架构使其反对流批一体。在新版本中,Flink SQL 提供了开箱即用的“Hive 数仓同步”性能,即所有的数据加工逻辑由 Flink SQL 以流计算模式执行,在数据写入端,主动将 ODS,DWD 和 DWS 层的曾经加工好的数据 实时回流 到 Hive table。One size (sql) fits for all suites (tables) 的设计,使得在 batch 层不再须要保护任何计算 pipeline。

比照传统架构,它带来的益处和解决的问题有哪些呢?

  • 计算口径与解决逻辑对立,升高开发和运维老本

    传统架构保护两套数据 pipeline 最大的问题在于须要放弃它们 解决逻辑的等价性 ,但因为应用了不同的计算引擎(比方离线应用 Hive,实时应用 Flink 或 Spark Streaming),SQL 往往不能间接套用,存在 代码上的差异性,经久不息下来,离线和实时处理逻辑很可能会齐全 diverge,有些大的公司甚至会存在两个团队别离去保护实时和离线数仓,人力物力老本十分高。Flink 反对 Hive Streaming Sink 后,实时处理后果能够实时回流到 Hive 表,离线的计算层能够齐全去掉,解决逻辑由 Flink SQL 对立保护,离线层只须要应用回流好的 ODS、DWD、DWS 表做进一步 ad-hoc 查问即可。

  • 离线对于“数据漂移”的解决更天然,离线数仓“实时化”

    离线数仓 pipeline 非 data-driven 的调度执行形式,在跨分区的数据边界解决上往往须要很多 trick 来保障分区数据的完整性,而在两套数仓架构并行的状况下,有时会存在对 late event 解决差别导致数据比照不统一的问题。而实时 data-driven 的解决形式和 Flink 对于 event time 的敌对反对自身就意味着以业务工夫为分区(window),通过 event time + watermark 能够对立定义实时和离线数据的完整性和时效性,Hive Streaming Sink 更是解决了离线数仓同步的“最初一公里问题”。

    FLIP-105:反对 Change Data Capture (CDC)

除了对 Hive Streaming Sink 的反对,Flink SQL 1.11 的另一大亮点就是引入了 CDC 机制。CDC 的全称是 Change Data Capture,用于 tracking 数据库表的增删改查操作,是目前十分成熟的同步数据库变更的一种计划。在国内常见的 CDC 工具就是阿里开源的 Canal,在国外比拟风行的有 Debezium。Flink SQL 在设计之初就提出了 Dynamic Table 和“流表二象性”的概念,并且在 Flink SQL 外部残缺反对了 Changelog 性能,绝对于其余开源流计算零碎是一个重要劣势。实质上 Changelog 就等价于一张始终在变动的数据库的表。Dynamic Table 这个概念是 Flink SQL 的基石,Flink SQL 的各个算子之间传递的就是 Changelog,残缺地反对了 Insert、Delete、Update 这几种音讯类型。

得益于 Flink SQL 运行时的弱小,Flink 与 CDC 对接只须要将内部的数据流转为 Flink 零碎外部的 Insert、Delete、Update 音讯即可。进入到 Flink 外部后,就能够灵便地利用 Flink 各种 query 语法了。

在理论利用中,把 Debezium Kafka Connect Service 注册到 Kafka 集群并带上想同步的数据库表信息,Kafka 则会主动创立 topic 并监听 Binlog,把变更同步到 topic 中。在 Flink 端想要生产带 CDC 的数据也很简略,只须要在 DDL 中申明 format = debezium-json 即可。

在 Flink 1.11 上开发者们还做了一些乏味的摸索,既然 Flink SQL 运行时可能残缺反对 Changelog,那是否有可能不须要 Debezium 或者 Canal 的服务,间接通过 Flink 获取 MySQL 的变更呢?答案当然是能够,Debezium 类库的良好设计使得它的 API 能够被封装为 Flink 的 Source Function,不须要再起额定的 Service,目前这个我的项目曾经开源,反对了 MySQL 和 Postgres 的 CDC 读取,后续也会反对更多类型的数据库,可移步 `https://github.com/ververica/…
` 解锁更多应用姿态。

上面的 Demo 会介绍如何应用 flink-cdc-connectors 捕捉 mysql 和 postgres 的数据变更,并利用 Flink SQL 做多流 join 后实时同步到 elasticsearch 中。

假如你在一个电商公司,订单和物流是你最外围的数据,你想要实时剖析订单的发货状况。因为公司曾经很大了,所以商品的信息、订单的信息、物流的信息,都扩散在不同的数据库和表中。咱们须要创立一个流式 ETL,去实时生产所有数据库全量和增量的数据,并将他们关联在一起,打成一个大宽表。从而不便数据分析师后续的剖析。

4 Flink SQL 1.12 将来布局

以上介绍了 Flink SQL 1.11 的外围性能与最佳实际,对于下个版本,云邪也给出了一些 ongoing 的打算,并欢送大家在社区踊跃提出意见 & 倡议。

  • FLIP-132:Temporal Table DDL(Binlog 模式的维表关联)
  • FLIP-129:重构 Descriptor API(Table API 的 DDL)
  • 反对 Schema Registry Avro 格局
  • CDC 更欠缺的反对(批处理,upsert 输入到 Kafka 或 Hive)
  • 优化 Streaming File Sink 小文件问题
  • N-ary input operator(Batch 性能晋升)

5 Appendix

应用新版本 TableEnvironment 遇到的常见报错及起因

第一个常见报错是 No operators defined in streaming topology. 遇到这个问题的起因是在老版本中执行 INSERT INTO 语句的上面两个办法

TableEnvironment#sqlUpdate()
TableEnvironment#execute() 

在新版本中没有齐全向前兼容(办法还在,执行逻辑变了),如果没有将 Table 转换为 AppendedStream/RetractStream 时(通过StreamExecutionEnvironment#toAppendStream/toRetractStream),下面的代码执行就会呈现上述谬误;与此同时,一旦做了上述转换,就必须应用 StreamExecutionEnvironment#execute() 来触发作业执行。所以倡议用户还是迁徙到新版本的 API 下面,语义上也会更清晰一些。

第二个问题是调用新的 TableEnvironemnt#executeSql()print 没有看到返回值,起因是因为目前 print 依赖了 checkpoint 机制,开启 exactly-onece 后就能够了,新版本会优化此问题。

老版本的 StreamTableSourceStreamTableSink 常见报错及新版本优化

第一个常见报错是不反对精度类型,经常出现在 JDBC 或者 HBase 数据源上,在新版本上这个问题就不会再呈现了。

第二个常见报错是 Sink 时找不到 PK,因为老的 StreamSink 须要通过 query 去推导出 PK,当 query 变得复杂时有可能会失落 PK 信息,但实际上 PK 信息在 DDL 里就能够获取,没有必要通过 query 去推导,所以新版本的 Sink 就不会再呈现这个谬误啦。

第三个常见报错是在解析 Source 和 Sink 时,如果用户少填或者填错了参数,框架返回的报错信息很含糊,“找不到 table factory”,用户也不晓得该怎么批改。这是因为老版本 SPI 设计得比拟通用,没有对 Source 和 Sink 解析的逻辑做独自解决,当匹配不到残缺参数列表的时候框架曾经默认以后的 table factory 不是要找的,而后遍历所有的 table factories 发现一个也不匹配,就报了这个错。在新版的加载逻辑里,Flink 会先判断 connector 类型,再匹配残余的参数列表,这个时候如果必填的参数缺失或填错了,框架就能够精准报错给用户。

正文完
 0