导读: 首先做个自我介绍,我目前在阿里云云计算平台,从事钻研 Flink 和 Hudi 联合方向的相干工作。

目前,Flink + Hudi 的计划推广大略曾经有了一年半的工夫,在国内风行度也已比拟高,支流的公司也会尝试去迭代他们的数仓计划。所以,明天我介绍的主题是 Flink 和 Hudi 在数据湖 Streaming 方向的一些摸索和实际,将会围绕以下四点开展:

  • Apache Hudi 背景介绍
  • Flink Hudi 设计
  • Hudi 利用场景
  • Hudi RoadMap

点击查看直播回放

Apache Hudi背景介绍

首先和大家分享下数据湖倒退的历史背景,以及Hudi的根本个性。

1.  数据湖倒退的历史背景

在我个人观点看来,传统的数仓计划(如 Hive)其实自身也是数据湖,而且我会把Hudi、Iceberg、Delta Lake 都看成是数仓下一代新的解决方案,而不仅仅只是一种湖格局。那为什么近一年来会有数据湖这一新的数仓状态的诞生?

随同着目前云存储(尤其是对象存储)逐渐成熟的大背景,数据湖的解决方案也会逐渐往云原生凑近。如图一所示,湖格局会适配云厂商的对象存储,做云厂商多云和云厂商用例,同时适配比拟风行的大数据计算框架(如Spark、Flink),以及查问端的 Presto、trino 以及传统 Hive 引擎,因而诞生了这样一套新的数仓解决方案。

2. Hudi 的四大外围个性

由上可知,Hudi 作为下一代的数仓解决方案,借助上下游的计算和查问引擎,实现代替传统 Hive 离线数仓的一套新计划,其外围特色整体能够总结为以下四点:

  • 开放性

开放性体现在两个方面:**

第一方面,上游反对多种数据源格局。比方传统数据库的 change log 日志、音讯队列 log 等传输方式,都会在 source 端会有十分丰盛的反对。**

第二方面,上游查问端也同样反对多种查问引擎。像支流的 OLAP 引擎 Presto、国内比拟火的 Starrocks、云厂商的 amazon redshift、数据分析产品 impala,都会对接到这样一套数仓架构外面。

所以开放性是 Hudi 的第一个特点。

  • 丰盛的事务反对

Hudi 对事务的反对水平,会比原来 Hive 数仓的要求更高,更丰盛。其中外围特点是反对在文件存储布局上做更新。在传统基于 Hive 的 T + 1 更新计划中,数据反复度会比拟高,只能实现天级别的数据新鲜度。并且随同着业务需要越来越简单,实时性要求越来越高,对数仓存储体系提出了更高的要求,对端到端的数据新鲜度要求做到分钟级或者是秒级。

其次是更新效率要求进步,不要每次都去 overwrite 整张表或者整个 partition 去更新,而是可能准确到文件粒度的部分更新来晋升存储和计算效率。Hudi很好地满足了这些需要,因而,对ACID语义的加强是这套数仓架构的第二大特点。

  • 基于ACID语义的增量解决

在我看来,第三个亮点是在ACID语义根底上衍生进去的增量解决,尤其Hudi提出的TimeTravel概念,或者间接对接Flink,Spark Streaming等流式解决引擎的形式,不论是近实时还是常驻的Streaming服务,实质都是一种流式生产,都能够了解为一种增量ETL解决。绝对于传统batch调度,在计算上会更加高效,尤其像Flink这种有状态的计算框架,会复用之前的计算结果,间接实现端到端的全链路增量解决。其次,在数据新鲜度上有一个数量级的晋升,从“天级别”晋升到“分钟级别”。    

比方说,国内目前有些实际用户会尝试应用Flink计算框架做湖表的Streaming生产,间接通过一套增量ETL链路去剖析从源端注入过去的数据,构建传统数仓的分层。还有一点,很多小伙伴会好奇TimeTravel这种incremental的查问设计,查问两个快照之间的增量数据,有什么用处?如果你是批模式调度的查问,支流场景是ADS端到上游的同步,比如说将数仓的生产后果同步到其余库表(如ES,Mysql)可通过这种TimeTravel定期做这种批量同步,当你对ADS的同步工夫度要求没这么高,就能够用这种幂等TimeTravel的查问形式比拟高效地同步到其余上游端。

以上三点,是绝对于支流Hive架构地三个外围区别,也是目前国内外湖仓我的项目正在致力的方向。

  • 智能化调度

再补充一点,在Hudi外面,会尽量优化文件布局,将小文件治理这种数据治理的计划做到框架外部,实现智能化调度。这是Hudi区别于其余数仓计划如Delta Lake,IceBerg的外围特点。

Flink + Hudi设计

1. Hudi写入pipeline(多算子组成的微服务架构)

从图二中能够看出,Hudi写入pipeline是一个Serverless的微服务架构,外围是在整个pipeline的服务起来之后,不论是Flink,还是Spark Streaming,整套服务能够对表自身能达到自治理的状态。所以,不光光思考数据高效写入,同时还须要思考写入过程中的文件治理,尽量避免产生太多小文件从而优化查问端的效率。通过定期的文件合并,文件清理,避免出现小文件数量爆炸式增长的状况呈现。

另一方面,是ACID事务性, 尤其是实现一个待更新的ACID事务,需思考多方面因素。当单job或者单节点要fail over时,Hudi能够保障疾速找到之前写入的谬误数据,并且实现rollback回滚。所以,Hudi的事务层反对是目前三个湖存储外面做的最欠缺,最高效的。

以copy on write的具体实现为例,会将上游的SQL原生数据结构转换成Hudi的数据结构,为了反对并发写入,咱们会对每个shuffle后的数据分bucket。

次要有两点:

第一个是新增数据,会尽量写入到以后已存在的比拟小的bucket外面。同时,为了防止生成小文件,也会尽量保障每个bucket的大小和预期大小雷同。

第二点是更新数据,Hudi设计了key主键,每个key的音讯都保护在一个bucket外部,每次更新都会写入到之前的bucket外面。而IceBerg,Delta Lake就只管写入,不会去管文件布局,因而他们会把查问端的一些合并和清理做得很重,所以查问效率会比拟低。相比之下,Hudi简单的写入过程和bucket策略就是在衡量和思考读写效率。这里所说的bucket概念,有点相似Snowflake里的micro partition概念,会在传统的partition分区上面再细化,以文件粒度来保护某个range下音讯的生命周期。以更细粒度去保护生命周期,可无效晋升数据更新和查问效率。

第二个算子之后,数据依据每个bucket做好分区,咱们会依照bucket ID做一遍shuffle,交给write task去写入。为何要依照bucket ID从新shuffle?次要是为了保护两个write task不能同时并发批改一个bucket的更新语义,否则容易造成更新抵触。

所以,从整体上看,这三个算子能够高效保障并发写入、更新。能够比拟显著看到,第二个算子的并发度其实决定了整个更新的并发度,决定以后可能同时更新和写入的bucket数量,而前面的算子能够自在独立地扩大。从实践经验举荐第二个和第三个算子的并发设置一样,当吞吐量不是很高的时候,一个bucket交给一个write task去写,吞吐量比拟高的时候,可能一个bucket的write task可能会分多个,能够调整到1:2的比例。

后盾还会启动clean commits的清理工作。数据commit操作产生在coordinator组件内,保障每个write task的commit大略对齐了checkpoint之后,数据才会flush进来,并且有一部分元数据信息,会对立提交给coordinator,coordinator收集到统计信息之后,会去联合checkpoint实现的事件做一次提交,实在的提交是在coordinator内。当coordinator实现提交之后,Hudi表会发动一个新的事务,只有当write task看到这个新的事务,才可能发动新事务的写入动作。所以两头存在一个异步期待的过程,相似于一个小型的状态机。

而Flink的快照所保障的语义其实是一个best effort语义,一旦收到某个checkpoint的胜利事件,就标记后面的状态都是胜利的,但两头可能存在checkpoint被abort状况。

因为Hudi须要保障每个写入的完整性和Exactly once语义,就须要考量两头的写入不能越界,比如说checkpoint的事件数据不能写入下个checkpoint,这样Exactly once语义就没方法保障。

在0.11版本会尝试做一些优化,比方说checkpoint被abort之后的状态是否复用。外面波及一个状态的切换,绝对会比较复杂。不像Spark Streaming每次都是微批的形象,每次先发动一个工作,人造保障了exactly once,容错语义交给框架。Flink怎么把这个异步算法和很强的exactly once语义联合在一起,是这套架构的一个难点所在。

2. 小文件策略

接下来,咱们认真看看文件写入的第二个算子bucket assign的具体决策。即新音讯如何去抉择放到哪个bucket,如图三所示,分两种状况介绍。

首先,左侧框图中有三个bucket,蓝色代表以后曾经存储文件的大小,如果是insert数据,策略是每次抉择以后残余空间最多的bucket写入。为何不思考抉择残余空间起码的bucket呢?因为须要思考到COW的写放大问题,效率比拟低。更新数据时,先找到保护以后key的bucket,而后写入。这样并不会造成文件大小的有限增长,因为每个record记录更新前后的大小根本近似,文件大小不会有显著的变动。影响文件大小的次要是insert数据,文件大小会设置阈值,维持在120M左右。

图中右侧框图是一个比拟极其的状况,两个bucket只剩下很小的写入空间,思考到写放大影响,会从新创立一个新的bucket从新写入。

为了进步并发写的吞吐量,会给每个bucket assign task调配一套独立的bucket管理策略,并利用Hash算法把bucket ID以固定的规定hash到每个bucket assign task 上面,做到了并发决策。因而,管制bucket assign task并发度就绝对管制了写入小文件数量,在写入吞吐量和小文件之间的衡量。

3. 全量 + 增量 读取

介绍完数据写入过程,再看下数据读取的流读局部。流读的全量读和增量读是如何实现的? 如图四所示,Hudi中TimeLine保留每个事务提交的毫秒工夫戳,每个工夫戳会对应一个快照版本,会记录在元数据外面。全量读时会扫全表的文件,会把整个全表的文件扫描进去,当你没有配置内置的Metadata索引表时,会间接扫全表,把文件系统中所有的文件都找进去。如果启用了Metadata表,就会在Metadata表(KV存储)里扫描这个文件信息,以绝对比拟高的效率扫描全表文件,而后发给上游,并且增量的局部会定期(默认60s)监听扫描TimeLine察看有没有新的commits,同步发给上游读写,每次增量的局部会基于上一次下发的工夫线点位,而后始终查找到以后最新的commit time。

Split mornitor算子负责保护这样一套监听增量文件信息的规定,下发给真正执行读取的task。

最近在master版本也反对了批模式的TimeTravel查问(某个时间段的点查),以前的版本尽管反对然而会有些问题,比方增量局部meta文件如果被archive、或者被清理,数据完整性就没有保障。新版本在保障在读取效率前提下,通过实现两个快照、commit之间的批模式增量读取形式应答这两个问题,保证数据残缺度。

Hudi利用场景

目前Flink + Hudi在国内曾经是十分风行的技术架构,这边总结三个利用场景向大家介绍一下。

1. 近实时DB数据入仓/湖

这套架构的DB数据入湖入仓外围特色是把原来T + 1的数据新鲜度晋升到分钟级别。 数据新鲜度通过目前比拟火的以Debezium、Maxwell为代表的CDC(change Data Capture)技术实现。以Streaming近实时的形式同步到数仓外面。在传统的Hive数仓中想保障实时是十分艰难的,尤其是文件更新,湖表实时写入更新,根本不可能实现。CDC技术对数仓自身存储是有要求的,首先是更新效率得足够高,可能反对以Streaming形式写入,并且可能十分高效的更新。尤其是CDC log在更新过程还可能会乱序,如何保障这种乱序更新的ACID语义,是有很高要求的,以后能满足乱序更新的湖格局只有Hudi能做到,而且Hudi还思考到了更新的效率问题,是目前来说比拟先进的架构。

图五下方的计划相比下面的计划,比拟适宜目前体量比拟大(每天增量能达到亿级别地)、数据平台比拟健全的公司,两头有一套对立的数据同步计划(汇总不同源表数据同步至音讯队列),音讯队列承当了数据的容错、容灾、缓存性能。同时,这套计划的扩展性也更加好。通过kafka的topic subscribe形式,能够比拟灵便地散发数据。

2. 近实时OLAP

第二个场景是近实时的OLAP场景,分钟级别的端到端数据新鲜度,同时又十分凋谢的OLAP查问引擎能够适配。其实是对kappa架构或者是原先Streaming数仓架构的一套新解法。在没有这套架构之前,实时剖析会跳过Hudi间接把数据双写到OLAP零碎中,比方ClickHouse、ES、MongoDB等。当仓存储曾经能够反对高效率分级别更新,可能对接OLAP引擎,那么这套架构就被大大简化,首先不必双写,一份数据就能够保障only one truth语义,防止双写带来数据完整性的问题。其次因为湖格局自身是十分凋谢的,在查问端引擎能够有更多抉择,比方Hudi就反对Presto、trino、Spark、Starrocks、以及云厂商的redshift引擎,会有十分高的灵便度。、

所以,这种近实时的OLAP架构,总结就是以下两点: ①对立上游存储端;②凋谢上游查问端。

但这套架构的数据新鲜度大略是5分钟级别,如果要做到像kappa秒级别的架构的话,目前Hudi还是不太适宜的,因为自身比拟依赖Flink的CheckPoint机制(反对端到端的exactly once语义),所以不能做到高频次的提交。

3. 近实时ETL

第三个场景是目前比拟前沿的架构,在国内也缓缓开始尝试这套架构。当数据源数据体量自身不大的时候,比方说源头过去的并不是kafka,可能源头只是一个Mysql的binlog,QPS每秒可能也就几百。那么这套架构是一个十分稳固且省事的架构,不光光是实现了这种端到端的增量解决,同时还解决两头数据入仓的需要。其实就是提供了两套形象,首先承当了一个数仓两头存储的一个存储形象,把数据间接以湖格局入仓;第二个是提供Queue能力,相似于Kafka这种音讯队列的能力,可用Streaming形式增量生产,并且能够在其上做一些增量计算。 就这一套架构间接对立原来的Lambda和kappa架构,就是kafka的存储形象加数仓文件的存储形象合并在一个存储形象外面,同时没有减少过多的存储老本。大家可能前面用的都是对象存储或者是以便宜存储的模式存在的HDFS。

整套架构解决了两个问题,第一是双写问题。 在Lambda架构下,数据先写kafka,而后入仓,保障这两份数据的一致性语义比拟难。而且kafka开启exactly once写入后吞吐量会降落很多,Kafka和HDFS之间的数据如何保障一致性呢?有人会了解去流读kafka,把kafka数据再起一个job同步到HDFS,这样计算资源、保护作业、同步老本都是原来的两倍。

第二个解决的问题是中间层查问需要。中间层数据间接入仓,并且不是以高效率形式更新,当须要对中间层DWD表做一些join操作时,能够间接和引擎对接,而不须要去思考说Lambda架构T+1更新效率的问题。湖格局分钟级时效性很大水平缓解了这个问题。

除此之外,这套架构还有个益处,能够依据不同地利用场景抉择丰盛的OLAP查问引擎,间接以表面的形式接入库存储,很不便地进行OLAP剖析。

4. 阿里云VVP实时入湖

接下来,简略讲一下目前阿里云VVP产品实时入湖的集成,次要还是入湖状态,阿里云内置Flink版本会有一个内置Hudi connector,大家能够通过FlinkSQL形式疾速构建入湖工作,间接写湖表,对接Hudi CDC connector或者对接kafka CDC format,实现数据疾速入湖。

并且在入湖过程中,提供了商业版个性,如schema evolution。CE,CTAS语法反对schema evolution而后同时咱们会主推DLF catalog元数据管理组件,DLF catalog会和EMR的DLF无缝集成,若是EMR通过spark写入,这边也能够看到,Flink入湖工作写完之后也能够治理,通过DLF组件,能够间接通过EMR查问端引擎剖析Hudi格局数据。

这是目前的一套推给商业化用户的技术计划,入湖通过VVP服务,剖析通过EMR,前期VVP可能会集成更多能力,如流批对立,满足用户的流读需要。

近期Hudi RoadMap

如图九所示,我将简略介绍下近期Hudi 0.12版本以及1.0版本将会做的一些feature。

首先,咱们会推出相似于Delta2.0的CDC Feed性能,因为目前咱们反对的CDC要求输出必须是一个CDC,Hudi会用CDC格局把它存下来。CDC Feed的区别就是不必保障整体的输出是CDC格局,即便呈现absert语义,或者CDC的两头数据有失落,能够残缺还原出CDC给主端。这个个性在读写吞吐量和资源上做些衡量,不会像目前这套架构解决CDC那么高效。

第二点是Meta Service服务。把元数据的治理插件化,通过对立的Meta Service plugable模式,对立治理Hudi上的表和工作。

第三点,咱们目前还在布局做Secondary Index二级索引。因为在目前的master版本中,Flink Spark都已实现data skipping能力(在写入时,如果用户开启Meta Data表,同时开启data skipping,会额定记录每个column的统计信息),最典型的是每个column会建一个Max, Min,开启一个元数据的减速,晋升文件级别的查问效率。后续还会反对相似于数据库的二级索引,为某个专门的column实现相似于LSM的形象索引,构建实用于点查场景的高效索引计划。

最初,后续咱们还会做相似于特色工程的按列更新性能开发,相似于Clickhouse的Merge Tree形象,独立存储某个column。 因为在机器学习的特色工程中大量特色须要成千上万个字段,每次生成一个特色都须要更新一个column,所以要求单个column要具备高效的更新能力。为了适配这样的场景,Hudi也会继续去摸索。

答疑

Q1:间接应用Hudi存储更新,相比间接CDC到Starrocks,这两种计划哪一个更好?感觉Starrocks的QPS应该会高于Hudi的更新速度。

A1:确实是这样,因为Starrocks service 会应用相似于LSM的高效主键索引,并且内存外面做partition策略,保护比拟多的二级索引,元数据信息。而且,最重要的一点就是在写入更新main table时会应用攒批的操作,先把屡次写入汇入buffer而后进行一次flash,并且在数据的flush上也能够攒批,这也是为什么Starrocks更新效率上更高的起因。

但同时,因为有了server集群,会带来两个问题,首先,会带来较高的运维老本,其次,内存模式相比于Hudi的serverless格局的开销会更大。

Hudi格局在开放性上,对于Starrocks会有肯定劣势,不光能够对接Starrocks、还能够对接Presto、Spark等支流OLAP引擎。

这就是他们的区别,两种计划的侧重点不同,还须要依据理论利用场景进行抉择。如果只是做OLAP利用,Starrocks更适宜。但如果想构建数仓,应用Starrocks代替Hudi的老本应该太高了。因为Hudi面向的场景次要时数仓,迭代的次要时Hive传统数仓,更有劣势。

Q2:流量数据入湖场景下,应用MOR(Merge on Read)表,还是COW(Copy on Write)表更适合?

A2:如果流量数据体量比拟大,倡议应用MOR表。以目前的实测计划,QPS不超过两万,COW表还是能够撑持的。超过两万之后,比拟举荐MOR online compaction形式。如果QPS更高,那可能须要把压缩工作再独立进去,这是目前能给到的一个计划。