整顿|路培杰(Flink 社区志愿者)

摘要:Apache Flink 是目前大数据畛域十分风行的流批对立的计算引擎,数据湖是适应云时代倒退潮流的新型技术架构,以 Iceberg、Hudi、Delta 为代表的解决方案应运而生,Iceberg 目前反对 Flink 通过 DataStream API /Table API 将数据写入 Iceberg 的表,并提供对 Apache Flink 1.11.x 的集成反对。

本文由腾讯数据平台部高级工程师苏舒分享,次要介绍腾讯大数据部门基于 Apache Flink 和 Apache Iceberg 构建实时数仓的利用实际,介绍次要包含如下几个方面:

  1. 背景及痛点
  2. 数据湖 Apache Iceberg 的介绍
  3. Flink+Iceberg 构建实时数仓
  4. 将来布局

一.背景及痛点

如图 1 所示,这是以后曾经助力的一些外部利用的用户,其中小程序和视频号这两款利用每天或者每个月产生的数据量都在 PB 级或者 EB 级以上。

图1

这些利用的用户在构建他们本人的数据分析平台过程中,他们往往会采纳图 2 这样的一个架构,置信大家对这个架构也十分的相熟了。

1.数据平台架构

业务方比方腾讯看点或者视频号的用户,他们通常会采集利用前端的业务打点数据以及应用服务日志之类的数据,这些数据会通过消息中间件(Kafka/RocketMQ)或者数据同步服务 (flume/nifi/dataX) 接入数仓或者实时计算引擎。

在数仓体系中会有各种各样的大数据组件,譬如 Hive/HBase/HDFS/S3,计算引擎如 MapReduce、Spark、Flink,依据不同的需要,用户会构建大数据存储和解决平台,数据在平台通过解决和剖析,后果数据会保留到 MySQL、Elasticsearch 等反对疾速查问的关系型、非关系型数据库中,接下来应用层就能够基于这些数据进行 BI 报表开发、用户画像,或基于 Presto 这种 OLAP 工具进行交互式查问等。

图2

2.Lambda 架构的痛点

在整个过程中咱们经常会用一些离线的调度零碎,定期的(T+1 或者每隔几小时)去执行一些 Spark 剖析工作,做一些数据的输出、输入或是 ETL 工作。离线数据处理的整个过程中必然存在数据提早的景象,不论是数据接入还是两头的剖析,数据的提早都是比拟大的,可能是小时级也有可能是天级别的。另外一些场景中咱们也经常会为了一些实时性的需要去构建一个实时处理过程,比方借助 Flink+Kafka 去构建实时的流解决零碎。

整体上,数仓架构中有十分多的组件,大大增加了整个架构的复杂性和运维的老本。

如下图,这是很多公司之前或者当初正在采纳的 Lambda 架构,Lambda 架构将数仓分为离线层和实时层,相应的就有批处理和流解决两个互相独立的数据处理流程,同一份数据会被解决两次以上,同一套业务逻辑代码须要适配性的开发两次。Lambda 架构大家应该曾经十分相熟了,上面我就着重介绍一下咱们采纳 Lambda 架构在数仓建设过程中遇到的一些痛点问题。

图3

例如在实时计算一些用户相干指标的实时场景下,咱们想看到以后 pv、uv 时,咱们会将这些数据放到实时层去做一些计算,这些指标的值就会实时出现进去,但同时想理解用户的一个增长趋势,须要把过来一天的数据计算出来。这样就须要通过批处理的调度工作来实现,比方凌晨两三点的时候在调度零碎上起一个 Spark 调度工作把当天所有的数据从新跑一遍。

很显然在这个过程中,因为两个过程运行的工夫是不一样的,跑的数据却雷同,因而可能造成数据的不统一。因为某一条或几条数据的更新,须要从新跑一遍整个离线剖析的链路,数据更新老本很大,同时须要保护离线和实时剖析两套计算平台,整个高低两层的开发流程和运维老本其实都是十分高的。

为了解决 Lambda 架构带来的各种问题,就诞生了 Kappa 架构,这个架构大家应该也十分的相熟。

3.Kappa 架构的痛点

咱们来讲一下 Kappa 架构,如图 4,它两头其实用的是音讯队列,通过用 Flink 将整个链路串联起来。Kappa 架构解决了 Lambda 架构中离线解决层和实时处理层之间因为引擎不一样,导致的运维老本和开发成本昂扬的问题,但 Kappa 架构也有其痛点。

  • 首先,在构建实时业务场景时,会用到 Kappa 去构建一个近实时的场景,但如果想对数仓中间层例如 ODS 层做一些简略的 OLAP 剖析或者进一步的数据处理时,如将数据写到 DWD 层的 Kafka,则须要另外接入 Flink。同时,当须要从 DWD 层的 Kafka 把数据再导入到 Clickhouse,Elasticsearch,MySQL 或者是 Hive 外面做进一步的剖析时,显然就减少了整个架构的复杂性。
  • 其次,Kappa 架构是强烈依赖音讯队列的,咱们晓得音讯队列自身在整个链路上数据计算的准确性是严格依赖它上游数据的程序,音讯队列接的越多,产生乱序的可能性就越大。ODS 层数据个别是相对精确的,把 ODS 层的数据发送到下一个 kafka 的时候就有可能产生乱序,DWD 层再发到 DWS 的时候可能又乱序了,这样数据不一致性就会变得很重大。
  • 第三,Kafka 因为它是一个顺序存储的零碎,顺序存储零碎是没有方法间接在其下面利用 OLAP 剖析的一些优化策略,例如谓词下推这类的优化策略,在顺序存储的 Kafka 上来实现是比拟艰难的事件。

那么有没有这样一个架构,既可能满足实时性的需要,又可能满足离线计算的要求,而且还可能加重运维开发的老本,解决通过音讯队列构建 Kappa 架构过程中遇到的一些痛点?答案是必定的,前面的篇幅会具体阐述。

图4

4.痛点总结

■ 传统 T+1 工作

  • 海量的TB级 T+ 1 工作提早导致上游数据产出工夫不稳固。
  • 工作遇到故障重试复原代价低廉
  • 数据架构在解决去重和 exactly-once语义能力方面比拟吃力
  • 架构简单,波及多个零碎协调,靠调度零碎来构建工作依赖关系

■ Lambda 架构痛点

  • 同时保护实时平台和离线平台两套引擎,运维老本高
  • 实时离线两个平台须要保护两套框架不同但业务逻辑雷同代码,开发成本高
  • 数据有两条不同链路,容易造成数据的不一致性
  • 数据更新老本大,须要重跑链路

■ Kappa 架构痛点

  • 对音讯队列存储要求高,音讯队列的回溯能力不迭离线存储
  • 音讯队列自身对数据存储有时效性,且以后无奈应用 OLAP 引擎间接剖析音讯队列中的数据
  • 全链路依赖音讯队列的实时计算可能因为数据的时序性导致后果不正确

图5

5.实时数仓建设需要

是否存在一种存储技术,既可能反对数据高效的回溯能力,反对数据的更新,又可能实现数据的批流读写,并且还可能实现分钟级到秒级的数据接入?

这也是实时数仓建设的迫切需要(图 6)。实际上是能够通过对 Kappa 架构进行降级,以解决 Kappa 架构中遇到的一些问题,接下来次要分享以后比拟火的数据湖技术--Iceberg。

图 6

二、数据湖 Apache Iceberg 的介绍

1.Iceberg 是什么

首先介绍一下什么是 Iceberg。官网形容如下:

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

Iceberg 的官网定义是一种表格局,能够简略了解为是基于计算层(Flink , Spark)和存储层(ORC,Parqurt,Avro)的一个中间层,用 Flink 或者 Spark 将数据写入 Iceberg,而后再通过其余形式来读取这个表,比方 Spark,Flink,Presto 等。

图 7

2.Iceberg 的 table format 介绍

Iceberg 是为剖析海量数据筹备的,被定义为 table format,table format 介于计算层和存储层之间。

table format 次要用于向下治理在存储系统上的文件,向上为计算层提供一些接口。存储系统上的文件存储都会采纳肯定的组织模式,譬如读一张 Hive 表的时候,HDFS 文件系统会带一些 partition,数据存储格局、数据压缩格局、数据存储 HDFS 目录的信息等,这些信息都存在 Metastore 上,Metastore 就能够称之为一种文件组织格局。

一个优良的文件组织格局,如 Iceberg,能够更高效的反对下层的计算层拜访磁盘上的文件,做一些 list、rename 或者查找等操作。

3.Iceberg 的能力总结

Iceberg 目前反对三种文件格式 parquet,Avro,ORC,如图 7,无论是 HDFS 或者 S3 上的文件,能够看到有行存也有列存,前面会具体的去介绍其作用。Iceberg 自身具备的能力总结如下(如图 8),这些能力对于前面咱们利用 Iceberg 来构建实时数仓是十分重要的。

图8

  • 基于快照的读写拆散和回溯
  • 流批对立的写入和读取
  • 不强绑定计算存储引擎
  • ACID 语义及数据多版本
  • 表, 模式及分区的变更

4.Iceberg 的文件组织格局介绍

下图展现的是 Iceberg 的整个文件组织格局。从上往下看:

  • 首先最上层是 snapshot 模块。Iceberg 外面的 snapshot 是一个用户可读取的根本的数据单位,也就是说用户每次读取一张表外面的所有数据,都是一个snapshot 下的数据。
  • 其次,manifest。一个 snapshot 上面会有多个 manifest,如图 snapshot-0 有两个 manifest,而 snapshot-1 有- 三个 manifest,每个 manifest 上面会治理一个至多个 DataFiles 文件。
  • 第三,DataFiles。manifest 文件外面寄存的就是数据的元信息,咱们能够关上 manifest 文件,能够看到外面其实是一行行的 datafiles 文件门路。

从图上看到,snapshot-1 蕴含了 snapshop-0 的数据,而 snapshot-1 这个时刻写入的数据只有 manifest2,这个能力其实就为咱们前面去做增量读取提供了一个很好的反对。

图 9

5.Iceberg 读写过程介绍

■ Apache Iceberg 读写

首先,如果有一个 write 操作,在写 snapsho-1 的时候,snapshot-1 是虚线框,也就是说此时还没有产生 commit 操作。这时候对 snapshot-1 的读其实是不可读的,因为用户的读只能读到曾经 commit 之后的 snapshot。产生 commit 之后才能够读。同理,会有 snapshot-2,snapshot-3。

Iceberg 提供的一个重要能力,就是读写拆散能力。在对 snapshot-4 进行写的时候,其实是齐全不影响对 snapshot-2 和 snapshot-3 的读。Iceberg 的这个能力对于构建实时数仓是十分重要的能力之一。

图 10

同理,读也是能够并发的,能够同时读 s1、s2、s3 的快照数据,这就提供了回溯读到 snapshot-2 或者 snapshot-3 数据的能力。Snapshot-4 写实现之后,会产生一次 commit 操作,这个时候 snapshot-4 变成了实心,此时就能够读了。另外,能够看到 current Snapshot 的指针移到 s4,也就是说默认状况下,用户对一张表的读操作,都是读 current Snapshot 指针所指向的 Snapshot,但不会影响后面的 snapshot 的读操作。

■ Apache Iceberg 增量读

接下来讲一下 Iceberg 的增量读。首先咱们晓得 Iceberg 的读操作只能基于曾经提交实现的 snapshot-1,此时会有一个 snapshot-2,能够看到每个 snapshot 都蕴含后面 snapshot 的所有数据,如果每次都读全量的数据,整个链路上对计算引擎来说,读取的代价十分高。

如果只心愿读到以后时刻新增的数据,这个时候其实就能够依据 Iceberg 的 snapshot 的回溯机制,仅读取 snapshot1 到 snapshot2 的增量数据,也就是紫色这块的数据能够读的。

图 11

同理 s3 也是能够只读黄色的这块区域的数据,同时也能够读 s3 到 s1 这块的增量数据,基于 Flink source 的 streaming reader 性能在外部咱们曾经实现这种增量读取的性能,并且曾经在线上运行了。方才讲到了一个十分重要的问题,既然 Iceberg 曾经有了读写拆散,并发读,增量读的性能,Iceberg 要跟 Flink 实现对接,那么就必须实现 Iceberg 的 sink。

■ 实时小文件问题

社区当初曾经重构了 Flink 外面的 FlinkIcebergSink,提供了 global committee 的性能,咱们的架构其实跟社区的架构是保持一致的,曲线框中的这块内容是 FlinkIcebergSink。

在有多个 IcebergStreamWriter 和一个 IcebergFileCommitter 的状况下,上游的数据写到 IcebergStreamWriter 的时候,每个 writer 外面做的事件都是去写 datafiles 文件。

图 12

当每个 writer 写完本人以后这一批 datafiles 小文件的时候,就会发送音讯给 IcebergFileCommitter,通知它能够提交了。而 IcebergFileCommitter 收到信息的时,就一次性将 datafiles 的文件提交,进行一次 commit 操作。

commit 操作自身只是对一些原始信息的批改,当数据都曾经写到磁盘了,只是让其从不可见变成可见。在这个状况下,Iceberg 只须要用一个 commit 即可实现数据从不可见变成可见的过程。

■ 实时小文件合并

Flink 实时作业个别会长期在集群中运行,为了要保证数据的时效性,个别会把 Iceberg commit 操作的工夫周期设成 30 秒或者是一分钟。当 Flink 作业跑一天时,如果是一分钟一次 commit,一天须要 1440 个 commit,如果 Flink 作业跑一个月commit 操作会更多。甚至 snapshot commit 的工夫距离越短,生成的 snapshot 的数量会越多。当流式作业运行后,就会生成大量的小文件。

这个问题如果不解决的话,Iceberg 在 Flink 解决引擎上的 sink 操作就不可用了。咱们在外部实现了一个叫做 data compaction operator 的性能,这个 operator 是跟着 Flink sink 一起走的。当 Iceberg 的 FlinkIcebergSink 每实现一次 commit 操作的时候,它都会向上游 FileScanTaskGen 发送音讯,通知 FileScanTaskGen 曾经实现了一次 commit。

图 13

FileScanTaskGen 外面会有相干的逻辑,可能依据用户的配置或者以后磁盘的个性来进行文件合并工作的生成操作。FileScanTaskGen 发送到 DataFileRewitre 的内容其实就是在 FileScanTaskGen 外面生成的须要合并的文件的列表。同理,因为合并文件是须要肯定的耗时操作,所以须要将其进行异步的操作散发到不同的 task rewrite operator 中。

下面讲过的 Iceberg 是有 commit 操作,对于 rewrite 之后的文件须要有一个新的 snapshot 。这里对 Iceberg 来说,也是一个 commit 操作,所以采纳一个单并发的像 commit 操作一样的事件。

整条链路下来,小文件的合并目前采纳的是 commit 操作,如果 commit 操作前面阻塞了,会影响后面的写入操作,这块咱们前面会继续优化。当初咱们也在 Iceberg 社区开了一个 design doc 文档在推动,跟社区探讨进行合并的相干工作。

三、Flink+Iceberg 构建实时数仓

1.近实时的数据接入

后面介绍了 Iceberg 既反对读写拆散,又反对并发读、增量读、小文件合并,还能够反对秒级到分钟级的提早,基于这些劣势咱们尝试采纳 Iceberg 这些性能来构建基于 Flink 的实时全链路批流一体化的实时数仓架构。

如下图所示,Iceberg 每次的 commit 操作,都是对数据的可见性的扭转,比如说让数据从不可见变成可见,在这个过程中,就能够实现近实时的数据记录。

图 14

2.实时数仓 - 数据湖剖析零碎

此前须要先进行数据接入,比方用 Spark 的离线调度工作去跑一些数据,拉取,抽取最初再写入到 Hive 表外面,这个过程的延时比拟大。有了 Iceberg 的表构造,能够两头应用 Flink,或者 spark streaming,实现近实时的数据接入。

基于以上性能,咱们再来回顾一下后面探讨的 Kappa 架构,Kappa 架构的痛点下面曾经形容过,Iceberg 既然可能作为一个优良的表格局,既反对 Streaming reader,又能够反对 Streaming sink,是否能够思考将 Kafka 替换成 Iceberg?

Iceberg 底层依赖的存储是像 HDFS 或 S3 这样的便宜存储,而且 Iceberg 是反对 parquet、orc、Avro 这样的列式存储。有列式存储的反对,就能够对 OLAP 剖析进行根本的优化,在中间层间接进行计算。例如谓词下推最根本的 OLAP 优化策略,基于 Iceberg snapshot 的 Streaming reader 性能,能够把离线工作天级别到小时级别的提早大大的升高,革新成一个近实时的数据湖剖析零碎。

图 15

在两头解决层,能够用 presto 进行一些简略的查问,因为 Iceberg 反对 Streaming read,所以在零碎的中间层也能够间接接入 Flink,间接在中间层用 Flink 做一些批处理或者流式计算的工作,把两头后果做进一步计算后输入到上游。

■ 替换 Kafka 的优劣势

总的来说,Iceberg 替换 Kafka 的劣势次要包含:

  • 实现存储层的流批对立
  • 中间层反对 OLAP 剖析
  • 完满反对高效回溯
  • 存储老本升高

当然,也存在肯定的缺点,如:

  • 数据提早从实时变成近实时
  • 对接其余数据系统须要额定开发工作

图 16

■ 秒级剖析 - 数据湖减速

因为 Iceberg 自身是将数据文件全副存储在 HDFS 上的,HDFS 读写这块对于秒级剖析的场景,还是不可能齐全满足咱们的需要,所以接下去咱们会在 Iceberg 底层反对 Alluxio 这样一个缓存,借助于缓存的能力能够实现数据湖的减速。这块的架构也在咱们将来的一个布局和建设中。

图 17

3.最佳实际

■ 实时小文件合并

如图 18 所示,腾讯外部曾经实现了 Iceberg 的齐全 SQL 化,其实咱们在 table properties 外面能够设置一些小文件合并的参数,例如 snapshot 达到多少进行一次合并,一共有多少个 snapshot 时进行合并等,这样底层就能够间接通过一条 insert 语句启动 Flink 入湖工作,整个工作就能够继续运行,后盾数据的 datafiles 文件也会在后盾主动实现合并的操作。

图 18

上面这张图就是 Iceberg 中数据文件和数据文件对应的 meta 文件的信息,因为当初社区开源的 IceberFlinkSink 还没有文件合并的性能,能够尝试关上一个比拟小的流解决工作,而后在本人电脑上跑一下,能够看到 Flink 工作运行之后,一段时间后,对应目录的文件数就会暴涨。

图 19

利用了 Iceberg 的实时合并小文件性能之后,能够看到文件数其实是能够管制在一个比较稳定的数量。

■ Flink 实时增量读取

实现实时数据的增量读取,能够将其配置到 Iceberg 的 table properties 参数外面,并且能够指定从哪个 snapshot 开始生产。如果指定了从哪个 snapshot 生产之后,每次 Flink 工作启动,就只会读取以后最新 snapshot 外面新增的数据。

图 20

在本实例中,开启了小文件合并的性能,最初用 SQL 启动了一个 Flink sink 的入湖工作。

■ SQL Extension 管理文件

以后用户十分心愿所有的工作都用 SQL 来解决,小文件合并的性能其实只实用于在线上跑的一些 Flink 工作,相较于离线工作来说,每一次 commit 周期内它所生成的文件数量或者文件大小都不会特地大。

但当用户的工作跑了比拟长的工夫,底层的文件可能曾经成千上万个了,这个时候间接在线上用实时的工作去做合并显然是不适合的,并可能会影响到线上实时工作的时效性,咱们能够通过应用 SQL extension 来解决小文件合并,或者是删除遗留的文件,或者是过期 snapshot。

咱们外部其实曾经实现了通过用 SQL extension 的形式来治理 Iceberg 在磁盘上的数据和数据元信息的文件,前面咱们会继续的往 SQL extension 减少更多的性能,来欠缺 Iceberg 的可用性,晋升用户体验。

图 21

四、将来布局

图 22

1.Iceberg 内核能力晋升

  • Row-level delete 性能。在用 Iceberg 构建整个数据链路的过程中,如果有数据的更新怎么办?Iceberg 以后只反对 copy on write 的 update 的能力,copy on write 对写是有一个放大的作用,如果要真正的在整个链路上构建一个实时数据处理过程,还是须要一个高效的 merge on read 的 update 能力。这是十分重要的,前面咱们也会再持续跟社区单干,腾讯外部也会去做一些实际,去欠缺 Row-level delete 的性能。
  • SQL Extension 能力欠缺。咱们会更加欠缺 SQL Extension 的能力。
  • 建设对立索引减速数据检索。Iceberg 当初并没有对立的索引来减速数据检索,当初咱们也在跟社区单干,社区也提出了一个 Bloom Filter 的索引能力,通过构建对立的索引,能够减速 iceberg 检索文件的能力。

在 Iceberg 的内核晋升方面,咱们次要是心愿先可能把这些性能给欠缺。

2.平台建设

在平台建设方面,咱们将尝试:

  • 首先,主动 Schema 辨认抽取建表。心愿可能主动的依据前端的数据 Schema 信息,可能主动的将这个表给创立进去,更不便用户去应用整个数据入湖的一个流程。
  • 其次,更便捷的数据元信息管理。Iceberg 当初的元信息其实都是裸的,都是间接放在 hive metastore 上的,如果用户须要查看数据元信息,其实还须要去跑 SQL,咱们心愿在平台化的建设中把它给持续的欠缺。
  • 第三,基于 Alluxio 打造数据减速层。心愿用 Alluxio 打造一个数据湖减速层性能,以不便下层更加好的去实现一个秒级剖析的能力。
  • 第四,与外部各零碎买通。其实咱们外部还有很多像实时离线剖析的各个系统,咱们也是须要将咱们整个平台跟外部的各个系统之间进行一个买通串联的工作。