乐趣区

关于Flink:Flink集成Iceberg在同程艺龙的实践

本文由同城艺龙大数据开发工程师张军分享,次要介绍同城艺龙 Flink 集成 Iiceberg 的生产实践。内容包含:

  1. 背景及痛点
  2. Flink + Iceberg 的落地
  3. Iceberg 优化实际
  4. 后续工作
  5. 收益及总结

一、背景及痛点

业务背景

同程艺龙是一个提供机票、住宿、交通等服务的在线游览服务平台,目前我所在的部门属于公司的研发部门,主要职责是为公司内其余业务部门提供一些根底服务,咱们的大数据系统次要承接的业务是部门内的一些大数据相干的数据统计、剖析工作等。数据起源有网关日志数据、服务器监控数据、K8s 容器的相干日志数据,App 的打点日志, MySQL 的 binlog 日志等。咱们次要的大数据工作是基于上述日志构建实时报表,提供基于 Presto 的报表展现和即时查问服务,同时也会基于 Flink 开发一些实时、批处理工作,为业务方提供精确及时的数据撑持。

原架构计划

因为咱们所有的原始数据都是存储在 Kafka 的,所以原来的技术架构就是首先是 Flink 工作生产 Kafka 的数据,通过 Flink SQL 或者 Flink jar 的各种解决之后实时写入 Hive,其中绝大部分工作都是 Flink SQL 工作,因为我认为 SQL 开发绝对代码要简略的多,并且保护不便、好了解,所以能用 SQL 写的都尽量用 SQL 来写。
提交 Flink 的平台应用的是 Zeppelin,其中提交 Flink SQL 工作是 Zeppelin 自带的性能,提交 jar 包工作是我本人基于 Application 模式开发的 Zeppelin 插件。
对于落地到 Hive 的数据,应用开源的报表零碎 metabase (底层应用 Presto) 提供实时报表展现、定时发送邮件报表,以及自定义 SQL 查问服务。因为业务对数据的实时性要求比拟高,心愿数据能尽快的展现进去,所以咱们很多的 Flink 流式工作的 checkpoint 设置为 1 分钟,数据格式采纳的是 orc 格局。

痛点

因为采纳的是列式存储格局 ORC,无奈像行式存储格局那样进行追加操作,所以不可避免的产生了一个大数据畛域十分常见且十分辣手的问题,即 HDFS 小文件问题。

开始的时候咱们的小文件解决方案是本人写的一个小文件压缩工具,定期去合并,咱们的 Hive 分区个别都是天级别的,所以这个工具的原理就是每天凌晨启动一个定时工作去压缩昨天的数据,首先把昨天的数据写入一个长期文件夹,压缩完,和原来的数据进行记录数的比对测验,数据条数统一之后,用压缩后的数据笼罩原来的数据,然而因为无奈保障事务,所以呈现了很多问题:

  • 压缩的同时因为提早数据的到来导致昨天的 Hive 分区又有数据写入了,测验就会失败,导致合并小文件失败。
  • 替换旧数据的操作是没有事务保障的,如果替换的过程中旧分区有新的数据写入,就会笼罩新写入的数据,造成数据失落。
  • 没有事务的反对,无奈实时合并以后分区的数据,只能合并压缩前一个分区的,最新的分区数据依然有小文件的问题,导致最新数据查问性能进步不了。

二、Flink+Iceberg 的落地

Iceberg 技术调研

所以基于以上的 HDFS 小文件、查问慢等问题,联合咱们的现状,我调研了目前市面上的数据湖技术:Delta、Apache Iceberg 和 Apache Hudi,思考了目前数据湖框架反对的性能和当前的社区规划,最终咱们是抉择了 Iceberg,其中思考的起因有以下几方面:

■ Iceberg 深度集成 Flink

后面讲到,咱们的绝大部分工作都是 Flink 工作,包含批处理工作和流解决工作,目前这三个数据湖框架,Iceberg 是集成 Flink 做的最欠缺的,如果采纳 Iceberg 代替 Hive 之后,迁徙的老本十分小,对用户简直是无感知的,
比方咱们原来的 SQL 是这样的:

INSERT INTO hive_catalog.db.hive_table SELECT * FROM kafka_table

迁徙到 Iceberg 当前,只须要批改 catalog 就行。

INSERT INTO iceberg_catalog.db.iIcebergceberg_table SELECT * FROM kafka_table

Presto 查问也是和这个相似,只须要批改 catalog 就行了。

■ Iceberg 的设计架构使得查问更快

在 Iceberg 的设计架构中,manifest 文件存储了分区相干信息、data files 的相干统计信息(max/min)等,去查问一些大的分区的数据,就能够间接定位到所要的数据,而不是像 Hive 一样去 list 整个 HDFS 文件夹,工夫复杂度从 O(n) 降到了 O(1),使得一些大的查问速度有了显著的晋升,在 Iceberg PMC Chair Ryan Blue 的演讲中,咱们看到命中 filter 的工作执行工夫从 61.5 小时降到了 22 分钟。

■ 应用 Flink SQL 将 CDC 数据写入 Iceberg

Flink CDC 提供了间接读取 MySQL binlog 的形式,绝对以前须要应用 canal 读取 binlog 写入 Iceberg,而后再去生产 Iceberg 数据。少了两个组件的保护,链路缩小了,节俭了保护的老本和出错的概率。并且能够实现导入全量数据和增量数据的完满对接,所以应用 Flink SQL 将 MySQL binlog 数据导入 Iceberg 来做 MySQL->Iceberg 的导入将会是一件十分有意义的事件。

此外对于咱们最后的压缩小文件的需要,尽管 Iceberg 目前还无奈实现主动压缩,然而它提供了一个批处理工作,曾经能满足咱们的需要。

■ Hive 表迁徙 Iceberg 表

迁徙筹备工作

目前咱们的所有数据都是存储在 Hive 表的,在验证完 Iceberg 之后,咱们决定将 Hive 的数据迁徙到 Iceberg,所以我写了一个工具,能够应用 Hive 的数据,而后新建一个 Iceberg 表,为其建设相应的元数据,然而测试的时候发现,如果采纳这种形式,须要把写入 Hive 的程序进行,因为如果 Iceberg 和 Hive 应用同一个数据文件,而压缩程序会一直地压缩 Iceberg 表的小文件,压缩完之后,不会马上删除旧数据,所以 Hive 表就会查到双份的数据,故咱们采纳双写的策略,原来写入 Hive 的程序不动,新启动一套程序写入 Iceberg,这样能对 Iceberg 表察看一段时间。还能和原来 Hive 中的数据进行比对,来验证程序的正确性。

通过一段时间察看,每天将近几十亿条数据、压缩后几个 T 大小的 Hive 表和 Iceberg 表,一条数据也不差。所以在最终比照数据没有问题之后,把 Hive 表进行写入,应用新的 Iceberg 表。

迁徙工具

我将这个 Hive 表迁徙 Iceberg 表的工具做成了一个基于 Flink batch job 的 Iceberg Action,提交了社区,不过目前还没合并:https://github.com/apache/ice…。这个性能的思路是应用 Hive 原始的数据不动,而后新建一个 Iceberg table,再为这个新的 Iceberg table 生成对应的元数据,大家有需要的话能够先看看。

此外,Iceberg 社区,还有一个把现有的数据迁徙到已存在的 Iceberg table 的工具,相似 Hive 的 LOAD DATA INPATH … INTO TABLE,是用 Spark 的存储过程做的,大家也能够关注下:https://github.com/apache/ice…

三、Iceberg 优化实际

压缩小文件

目前压缩小文件是采纳的一个额定批工作来进行的,Iceberg 提供了一个 Spark 版本的 action,我在做功能测试的时候发现了一些问题,此外我对 Spark 也不是十分相熟,放心出了问题不好排查,所以参照 Spark 版本的本人实现了一个 Flink 版本,并修复了一些 bug,进行了一些性能的优化。

因为咱们的 Iceberg 的元数据都是存储在 Hive 中的,也就是咱们应用了 HiveCatalog,所以压缩程序的逻辑是把 Hive 中所有的 Iceberg 表全副都查出来,顺次压缩。压缩没有过滤条件,不论是分区表还是非分区表,都进行全表的压缩,这样做是为了解决某些应用 eventtime 的 Flink 工作。如果有提早的数据的到来,就会把数据写入以前的分区,如果不是全表压缩只压缩当天分区的话,新写入的其余天的数据就不会被压缩。

之所以没有开启定时工作来压缩,是因为比方定时五分钟压缩一个表,如果五分钟之内这个压缩工作没实现,没有提交新的 snapshot,下一个定时工作又开启了,就会把上一个没有实现的压缩工作中的数据从新压缩一次,所以每个表顺次压缩的策略能够保障某一时刻一个表只有一个工作在压缩。

代码示例参考:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles() //.maxParallelism(parallelism) //.filter(Expressions.equal("day", day)) //.targetSizeInBytes(targetSizeInBytes) .execute();

目前零碎运行稳固,曾经实现了几万次工作的压缩。

留神:
不过目前对于新公布的 Iceberg 0.11 来说,还有一个已知的 bug,即当压缩前的文件大小大于要压缩的大小(targetSizeInBytes)时,会造成数据失落,其实这个问题我在最开始测试小文件压缩的时候就发现了,并且提了一个 pr,我的策略是大于指标文件的数据文件不参加压缩,不过这个 pr 没有合并到 0.11 版本中,起初社区另外一个兄弟也发现了雷同的问题,提交了一个 pr(https://github.com/apache/ice…),策略是将这个大文件拆分到指标文件大小,目前曾经合并到 master,会在下一个 bug fix 版本 0.11.1 中公布。

查问优化

■ 批处理定时工作

目前对于定时调度中的批处理工作,Flink 的 SQL 客户端还没 Hive 那样做的很欠缺,比方执行 hive-f 来执行一个文件。而且不同的工作须要不同的资源,并行度等。

所以我本人封装了一个 Flink 程序,通过调用这个程序来进行解决,读取一个指定文件外面的 SQL,来提交批工作。在命令行管制工作的资源和并行度等。

/home/flink/bin/fFlinklinklink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql

■ 优化

批工作的查问这块,我做了一些优化工作,比方 limit 下推,filter 下推,查问并行度推断等,能够大大提高查问的速度,这些优化都曾经推回给社区,并且在 Iceberg 0.11 版本中公布。

运维治理

■ 清理 orphan 文件

  1. 定时工作删除

在应用 Iceberg 的过程中,有时候会有这样的状况,我提交了一个 Flink 工作,因为各种起因,把它停了,这个时候 Iceberg 还没提交相应的快照。此外因为一些异样导致程序失败,会产生一些不在 Iceberg 元数据外面的孤立的数据文件,这些文件对 Iceberg 来说是不可达的,也是没用的。所以咱们须要像 jvm 的垃圾回收一样来清理这些文件。

目前 Iceberg 提供了一个 Spark 版本的 action 来解决这些没用的文件,咱们采取的策略和压缩小文件一样,获取 Hive 中的所有的 Iceberg 表。每隔一个小时执行一次定时工作来删除这些没用的文件。

SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
  1. 踩坑

咱们在程序运行过程中呈现了失常的数据文件被删除的问题,通过调研,因为快照保留设置是一小时,这个清理程序清理工夫也是设置一个小时,通过日志发现是这个清理程序删除了失常的数据。查了查代码,应该是设置了一样的工夫,在清理孤立文件的时候,有其余程序正在读取要 expired 的 snapshot,导致删除了失常的数据。最初把这个清理程序的清理工夫改成默认的三天,没有再呈现删除数据文件的问题。
当然,为了保险起见,咱们能够笼罩原来的删除文件的办法,改成将文件到一个备份文件夹,查看没有问题之后,手工删除。

■ 快照过期解决

咱们的快照过期策略,是和压缩小文件的批处理工作写在一起的,压缩完小文件之后,进行表的快照过期解决,目前保留的工夫是一个小时。这是因为对于有一些比拟大的表,分区比拟多,而且 checkpoint 比拟短,如果保留的快照过长的话,还是会保留过多小文件,咱们临时没有查问历史快照的需要,所以我将快照的保留工夫设置了一个小时。

long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);table.expireSnapshots()// .retainLast(20).expireOlderThan(olderThanTimestamp).commit();

■ 数据管理

写入了数据之后,当想查看相应的快照有多少数据文件时,间接查问 Spark 无奈晓得哪个是有用的,哪个是没用的。所以须要有对应的管理工具。目前 Flink 这块还不太成熟,咱们能够应用 Spark3 提供的工具来查看。

  1. DDL

目前 create table 这些操作咱们是通过 Flink SQL Client 来做的。其余相干的 DDL 的操作能够应用 Spark 来做:https://iceberg.apache.org/sp…

  1. DML

一些相干的数据的操作,比方删除数据等能够通过 MySQL 来实现,Presto 目前只反对分区级别的删除性能。

  1. show partitions & show create table

在咱们操作 Hive 的时候,有一些很罕用的操作,比方 show partitions、show create table 等,这些目前 Flink 还没有反对,所以在操作 Iceberg 的时候就很不不便,咱们本人基于 Flink 1.12 做 了批改,不过目前还没有齐全提交到社区,后续有工夫会提交到 Flink 和 Iceberg 社区。

四、后续工作

  • Flink SQL 接入 CDC 数据到 Iceberg

目前在咱们外部的版本中,我曾经测试通过能够应用 Flink SQL 将 CDC 数据(比方 MySQL binlog)写入 Iceberg,社区的版本中实现该性能还须要做一些工作,我也提交了一些相干的 PR 来推动这个工作。

  • 应用 SQL 进行删除和更新

对于 copy-on-write 表,咱们能够应用 Spark SQL 来进行行级的删除和更新。具体的反对的语法能够参考源码中的测试类:

org.apache.iceberg.spark.extensions.TestDelete & org.apache.iceberg.spark.extensions.TestUpdate,这些性能我在测试环境测试是能够的,然而还没有来得及更新到生产。

  • 应用 Flink SQL 进行 streaming read

在工作中会有一些这样的场景,因为数据比拟大,Iceberg 的数据只存了较短的工夫,如果很可怜因为程序写错了等起因,想从更早的工夫来生产就无能为力了。
当引入了 Iceberg 的 streaming read 之后,这些问题就能够解决了,因为 Iceberg 存储了所有的数据,当然这里有一个前提就是对于数据没有要求特地准确,比方达到秒级别,因为目前 Flink 写入 Iceberg 的事务提交是基于 Flink Checkpoint 距离的。

五、收益及总结

通过对 Iceberg 大略一个季度的调研,测试,优化和 bug 修复,咱们将现有的 Hive 表都迁徙到了 Iceberg,完满解决了原来的所有的痛点问题,目前零碎稳固运行,而且绝对 Hive 失去了很多的收益:

  • Flink 写入的资源缩小

举一个例子,默认配置下,原来一个 flink 读取 kafka 写入 hive 的工作,须要 60 个并行度才不会让 Kafka 产生积压。改成写入 iceberg 之后,只须要 20 个并行度就够了。

  • 查问速度变快

后面咱们讲到 Iceberg 查问的时候不会像 Hive 一样去 list 整个文件夹来获取分区数据,而是先从 manifest 文件中获取相干数据,查问的性能失去了显著的晋升,一些大的报表的查问速度从 50 秒进步到 30 秒。

  • 并发读写

因为 Iceberg 的事务反对,咱们能够实现对一个表进行并发读写,Flink 流式数据实时入湖,压缩程序同时压缩小文件,清理过期文件和快照的程序同时清理无用的文件,这样就能更及时的提供数据,做到分钟级的提早,查问最新分区数据的速度大大放慢了,并且因为 Iceberg 的 ACID 个性能够保证数据的准确性。

  • time travel

能够回溯查问以前某一时刻的数据。

总结一下,咱们目前能够实现应用 Flink SQL 对 Iceberg 进行批、流的读写,并能够对小文件进行实时的压缩,应用 Spark SQL 做一些 delete 和 update 工作以及一些 DDL 操作,后续能够应用 Flink SQL 将 CDC 的数据写入 Iceberg。目前对 Iceberg 的所有的优化和 bug fix,我曾经奉献给社区。因为笔者程度无限,有时候也不免有谬误,还请大家不吝赐教。

作者介绍:
张军,同程艺龙大数据开发工程师

退出移动版