乐趣区

关于Flink:Flink-Iceberg-在去哪儿的实时数仓实践

作者:余东

本文介绍去哪儿数据平台在应用 Flink + Iceberg 0.11 的一些实际。内容包含:

  • 背景及痛点
  • Iceberg 架构
  • 痛点一:Kafka 数据失落
  • 痛点二:近实时 Hive 压力大
  • Iceberg 优化实际
  • 总结

一、背景及痛点

1. 背景

咱们在应用 Flink 做实时数仓以及数据传输过程中,遇到了一些问题:比方 Kafka 数据失落,Flink 联合 Hive 的近实时数仓性能等。Iceberg 0.11 的新个性解决了这些业务场景碰到的问题。比照 Kafka 来说,Iceberg 在某些特定场景有本人的劣势,在此咱们做了一些基于 Iceberg 的实际分享。

2. 原架构计划

原先的架构采纳 Kafka 存储实时数据,其中包含日志、订单、车票等数据。而后用 Flink SQL 或者 Flink datastream 生产数据进行流转。外部自研了提交 SQL 和 Datastream 的平台,通过该平台提交实时作业。

3. 痛点

  • Kafka 存储老本高且数据量大。Kafka 因为压力大将数据过期工夫设置的比拟短,当数据产生反压,积压等状况时,如果在肯定的工夫内没生产数据导致数据过期,会造成数据失落。
  • Flink 在 Hive 上做了近实时的读写反对。为了分担 Kafka 压力,将一些实时性不太高的数据放入 Hive,让 Hive 做分钟级的分区。然而随着元数据一直减少,Hive metadata 的压力日益显著,查问也变得更慢,且存储 Hive 元数据的数据库压力也变大。

二、Iceberg 架构

1. Iceberg 架构解析

术语解析

  • 数据文件(data files)

    Iceberg 表实在存储数据的文件,个别存储在 data 目录下,以“.parquet”结尾。

  • 清单文件(Manifest file)

    每行都是每个数据文件的详细描述,包含数据文件的状态、文件门路、分区信息、列级别的统计信息(比方每列的最大最小值、空值数等)。通过该文件,可过滤掉无关数据,进步检索速度。

  • 快照(Snapshot)

    快照代表一张表在某个时刻的状态。每个快照版本蕴含某个时刻的所有数据文件列表。Data files 存储在不同的 manifest files 外面,manifest files 存储在一个 Manifest list 文件外面,而一个 Manifest list 文件代表一个快照。

2. Iceberg 查问打算

查问打算是在表中查找“查问所需文件”的过程。

  • 元数据过滤

    清单文件包含分区数据元组和每个数据文件的列级统计信息。在打算期间,查问谓词会主动转换为分区数据上的谓词,并首先利用于过滤数据文件。接下来,应用列级值计数,空计数,上限和下限来打消与查问谓词不匹配的文件。

  • Snapshot ID

    每个 Snapshot ID 会关联到一组 manifest files,而每一组 manifest files 蕴含很多 manifest file。

  • manifest files 文件列表

    每个 manifest files 又记录了以后 data 数据块的元数据信息,其中就蕴含了文件列的最大值和最小值,而后依据这个元数据信息,索引到具体的文件块,从而更快的查问到数据。

三、痛点一:Kafka 数据失落

1. 痛点介绍

通常咱们会抉择 Kafka 做实时数仓,以及日志传输。Kafka 自身存储老本很高,且数据保留工夫有时效性,一旦生产积压,数据达到过期工夫后,就会造成数据失落且没有生产到。

2. 解决方案

将实时要求不高的业务数据入湖、比如说能承受 1-10 分钟的提早。因为 Iceberg 0.11 也反对 SQL 实时读取,而且还能保留历史数据。这样既能够加重线上 Kafka 的压力,还能确保数据不失落的同时也能实时读取。

3 . 为什么 Iceberg 只能做近实时入湖?

  1. Iceberg 提交 Transaction 时是以文件粒度来提交。这就没法以秒为单位提交 Transaction,否则会造成文件数量收缩;
  2. 没有在线服务节点。对于实时的高吞吐低提早写入,无奈失去纯实时的响应;
  3. Flink 写入以 checkpoint 为单位,物理数据写入 Iceberg 后并不能间接查问,当触发了 checkpoint 才会写 metadata 文件,这时数据由不可见变为可见。checkpoint 每次执行都会有肯定工夫。

4. Flink 入湖剖析

组件介绍

  • IcebergStreamWriter

    次要用来写入记录到对应的 avro、parquet、orc 文件,生成一个对应的 Iceberg DataFile,并发送给上游算子。

另外一个叫做 IcebergFilesCommitter,次要用来在 checkpoint 到来时把所有的 DataFile 文件收集起来,并提交 Transaction 到 Apache Iceberg,实现本次 checkpoint 的数据写入,生成 DataFile。

  • IcebergFilesCommitter

    为每个 checkpointId 保护了一个 DataFile 文件列表,即 map<Long, List<DataFile>>,这样即便两头有某个 checkpoint 的 transaction 提交失败了,它的 DataFile 文件依然保护在 State 中,仍然能够通过后续的 checkpoint 来提交数据到 Iceberg 表中。

5. Flink SQL Demo

Flink Iceberg 实时入湖流程,生产 Kafka 数据写入 Iceberg,并从 Iceberg 近实时读取数据。

5.1 前期工作

  • 开启实时读写性能

    set execution.type = streaming

  • 开启 table sql hint 性能来应用 OPTIONS 属性

    set table.dynamic-table-options.enabled=true

  • 注册 Iceberg catalog 用于操作 Iceberg 表

    CREATE CATALOG Iceberg_catalog WITH (\n"+"  'type'='Iceberg',\n"+"  'catalog-type'='Hive',"+"  'uri'='thrift://localhost:9083'" +
                ");
  • Kafka 实时数据入湖

    insert into Iceberg_catalog.Iceberg_db.tbl1 \n 
                select * from Kafka_tbl;
  • 数据湖之间实时流转 tbl1 -> tbl2

      insert into Iceberg_catalog.Iceberg_db.tbl2  
        select * from Iceberg_catalog.Iceberg_db.tbl1 
        /*+ OPTIONS('streaming'='true', 
    'monitor-interval'='10s',snapshot-id'='3821550127947089987')*/

5.2 参数解释

  • monitor-interval

    间断监督新提交的数据文件的工夫距离 (默认值:1s)。

  • start-snapshot-id

    从指定的快照 ID 开始读取数据、每个快照 ID 关联的是一组 manifest file 元数据文件,每个元数据文件映射着本人的实在数据文件,通过快照 ID,从而读取到某个版本的数据。

6. 踩坑记录

我之前在 SQL Client 写数据到 Iceberg,data 目录数据始终在更新,然而 metadata 没有数据,导致查问的时候没有数,因为 Iceberg 的查问是须要元数据来索引实在数据的。SQL Client 默认没有开启 checkpoint,须要通过配置文件来开启状态。所以会导致 data 目录写入数据而 metadata 目录不写入元数据。

PS:无论通过 SQL 还是 Datastream 入湖,都必须开启 Checkpoint。

7. 数据样例

上面两张图展现的是实时查问 Iceberg 的成果,一秒前和一秒后的数据变动状况。

  • 一秒前的数据

  • 一秒后刷新的数据

四、痛点二:Flink 联合 Hive 的近实时越来越慢

1. 痛点介绍

选用 Flink + Hive 的近实时架构尽管反对了实时读写,然而这种架构带来的问题是随着表和分区增多,将会面临以下问题:

  • 元数据过多

    Hive 将分区改为小时 / 分钟级,尽管进步了数据的准实时性,然而 metestore 的压力也是不言而喻的,元数据过多导致生成查问打算变慢,而且还会影响线上其余业务稳固。

  • 数据库压力变大

    随着元数据减少,存储 Hive 元数据的数据库压力也会减少,一段时间后,还须要对该库进行扩容,比方存储空间。

2. 解决方案

将原先的 Hive 近实时迁徙到 Iceberg。为什么 Iceberg 能够解决元数据量大的问题,而 Hive 在元数据大的时候却容易造成瓶颈?

  • Iceberg 是把 metadata 保护在可拓展的分布式文件系统上,不存在中心化的元数据系统;
  • Hive 则是把 partition 之上的元数据保护在 metastore 外面 (partition 过多则给 mysql 造成微小压力),而 partition 内的元数据其实是保护在文件内的(启动作业须要列举大量文件能力确定文件是否须要被扫描,整个过程十分耗时)。

五、优化实际

1. 小文件解决

  • Iceberg 0.11 以前,通过定时触发 batch api 进行小文件合并,这样尽管能合并,然而须要保护一套 Actions 代码,而且也不是实时合并的。

    Table table = findTable(options, conf);
    Actions.forTable(table).rewriteDataFiles()
            .targetSizeInBytes(10 * 1024) // 10KB
            .execute();
  • Iceberg 0.11 新个性,反对了流式小文件合并。

    通过分区 / 存储桶键应用哈希混洗形式写数据、从源头间接合并文件,这样的益处在于,一个 task 会解决某个分区的数据,提交本人的 Datafile 文件,比方一个 task 只解决对应分区的数据。这样防止了多个 task 解决提交很多小文件的问题,且不须要额定的保护代码,只需在建表的时候指定属性 write.distribution-mode,该参数与其它引擎是通用的,比方 Spark 等。

    CREATE TABLE city_table ( 
         province BIGINT,
         city STRING
      ) PARTITIONED BY (province, city) WITH ('write.distribution-mode'='hash');

2. Iceberg 0.11 排序

2.1 排序介绍

在 Iceberg 0.11 之前,Flink 是不反对 Iceberg 排序功能的,所以之前只能联合 Spark 以批模式来反对排序功能,0.11 新增了排序个性的反对,也意味着,咱们在实时也能够领会到这个益处。

排序的实质是为了扫描更快,因为依照 sort key 做了聚合之后,所有的数据都依照从小到大排列,max-min 能够过滤掉大量的有效数据。

2.2 排序 demo

insert into Iceberg_table select days from Kafka_tbl order by days, province_id;

3. Iceberg 排序后 manifest 详解

参数解释

  • file_path:物理文件地位。
  • partition:文件所对应的分区。
  • lower_bounds:该文件中,多个排序字段的最小值,下图是我的 days 和 province_id 最小值。
  • upper_bounds:该文件中,多个排序字段的最大值,下图是我的 days 和 province_id 最大值。

通过分区、列的上上限信息来确定是否读取 file_path 的文件,数据排序后,文件列的信息也会记录在元数据中,查问打算从 manifest 去定位文件,不须要把信息记录在 Hive metadata,从而加重 Hive metadata 压力,晋升查问效率。

利用 Iceberg 0.11 的排序个性,将天作为分区。按天、小时、分钟进行排序,那么 manifest 文件就会记录这个排序规定,从而在检索数据的时候,进步查问效率,既能实现 Hive 分区的检索长处,还能防止 Hive metadata 元数据过多带来的压力。

六、总结

相较于之前的版本来说,Iceberg 0.11 新增了许多实用的性能,比照了之前应用的旧版本,做以下总结:

  • Flink + Iceberg 排序功能

    在 Iceberg 0.11 以前,排序功能集成了 Spark,但没有集成 Flink,过后用 Spark + Iceberg 0.10 批量迁徙了一批 Hive 表。在 BI 上的收益是:原先 BI 为了晋升 Hive 查问速度建了多级分区,导致小文件和元数据过多,入湖过程中,利用 Spark 排序 BI 常常查问的条件,联合隐式分区,最终晋升 BI 检索速度的同时,也没有小文件的问题,Iceberg 有本身的元数据,也缩小了 Hive metadata 的压力。

    Icebeg 0.11 反对了 Flink 的排序,是一个很实用的性能点。咱们能够把原先 Flink + Hive 的分区转移到 Iceberg 排序中,既能达到 Hive 分区的成果,也能缩小小文件和晋升查问效率。

  • 实时读取数据

    通过 SQL 的编程形式,即可实现数据的实时读取。益处在于,能够把实时性要求不高的,比方业务能够承受 1-10 分钟提早的数据放入 Iceberg 中,在缩小 Kafka 压力的同时,也能实现数据的近实时读取,还能保留历史数据。

  • 实时合并小文件

    在 Iceberg 0.11 以前,须要用 Iceberg 的合并 API 来保护小文件合并,该 API 须要传入表信息,以及定时信息,且合并是按批次这样进行的,不是实时的。从代码上来说,减少了保护和开发成本;从时效性来说,不是实时的。0.11 用 Hash 的形式,从源头对数据进行实时合并,只需在 SQL 建表时指定 (‘write.distribution-mode’=’hash’) 属性即可,不须要手工保护。

退出移动版