共计 5480 个字符,预计需要花费 14 分钟才能阅读完成。
作者:余东
本文介绍去哪儿数据平台在应用 Flink + Iceberg 0.11 的一些实际。内容包含:
- 背景及痛点
- Iceberg 架构
- 痛点一:Kafka 数据失落
- 痛点二:近实时 Hive 压力大
- Iceberg 优化实际
- 总结
一、背景及痛点
1. 背景
咱们在应用 Flink 做实时数仓以及数据传输过程中,遇到了一些问题:比方 Kafka 数据失落,Flink 联合 Hive 的近实时数仓性能等。Iceberg 0.11 的新个性解决了这些业务场景碰到的问题。比照 Kafka 来说,Iceberg 在某些特定场景有本人的劣势,在此咱们做了一些基于 Iceberg 的实际分享。
2. 原架构计划
原先的架构采纳 Kafka 存储实时数据,其中包含日志、订单、车票等数据。而后用 Flink SQL 或者 Flink datastream 生产数据进行流转。外部自研了提交 SQL 和 Datastream 的平台,通过该平台提交实时作业。
3. 痛点
- Kafka 存储老本高且数据量大。Kafka 因为压力大将数据过期工夫设置的比拟短,当数据产生反压,积压等状况时,如果在肯定的工夫内没生产数据导致数据过期,会造成数据失落。
- Flink 在 Hive 上做了近实时的读写反对。为了分担 Kafka 压力,将一些实时性不太高的数据放入 Hive,让 Hive 做分钟级的分区。然而随着元数据一直减少,Hive metadata 的压力日益显著,查问也变得更慢,且存储 Hive 元数据的数据库压力也变大。
二、Iceberg 架构
1. Iceberg 架构解析
术语解析
-
数据文件(data files)
Iceberg 表实在存储数据的文件,个别存储在 data 目录下,以“.parquet”结尾。
-
清单文件(Manifest file)
每行都是每个数据文件的详细描述,包含数据文件的状态、文件门路、分区信息、列级别的统计信息(比方每列的最大最小值、空值数等)。通过该文件,可过滤掉无关数据,进步检索速度。
-
快照(Snapshot)
快照代表一张表在某个时刻的状态。每个快照版本蕴含某个时刻的所有数据文件列表。Data files 存储在不同的 manifest files 外面,manifest files 存储在一个 Manifest list 文件外面,而一个 Manifest list 文件代表一个快照。
2. Iceberg 查问打算
查问打算是在表中查找“查问所需文件”的过程。
-
元数据过滤
清单文件包含分区数据元组和每个数据文件的列级统计信息。在打算期间,查问谓词会主动转换为分区数据上的谓词,并首先利用于过滤数据文件。接下来,应用列级值计数,空计数,上限和下限来打消与查问谓词不匹配的文件。
-
Snapshot ID
每个 Snapshot ID 会关联到一组 manifest files,而每一组 manifest files 蕴含很多 manifest file。
-
manifest files 文件列表
每个 manifest files 又记录了以后 data 数据块的元数据信息,其中就蕴含了文件列的最大值和最小值,而后依据这个元数据信息,索引到具体的文件块,从而更快的查问到数据。
三、痛点一:Kafka 数据失落
1. 痛点介绍
通常咱们会抉择 Kafka 做实时数仓,以及日志传输。Kafka 自身存储老本很高,且数据保留工夫有时效性,一旦生产积压,数据达到过期工夫后,就会造成数据失落且没有生产到。
2. 解决方案
将实时要求不高的业务数据入湖、比如说能承受 1-10 分钟的提早。因为 Iceberg 0.11 也反对 SQL 实时读取,而且还能保留历史数据。这样既能够加重线上 Kafka 的压力,还能确保数据不失落的同时也能实时读取。
3 . 为什么 Iceberg 只能做近实时入湖?
- Iceberg 提交 Transaction 时是以文件粒度来提交。这就没法以秒为单位提交 Transaction,否则会造成文件数量收缩;
- 没有在线服务节点。对于实时的高吞吐低提早写入,无奈失去纯实时的响应;
- Flink 写入以 checkpoint 为单位,物理数据写入 Iceberg 后并不能间接查问,当触发了 checkpoint 才会写 metadata 文件,这时数据由不可见变为可见。checkpoint 每次执行都会有肯定工夫。
4. Flink 入湖剖析
组件介绍
-
IcebergStreamWriter
次要用来写入记录到对应的 avro、parquet、orc 文件,生成一个对应的 Iceberg DataFile,并发送给上游算子。
另外一个叫做 IcebergFilesCommitter,次要用来在 checkpoint 到来时把所有的 DataFile 文件收集起来,并提交 Transaction 到 Apache Iceberg,实现本次 checkpoint 的数据写入,生成 DataFile。
-
IcebergFilesCommitter
为每个 checkpointId 保护了一个 DataFile 文件列表,即 map<Long, List<DataFile>>,这样即便两头有某个 checkpoint 的 transaction 提交失败了,它的 DataFile 文件依然保护在 State 中,仍然能够通过后续的 checkpoint 来提交数据到 Iceberg 表中。
5. Flink SQL Demo
Flink Iceberg 实时入湖流程,生产 Kafka 数据写入 Iceberg,并从 Iceberg 近实时读取数据。
5.1 前期工作
-
开启实时读写性能
set execution.type = streaming
-
开启 table sql hint 性能来应用 OPTIONS 属性
set table.dynamic-table-options.enabled=true
-
注册 Iceberg catalog 用于操作 Iceberg 表
CREATE CATALOG Iceberg_catalog WITH (\n"+" 'type'='Iceberg',\n"+" 'catalog-type'='Hive',"+" 'uri'='thrift://localhost:9083'" + ");
-
Kafka 实时数据入湖
insert into Iceberg_catalog.Iceberg_db.tbl1 \n select * from Kafka_tbl;
-
数据湖之间实时流转 tbl1 -> tbl2
insert into Iceberg_catalog.Iceberg_db.tbl2 select * from Iceberg_catalog.Iceberg_db.tbl1 /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s',snapshot-id'='3821550127947089987')*/
5.2 参数解释
-
monitor-interval
间断监督新提交的数据文件的工夫距离 (默认值:1s)。
-
start-snapshot-id
从指定的快照 ID 开始读取数据、每个快照 ID 关联的是一组 manifest file 元数据文件,每个元数据文件映射着本人的实在数据文件,通过快照 ID,从而读取到某个版本的数据。
6. 踩坑记录
我之前在 SQL Client 写数据到 Iceberg,data 目录数据始终在更新,然而 metadata 没有数据,导致查问的时候没有数,因为 Iceberg 的查问是须要元数据来索引实在数据的。SQL Client 默认没有开启 checkpoint,须要通过配置文件来开启状态。所以会导致 data 目录写入数据而 metadata 目录不写入元数据。
PS:无论通过 SQL 还是 Datastream 入湖,都必须开启 Checkpoint。
7. 数据样例
上面两张图展现的是实时查问 Iceberg 的成果,一秒前和一秒后的数据变动状况。
- 一秒前的数据
- 一秒后刷新的数据
四、痛点二:Flink 联合 Hive 的近实时越来越慢
1. 痛点介绍
选用 Flink + Hive 的近实时架构尽管反对了实时读写,然而这种架构带来的问题是随着表和分区增多,将会面临以下问题:
-
元数据过多
Hive 将分区改为小时 / 分钟级,尽管进步了数据的准实时性,然而 metestore 的压力也是不言而喻的,元数据过多导致生成查问打算变慢,而且还会影响线上其余业务稳固。
-
数据库压力变大
随着元数据减少,存储 Hive 元数据的数据库压力也会减少,一段时间后,还须要对该库进行扩容,比方存储空间。
2. 解决方案
将原先的 Hive 近实时迁徙到 Iceberg。为什么 Iceberg 能够解决元数据量大的问题,而 Hive 在元数据大的时候却容易造成瓶颈?
- Iceberg 是把 metadata 保护在可拓展的分布式文件系统上,不存在中心化的元数据系统;
- Hive 则是把 partition 之上的元数据保护在 metastore 外面 (partition 过多则给 mysql 造成微小压力),而 partition 内的元数据其实是保护在文件内的(启动作业须要列举大量文件能力确定文件是否须要被扫描,整个过程十分耗时)。
五、优化实际
1. 小文件解决
-
Iceberg 0.11 以前,通过定时触发 batch api 进行小文件合并,这样尽管能合并,然而须要保护一套 Actions 代码,而且也不是实时合并的。
Table table = findTable(options, conf); Actions.forTable(table).rewriteDataFiles() .targetSizeInBytes(10 * 1024) // 10KB .execute();
-
Iceberg 0.11 新个性,反对了流式小文件合并。
通过分区 / 存储桶键应用哈希混洗形式写数据、从源头间接合并文件,这样的益处在于,一个 task 会解决某个分区的数据,提交本人的 Datafile 文件,比方一个 task 只解决对应分区的数据。这样防止了多个 task 解决提交很多小文件的问题,且不须要额定的保护代码,只需在建表的时候指定属性 write.distribution-mode,该参数与其它引擎是通用的,比方 Spark 等。
CREATE TABLE city_table ( province BIGINT, city STRING ) PARTITIONED BY (province, city) WITH ('write.distribution-mode'='hash');
2. Iceberg 0.11 排序
2.1 排序介绍
在 Iceberg 0.11 之前,Flink 是不反对 Iceberg 排序功能的,所以之前只能联合 Spark 以批模式来反对排序功能,0.11 新增了排序个性的反对,也意味着,咱们在实时也能够领会到这个益处。
排序的实质是为了扫描更快,因为依照 sort key 做了聚合之后,所有的数据都依照从小到大排列,max-min 能够过滤掉大量的有效数据。
2.2 排序 demo
insert into Iceberg_table select days from Kafka_tbl order by days, province_id;
3. Iceberg 排序后 manifest 详解
参数解释
- file_path:物理文件地位。
- partition:文件所对应的分区。
- lower_bounds:该文件中,多个排序字段的最小值,下图是我的 days 和 province_id 最小值。
- upper_bounds:该文件中,多个排序字段的最大值,下图是我的 days 和 province_id 最大值。
通过分区、列的上上限信息来确定是否读取 file_path 的文件,数据排序后,文件列的信息也会记录在元数据中,查问打算从 manifest 去定位文件,不须要把信息记录在 Hive metadata,从而加重 Hive metadata 压力,晋升查问效率。
利用 Iceberg 0.11 的排序个性,将天作为分区。按天、小时、分钟进行排序,那么 manifest 文件就会记录这个排序规定,从而在检索数据的时候,进步查问效率,既能实现 Hive 分区的检索长处,还能防止 Hive metadata 元数据过多带来的压力。
六、总结
相较于之前的版本来说,Iceberg 0.11 新增了许多实用的性能,比照了之前应用的旧版本,做以下总结:
-
Flink + Iceberg 排序功能
在 Iceberg 0.11 以前,排序功能集成了 Spark,但没有集成 Flink,过后用 Spark + Iceberg 0.10 批量迁徙了一批 Hive 表。在 BI 上的收益是:原先 BI 为了晋升 Hive 查问速度建了多级分区,导致小文件和元数据过多,入湖过程中,利用 Spark 排序 BI 常常查问的条件,联合隐式分区,最终晋升 BI 检索速度的同时,也没有小文件的问题,Iceberg 有本身的元数据,也缩小了 Hive metadata 的压力。
Icebeg 0.11 反对了 Flink 的排序,是一个很实用的性能点。咱们能够把原先 Flink + Hive 的分区转移到 Iceberg 排序中,既能达到 Hive 分区的成果,也能缩小小文件和晋升查问效率。
-
实时读取数据
通过 SQL 的编程形式,即可实现数据的实时读取。益处在于,能够把实时性要求不高的,比方业务能够承受 1-10 分钟提早的数据放入 Iceberg 中,在缩小 Kafka 压力的同时,也能实现数据的近实时读取,还能保留历史数据。
-
实时合并小文件
在 Iceberg 0.11 以前,须要用 Iceberg 的合并 API 来保护小文件合并,该 API 须要传入表信息,以及定时信息,且合并是按批次这样进行的,不是实时的。从代码上来说,减少了保护和开发成本;从时效性来说,不是实时的。0.11 用 Hash 的形式,从源头对数据进行实时合并,只需在 SQL 建表时指定 (‘write.distribution-mode’=’hash’) 属性即可,不须要手工保护。