乐趣区

关于前端:基于-Flink-SQL-构建流批一体的-ETL-数据集成

简介:如何利用 Flink SQL 构建流批一体的 ETL 数据集成。

本文整顿自云邪、雪尽在 Flink Forward Asia 2020 的分享,该分享以 4 个章节来具体介绍如何利用 Flink SQL 构建流批一体的 ETL 数据集成, 文章的次要内容如下:

  1. 数据仓库与数据集成
  2. 数据接入(E)
  3. 数据入仓 / 湖(L)
  4. 数据打宽(T)

数据仓库与数据集成

数据仓库是一个集成的(Integrated),面向主题的(Subject-Oriented),随工夫变动的(Time-Variant),不可批改的(Nonvolatile)数据汇合,用于反对管理决策。这是数据仓库之父 Bill Inmon 在 1990 年提出的数据仓库概念。该概念里最重要的一点就是“集成的”,其余个性都是一些方法论的货色。因为数据仓库首先要解决的问题,就是数据集成,就是将多个扩散的、异构的数据源整合在一起,打消数据孤岛,便于后续的剖析。这个不仅实用于传统的离线数仓,也同样实用于实时数仓,或者是当初炽热的数据湖。首先要解决的就是数据集成的问题。如果说业务的数据都在一个数据库中,并且这个数据库还能提供十分高效的查问剖析能力,那其实也用不着数据仓库和数据湖上场了。

数据集成就是咱们常称作 ETL 的过程,别离是数据接入、数据荡涤转换打宽、以及数据的入仓入湖,别离对应三个英文单词的首字母,所以叫 ETL。ETL 的过程也是数仓搭建中最具工作量的环节。那么 Flink 是如何改善这个 ETL 的过程的呢?咱们先来看看传统的数据仓库的架构。

传统的数据仓库,实时和离线数仓是比拟割裂的两套链路,比方实时链路通过 Flume 和 Canal 实时同步日志和数据库数据到 Kafka 中,而后在 Kafka 中做数据清理和打宽。离线链路通过 Flume 和 Sqoop 定期同步日志和数据库数据到 HDFS 和 Hive。而后在 Hive 里做数据清理和打宽。

这里咱们次要关注的是数仓的前半段的构建,也就是到 ODS、DWD 层,咱们把这一块看成是狭义的 ETL 数据集成的范畴。那么在这一块,传统的架构次要存在的问题就是这种割裂的数仓搭建这会造成很多反复工作,反复的资源耗费,并且实时、离线底层数据模型不统一,会导致数据一致性和品质难以保障。同时两个链路的数据是孤立的,数据没有实现买通和共享。

那么 Flink 能给这个架构带来什么扭转呢?

基于 Flink SQL 咱们当初能够不便地构建流批一体的 ETL 数据集成,与传统数仓架构的外围区别次要是这几点:

  1. Flink SQL 原生反对了 CDC 所以当初能够不便地同步数据库数据,不论是直连数据库,还是对接常见的 CDC 工具。
  2. Flink SQL 在最近的版本中继续强化了维表 join 的能力,不仅能够实时关联数据库中的维表数据,当初还能关联 Hive 和 Kafka 中的维表数据,能灵便满足不同工作负载和时效性的需要。
  3. 基于 Flink 弱小的流式 ETL 的能力,咱们能够对立在实时层做数据接入和数据转换,而后将明细层的数据回流到离线数仓中。
  4. 当初 Flink 流式写入 Hive,曾经反对了主动合并小文件的性能,解决了小文件的苦楚。

所以基于流批一体的架构,咱们能取得的收益:

  1. 对立了根底公共数据
  2. 保障了流批后果的一致性
  3. 晋升了离线数仓的时效性
  4. 缩小了组件和链路的保护老本

接下来咱们会针对这个架构中的各个局部, 联合场景案例开展进行介绍,包含数据接入,数据入仓入湖,数据打宽。

数据接入

当初数据仓库典型的数据起源次要来自日志和数据库,日志接入现阶段曾经十分成熟了,也有十分丰盛的开源产品可供选择,包含 Flume,Filebeat,Logstash 等等都能很不便地采集日志到 Kafka。这里咱们就不作过多开展。

数据库接入会简单很多,常见的几种 CDC 同步工具包含 Canal,Debezium,Maxwell。Flink 通过 CDC format 与这些同步工具做了很好的集成,能够间接生产这些同步工具产生的数据。同时 Flink 还推出了原生的 CDC connector,直连数据库,升高接入门槛,简化数据同步流程。

咱们先来看一个应用 CDC format 的例子。当初常见的计划是通过 Debezium 或者 Canal 去实时采集 MySQL 数据库的 binlog,并将行级的变更事件同步到 Kafka 中供 Flink 剖析解决。在 Flink 推出 CDC format 之前,用户要去生产这种数据会十分麻烦,用户须要理解 CDC 工具的数据格式,将 before,after 等字段都申明进去,而后用 ROW_NUMBER 做个去重,来保障实时保留最初一行的语义。但这样应用老本很高,而且也不反对 DELETE 事件。

当初 Flink 反对了 CDC format,比方这里咱们在 with 参数中能够间接指定 format =‘debezium-json’,而后 schema 局部只须要填数据库中表的 schema 即可。Flink 能自动识别 Debezium 的 INSERT/UPDATE/DELETE 事件,并转成 Flink 外部的 INSERT/UPDATE/DELETE 音讯。之后用户能够在该表上间接做聚合、join 等操作,就跟操作一个 MySQL 实时物化视图一样,十分不便。

在 Flink 1.12 版本中,Flink 曾经原生反对了大部分常见的 CDC format,比方 Canal json、Debezium json、Debezium avro、Maxwell 等等。同时 Flink 也凋谢了 CDC format 的接口,用户能够实现本人的 CDC format 插件来对接本人公司的同步工具。

除此之外,Flink 外部原生反对了 CDC 的语义,所以能够很天然地间接去读取 MySQL 的 binlog 数据并转成 Flink 外部的变更音讯。所以咱们推出了 MySQL CDC connector,你只须要在 with 参数中指定 connector=mysql-cdc,而后 select 这张表就能实时读取 MySQL 中的全量 +CDC 增量数据,无需部署其余组件和服务。你能够把 Flink 中定义的这张表了解成是 MySQL 的实时物化视图,所以在这张表上的聚合、join 等后果,跟实时在 MySQL 中运行进去的后果是统一的。相比于刚刚介绍的 Debezium,Canal 的架构,CDC connector 在应用上更加简略易用了,不必再去学习和保护额定组件,数据不须要通过 Kafka 落地,缩小了端到端提早。而且反对先读取全量数据,并无缝切换到 CDC 增量读取上,也就是咱们说的是流批一体,流批交融的架构。

咱们发现 MySQL CDC connector 十分受用户的欢送,尤其是联合 OLAP 引擎,能够疾速构建实时 OLAP 架构。实时 OLAP 架构的一个特点就是将数据库数据同步到 OLAP 中做即席查问,这样就无需离线数仓了。

以前是怎么做的呢?

之前用户个别先用 datax 做个全量同步,而后用 canal 同步实时增量到 Kafka,而后从 Kafka 同步到 OLAP,这种架构比较复杂,链路也很长。当初很多公司都在用 Flink+ClickHouse 来疾速构建实时 OLAP 架构。咱们只须要在 Flink 中定义一个 mysql-cdc source,一个 ClickHouse sink,而后提交一个 insert into query 就实现了从 MySQL 到 ClickHouse 的实时同步工作,十分不便。而且,ClickHouse 有一个痛点就是 join 比较慢,所以个别咱们会把 MySQL 数据打成一张大的明细宽表数据,再写入 ClickHouse。这个在 Flink 中一个 join 操作就实现了。而在 Flink 提供 MySQL CDC connector 之前,要在全量 + 增量的实时同步过程中做 join 是十分麻烦的。

当然,这里咱们也能够把 ClickHouse 替换成其余常见的 OLAP 引擎,比方阿里云的 Hologres。咱们发现在阿里云上有很多的用户都采纳了这套链路和架构,因为它能够省掉数据同步服务和消息中间件的老本,对于很多中小公司来说,在现在的疫情时代,管制老本是十分重要的。

当然,这里也能够应用其余 OLAP 引擎,比方 TiDB。TiDB 官网也在最近发过一篇文章介绍这种 Flink+TiDB 的实时 OLAP 架构。

数据入仓湖

刚刚咱们介绍了基于 Flink SQL 能够十分不便地做数据接入,也就是 ETL 的 Extract 的局部。接下来,咱们介绍一下 Flink SQL 在数据入仓入湖方面的能力,也就是 Load 的局部。

咱们回顾下刚刚的流批一体的架构图,其中最外围的局部就是 Kafka 数据的流式入仓,正是这一流程买通了实时和离线数仓,对立了数仓的根底公共数据,晋升了离线数仓的时效性,所以咱们针对这一块开展讲一讲。

应用 Flink SQL 做流式数据入仓,十分的不便,而且 1.12 版本曾经反对了小文件的主动合并,解决了小文件的痛点。能够看下左边这段代码,先在 Flink SQL 中应用 Hive dialect 创立一张 Hive 的后果表,而后通过 select from kafka 表 insert into Hive 表这样一个简略 query,就能够提交工作实时将 Kafka 数据流式写入 Hive。

如果要开启小文件合并,只须要在 Hive 表参数中加上 auto-compaction = true,那么在流式写入这张 Hive 表的时候就会主动做小文件的 compaction。小文件合并的原理,是 Flink 的 streaming sink 会起一个小拓扑,外面 temp writer 节点负责一直将收到的数据写入临时文件中,当收到 checkpoint 时,告诉 compact coordinator 开始做小文件合并,compact coordinator 会将 compaction 工作分发给多个 compact operator 并发地去做小文件合并。当 compaction 实现的时候,再告诉 partition committer 提交整个分区文件可见。整个过程利用了 Flink 本身的 checkpoint 机制实现 compaction 的自动化,无需起另外的 compaction 服务。这也是 Flink 流式入仓比照于其余入仓工具的一个外围劣势。

除了流式入仓,Flink 当初也反对流式入湖。以 Iceberg 举例,基于 Iceberg 0.10,当初能够在 Flink SQL 外面间接 create 一个 Iceberg catalog,在 Iceberg catalog 下能够 create table 间接创立 Iceberg 表。而后提交 insert into query 就能够将流式数据导入到 Iceberg 中。而后在 Flink 中能够用 batch 模式读取这张 Iceberg 表,做离线剖析。不过 Iceberg 的小文件主动合并性能目前还没有公布,还在反对中。

刚刚介绍的是纯 append 数据流式入仓入湖的能力,接下来介绍 CDC 数据流式入仓入湖的能力。咱们先介绍 CDC 数据入 Kafka 实时数仓。其实这个需要在实时数仓的搭建中是十分常见的,比方同步数据库 binlog 数据到 Kafka 中,又比方 join,聚合的后果是个更新流,用户想把这个更新流写到 Kafka 作为两头数据供上游生产。

这在以前做起来会十分的麻烦,在 Flink 1.12 版本中,Flink 引入了一个新的 connector,叫做 upsert-kafka,原生地反对了 Kafka 作为一个高效的 CDC 流式存储。

为什么说是高效的,因为存储的模式是与 Kafka log compaction 机制高度集成的,Kafka 会对 compacted topic 数据做主动清理,且 Flink 读取清理后的数据,仍能保障语义的一致性。而且像 Canal, Debezium 会存储 before,op_type 等很多无用的元数据信息,upsert-kafka 只会存储数据自身的内容,节俭大量的存储老本。应用上的话,只须要在 DDL 中申明 connector = upsert-kafka,并定义 PK 即可。

比方咱们这里定义了 MySQL CDC 的直播间表,以及一个 upsert-kafka 的后果表,将直播间的数据库同步到 Kafka 中。那么写入 Kafka 的 INSERT 和 UPDATE 都是一个带 key 的一般数据,DELETE 是一个带 key 的 NULL 数据。Flink 读取这个 upsert-kafka 中的数据时,能自动识别出 INSERT/UPDATE/DELETE 音讯,生产这张 upsert-kafka 表与生产 MySQL CDC 表的语义统一。并且当 Kafka 对 topic 数据做了 compaction 清理后,Flink 读取清理后的数据,仍能保障语义的一致性。

CDC 数据入 Hive 数仓会麻烦一些,因为 Hive 自身不反对 CDC 的语义,当初的一种常见形式是先将 CDC 数据以 changelog-json 格局流式写入到 HDFS。而后起个 batch 工作周期性地将 HDFS 上的 CDC 数据依照 op 类型分为 INSERT, UPDATE, DELETE 三张表,而后做个 batch merge。

数据打宽

后面介绍了基于 Flink SQL 的 ETL 流程的 Extract 和 Load,接下来介绍 Transformation 中最常见的数据打宽操作。

数据打宽是数据集成中最为常见的业务加工场景,数据打宽最次要的伎俩就是 Join,Flink SQL 提供了丰盛的 Join 反对,包含 Regular Join、Interval Join、Temporal Join。

Regular Join 就是大家熟知的双流 Join,语法上就是一般的 JOIN 语法。图中案例是通过广告曝光流关联广告点击流将广告数据打宽,打宽后能够进一步计算广告费用。从图中能够看出,曝光流和点击流都会存入 join 节点的 state,join 算子通过关联曝光流和点击流的 state 实现数据打宽。Regular Join 的特点是,任意一侧流都会触发后果的更新,比方案例中的曝光流和点击流。同时 Regular Join 的语法与传统批 SQL 统一,用户学习门槛低。但须要留神的是,Regular join 通过 state 来存储双流曾经达到的数据,state 默认永恒保留,所以 Regular join 的一个问题是默认状况下 state 会持续增长,个别咱们会联合 state TTL 应用。

Interval Join 是一条流上须要有工夫区间的 join,比方刚刚的广告计费案例中,它有一个十分典型的业务特点在外面,就是点击个别产生在曝光之后的 10 分钟内。因而绝对于 Regular Join,咱们其实只须要关联这 10 分钟内的曝光数据,所以 state 不必存储全量的曝光数据,它是在 Regular Join 之上的一种优化。要转成一个 Interval Join,须要在两个流上都定义工夫属性字段(如图中的 click_time 和 show_time)。并在 join 条件中定义左右流的工夫区间,比方这里咱们减少了一个条件:点击工夫须要大于等于曝光工夫,同时小于等于曝光后 10 分钟。与 Regular Join 雷同,Interval Join 任意一条流都会触发后果更新,但相比 Regular Join,Interval Join 最大的长处是 state 能够主动清理,依据工夫区间保留数据,state 占用大幅缩小。Interval Join 实用于业务有明确的工夫区间,比方曝光流关联点击流,点击流关联下单流,下单流关联成交换。

Temporal join (时态表关联) 是最罕用的数据打宽形式,它罕用来做咱们熟知的维表 Join。在语法上,它须要一个显式的 FOR SYSTEM_TIME AS OF 语句。它与 Regular Join 以及 Interval Join 最大的区别就是,维度数据的变动不会触发后果更新,所以支流关联上的维度数据不会再扭转。Flink 反对十分丰盛的 Temporal join 性能,包含关联 lookup DB,关联 changelog,关联 Hive 表。在以前,大家熟知的维表 join 个别都是关联一个能够查问的数据库,因为维度数据在数据库外面,但实际上维度数据可能有多种物理状态,比方 binlog 模式,或者定期同步到 Hive 中变成了 Hive 分区表的模式。在 Flink 1.12 中,当初曾经反对关联这两种新的维表状态。

Temporal Join Lookup DB 是最常见的维表 Join 形式,比方在用户点击流关联用户画像的案例中,用户点击流在 Kafka 中,用户实时画像寄存在 HBase 数据库中,每个点击事件通过查问并关联 HBase 中的用户实时画像实现数据打宽。Temporal Join Lookup DB 的特点是,维表的更新不会触发后果的更新,维度数据寄存在数据库中,实用于实时性要求较高的场景,应用时咱们个别会开启 Async IO 和内存 cache 晋升查问效率。

在介绍 Temporal Join Changelog 前,咱们再看一个 Lookup DB 的例子,这是一个直播互动数据关联直播间维度的案例。这个案例中直播互动数据(比方点赞、评论)寄存在 Kafka 中,直播间实时的维度数据(比方主播、直播间题目)寄存在 MySQL 中,直播互动的数据量是十分大的,为了减速拜访,罕用的计划是加个高速缓存,比方把直播间的维度数据通过 CDC 同步,再存入 Redis 中,再做维表关联。这种计划的问题是,直播的业务数据比拟非凡,直播间的创立和直播互动数据根本是同时产生的,因而互动数据可能早早地达到了 Kafka 被 Flink 生产,然而直播间的创立音讯通过了 Canal, Kafka,Redis, 这个链路比拟长,数据提早比拟大,可能导致互动数据查问 Redis 时,直播间数据还未同步实现,导致关联不上直播间数据,造成上游统计分析的偏差。

针对这类场景,Flink 1.12 反对了 Temporal Join Changelog,通过从 changelog 在 Flink state 中物化出维表来实现维表关联。刚刚的场景有了更简洁的解决方案,咱们能够通过 Flink CDC connector 把直播间数据库表的 changelog 同步到 Kafka 中,留神咱们看下左边这段 SQL,咱们用了 upsert-kafka connector 来将 MySQL binlog 写入了 Kafka,也就是 Kafka 中寄存了直播间变更数据的 upsert 流。而后咱们将互动数据 temporal join 这个直播间 upsert 流,便实现了直播数据打宽的性能。

留神咱们这里 FOR SYSTEM_TIME AS OF 不是跟一个 processing time,而是左流的 event time,它的含意是去关联这个 event time 时刻的直播间数据,同时咱们在直播间 upsert 流上也定义了 watermark,所以 temporal join changelog 在执行上会做 watermark 期待和对齐,保障关联上准确版本的后果,从而解决先前计划中关联不上的问题。

咱们具体解释下 temporal join changelog 的过程,左流是互动流数据,右流是直播间 changelog。直播间 changelog 会物化到右流的维表 state 中,state 相当于一个多版本的数据库镜像,支流互动数据会临时缓存在左流的 state 中,等到 watermark 达到对齐后再去查维表 state 中的数据。比方当初互动流和直播流的 watermark 都到了 10:01 分,互动流的这条 10:01 分评论数据就会去查问维表 state,并关联上 103 房间的信息。当 10:05 这条评论数据到来时,它不会马上输入,不然就会关联上空的房间信息。它会始终期待,等到左右两流的 watermark 都到 10:05 后,才会去关联维表 state 中的数据并输入。这个时候,它能关联上精确的 104 房间信息。

总结下,Temporal Join Changelog 的特点是实时性高,因为是依照 event time 做的版本关联,所以能关联上准确版本的信息,且维表会做 watermark 对齐期待,使得用户能够通过 watermark 管制早退的维表数。Temporal Join Changelog 中的维表数据都是寄存在 temporal join 节点的 state 中,读取十分高效,就像是一个本地的 Redis 一样,用户不再须要保护额定的 Redis 组件。

在数仓场景中,Hive 的应用是十分宽泛的,Flink 与 Hive 的集成十分敌对,当初曾经反对 Temporal Join Hive 分区表和非分区表。咱们举个典型的关联 Hive 分区表的案例:订单流关联店铺数据。店铺数据个别是变动比拟迟缓的,所以业务方个别会按天全量同步店铺表到 Hive 分区中,每天会产生一个新分区,每个分区是当天全量的店铺数据。

为了关联这种 Hive 数据,只需咱们在创立 Hive 分区表时指定右侧这两个红圈中的参数,便能实现主动关联 Hive 最新分区性能,partition.include = latestb 示意只读取 Hive 最新分区,partition-name 示意抉择最新分区时按分区名的字母序排序。到 10 月 3 号的时候,Hive 中曾经产生了 10 月 2 号的新分区, Flink 监控到新分区后,就会从新加载 10 月 2 号的数据到 cache 中并替换掉 10 月 1 号的数据作为最新的维表。之后的订单流数据关联上的都是 cache 10 月 2 号分区的数据。Temporal join Hive 的特点是能够主动关联 Hive 最新分区,实用于维表迟缓更新,高吞吐的业务场景。

总结一下咱们刚刚介绍的几种在数据打宽中应用的 join:

  1. Regular Join 的实效性十分高,吞吐个别,因为 state 会保留所有达到的数据,实用于双流关联场景;
  2. Interval Jon 的时效性十分好,吞吐较好,因为 state 只保留工夫区间内的数据,实用于有业务工夫区间的双流关联场景;
  3. Temporal Join Lookup DB 的时效性比拟好,吞吐较差,因为每条数据都须要查问内部零碎,会有 IO 开销,实用于维表在数据库中的场景;
  4. Temporal Join Changelog 的时效性很好,吞吐也比拟好,因为它没有 IO 开销,实用于须要维表期待,或者关联精确版本的场景;
  5. Temporal Join Hive 的时效性个别,但吞吐十分好,因为维表的数据寄存在 cache 中,实用于维表迟缓更新的场景,高吞吐的场景。

总结

最初咱们来总结下 Flink 在 ETL 数据集成上的能力。这是目前 Flink 数据集成的能力矩阵,咱们将现有的内部存储系统分为了关系型数据库、KV 数据库、音讯队列、数据湖、数据仓库 5 种类型,能够从图中看出 Flink 有十分丰盛的生态,并且对每种存储引擎都有十分弱小的集成能力。

横向上咱们定义了 6 种能力,别离是 3 种数据接入能力:

  • 全量读取
  • 流式读取
  • CDC 流式读取

一种数据打宽能力:

  • 维度关联;

以及两种入仓 / 入湖能力:

  • 流式写入
  • CDC 写入

能够看到 Flink 对各个系统的数据接入能力、维度打宽能力、入仓 / 入湖能力都曾经十分欠缺了。在 CDC 流式读取上,Flink 曾经反对了支流的数据库和 Kafka 音讯队列。在数据湖方向,Flink 对 Iceberg 的流式读取和 CDC 写入的性能也行将在接下来的 Iceberg 版本中公布。从这个能力矩阵能够看出,Flink 的数据集成能力是十分全面的。

作者:阿里云实时计算 Flink
原文链接
本文为阿里云原创内容,未经容许不得转载

退出移动版