乐趣区

关于sql:汽车之家基于-Flink-Iceberg-的湖仓一体架构实践

简介:由汽车之家实时计算平台负责人邸星星在 4 月 17 日上海站 Meetup 分享的,基于 Flink + Iceberg 的湖仓一体架构实际。

内容简要:

一、数据仓库架构降级的背景

二、基于 Iceberg 的湖仓一体架构实际

三、总结与收益

四、后续布局

GitHub 地址
https://github.com/apache/flink
欢送大家给 Flink 点赞送 star~

一、数据仓库架构降级的背景

1. 基于 Hive 的数据仓库的痛点

原有的数据仓库齐全基于 Hive 建造而成,次要存在三大痛点:

痛点一:不反对 ACID

1)不反对 Upsert 场景;

2)不反对 Row-level delete,数据修改老本高。

痛点二:时效性难以晋升

1)数据难以做到准实时可见;

2)无奈增量读取,无奈实现存储层面的流批对立;

3)无奈反对分钟级提早的数据分析场景。

痛点三:Table Evolution

1)写入型 Schema,对 Schema 变更反对不好;

2)Partition Spec 变更反对不敌对。

2. Iceberg 要害个性

Iceberg 次要有四大要害个性:反对 ACID 语义、增量快照机制、凋谢的表格局和流批接口反对。

  • 反对 ACID 语义

    • 不会读到不残缺的 Commit;
    • 基于乐观锁反对并发 Commit;
    • Row-level delete,反对 Upsert。
  • 增量快照机制

    • Commit 后数据即可见(分钟级);
    • 可回溯历史快照。
  • 凋谢的表格局

    • 数据格式:parquet、orc、avro
    • 计算引擎:Spark、Flink、Hive、Trino/Presto
  • 流批接口反对

    • 反对流、批写入;
    • 反对流、批读取。

二、基于 Iceberg 的湖仓一体架构实际

湖仓一体的意义就是说我不须要看见湖和仓,数据有着买通的元数据的格局,它能够自在的流动,也能够对接下层多样化的计算生态。

——贾扬清(阿里云计算平台高级研究员)

1. Append 流入湖的链路

上图为日志类数据入湖的链路,日志类数据蕴含客户端日志、用户端日志以及服务端日志。这些日志数据会实时录入到 Kafka,而后通过 Flink 工作写到 Iceberg 外面,最终存储到 HDFS。

2. Flink SQL 入湖链路买通

咱们的 Flink SQL 入湖链路买通是基于“Flink 1.11 + Iceberg 0.11”实现的,对接 Iceberg Catalog 咱们次要做了以下内容:

1)Meta Server 减少对 Iceberg Catalog 的反对;

2)SQL SDK 减少 Iceberg Catalog 反对。

而后在这根底上,平台凋谢 Iceberg 表的治理性能,使得用户能够本人在平台上建 SQL 的表。

3. 入湖 – 反对代理用户

第二步是外部的实际,对接现有估算体系、权限体系。

因为之前平台做实时作业的时候,平台都是默认为 Flink 用户去运行的,之前存储不波及 HDFS 存储,因而可能没有什么问题,也就没有思考估算划分方面的问题。

然而当初写 Iceberg 的话,可能就会波及一些问题。比方数仓团队有本人的集市,数据就应该写到他们的目录上面,估算也是划到他们的估算下,同时权限和离线团队账号的体系买通。

如上所示,这块次要是在平台上做了代理用户的性能,用户能够去指定用哪个账号去把这个数据写到 Iceberg 外面,实现过程次要有以下三个。

  • 减少 Table 级别配置:’iceberg.user.proxy’ = ‘targetUser’

    1)启用 Superuser

    2)团队账号鉴权

  • 拜访 HDFS 时启用代理用户:

  • 拜访 Hive Metastore 时指定代理用户

    1)参考 Spark 的相干实现:

    org.apache.spark.deploy.security.HiveDelegationTokenProvider

    2)动静代理 HiveMetaStoreClient,应用代理用户拜访 Hive metastore

4. Flink SQL 入湖示例

DDL + DML

5. CDC 数据入湖链路

如上所示,咱们有一个 AutoDTS 平台,负责业务库数据的实时接入。咱们会把这些业务库的数据接入到 Kafka 外面,同时它还反对在平台上配置散发工作,相当于把进 Kafka 的数据散发到不同的存储引擎里,在这个场景下是散发到 Iceberg 里。

6. Flink SQL CDC 入湖链路买通

上面是咱们基于“Flink1.11 + Iceberg 0.11”反对 CDC 入湖所做的改变:

  • 改良 Iceberg Sink:

    Flink 1.11 版本为 AppendStreamTableSink,无奈解决 CDC 流,批改并适配。

  • 表治理

    1)反对 Primary key(PR1978)

    2)开启 V2 版本:’iceberg.format.version’ = ‘2’

7. CDC 数据入湖

1. 反对 Bucket

Upsert 场景下,须要确保同一条数据写入到同一 Bucket 下,这又如何实现?

目前 Flink SQL 语法不反对申明 bucket 分区,通过配置的形式申明 Bucket:

‘partition.bucket.source’=’id’, // 指定 bucket 字段

‘partition.bucket.num’=’10’, // 指定 bucket 数量

2. Copy-on-write sink

做 Copy-on-Write 的起因是本来社区的 Merge-on-Read 不反对合并小文件,所以咱们长期去做了 Copy-on-write sink 的实现。目前业务始终在测试应用,成果良好。

上方为 Copy-on-Write 的实现,其实跟原来的 Merge-on-Read 比拟相似,也是有 StreamWriter 多并行度写入 FileCommitter 单并行度程序提交

在 Copy-on-Write 外面,须要依据表的数据量正当设置 Bucket 数,无需额定做小文件合并。

  • StreamWriter 在 snapshotState 阶段多并行度写入

    1)减少 Buffer;

    2)写入前须要判断上次 checkpoint 曾经 commit 胜利;

    3)按 bucket 分组、合并,一一 Bucket 写入。

  • FileCommitter 单并行度程序提交

    1)table.newOverwrite()

    2)Flink.last.committed.checkpoint.id

8. 示例 – CDC 数据配置入湖

如上图所示,在理论应用中,业务方能够在 DTS 平台上创立或配置散发工作即可。

实例类型抉择 Iceberg 表,而后抉择指标库,表明要把哪个表的数据同步到 Iceberg 里,而后能够选原表和指标表的字段的映射关系是什么样的,配置之后就能够启动散发工作。启动之后,会在实时计算平台 Flink 外面提交一个实时工作,接着用 Copy-on-write sink 去实时地把数据写到 Iceberg 表外面。

9. 入湖其余实际

实际一:缩小 empty commit

  • 问题形容:

    在上游 Kafka 长期没有数据的状况下,每次 Checkpoint 依旧会生成新的 Snapshot,导致大量的空文件和不必要的 Snapshot。

  • 解决方案(PR – 2042):

    减少配置 Flink.max-continuousempty-commits,在间断指定次数 Checkpoint 都没有数据后才真正触发 Commit,生成 Snapshot。

实际二:记录 watermark

  • 问题形容:

    目前 Iceberg 表自身无奈间接反映数据写入的进度,离线调度难以精准触发上游工作。

  • 解决方案(PR – 2109):

    在 Commit 阶段将 Flink 的 Watermark 记录到 Iceberg 表的 Properties 中,可直观的反映端到端的提早状况,同时能够用来判断分区数据完整性,用于调度触发上游工作。

实际三:删表优化

  • 问题形容:

    删除 Iceberg 可能会很慢,导致平台接口相应超时。因为 Iceberg 是面向对象存储来形象 IO 层的,没有疾速革除目录的办法。

  • 解决方案:

    扩大 FileIO,减少 deleteDir 办法,在 HDFS 上疾速删除表数据。

10. 小文件合并及数据清理

定期为每个表执行批处理工作(spark 3),分为以下三个步骤:

1. 定期合并新增分区的小文件:

​ rewriteDataFilesAction.execute(); 仅合并小文件,不会删除旧文件。

2. 删除过期的 snapshot,清理元数据及数据文件:

​ table.expireSnapshots().expireOld erThan(timestamp).commit();

3. 清理 orphan 文件,默认清理 3 天前,且无奈涉及的文件:

​ removeOrphanFilesAction.older Than(timestamp).execute();

11. 计算引擎 – Flink

Flink 是实时平台的外围计算引擎,目前次要反对数据入湖场景,次要有以下几个方面的特点。

  • 数据准实时入湖:

    Flink 和 Iceberg 在数据入湖方面集成度最高,Flink 社区被动拥抱数据湖技术。

  • 平台集成:

    AutoStream 引入 IcebergCatalog,反对通过 SQL 建表、入湖 AutoDTS 反对将 MySQL、SQLServer、TiDB 表配置入湖。

  • 流批一体:

    在流批一体的理念下,Flink 的劣势会逐步体现进去。

12. 计算引擎 – Hive

Hive 在 SQL 批处理层面 Iceberg 和 Spark 3 集成度更高,次要提供以下三个方面的性能。

  • 定期小文件合并及 meta 信息查问:

    SELECT * FROM prod.db.table.history 还可查看 snapshots, files, manifests。

  • 离线数据写入:

    1)Insert into 2)Insert overwrite 3)Merge into

  • 剖析查问:

    次要反对日常的准实时剖析查问场景。

13. 计算引擎 – Trino/Presto

AutoBI 曾经和 Presto 集成,用于报表、剖析型查问场景。

  • Trino

    1)间接将 Iceberg 作为报表数据源

    2)须要减少元数据缓存机制:https://github.com/trinodb/trino/issues/7551

  • Presto

    社区集成中:https://github.com/prestodb/presto/pull/15836

14. 踩过的坑

1. 拜访 Hive Metastore 异样

问题形容:HiveConf 的构造方法的误用,导致 Hive 客户端中申明的配置被笼罩,导致拜访 Hive metastore 时异样

解决方案(PR-2075):修复 HiveConf 的结构,显示调用 addResource 办法,确保配置不会被笼罩:hiveConf.addResource(conf);

2.Hive metastore 锁未开释

问题形容:“CommitFailedException: Timed out after 181138 ms waiting for lock xxx.”起因是 hiveMetastoreClient.lock 办法,在未取得锁的状况下,也须要显示 unlock,否则会导致下面异样。

解决方案(PR-2263):优化 HiveTableOperations#acquireLock 办法,在获取锁失败的状况下显示调用 unlock 来开释锁。

3. 元数据文件失落

问题形容:Iceberg 表无法访问,报“NotFoundException Failed to open input stream for file : xxx.metadata.json”

解决方案(PR-2328):当调用 Hive metastore 更新 iceberg 表的 metadata\_location 超时后,减少查看机制,确认元数据未保留胜利后再删除元数据文件。

三、收益与总结

1. 总结

​ 通过对湖仓一体、流批交融的摸索,咱们别离做了总结。

  • 湖仓一体

    1)Iceberg 反对 Hive Metastore;

    2)总体应用上与 Hive 表相似:雷同数据格式、雷同的计算引擎。

  • 流批交融

    准实时场景下实现流批对立:同源、同计算、同存储。

2. 业务收益

  • 数据时效性晋升:

    入仓提早从 2 小时以上升高到 10 分钟以内;算法外围工作 SLA 提前 2 小时实现。

  • 准实时的剖析查问:

    联合 Spark 3 和 Trino,反对准实时的多维分析查问。

  • 特色工程提效:

    提供准实时的样本数据,进步模型训练时效性。

  • CDC 数据准实时入仓:

    能够在数仓针对业务表做准实时剖析查问。

3. 架构收益 – 准实时数仓

上方也提到了,咱们反对准实时的入仓和剖析,相当于是为后续的准实时数仓建设提供了根底的架构验证。准实时数仓的劣势是一次开发、口径对立、对立存储,是真正的批流一体。劣势是实时性较差,原来可能是秒级、毫秒级的提早,当初是分钟级的数据可见性。

然而在架构层面上,这个意义还是很大的,后续咱们能看到一些心愿,能够把整个原来“T + 1”的数仓,做成准实时的数仓,晋升数仓整体的数据时效性,而后更好地反对上下游的业务。

四、后续布局

1. 跟进 Iceberg 版本

全面凋谢 V2 格局,反对 CDC 数据的 MOR 入湖。

2. 建设准实时数仓

基于 Flink 通过 Data pipeline 模式对数仓各层表全面提速。

3. 流批一体

随着 upsert 性能的逐步完善,继续摸索存储层面流批一体。

4. 多维分析

基于 Presto/Spark3 输入准实时多维分析。

版权申明:本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

退出移动版