简介:阿里云技术专家李少锋(风泽)在Apache Hudi 与 Apache Pulsar 联结 Meetup 杭州站上的演讲整顿稿件,本议题将介绍典型 CDC 入湖场景,以及如何应用 Pulsar/Hudi 来构建数据湖,同时将会分享 Hudi 内核设计、新愿景以及社区最新动静。
本文PPT下载链接:
李少锋(风泽) - 阿里云技术专家-《基于Apache Hudi的CDC数据入湖》.pdf
其余干货:
王烨(萌豆)-阿里云高级技术专家 -《阿里云基于Hudi构建Lakehouse实际》.pdf
翟佳-StreamNative 联结创始人、Apache Pulsar PMC 成员-《Pulsar 2.8.0 性能个性概述及布局》.pdf
盛宇帆-StreamNative 软件工程师-《基于 Flink 的全新 Pulsar Connector 的设计、开发和应用》.pdf
一、CDC背景介绍
首先咱们介绍什么是CDC?CDC的全称是Change data Capture,即变更数据捕捉,它是数据库畛域十分常见的技术,次要用于捕捉数据库的一些变更,而后能够把变更数据发送到上游。它的利用比拟广,能够做一些数据同步、数据散发和数据采集,还能够做ETL,明天次要分享的也是把DB数据通过CDC的形式ETL到数据湖。
对于CDC,业界次要有两种类型:一是基于查问的,客户端会通过SQL形式查问源库表变更数据,而后对外发送。二是基于日志,这也是业界宽泛应用的一种形式,个别是通过binlog形式,变更的记录会写入binlog,解析binlog后会写入音讯零碎,或间接基于Flink CDC进行解决。
它们两者是有区别的,基于查问比较简单,是入侵性的,而基于日志是非侵入性,对数据源没有影响,但binlog的解析比较复杂一些。
基于查问和基于日志,别离有四种实现技术,有基于工夫戳、基于触发器和快照,还有基于日志的,这是实现CDC的技术,上面是几种形式的比照。
通过这个表格比照能够发现基于日志的综合最优,但解析比较复杂,但业界有很多开源的binlog的解析器,比拟通用和风行的有Debezium、Canal,以及Maxwell。基于这些binlog解析器就能够构建ETL管道。
上面来看下业界比拟风行的一种CDC入仓架构。
整个数据入仓是分实时流是离线流,实时流解析binlog,通过Canal解析binlog,而后写入Kafka,而后每个小时会把Kafka数据同步到Hive中;另外就是离线流,离线流须要对同步到Hive的贴源层的表进行拉取一次全量,如果只有后面的实时流是数据是不全的,必须通过离线流的SQL Select把全量导入一次数据,对每张ODS表会把存量数据和增量数据做一个Merge。这里能够看到对于ODS层的实时性不够,存在小时、天级别的提早。而对ODS层这个延时能够通过引入Apache Hudi做到分钟级。
二、CDC数据入湖办法
基于CDC数据的入湖,这个架构非常简单。上游各种各样的数据源,比方DB的变更数据、事件流,以及各种内部数据源,都能够通过变更流的形式写入表中,再进行内部的查问剖析,整个架构非常简单。
架构尽管简略,但还是面临很多挑战。以Apache Hudi数据湖为例,数据湖是通过文件存储各种各样的数据, 对于CDC的数据处理须要对湖里某局部文件进行牢靠地、事务性变更,这样能够保障上游查问不会看到局部后果,另外对CDC数据须要高效的做更新、删除操作,这就须要疾速定位到更改的文件,另外是对于每小批量的数据写入,心愿可能主动解决小文件,防止繁冗的小文件解决,还有面向查问的布局优化,能够通过一些技术手段如Clustering革新文件布局,对外提供更好的查问性能。
而Apache Hudi是怎么应答这些挑战的呢?首先反对事务性写入,包含读写之间的MVCC机制保障写不影响读,也能够管制事务及并发保障,对于并发写采纳OCC乐观锁机制,对更新删除,内置一些索引及自定义保障更新、删除比拟高效。另外是面向查问优化,Hudi外部会主动做小文件的治理,文件会主动长到用户指定的文件大小,如128M,这对Hudi来说也是比拟外围的个性。另外Hudi提供了Clustering来优化文件布局的性能。
下图是典型CDC入湖的链路。下面的链路是大部分公司采取的链路,后面CDC的数据先通过CDC工具导入Kafka或者Pulsar,再通过Flink或者是Spark流式生产写到Hudi里。第二个架构是通过Flink CDC直联到MySQL上游数据源,间接写到上游Hudi表。
其实,这两条链路各有优缺点。第一个链路对立数据总线,扩展性和容错性都很好。对于第二条链路,扩展性和容错性会略微差点,但因为组件较少,保护老本相应较低。
这是阿里云数据库OLAP团队的CDC入湖链路,因为咱们咱们做Spark的团队,所以咱们采纳的Spark Streaming链路入湖。整个入湖链路也分为两个局部:首先有一个全量同步作业,会通过Spark做一次全量数据拉取,这里如果有从库能够直连从库做一次全量同步,防止对主库的影响,而后写到Hudi。而后会启动一个增量作业,增量作业通过Spark生产阿里云DTS里的binlog数据来将binlog准实时同步至Hudi表。全量和增量作业的编排借助了Lakehouse的作业主动编排能力,协调全量和增量作业,而对于全量和增量连接时利用Hudi的Upsert语义保障全增量数据的最终的一致性,不会呈现数据偏多和偏少的问题。
在Lakehouse的CDC入湖链路中,咱们团队也做了一些优化。
第一个是原库的Schema变更解决,咱们对接的客户某些列的减少、删除或者批改某些列的场景。在Spark写Hudi之前会做Schema的测验,看这个Schema是不是非法,如果非法就能够失常写入,如果不非法的话,则会写入失败,而删除字段会导致Schema校验不非法,导致作业失败,这样稳定性是没有保障的。因而咱们会捕获Schema Validation的异样,如果发现是缩小了字段,咱们会把之前的字段做主动补全,而后做重试,保障链路是稳固的。
第二个有些客户表没有主键或者主键不合理,比方采纳更新工夫字段作为主键,或者设置会变动的分区字段,这时候就会导致写入Hudi的数据和源库表数据对不上。因而咱们做了一些产品层面的优化,容许用户正当设置主键和分区映射,保障同步到Hudi里和源库是数据齐全对齐的。
还有一个常见需要是用户在上游库中减少一个表,如果应用表级别同步的话,新增表在整个链路是无奈感知的,也就无奈同步到Hudi中,而在Lakehouse中,咱们能够对整库进行同步,因而在库中新增表时,会主动感知新增表,将新增表数据主动同步到Hudi,做到原库减少表主动感知的能力。
还有一个是对CDC写入时候性能优化,比方拉取蕴含Insert、Update、Delete等事件的一批数据,是否始终应用Hudi的Upsert形式写入呢?这样管制比较简单,并且Upsert有数据去重能力,但它带来的问题是找索引的效率低,而对于Insert形式而言,不须要找索引,效率比拟高。因而对于每一批次数据会判断是否都是Insert事件,如果都是Insert事件就间接Insert形式写入,防止查找文件是否更新的开销,数据显示大略能够晋升30%~50%的性能。当然这里也须要思考到DTS异样,从新生产数据时,复原期间不能间接应用Insert形式,否则可能会存在数据反复,对于这个问题咱们引入了表级别的Watermark,保障即便在DTS异常情况下也不会呈现数据反复问题。
三、Hudi外围设计
接着介绍下Hudi 的定位,依据社区最新的愿景,Hudi的定义是流式数据湖平台,它反对海量数据更新,内置表格局以及反对事务的贮存,一系列列表服务Clean、Archive、
Compaction、Clustering等,以及开箱即用的数据服务,以及自身自带的运维工具和指标监控,提供很好的运维能力。
这是Hudi官网的图,能够看到Hudi在整个生态里是做湖存储,底层能够对接HDFS以及各种云厂商的对象存储,只有兼容Hadoop协定接。上游是入湖的变动事件流,对上能够反对各种各样的数据引擎,比方presto、Spark以及云上产品;另外能够利用Hudi的增量拉取能力借助Spark、Hive、Flink构建派生表。
整个Hudi体系结构是十分齐备的,其定位为增量的解决栈。典型的流式是面向行,对数据逐行解决,解决十分高效。
但面向行的数据里没有方法做大规模剖析做扫描优化,而批处理可能须要每天全量解决一次,效率绝对比拟低。而Hudi引入增量解决的概念,解决的数据都是某一时间点之后的,和流解决类似,又比批处理高效很多,并且自身是面向数据湖中的列存数据,扫描优化十分高效。
而回顾Hudi的倒退历史。2015年社区的主席发表了一篇增量解决的文章,16年在Uber开始投入生产,为所有数据库要害业务提供了撑持;2017年,在Uber撑持了100PB的数据湖,2018年随着云计算遍及,吸引了国内外的使用者;19年Uber把它捐献到Apache进行孵化;2020年一年左右的工夫就成为了顶级我的项目,采用率增长了超过10倍;2021年Uber最新材料显示Hudi反对了500PB数据湖,同时对Hudi做了很多加强,像Spark SQL DML和Flink的集成。最近字节跳动举荐部门分享的基于Hudi的数据湖实际单表超过了400PB,总存储超过了1EB,日增PB级别。
通过几年的倒退,国内外采纳Hudi的公司十分多,比方私有云的华为云、阿里云、腾讯云以及AWS,都集成了Hudi,阿里云也基于Hudi构建Lakehouse。字节跳动的整个数仓体系往湖上迁徙也是基于Hudi构建的,前面也会有相应的文章分享他们基于Flink+Hudi的数据湖的日增PB数据量的实际。同时像百度、快手头部互联网大厂都有在应用。同时咱们理解银行、金融行业也有工商银行、农业银行、百度金融、百信银行也有落地。游戏畛域包含了三七互娱、米哈游、4399,能够看到Hudi在各行各业都有比拟宽泛的利用。
Hudi的定位是一套残缺的数据湖平台,最上层面向用户能够写各种各样的SQL,Hudi作为平台提供的各种能力,上面一层是基于SQL以及编程的API,再下一层是Hudi的内核,包含索引、并发管制、表服务,前面社区要构建的基于Lake Cache构建缓存,文件格式是应用的凋谢Parquet、ORC、HFile存储格局,整个数据湖能够构建在各种云上。
前面接着介绍Hudi的要害设计,这对咱们理解Hudi十分有帮忙。首先是文件格式,它最底层是基于Fileslice的设计,翻译过去就是文件片,文件片蕴含根本文件和增量日志文件。根本文件就是一个Parquet或者是ORC文件,增量文件是log文件,对于log文件的写入Hudi里编码了一些block,一批Update能够编码成一个数据块,写到文件里。而根底文件是可插拔,能够基于Parquet,最新的9.0版本曾经反对了ORC。还有基于HFile,HFile可用作元数据表。
Log文件里保留了一系列各种各样的数据块,它是有点相似于数据库的重做日志,每个数据版本都能够通过重做日志找到。对于根底文件和Log文件通过压缩做合并造成新的根底文件。Hudi提供了同步和异步的两种形式,这为用户提供了很灵便的抉择,比方做能够抉择同步Compaction,如果对提早不敏感,而不须要额定异步起一个作业做Compaction,或者有些用户心愿保障写入链路的提早,能够异步做Compaction而不影响主链路。
Hudi基于File Slice上有个File Group的概念,File Group会蕴含有不同的File Slice,也File Slice形成了不同的版本,Hudi提供了机制来保留元数据个数,保障元数据大小可控。
对于数据更新写入,尽量应用append,比方之前写了一个Log文件,在更新时,会持续尝试往Log文件写入,对于HDFS这种反对append语义的存储十分敌对,而很多云上对象存储不反对append语义,即数据写进去之后不可更改,只能新写Log文件。对于每个文件组也就是不同FileGroup之间是相互隔离的,能够针对不同的文件组做不同的逻辑,用户能够自定义算法实现,非常灵活。
基于Hudi FileGroup的设计能够带来不少收益。比方根底文件是100M,前面对根底文件进行了更新50M数据,就是4个FileGroup,做Compaction合并开销是600M,50M只须要和100M合,4个150M开销就是600M,这是有FileGroup设计。还是有4个100M的文件,也是做了更新,每一次合,比方25M要和400M合并,开销是1200M,能够看到采纳FileGroup的设计,合并开销缩小一半。
还有表格局。表格局的内容是文件在Hudi内是怎么存的。首先定义了表的根门路,而后写一些分区,和Hive的文件分区组织是一样的。还有对表的Schema定义,表的Schema变更,有一种形式是元数据记录在文件里,也有的是借助内部KV存储元数据,两者各有优缺点。
Hudi基于Avro格局示意Schema,因而对Schema的Evolution能力齐全等同于Avro Schema的Evolution能力,即能够减少字段以及向上兼容的变更,如int变成long是兼容的,但long变成int是不兼容的。
以后当初社区曾经有计划反对Full Schema Evolution,即能够减少一个字段,删去一个字段,重命名,也就是变更一个字段。
还有一个是Hudi的索引设计。每一条数据写入Hudi时,都会保护数据主键到一个文件组ID的映射,这样在做更新、删除时能够更快的定位到变更的文件。
左边的图里有个订单表,能够依据日期写到不同的分区里。上面就是用户表,就不须要做分区,因为它的数据量没有那么大,变更没那么频繁,能够应用非分区的表。
对于分区表及变更频繁的表,在应用Flink写入时,利用Flink State构建的全局索引效率比拟高。整个索引是可插拔的,包含Bloomfilter、 HBase高性能索引。在字节场景中, Bloomfilter过滤器齐全不能满足日增PB的索引查找,因而他们应用HBase高性能索引,因而用户可依据本人的业务状态灵便抉择不同索引的实现。在有不同类型索引状况下能够以较低代价反对早退的更新、随机更新的场景。
另外一个设计是并发管制。并发管制是在0.8之后才引入的。Hudi提供乐观锁机制来解决并发写问题,在提交的时候查看两个变更是否抵触,如果抵触就会写入失败。对于表服务如Compaction或者是Clustering外部没有锁,Hudi外部有一套协调机制来防止锁竞争问题。比方做Compaction,能够先在timeline上先打一个点,前面齐全能够和写入链路解耦,异步做Compaction。
例如右边是数据摄取链路,数据每半个小时摄取一次,左边是异步删除作业,也会变更表,并且很有可能和写入批改抵触,会导致这个链路始终失败,平台无端的耗费CPU资源,当初社区针对这种状况也有改良计划,心愿尽早检测并发写入的抵触,提前终止,缩小资源节约。
另外一个设计是元数据表。因为Hudi最开始是基于HDFS构建和设计,没有太多思考云上存储场景,导致在云上FileList十分慢。因而在0.8版本,社区引入了Metadata Table,Metadata Table自身也是一张Hudi表,它构建成一张Hudi,能够复用Hudi表等各种表服务。Metadata Table表文件里会存分区下有的所有文件名以及文件大小,每一列的统计信息做查问优化,以及当初社区正在做的,基于Meta Table表构建全局索引,每条记录对应每个文件ID都记录在Meta table,缩小解决Upsert时查问待更新文件的开销,也是上云必备。
四、Hudi将来布局
将来的布局,如基于Pulsar、Hudi构建Lakehouse,这是StreamNative CEO提出的Proposal,想基于Hudi去构建Pulsar分层的存储。在Hudi社区,咱们也做了一些工作,想把Hudi内置的工具包DeltaStreamar内置Pulsar Source,当初曾经有PR了,心愿两个社区分割能够更严密。Pular分层存储内核局部StreamNative有同学正在做。
最近几天曾经公布了0.9.0重要的优化和改良。首先集成了Spark SQL,极大升高了数据分析人员应用Hudi的门槛。
Flink集成Hudi的计划早在Hudi的0.7.0版本就有了,通过几个版本的迭代,Flink集成Hudi曾经十分成熟了,在字节跳动等大公司曾经在生产应用。Blink团队做的一个CDC的Format集成,间接把Update、Deltete事件间接存到Hudi。还有就是做存量数据的一次性迁徙,增量了批量导入能力,缩小了序列化和反序列化的开销。
另外当初有一些用户会感觉Hudi存一些元数据字段,比方\_hoodie\_commit\_time等元信息,这些信息都是从数据信息里提取的,有局部存储开销,当初反对虚构键,元数据字段不会再存数据了,它带来的限度就是不能应用增量ETL,无奈获取Hudi某一个工夫点之后的变更数据。
另外很多小伙伴也在心愿Hudi反对ORC格局,Hudi最新版本反对了ORC格局,同时这部分格局的是可插拔的,后续能够很灵便接入更多的格局。还做了Metadata Table的写入和查问优化,通过Spark SQL查问的时候,防止Filelist,间接通过Metadata Table获取整个文件列表信息。
从更远来看社区将来的布局包含对于Spark集成降级到Data SourceV2,当初Hudi基于V1,无奈用到V2的性能优化。还有Catalog集成,能够通过Catalog治理表,能够创立、删除、更新,表格元数据的治理通过Spark Catalog集成。
Flink模块Blink团队有专职同学负责,后续会把流式数据里的Watremark推到Hudi表里。
另外是与Kafka Connect Sink的集成,后续间接通过Java客户把Kafka的数据写到Hudi。
在内核侧的优化,包含了基于Metadata Table全局记录级别索引。还有字节跳动小伙伴做的写入反对Bucket,这样的益处就是做数据更新的时候,能够通过主键找到对应Bucket,只有把对应Bucket的parquet文件的Bloomfilter读取进去就能够了,缩小了查找更新时候的开销。
还有更智能地Clustering策略,在咱们外部也做了这部分工作,更智能的Clustering能够基于之前的负载状况,动静的开启Clustering优化,另外还包含基于Metadata Table构建二级索引,以及Full Schema Evolution和跨表事务。
当初Hudi社区倒退得比拟快,代码重构量十分大,但都是为了更好的社区倒退,从0.7.0到0.9.0版本Flink集成Hudi模块基本上齐全重构了,如果有趣味的同学能够参加到社区,独特建设更好的数据湖平台。
版权申明:本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。