乐趣区

关于flink:深度集成-Flink-Apache-Iceberg-0110-最新功能解读

在 2021 年 1 月 27 日,Apache Iceberg 公布了 0.11.0 版本[1]。在这个版本中,实现了以下外围性能:

1、Apache Iceberg 在 Core API 层面反对了 partition 的变更;同时还在 Iceberg Format v2 之上新增了 SortOrder 标准,次要用于将那些散列度较高的 column 汇集在少数几个文件内,这样能够大量缩小小文件的数量。同时进步读取的效率,因为数据通过 sort 写入后,文件级别和 Page 级别的 min-max 范畴将更小,有助于高效的数据过滤。

2、在 Flink 和 Iceberg 的集成方面,社区实现了以下指标:

  • 实现了 Flink Streaming Reader,意味着咱们能够通过 Flink 流作业增量地去拉取 Apache Iceberg 中新增数据。对 Apache Iceberg 这样流批对立的存储层来说,Apache Flink 是真正意义上第一个实现了流批读写 Iceberg 的计算引擎,这也标记着 Apache Flink 和 Apache Iceberg 在独特打造流批对立的数据湖架构上开启了新的篇章。
  • 实现了 Flink Streaming/Batch Reader 的 limit pushdown 和 filter pushdown。
  • 实现了 CDC 和 Upsert 事件通过 flink 计算引擎写入 Apache Iceberg,并在中等数据规模上实现了正确性验证。
  • 在 Flink Iceberg Sink 中反对 write.distribution-mode=hash 的形式写入数据,这能够从生产源头上大量缩小小文件。

3、在 Spark3 和 Iceberg 的集成方面,社区反对了大量高阶 SQL:

  • MERGE INTO
  • DELETE FROM
  • ALTER TABLE … ADD/DROP PARTITION
  • ALTER TABLE … WRITE ORDERED BY
  • 通过 Call <procedure> 形式来执行更多的数据管理操作,例如合并小文件、清理过期文件等。

4、在周边生态集成方面,社区实现了以下指标:

  • 引入 AWS module,实现和 AWS S3[2] 以及 Glue Catalog[3] 等云服务的集成;
  • 集成风行的开源 catalog 服务 nessie[4]。

在接下来的内容里,我将阐明 Apache Iceberg 0.11.0 在 Apache Flink 集成方面做的一些具体工作。

Apache Flink 流式读取

在 Apache Iceberg 0.10.0 版本中,咱们曾经在 Flink SQL 层面反对了:

  1. 流作业写入 Apache Iceberg 表;
  2. 批作业写入 Apache Iceberg 表;
  3. 批作业读取 Apache Iceberg 表;

在最新的 Apache Iceberg 0.11.0 版本中,咱们又胜利集成了 Flink 流作业读取 Apache Iceberg 表。有了这个性能,能够很不便地实现不同 Iceberg 表之间的数据流转和 ETL。例如咱们有一个原始表 A,须要把表 A 通过一些数据处理或者打宽,解决成一个表 B,那么这个场景是很适宜用 Apache Iceberg 的 Streaming Reader 来实现的。

除此之外,Netflix 也提出他们在采纳 Flink Streaming Reader 来实现历史数据的 backfill 和 boostrap。当然,这须要将来 iceberg 集成到 FLIP-27,目前 Netflix 提供了他们对这块工作的一些实践经验 [5] 和设计工作[6],大家感兴趣能够参考一下。

目前,对这个性能咱们提供了 Flink SQL 和 DataStream API 两种应用形式(举荐采纳 Flink SQL)。您能够通过浏览文档 [7] 来启动 Flink SQL 客户端,而后通过如下形式来启动流作业拜访 Apache Iceberg 的增量数据:

-- Submit the flink job in streaming mode for current session.
SET execution.type = streaming ;

-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;

-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

Flink Source 的 Limit Pushdown 和 Filter Pushdown

在 Flink 的 Batch Source 和 Streaming Source 中,咱们对接了 Limit 操作和 Filter 操作跟 Iceberg 表的下推实现。这意味着,在读取 Apache Iceberg 表时,碰到这样的 SQL:

SELECT * FROM sample LIMIT 10;

咱们能够在存储层面就实现数据过滤,而不须要把数据从存储层面读取进去,再丢给计算引擎。从而大大提高数据的拜访效率。

Filter 的下推也是相似,目前咱们反对了如下 Filter 的下推,简直蕴含了所有常见 filter 的下推操作:

SELECT * FROM sample WHERE data = 'a';
SELECT * FROM sample WHERE data != 'a';
SELECT * FROM sample WHERE data >= 'a';
SELECT * FROM sample WHERE data <= 'a';
SELECT * FROM sample WHERE data < 'a';
SELECT * FROM sample WHERE data > 'a';
SELECT * FROM sample WHERE data = 'a' AND id = 1;
SELECT * FROM sample WHERE data = 'a' OR id = 1;
SELECT * FROM sample WHERE data IS NULL;
SELECT * FROM sample WHERE NOT (id = 1);
SELECT * FROM sample WHERE data LIKE 'aaa%';

对 CDC(例如 MySQL Binlog)和 Upsert 事件的反对

这个性能是 Apache Flink 社区用户呼声特地高的一个性能,次要来自两个外围场景的需要:

  1. 用户心愿把来自关系型数据库的 binlog 导入到 Apache Iceberg 数据湖中,提供近实时的数据分析能力。
  2. 心愿把 Flink 流作业 AGG 产生的 upsert stream 导入到 Apache Iceberg 数据湖中,从而借助 Apache Iceberg 的存储能力和 Apache Flink 的剖析能力,提供近实时的数据报表。

通常来说,咱们能选的开源计划各有有余:抉择采纳 Hive MR 则只能提供 T+1 的数据时效性;采纳 Apache Kudu 则必须面临跟 HDFS 和云端对象存储脱节的难堪;抉择 HBase 则面临行存导致剖析能力有余的问题;抉择 Spark+delta 则无奈充分利用 Apache Flink 在流计算畛域的劣势。那么,在 Apache Iceberg 的实现中,这些问题将无望解决。

咱们把 flink+iceberg 对 CDC/Upsert 工作的集成大抵分成了两个阶段:

  • 第一阶段,是指 Flink 能够顺利地把 CDC 和 Upsert 的数据胜利写入到 Apache Iceberg,并能读取到一个正确的后果;
  • 第二阶段,是指 Flink+Iceberg 能顺利通过较大数据量的稳定性测试和性能测试,保障整条链路的稳定性和性能,从而达到能够上生产的水准。

那么,目前咱们在 0.11.0 版本中,曾经实现了第一阶段的指标,流作业曾经可能胜利地将 CDC/Upsert 数据写入到 Apache Iceberg 中,国内的小伙伴例如汽车之家和 B 站曾经帮忙实现中等数据量的正确性验证。

在将来的 Apache Iceberg 0.12.0 版本中,咱们布局了一系列的性能和稳定性相干事件,0.12.0 版本将会是 Iceberg CDC/Upsert 性能达到 Production Ready 的一个标志性版本。

反对 write.distribution-mode=hash 形式写入 Apache Iceberg

在 Flink 流作业写文件系统的数据文件时,非常容易碰到小文件的问题。这是因为如果 source 端的数据,不通过任何 shuffle 或者 cluster,就写入到 partition,很容易导致每个 Task 写了大量的 Partition 和 Bucket。这样对一个 Partition 来说,就存在多个 Task 写入,每个 Task 至多产生一个文件。而在 Apache Iceberg 这种数据湖架构中,Flink 的每一次 checkpoint,都将 Roll over file writer 以便提交 txn,那么随着分钟级别的 checkpoint 提交,肯定会产生大量的小文件。

目前在 Apache Iceberg 中,将提供 3 中形式来解决小文件问题:

1、在 Iceberg 表中设置 write.distribution-mode=hash 属性,例如:

CREATE TABLE sample (
    id BIGINT,
    data STRING
) PARTITIONED BY (data) WITH ('write.distribution-mode'='hash');

这样能够保障每一条记录依照 partition key 做 shuffle 之后再写入,每一个 Partition 最多由一个 Task 来负责写入,大大地缩小了小文件的产生。然而,这很容易产生另外一个问题,就是数据歪斜的问题。很多业务表都是依照工夫字段来做分区的,而产生的新数据都是依照工夫写入的,容易导致新数据都写入同一个 partition,造成写入数据热点。目前咱们举荐的做法是,在 partition 上面采纳 hash 的形式设置 bucket,那么每一个 partition 的数据将平均地落到每个 bucket 内,每一个 bucket 最多只会由一个 task 来写,既解决了小文件问题,又解决了数据热点问题。

在 Flink 1.11 版本临时不反对通过 SQL 的形式创立 bucket,但咱们能够通过 Java API 的形式将上述依照 data 字段 partition 之后的表增加 bucket。调用形式如下:

table.updateSpec()
       .addField(Expressions.bucket("id", 32))
       .commit();

2、定期对 Apache Iceberg 表执行 Major Compaction 来合并 Apache iceberg 表中的小文件。这个作业目前是一个 Flink 的批作业,提供 Java API 的形式来提交作业,应用姿态能够参考文档[8]。

3、在每个 Flink Sink 流作业之后,外挂算子用来实现小文件的主动合并。这个性能目前暂未 merge 到社区版本,因为波及到 format v2 的 compaction 的一些探讨,咱们会在 0.12.0 版本中公布该性能。

总结

自 Apache Flink 接入 Apache Iceberg 以来,社区曾经胜利地公布了两个版本。在这两个版本中,咱们曾经胜利地实现 Flink+Iceberg 的流批读写能力。

到目前为止,Flink+Iceberg 在国内外曾经有不少胜利的上线案例:

  • 腾讯外部每天都有大量的日志数据通过 Flink 荡涤解决后导入到 Iceberg,最大的表日新增几十 TB;
  • Netflix 则将公司内简直所有的用户行为数据通过 Flink 流计算导入到 Iceberg,最终存储在 AWS S3 之上,相比 HDFS 的形式,Flink+Iceberg 帮忙他们公司节俭大量的存储老本;
  • 同程艺龙也在 Flink+Iceberg 之上做了大量摸索,之前简直所有的剖析数据都存储在 Hive 上,鉴于 Hive 在 ACID 和历史回溯等方面能力有余,他们调研了 Iceberg,发现 Iceberg 非常适合替换他们的 Hive 存储格局,又因为下层计算生态的良好对接,简直所有的历史计算作业都不须要做改变,就能不便地切换 Hive 表到 Iceberg 之上。到目前为止同程艺龙曾经实现了几十张 Hive 表到 Iceberg 表的迁徙;
  • 汽车之家也是胜利在生产环境大量替换 Hive 表为 Iceberg 表的公司之一,同时他们也是最早采纳社区版 Iceberg 做 CDC 和 Upsert 数据分析 PoC 的公司,也十分期待将来 0.12.0 对 CDC 和 Upsert 场景的更多优化。

在将来的 Apache Iceberg 0.12.0 版本中,咱们布局了上图的外围性能。实质上咱们将实现 Flink+Iceberg 对 CDC 及 Upsert 场景的更好反对,将在稳定性、性能、易用性三个方面做更多的优化工作。

最初,我想聊一下 Apache Iceberg 在计算生态方面的现状。

随着 Apache Iceberg 0.11.0 新版的公布,Apache Iceberg 作为一个对立通用的数据湖 Table Format,在生态集成方面的劣势愈发显著。因为在 Table Format 层面对计算引擎无偏袒,计算引擎的集成呈现出百花齐放的姿势,大数据生态内简直所有支流计算引擎都跟 Iceberg 有着不同水平的对接:

  1. Netflix、腾讯和 Apple 几家公司的贡献者主力推动 Spark+Iceberg 的集成,腾讯、Netflix 和 Apple 在 Apache Spark 社区有着多位 Spark PMC 和 Spark Committer,在 Spark 社区和 Iceberg 社区的影响力引人注目。我集体乐观地判断,Apache Iceberg 和 Spark 的集成体验,将来无望比肩 Databricks delta 的商业版体验,大家能够期待下。
  2. 阿里巴巴 Flink 团队、Netflix 以及国内外宏大的 Flink 用户群在一直地推动 Flink+Iceberg 的集成,不再赘述;
  3. AWS Presto 团队以及 Trino 团队则在一直推动着 Presto 和 Iceberg 的集成,AWS Presto 团队曾经明确将 Iceberg 选型为他们的数据湖 table format。同时,也能够非常明显地看到,AWS 团队在 Iceberg 和 S3 以及 Glue 生态买通方面做的大量工作,Apache Iceberg 曾经成为 AWS 数据湖生态中相当重要的一环。
  4. Cloudera 曾经明确地选型 Apache Iceberg 来构建他们的商业版数据湖。应用过 Hadoop 的同学肯定不会对这家公司生疏,没错,这家公司就是 Hadoop 商业发行版做的最为杰出的公司之一。将来,他们将基于 Apache Iceberg 推出私有云服务,将给用户带来欠缺的 Flink、Spark、Hive、Impala 数据湖集成体验。这里重点说一下 Apache Impala,Cloudera 在交互式剖析场景下十分倚重自家开源的 Apache Impala(事实上,在大数据基准测试下 Impala 的性能体现确实要比 Presto 更好),Apache Iceberg 对存储层较为完满的形象和对多样化计算引擎的容纳,是胜利感动 Cloudera 选型 Apache Iceberg 最外围的理由之一。

更多对于 Flink 数据湖的探讨,请扫描下方钉群二维码,退出数据湖技术交换钉钉群。咱们会定期在群里公布 Apache Iceberg/Hudi 和 Flink 集成的最新进展,咱们也十分欢送大家踊跃探讨相干话题。

另外阿里云 Flink 团队也始终在寻求大数据计算和数据湖存储方向的人才,这里既有丰盛的利用场景等你来挑战,又有绝对灵便的空间参加开源社区晋升集体影响力。感兴趣的同学能够间接分割:kete.yangkt@alibaba-inc.com。

参考链接:

[1]https://lists.apache.org/x/th…
[2]https://aws.amazon.com/cn/s3/
[3]https://aws.amazon.com/cn/glue/
[4]https://projectnessie.org/
[5]https://www.youtube.com/watch…
[6]https://docs.google.com/docum…
[7]https://github.com/apache/ice…
[8]https://github.com/apache/ice…

作者简介:

胡争(子毅),Apache Iceberg Committer,Apache HBase PMC 成员,阿里巴巴技术专家。目前次要负责 Flink 数据湖计划的设计和开发工作,Apache Iceberg 及 Apache Flink 我的项目的长期沉闷贡献者,《HBase 原理与实际》作者。

退出移动版