关于大数据:实时数据湖-Flink-Hudi-实践探索

59次阅读

共计 8402 个字符,预计需要花费 22 分钟才能阅读完成。

导读: 首先做个自我介绍,我目前在阿里云云计算平台,从事钻研 Flink 和 Hudi 联合方向的相干工作。

目前,Flink + Hudi 的计划推广大略曾经有了一年半的工夫,在国内风行度也已比拟高,支流的公司也会尝试去迭代他们的数仓计划。所以,明天我介绍的主题是 Flink 和 Hudi 在数据湖 Streaming 方向的一些摸索和实际,将会围绕以下四点开展:

  • Apache Hudi 背景介绍
  • Flink Hudi 设计
  • Hudi 利用场景
  • Hudi RoadMap

点击查看直播回放

Apache Hudi 背景介绍

首先和大家分享下数据湖倒退的历史背景,以及 Hudi 的根本个性。

1.  数据湖倒退的历史背景

在我个人观点看来,传统的数仓计划(如 Hive)其实自身也是数据湖,而且我会把 Hudi、Iceberg、Delta Lake 都看成是数仓下一代新的解决方案,而不仅仅只是一种湖格局。那为什么近一年来会有数据湖这一新的数仓状态的诞生?

随同着目前云存储(尤其是对象存储)逐渐成熟的大背景,数据湖的解决方案也会逐渐往云原生凑近。如图一所示,湖格局会适配云厂商的对象存储,做云厂商多云和云厂商用例,同时适配比拟风行的大数据计算框架(如 Spark、Flink),以及查问端的 Presto、trino 以及传统 Hive 引擎,因而诞生了这样一套新的数仓解决方案。

2. Hudi 的四大外围个性

由上可知,Hudi 作为下一代的数仓解决方案,借助上下游的计算和查问引擎,实现代替传统 Hive 离线数仓的一套新计划,其外围特色整体能够总结为以下四点:

  • 开放性

开放性体现在两个方面:**

第一方面,上游反对多种数据源格局。比方传统数据库的 change log 日志、音讯队列 log 等传输方式,都会在 source 端会有十分丰盛的反对。**

第二方面,上游查问端也同样反对多种查问引擎。像支流的 OLAP 引擎 Presto、国内比拟火的 Starrocks、云厂商的 amazon redshift、数据分析产品 impala,都会对接到这样一套数仓架构外面。

所以开放性是 Hudi 的第一个特点。

  • 丰盛的事务反对

Hudi 对事务的反对水平,会比原来 Hive 数仓的要求更高,更丰盛。其中外围特点是反对在文件存储布局上做更新。在传统基于 Hive 的 T + 1 更新计划中,数据反复度会比拟高,只能实现天级别的数据新鲜度。并且随同着业务需要越来越简单,实时性要求越来越高,对数仓存储体系提出了更高的要求,对端到端的数据新鲜度要求做到分钟级或者是秒级。

其次是更新效率要求进步,不要每次都去 overwrite 整张表或者整个 partition 去更新,而是可能准确到文件粒度的部分更新来晋升存储和计算效率。Hudi 很好地满足了这些需要,因而,对 ACID 语义的加强是这套数仓架构的第二大特点。

  • 基于 ACID 语义的增量解决

在我看来,第三个亮点是在 ACID 语义根底上衍生进去的增量解决,尤其 Hudi 提出的 TimeTravel 概念,或者间接对接 Flink,Spark Streaming 等流式解决引擎的形式,不论是近实时还是常驻的 Streaming 服务,实质都是一种流式生产,都能够了解为一种增量 ETL 解决。绝对于传统 batch 调度,在计算上会更加高效,尤其像 Flink 这种有状态的计算框架,会复用之前的计算结果,间接实现端到端的全链路增量解决。其次,在数据新鲜度上有一个数量级的晋升,从“天级别”晋升到“分钟级别”。

比方说,国内目前有些实际用户会尝试应用 Flink 计算框架做湖表的 Streaming 生产,间接通过一套增量 ETL 链路去剖析从源端注入过去的数据,构建传统数仓的分层。还有一点,很多小伙伴会好奇 TimeTravel 这种 incremental 的查问设计,查问两个快照之间的增量数据,有什么用处?如果你是批模式调度的查问,支流场景是 ADS 端到上游的同步,比如说将数仓的生产后果同步到其余库表(如 ES,Mysql)可通过这种 TimeTravel 定期做这种批量同步,当你对 ADS 的同步工夫度要求没这么高,就能够用这种幂等 TimeTravel 的查问形式比拟高效地同步到其余上游端。

以上三点,是绝对于支流 Hive 架构地三个外围区别,也是目前国内外湖仓我的项目正在致力的方向。

  • 智能化调度

再补充一点,在 Hudi 外面,会尽量优化文件布局,将小文件治理这种数据治理的计划做到框架外部,实现智能化调度。这是 Hudi 区别于其余数仓计划如 Delta Lake,IceBerg 的外围特点。

Flink + Hudi 设计

1. Hudi 写入 pipeline(多算子组成的微服务架构)

从图二中能够看出,Hudi 写入 pipeline 是一个 Serverless 的微服务架构,外围是在整个 pipeline 的服务起来之后,不论是 Flink,还是 Spark Streaming,整套服务能够对表自身能达到自治理的状态。所以,不光光思考数据高效写入,同时还须要思考写入过程中的文件治理,尽量避免产生太多小文件从而优化查问端的效率。通过定期的文件合并,文件清理,避免出现小文件数量爆炸式增长的状况呈现。

另一方面,是 ACID 事务性, 尤其是实现一个待更新的 ACID 事务,需思考多方面因素。当单 job 或者单节点要 fail over 时,Hudi 能够保障疾速找到之前写入的谬误数据,并且实现 rollback 回滚。所以,Hudi 的事务层反对是目前三个湖存储外面做的最欠缺,最高效的。

以 copy on write 的具体实现为例,会将上游的 SQL 原生数据结构转换成 Hudi 的数据结构,为了反对并发写入,咱们会对每个 shuffle 后的数据分 bucket。

次要有两点:

第一个是新增数据,会尽量写入到以后已存在的比拟小的 bucket 外面。同时,为了防止生成小文件,也会尽量保障每个 bucket 的大小和预期大小雷同。

第二点是更新数据,Hudi 设计了 key 主键,每个 key 的音讯都保护在一个 bucket 外部,每次更新都会写入到之前的 bucket 外面。而 IceBerg,Delta Lake 就只管写入,不会去管文件布局,因而他们会把查问端的一些合并和清理做得很重,所以查问效率会比拟低。相比之下,Hudi 简单的写入过程和 bucket 策略就是在衡量和思考读写效率。这里所说的 bucket 概念,有点相似 Snowflake 里的 micro partition 概念,会在传统的 partition 分区上面再细化,以文件粒度来保护某个 range 下音讯的生命周期。以更细粒度去保护生命周期,可无效晋升数据更新和查问效率。

第二个算子之后,数据依据每个 bucket 做好分区,咱们会依照 bucket ID 做一遍 shuffle,交给 write task 去写入。为何要依照 bucket ID 从新 shuffle?次要是为了保护两个 write task 不能同时并发批改一个 bucket 的更新语义,否则容易造成更新抵触。

所以,从整体上看,这三个算子能够高效保障并发写入、更新。能够比拟显著看到,第二个算子的并发度其实决定了整个更新的并发度,决定以后可能同时更新和写入的 bucket 数量,而前面的算子能够自在独立地扩大。从实践经验举荐第二个和第三个算子的并发设置一样,当吞吐量不是很高的时候,一个 bucket 交给一个 write task 去写,吞吐量比拟高的时候,可能一个 bucket 的 write task 可能会分多个,能够调整到 1:2 的比例。

后盾还会启动 clean commits 的清理工作。数据 commit 操作产生在 coordinator 组件内,保障每个 write task 的 commit 大略对齐了 checkpoint 之后,数据才会 flush 进来,并且有一部分元数据信息,会对立提交给 coordinator,coordinator 收集到统计信息之后,会去联合 checkpoint 实现的事件做一次提交,实在的提交是在 coordinator 内。当 coordinator 实现提交之后,Hudi 表会发动一个新的事务,只有当 write task 看到这个新的事务,才可能发动新事务的写入动作。所以两头存在一个异步期待的过程,相似于一个小型的状态机。

而 Flink 的快照所保障的语义其实是一个 best effort 语义,一旦收到某个 checkpoint 的胜利事件,就标记后面的状态都是胜利的,但两头可能存在 checkpoint 被 abort 状况。

因为 Hudi 须要保障每个写入的完整性和 Exactly once 语义,就须要考量两头的写入不能越界,比如说 checkpoint 的事件数据不能写入下个 checkpoint,这样 Exactly once 语义就没方法保障。

在 0.11 版本会尝试做一些优化,比方说 checkpoint 被 abort 之后的状态是否复用。外面波及一个状态的切换,绝对会比较复杂。不像 Spark Streaming 每次都是微批的形象,每次先发动一个工作,人造保障了 exactly once,容错语义交给框架。Flink 怎么把这个异步算法和很强的 exactly once 语义联合在一起,是这套架构的一个难点所在。

2. 小文件策略

接下来,咱们认真看看文件写入的第二个算子 bucket assign 的具体决策。即 新音讯如何去抉择放到哪个 bucket,如图三所示,分两种状况介绍。

首先,左侧框图中有三个 bucket,蓝色代表以后曾经存储文件的大小,如果是 insert 数据,策略是每次抉择以后残余空间最多的 bucket 写入。为何不思考抉择残余空间起码的 bucket 呢?因为须要思考到 COW 的写放大问题,效率比拟低。更新数据时,先找到保护以后 key 的 bucket,而后写入。这样并不会造成文件大小的有限增长,因为每个 record 记录更新前后的大小根本近似,文件大小不会有显著的变动。影响文件大小的次要是 insert 数据,文件大小会设置阈值,维持在 120M 左右。

图中右侧框图是一个比拟极其的状况,两个 bucket 只剩下很小的写入空间,思考到写放大影响,会从新创立一个新的 bucket 从新写入。

为了进步并发写的吞吐量,会给每个 bucket assign task 调配一套独立的 bucket 管理策略,并利用 Hash 算法把 bucket ID 以固定的规定 hash 到每个 bucket assign task 上面,做到了并发决策。因而,管制 bucket assign task 并发度就绝对管制了写入小文件数量,在写入吞吐量和小文件之间的衡量。

3. 全量 + 增量 读取

介绍完数据写入过程,再看下数据读取的流读局部。流读的全量读和增量读是如何实现的? 如图四所示,Hudi 中 TimeLine 保留每个事务提交的毫秒工夫戳,每个工夫戳会对应一个快照版本,会记录在元数据外面。全量读时会扫全表的文件,会把整个全表的文件扫描进去,当你没有配置内置的 Metadata 索引表时,会间接扫全表,把文件系统中所有的文件都找进去。如果启用了 Metadata 表,就会在 Metadata 表(KV 存储)里扫描这个文件信息,以绝对比拟高的效率扫描全表文件,而后发给上游,并且增量的局部会定期(默认 60s)监听扫描 TimeLine 察看有没有新的 commits,同步发给上游读写,每次增量的局部会基于上一次下发的工夫线点位,而后始终查找到以后最新的 commit time。

Split mornitor 算子负责保护这样一套监听增量文件信息的规定,下发给真正执行读取的 task。

最近在 master 版本也反对了批模式的 TimeTravel 查问(某个时间段的点查),以前的版本尽管反对然而会有些问题,比方增量局部 meta 文件如果被 archive、或者被清理,数据完整性就没有保障。新版本在保障在读取效率前提下,通过实现两个快照、commit 之间的批模式增量读取形式应答这两个问题,保证数据残缺度。

Hudi 利用场景

目前 Flink + Hudi 在国内曾经是十分风行的技术架构,这边总结三个利用场景向大家介绍一下。

1. 近实时 DB 数据入仓 / 湖

这套架构的 DB 数据入湖入仓外围特色是把原来 T + 1 的数据新鲜度晋升到分钟级别。 数据新鲜度通过目前比拟火的以 Debezium、Maxwell 为代表的 CDC(change Data Capture)技术实现。以 Streaming 近实时的形式同步到数仓外面。在传统的 Hive 数仓中想保障实时是十分艰难的,尤其是文件更新,湖表实时写入更新,根本不可能实现。CDC 技术对数仓自身存储是有要求的,首先是更新效率得足够高,可能反对以 Streaming 形式写入,并且可能十分高效的更新。尤其是 CDC log 在更新过程还可能会乱序,如何保障这种乱序更新的 ACID 语义,是有很高要求的,以后能满足乱序更新的湖格局只有 Hudi 能做到,而且 Hudi 还思考到了更新的效率问题,是目前来说比拟先进的架构。

图五下方的计划相比下面的计划,比拟适宜目前体量比拟大(每天增量能达到亿级别地)、数据平台比拟健全的公司,两头有一套对立的数据同步计划(汇总不同源表数据同步至音讯队列),音讯队列承当了数据的容错、容灾、缓存性能。同时,这套计划的扩展性也更加好。通过 kafka 的 topic subscribe 形式,能够比拟灵便地散发数据。

2. 近实时 OLAP

第二个场景是近实时的 OLAP 场景,分钟级别的端到端数据新鲜度,同时又十分凋谢的 OLAP 查问引擎能够适配。其实是对 kappa 架构或者是原先 Streaming 数仓架构的一套新解法。在没有这套架构之前,实时剖析会跳过 Hudi 间接把数据双写到 OLAP 零碎中,比方 ClickHouse、ES、MongoDB 等。当仓存储曾经能够反对高效率分级别更新,可能对接 OLAP 引擎,那么这套架构就被大大简化,首先不必双写,一份数据就能够保障 only one truth 语义,防止双写带来数据完整性的问题。其次因为湖格局自身是十分凋谢的,在查问端引擎能够有更多抉择,比方 Hudi 就反对 Presto、trino、Spark、Starrocks、以及云厂商的 redshift 引擎,会有十分高的灵便度。、

所以,这种近实时的 OLAP 架构,总结就是以下两点: ①对立上游存储端;②凋谢上游查问端。

但这套架构的数据新鲜度大略是 5 分钟级别,如果要做到像 kappa 秒级别的架构的话,目前 Hudi 还是不太适宜的,因为自身比拟依赖 Flink 的 CheckPoint 机制(反对端到端的 exactly once 语义),所以不能做到高频次的提交。

3. 近实时 ETL

第三个场景是目前比拟前沿的架构,在国内也缓缓开始尝试这套架构。当数据源数据体量自身不大的时候,比方说源头过去的并不是 kafka,可能源头只是一个 Mysql 的 binlog,QPS 每秒可能也就几百。那么这套架构是一个十分稳固且省事的架构,不光光是实现了这种端到端的增量解决,同时还解决两头数据入仓的需要。其实就是提供了两套形象,首先承当了一个数仓两头存储的一个存储形象,把数据间接以湖格局入仓;第二个是提供 Queue 能力,相似于 Kafka 这种音讯队列的能力,可用 Streaming 形式增量生产,并且能够在其上做一些增量计算。 就这一套架构间接对立原来的 Lambda 和 kappa 架构,就是 kafka 的存储形象加数仓文件的存储形象合并在一个存储形象外面,同时没有减少过多的存储老本。大家可能前面用的都是对象存储或者是以便宜存储的模式存在的 HDFS。

整套架构解决了两个问题,第一是双写问题。 在 Lambda 架构下,数据先写 kafka,而后入仓,保障这两份数据的一致性语义比拟难。而且 kafka 开启 exactly once 写入后吞吐量会降落很多,Kafka 和 HDFS 之间的数据如何保障一致性呢?有人会了解去流读 kafka,把 kafka 数据再起一个 job 同步到 HDFS,这样计算资源、保护作业、同步老本都是原来的两倍。

第二个解决的问题是中间层查问需要。中间层数据间接入仓,并且不是以高效率形式更新,当须要对中间层 DWD 表做一些 join 操作时,能够间接和引擎对接,而不须要去思考说 Lambda 架构 T + 1 更新效率的问题。湖格局分钟级时效性很大水平缓解了这个问题。

除此之外,这套架构还有个益处,能够依据不同地利用场景抉择丰盛的 OLAP 查问引擎,间接以表面的形式接入库存储,很不便地进行 OLAP 剖析。

4. 阿里云 VVP 实时入湖

接下来,简略讲一下目前阿里云 VVP 产品实时入湖的集成,次要还是入湖状态,阿里云内置 Flink 版本会有一个内置 Hudi connector,大家能够通过 FlinkSQL 形式疾速构建入湖工作,间接写湖表,对接 Hudi CDC connector 或者对接 kafka CDC format,实现数据疾速入湖。

并且在入湖过程中,提供了商业版个性,如 schema evolution。CE,CTAS 语法反对 schema evolution 而后同时咱们会主推 DLF catalog 元数据管理组件,DLF catalog 会和 EMR 的 DLF 无缝集成,若是 EMR 通过 spark 写入,这边也能够看到,Flink 入湖工作写完之后也能够治理,通过 DLF 组件,能够间接通过 EMR 查问端引擎剖析 Hudi 格局数据。

这是目前的一套推给商业化用户的技术计划,入湖通过 VVP 服务,剖析通过 EMR,前期 VVP 可能会集成更多能力,如流批对立,满足用户的流读需要。

近期 Hudi RoadMap

如图九所示,我将简略介绍下近期 Hudi 0.12 版本以及 1.0 版本将会做的一些 feature。

首先,咱们会推出相似于 Delta2.0 的 CDC Feed 性能,因为目前咱们反对的 CDC 要求输出必须是一个 CDC,Hudi 会用 CDC 格局把它存下来。CDC Feed 的区别就是不必保障整体的输出是 CDC 格局,即便呈现 absert 语义,或者 CDC 的两头数据有失落,能够残缺还原出 CDC 给主端。这个个性在读写吞吐量和资源上做些衡量,不会像目前这套架构解决 CDC 那么高效。

第二点是 Meta Service 服务。把元数据的治理插件化,通过对立的 Meta Service plugable 模式,对立治理 Hudi 上的表和工作。

第三点,咱们目前还在布局做 Secondary Index 二级索引。因为在目前的 master 版本中,Flink Spark 都已实现 data skipping 能力(在写入时,如果用户开启 Meta Data 表,同时开启 data skipping,会额定记录每个 column 的统计信息),最典型的是每个 column 会建一个 Max, Min,开启一个元数据的减速,晋升文件级别的查问效率。后续还会反对相似于数据库的二级索引,为某个专门的 column 实现相似于 LSM 的形象索引,构建实用于点查场景的高效索引计划。

最初,后续咱们还会做相似于特色工程的按列更新性能开发,相似于 Clickhouse 的 Merge Tree 形象,独立存储某个 column。 因为在机器学习的特色工程中大量特色须要成千上万个字段,每次生成一个特色都须要更新一个 column,所以要求单个 column 要具备高效的更新能力。为了适配这样的场景,Hudi 也会继续去摸索。

答疑

Q1:间接应用 Hudi 存储更新,相比间接 CDC 到 Starrocks,这两种计划哪一个更好?感觉 Starrocks 的 QPS 应该会高于 Hudi 的更新速度。

A1:确实是这样,因为 Starrocks service 会应用相似于 LSM 的高效主键索引,并且内存外面做 partition 策略,保护比拟多的二级索引,元数据信息。而且,最重要的一点就是在写入更新 main table 时会应用攒批的操作,先把屡次写入汇入 buffer 而后进行一次 flash,并且在数据的 flush 上也能够攒批,这也是为什么 Starrocks 更新效率上更高的起因。

但同时,因为有了 server 集群,会带来两个问题,首先,会带来较高的运维老本,其次,内存模式相比于 Hudi 的 serverless 格局的开销会更大。

Hudi 格局在开放性上,对于 Starrocks 会有肯定劣势,不光能够对接 Starrocks、还能够对接 Presto、Spark 等支流 OLAP 引擎。

这就是他们的区别,两种计划的侧重点不同,还须要依据理论利用场景进行抉择。如果只是做 OLAP 利用,Starrocks 更适宜。但如果想构建数仓,应用 Starrocks 代替 Hudi 的老本应该太高了。因为 Hudi 面向的场景次要时数仓,迭代的次要时 Hive 传统数仓,更有劣势。

Q2:流量数据入湖场景下,应用 MOR(Merge on Read)表,还是 COW(Copy on Write)表更适合?

A2:如果流量数据体量比拟大,倡议应用 MOR 表。以目前的实测计划,QPS 不超过两万,COW 表还是能够撑持的。超过两万之后,比拟举荐 MOR online compaction 形式。如果 QPS 更高,那可能须要把压缩工作再独立进去,这是目前能给到的一个计划。

正文完
 0