关于flink:使用-Flink-Hudi-构建流式数据湖平台

18次阅读

共计 4531 个字符,预计需要花费 12 分钟才能阅读完成。

摘要:本文整顿自阿里巴巴技术专家陈玉兆 (玉兆)、阿里巴巴开发工程师刘大龙 (风离) 在 Flink Forward Asia 2021 的分享。次要内容包含:

  1. Apache Hudi 101
  2. Flink Hudi Integration
  3. Flink Hudi Use Case
  4. Apache Hudi Roadmap

FFA 2021 直播回放 & 演讲 PDF 下载

一、Apache Hudi 101

提到数据湖,大家都会有这样的疑难,什么是数据湖?为什么数据湖近两年热度很高?数据湖其实不是一个新的概念,最早的数据湖概念在 80 年代就曾经提出,过后对数据湖的定义是原始数据层,能够寄存各种结构化、半结构化甚至非结构化的数据。像机器学习、实时剖析等很多场景都是在查问的时候确定数据的 Schema。

湖存储成本低、灵活性高的个性,十分实用于做查问场景的中心化存储。随同着近年来云服务的衰亡,尤其是对象存储的成熟,越来越多的企业抉择在云上构建存储服务。数据湖的存算拆散架构非常适合以后的云服务架构,通过快照隔离的形式,提供根底的 acid 事务,同时反对对接多种剖析引擎适配不同的查问场景,能够说湖存储在老本和开放性上占了极大劣势。

以后的湖存储曾经开始承当数仓的性能,通过和计算引擎对接实现湖仓一体的架构。湖存储是一种 table format,在原有的 data format 根底上封装了 table 的高级语义。Hudi 从 2016 年开始将数据湖投入实际,过后是为了解决大数据场景下文件系统上的数据更新问题,Hudi 类 LSM 的 table format 以后在湖格局中是自成一家的,对近实时更新比拟敌对,语义也绝对欠缺。

Table format 是以后风行的三种数据湖格局的根底属性,而 Hudi 从我的项目之初就始终朝着平台方向去演变,领有比较完善的数据治理和 table service,比方用户在写入的时候能够并发地优化文件的布局,metadata table 能够大幅优化写入时查问端的文件查找效率。

上面介绍一些 Hudi 的根底概念。

Timeline service 是 Hudi 事务层的外围形象,Hudi 所有数据操作都是围绕着 timeline service 来开展的,每次操作通过 instant 形象绑定一个特定的工夫戳,一连串的 instant 形成了 timeline service,每一个 instance 记录了对应的 action 和状态。通过 timeline service,Hudi 能够晓得以后表操作的状态,通过一套文件系统视图的形象联合 timeline service,能够对 table 以后的 reader 和 writer 裸露特定工夫戳下的文件布局视图。

file group 是 Hudi 在文件布局层的外围形象,每一个 file group 相当于一个 bucket,通过文件大小来来划分,它的每次写入行为都会产生一个新的版本,一个版本被形象为一个 file slice,file slice 外部保护了相应版本的数据文件。当一个 file group 写入到规定的文件大小的时候,就会切换一个新的 file group。

Hudi 在 file slice 的写入行为能够形象成两种语义,copy on write 和 merge on read。

copy on write 每次都会写全量数据,新数据会和上一个 file slice 的数据 merge,而后再写一个新的 file slice,产生一个新的 bucket 的文件。

而 merge on read 则比较复杂一些,它的语义是追加写入,即每次只写增量数据,所以不会写新的 file slice。它首先会尝试追加之前的 file slice,只有当该写入的 file slice 被纳入压缩打算之后,才会切新的 file slice。

二、Flink Hudi Integration

Flink Hudi 的写入 pipeline 由几个算子形成。第一个算子负责将 table 层的 rowdata 转换成 Hudi 的音讯格局 HudiRecord。接着通过一个 Bucket Assigner,它次要负责将曾经转好的 HudiRecord 调配到特定的 file group 中,接着分好 file group 的 record 会流入 Writer 算子执行真正的文件写入。最初还有一个 coordinator,负责 Hudi table 层的 table service 调度以及新事务的发动和提交。此外,还有一些后盾的清理角色负责清理老版本的数据。

以后的设计中,每一个 bucket assign task 都会持有一个 bucket assigner,它独立保护本人的一组 file group。在写入新数据或非更新 insert 数据的时候,bucket assign task 会扫描文件视图,优先将这一批新的数据写入到被断定为小 bucket 的 file group 里。

比方上图,file group 默认大小是 120M,那么左图的 task1 会优先写到 file group1 和 file group2,留神这里不会写到 file group3,这是因为 file group3 曾经有 100M 数据,对于比拟靠近指标阈值的 bucket 不再写入能够防止适度写放大。而右图中的 task2 会间接写一个新的 file group,不会去追加那些曾经写的比拟大的 file group 了。

接下来介绍 Flink Hudi 写流程的状态切换机制。作业刚启动时,coordinator 会先尝试去文件系统上新建这张表,如果以后表不存在,它就会去文件目录上写一些 meta 信息,也就是构建一个表。收到所有 task 的初始化 meta 信息后,coordinator 会开启一个新的 transaction,write task 看到 transaction 的发动后,就会解锁以后数据的 flush 行为。

Write Task 会先积攒一批数据,这里有两种 flush 策略,一种是以后的数据 buffer 达到了指定的大小,就会把内存中的数据 flush 进来;另一种是当上游的 checkpoint barrier 达到须要做快照的时候,会把所有内存中的数据 flush 到磁盘。每次 flush 数据之后都会把 meta 信息发送给 coordinator。coordinator 收到 checkpoint 的 success 事件后,会提交对应的事务,并且发动下一个新的事务。writer task 看到新事务后,又会解锁下一轮事务的写入。这样,整个写入流程就串起来了。

Flink Hudi Write 提供了十分丰盛的写入场景。以后反对对 log 数据类型的写入,即非更新的数据类型,同时反对小文件合并。另外对于 Hudi 的外围写入场景比方更新流、CDC 数据也都是 Hudi 重点反对的。同时,Flink Hudi 还反对历史数据的高效率批量导入,bucket insert 模式能够一次性将比方 Hive 中的离线数据或者数据库中的离线数据,通过批量查问的形式,高效导入 Hudi 格局中。另外,Flink Hudi 还提供了全量和增量的索引加载,用户能够一次性将批量数据高效导入湖格局,再通过对接流的写入程序,实现全量接增量的数据导入。

Flink Hudi read 端也反对了十分丰盛的查问视图,目前次要反对的有全量读取、历史工夫 range 的增量读取以及流式读取。

上图是一段通过 Flink sql 写 Hudi 的例子,Hudi 反对的 use case 十分丰盛,也尽量简化了用户须要配置的参数。通过简略配置表 path、并发以及 operation type,用户能够十分不便地将上游的数据写入到 Hudi 格局中。

三、Flink Hudi Use Case

上面介绍 Flink Hudi 的经典利用场景。

第一个经典场景是 DB 导入数据湖。目前 DB 数据导入数据湖有两种形式:能够通过 CDC connector 一次性将全量和增量数据导入到 Hudi 格局中;也能够通过生产 Kafka 上的 CDC changelog,通过 Flink 的 CDC format 将数据导入到 Hudi 格局。

第二个经典场景是流计算的 ETL (近实时的 olap 剖析)。通过对接上游流计算简略的一些 ETL,比方双流 join 或双流 join 接一个 agg,间接将变更流写入到 Hudi 格局中,而后上游的 read 端能够对接传统经典的 olap 引擎比方 presto、spark 来做端到端的近实时查问。

第三个经典场景和第二个有些相似,Hudi 反对原生的 changelog,也就是反对保留 Flink 计算中行级别的变更。基于这个能力,通过流读生产变更的形式,能够实现端到端的近实时的 ETL 生产。

将来,社区两个大版本次要的精力还是放在流读和流写方向,并且会增强流读的语义;另外在 catalog 和 metadata 方面会做自治理;咱们还会在近期推出一个 trino 原生的 connector 反对,取代以后读 Hive 的形式,提高效率。

四、Apache Hudi Roadmap

上面是一个 MySql 到 Hudi 千表入湖的演示。

首先数据源这里咱们筹备了两个库,benchmark1 和 benchmark2,benchmark1 上面有 100 张表,benchmark2 上面有 1000 张表。因为千表入湖强依赖于 catalog,所以咱们首先要创立 catalog,对于数据源咱们要创立 MySql catalog,对于指标咱们要创立 Hudi catalog。MySql catalog 用于获取所有源表相干的信息,包含表构造、表的数据等。Hudi catalog 用于创立指标。

执行两条 sql 语句当前,两条 catalog 就创立胜利了。

接下来到作业开发页面创立一个千表入湖的作业。只须要简略的 9 行 SQL,第一种语法是 create database as database,它的作用是把 MySql benchmark1 库下所有的表构造和表数据一键同步到 Hudi CDS demo 库,表的关系是一对一映射。第二条语法是 create table as table,它的作用是把 MySql benchmark2 库下所有匹配 sbtest. 正则表达式的表同步到 Hudi 的 DB1 下的 ctas_dema 表外面,是多对一的映射关系,会做分库分表的合并。

接着咱们运行并上线,而后到作业运维的页面去启动作业,能够看到配置信息曾经更新了,阐明曾经从新上线过。接着点击启动按钮,启动作业。而后就能够到作业总览页面查看作业相干的状态信息。

上图是作业的拓扑,非常复杂,有 1100 张源表和 101 张指标表。这里咱们做了一些优化 —— source merge,把所有的表合并到一个节点里,能够在增量 binlog 拉取阶段只拉取一次,加重对 MySql 的压力。

接下来刷新 oss 页面,能够看到曾经多了一个 cdas_demo 门路,进入 subtest1 门路,能够看到曾经有元数据在写入,表明数据其实在写入过程中。

再到作业开发页面写一个简略的 SQL 查问某张表,来验证一下数据是否真的在写入。执行上图 SQL 语句,能够看到数据曾经能够查问到,这些数据与插入的数据是统一的。

咱们利用 catalog 提供的元数据能力,联合 CDS 和 CTS 语法,通过几行简略的 SQL,就能轻松实现几千张表的数据入湖,极大简化了数据入湖的流程,升高了开发运维的工作量。


FFA 2021 直播回放 & 演讲 PDF 下载

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

正文完
 0