关于kafka:技术干货|基于Apache-Hudi-的CDC数据入湖内附干货PPT下载渠道

278次阅读

共计 7651 个字符,预计需要花费 20 分钟才能阅读完成。

简介:阿里云技术专家李少锋 (风泽) 在 Apache Hudi 与 Apache Pulsar 联结 Meetup 杭州站上的演讲整顿稿件,本议题将介绍典型 CDC 入湖场景,以及如何应用 Pulsar/Hudi 来构建数据湖,同时将会分享 Hudi 内核设计、新愿景以及社区最新动静。

本文 PPT 下载链接:

李少锋(风泽) – 阿里云技术专家 -《基于 Apache Hudi 的 CDC 数据入湖》.pdf

一、CDC 背景介绍

首先咱们介绍什么是 CDC?CDC 的全称是 Change data Capture,即变更数据捕捉,它是数据库畛域十分常见的技术,次要用于捕捉数据库的一些变更,而后能够把变更数据发送到上游。它的利用比拟广,能够做一些数据同步、数据散发和数据采集,还能够做 ETL,明天次要分享的也是把 DB 数据通过 CDC 的形式 ETL 到数据湖。

对于 CDC,业界次要有两种类型:一是基于查问的,客户端会通过 SQL 形式查问源库表变更数据,而后对外发送。二是基于日志,这也是业界宽泛应用的一种形式,个别是通过 binlog 形式,变更的记录会写入 binlog,解析 binlog 后会写入音讯零碎,或间接基于 Flink CDC 进行解决。

它们两者是有区别的,基于查问比较简单,是入侵性的,而基于日志是非侵入性,对数据源没有影响,但 binlog 的解析比较复杂一些。

基于查问和基于日志,别离有四种实现技术,有基于工夫戳、基于触发器和快照,还有基于日志的,这是实现 CDC 的技术,上面是几种形式的比照。

通过这个表格比照能够发现基于日志的综合最优,但解析比较复杂,但业界有很多开源的 binlog 的解析器,比拟通用和风行的有 Debezium、Canal,以及 Maxwell。基于这些 binlog 解析器就能够构建 ETL 管道。

上面来看下业界比拟风行的一种 CDC 入仓架构。

整个数据入仓是分实时流是离线流,实时流解析 binlog,通过 Canal 解析 binlog,而后写入 Kafka,而后每个小时会把 Kafka 数据同步到 Hive 中;另外就是离线流,离线流须要对同步到 Hive 的贴源层的表进行拉取一次全量,如果只有后面的实时流是数据是不全的,必须通过离线流的 SQL Select 把全量导入一次数据,对每张 ODS 表会把存量数据和增量数据做一个 Merge。这里能够看到对于 ODS 层的实时性不够,存在小时、天级别的提早。而对 ODS 层这个延时能够通过引入 Apache Hudi 做到分钟级。

二、CDC 数据入湖办法

基于 CDC 数据的入湖,这个架构非常简单。上游各种各样的数据源,比方 DB 的变更数据、事件流,以及各种内部数据源,都能够通过变更流的形式写入表中,再进行内部的查问剖析,整个架构非常简单。

架构尽管简略,但还是面临很多挑战。以 Apache Hudi 数据湖为例,数据湖是通过文件存储各种各样的数据,对于 CDC 的数据处理须要对湖里某局部文件进行牢靠地、事务性变更,这样能够保障上游查问不会看到局部后果,另外对 CDC 数据须要高效的做更新、删除操作,这就须要疾速定位到更改的文件,另外是对于每小批量的数据写入,心愿可能主动解决小文件,防止繁冗的小文件解决,还有面向查问的布局优化,能够通过一些技术手段如 Clustering 革新文件布局,对外提供更好的查问性能。

而 Apache Hudi 是怎么应答这些挑战的呢?首先反对事务性写入,包含读写之间的 MVCC 机制保障写不影响读,也能够管制事务及并发保障,对于并发写采纳 OCC 乐观锁机制,对更新删除,内置一些索引及自定义保障更新、删除比拟高效。另外是面向查问优化,Hudi 外部会主动做小文件的治理,文件会主动长到用户指定的文件大小,如 128M,这对 Hudi 来说也是比拟外围的个性。另外 Hudi 提供了 Clustering 来优化文件布局的性能。

下图是典型 CDC 入湖的链路。下面的链路是大部分公司采取的链路,后面 CDC 的数据先通过 CDC 工具导入 Kafka 或者 Pulsar,再通过 Flink 或者是 Spark 流式生产写到 Hudi 里。第二个架构是通过 Flink CDC 直联到 MySQL 上游数据源,间接写到上游 Hudi 表。

其实,这两条链路各有优缺点。第一个链路对立数据总线,扩展性和容错性都很好。对于第二条链路,扩展性和容错性会略微差点,但因为组件较少,保护老本相应较低。

这是阿里云数据库 OLAP 团队的 CDC 入湖链路,因为咱们咱们做 Spark 的团队,所以咱们采纳的 Spark Streaming 链路入湖。整个入湖链路也分为两个局部:首先有一个全量同步作业,会通过 Spark 做一次全量数据拉取,这里如果有从库能够直连从库做一次全量同步,防止对主库的影响,而后写到 Hudi。而后会启动一个增量作业,增量作业通过 Spark 生产阿里云 DTS 里的 binlog 数据来将 binlog 准实时同步至 Hudi 表。全量和增量作业的编排借助了 Lakehouse 的作业主动编排能力,协调全量和增量作业,而对于全量和增量连接时利用 Hudi 的 Upsert 语义保障全增量数据的最终的一致性,不会呈现数据偏多和偏少的问题。

在 Lakehouse 的 CDC 入湖链路中,咱们团队也做了一些优化。

第一个是原库的 Schema 变更解决,咱们对接的客户某些列的减少、删除或者批改某些列的场景。在 Spark 写 Hudi 之前会做 Schema 的测验,看这个 Schema 是不是非法,如果非法就能够失常写入,如果不非法的话,则会写入失败,而删除字段会导致 Schema 校验不非法,导致作业失败,这样稳定性是没有保障的。因而咱们会捕获 Schema Validation 的异样,如果发现是缩小了字段,咱们会把之前的字段做主动补全,而后做重试,保障链路是稳固的。

第二个有些客户表没有主键或者主键不合理,比方采纳更新工夫字段作为主键,或者设置会变动的分区字段,这时候就会导致写入 Hudi 的数据和源库表数据对不上。因而咱们做了一些产品层面的优化,容许用户正当设置主键和分区映射,保障同步到 Hudi 里和源库是数据齐全对齐的。

还有一个常见需要是用户在上游库中减少一个表,如果应用表级别同步的话,新增表在整个链路是无奈感知的,也就无奈同步到 Hudi 中,而在 Lakehouse 中,咱们能够对整库进行同步,因而在库中新增表时,会主动感知新增表,将新增表数据主动同步到 Hudi,做到原库减少表主动感知的能力。

还有一个是对 CDC 写入时候性能优化,比方拉取蕴含 Insert、Update、Delete 等事件的一批数据,是否始终应用 Hudi 的 Upsert 形式写入呢?这样管制比较简单,并且 Upsert 有数据去重能力,但它带来的问题是找索引的效率低,而对于 Insert 形式而言,不须要找索引,效率比拟高。因而对于每一批次数据会判断是否都是 Insert 事件,如果都是 Insert 事件就间接 Insert 形式写入,防止查找文件是否更新的开销,数据显示大略能够晋升 30%~50% 的性能。当然这里也须要思考到 DTS 异样,从新生产数据时,复原期间不能间接应用 Insert 形式,否则可能会存在数据反复,对于这个问题咱们引入了表级别的 Watermark,保障即便在 DTS 异常情况下也不会呈现数据反复问题。

三、Hudi 外围设计

接着介绍下 Hudi 的定位,依据社区最新的愿景,Hudi 的定义是流式数据湖平台,它反对海量数据更新,内置表格局以及反对事务的贮存,一系列列表服务 Clean、Archive、

Compaction、Clustering 等,以及开箱即用的数据服务,以及自身自带的运维工具和指标监控,提供很好的运维能力。

这是 Hudi 官网的图,能够看到 Hudi 在整个生态里是做湖存储,底层能够对接 HDFS 以及各种云厂商的对象存储,只有兼容 Hadoop 协定接。上游是入湖的变动事件流,对上能够反对各种各样的数据引擎,比方 presto、Spark 以及云上产品;另外能够利用 Hudi 的增量拉取能力借助 Spark、Hive、Flink 构建派生表。

整个 Hudi 体系结构是十分齐备的,其定位为增量的解决栈。典型的流式是面向行,对数据逐行解决,解决十分高效。

但面向行的数据里没有方法做大规模剖析做扫描优化,而批处理可能须要每天全量解决一次,效率绝对比拟低。而 Hudi 引入增量解决的概念,解决的数据都是某一时间点之后的,和流解决类似,又比批处理高效很多,并且自身是面向数据湖中的列存数据,扫描优化十分高效。

而回顾 Hudi 的倒退历史。2015 年社区的主席发表了一篇增量解决的文章,16 年在 Uber 开始投入生产,为所有数据库要害业务提供了撑持;2017 年,在 Uber 撑持了 100PB 的数据湖,2018 年随着云计算遍及,吸引了国内外的使用者;19 年 Uber 把它捐献到 Apache 进行孵化;2020 年一年左右的工夫就成为了顶级我的项目,采用率增长了超过 10 倍;2021 年 Uber 最新材料显示 Hudi 反对了 500PB 数据湖,同时对 Hudi 做了很多加强,像 Spark SQL DML 和 Flink 的集成。最近字节跳动举荐部门分享的基于 Hudi 的数据湖实际单表超过了 400PB,总存储超过了 1EB,日增 PB 级别。

通过几年的倒退,国内外采纳 Hudi 的公司十分多,比方私有云的华为云、阿里云、腾讯云以及 AWS,都集成了 Hudi,阿里云也基于 Hudi 构建 Lakehouse。字节跳动的整个数仓体系往湖上迁徙也是基于 Hudi 构建的,前面也会有相应的文章分享他们基于 Flink+Hudi 的数据湖的日增 PB 数据量的实际。同时像百度、快手头部互联网大厂都有在应用。同时咱们理解银行、金融行业也有工商银行、农业银行、百度金融、百信银行也有落地。游戏畛域包含了三七互娱、米哈游、4399,能够看到 Hudi 在各行各业都有比拟宽泛的利用。

Hudi 的定位是一套残缺的数据湖平台,最上层面向用户能够写各种各样的 SQL,Hudi 作为平台提供的各种能力,上面一层是基于 SQL 以及编程的 API,再下一层是 Hudi 的内核,包含索引、并发管制、表服务,前面社区要构建的基于 Lake Cache 构建缓存,文件格式是应用的凋谢 Parquet、ORC、HFile 存储格局,整个数据湖能够构建在各种云上。

前面接着介绍 Hudi 的要害设计,这对咱们理解 Hudi 十分有帮忙。首先是文件格式,它最底层是基于 Fileslice 的设计,翻译过去就是文件片,文件片蕴含根本文件和增量日志文件。根本文件就是一个 Parquet 或者是 ORC 文件,增量文件是 log 文件,对于 log 文件的写入 Hudi 里编码了一些 block,一批 Update 能够编码成一个数据块,写到文件里。而根底文件是可插拔,能够基于 Parquet,最新的 9.0 版本曾经反对了 ORC。还有基于 HFile,HFile 可用作元数据表。

Log 文件里保留了一系列各种各样的数据块,它是有点相似于数据库的重做日志,每个数据版本都能够通过重做日志找到。对于根底文件和 Log 文件通过压缩做合并造成新的根底文件。Hudi 提供了同步和异步的两种形式,这为用户提供了很灵便的抉择,比方做能够抉择同步 Compaction,如果对提早不敏感,而不须要额定异步起一个作业做 Compaction,或者有些用户心愿保障写入链路的提早,能够异步做 Compaction 而不影响主链路。

Hudi 基于 File Slice 上有个 File Group 的概念,File Group 会蕴含有不同的 File Slice,也 File Slice 形成了不同的版本,Hudi 提供了机制来保留元数据个数,保障元数据大小可控。

对于数据更新写入,尽量应用 append,比方之前写了一个 Log 文件,在更新时,会持续尝试往 Log 文件写入,对于 HDFS 这种反对 append 语义的存储十分敌对,而很多云上对象存储不反对 append 语义,即数据写进去之后不可更改,只能新写 Log 文件。对于每个文件组也就是不同 FileGroup 之间是相互隔离的,能够针对不同的文件组做不同的逻辑,用户能够自定义算法实现,非常灵活。

基于 Hudi FileGroup 的设计能够带来不少收益。比方根底文件是 100M,前面对根底文件进行了更新 50M 数据,就是 4 个 FileGroup,做 Compaction 合并开销是 600M,50M 只须要和 100M 合,4 个 150M 开销就是 600M,这是有 FileGroup 设计。还是有 4 个 100M 的文件,也是做了更新,每一次合,比方 25M 要和 400M 合并,开销是 1200M,能够看到采纳 FileGroup 的设计,合并开销缩小一半。

还有表格局。表格局的内容是文件在 Hudi 内是怎么存的。首先定义了表的根门路,而后写一些分区,和 Hive 的文件分区组织是一样的。还有对表的 Schema 定义,表的 Schema 变更,有一种形式是元数据记录在文件里,也有的是借助内部 KV 存储元数据,两者各有优缺点。

Hudi 基于 Avro 格局示意 Schema,因而对 Schema 的 Evolution 能力齐全等同于 Avro Schema 的 Evolution 能力,即能够减少字段以及向上兼容的变更,如 int 变成 long 是兼容的,但 long 变成 int 是不兼容的。

以后当初社区曾经有计划反对 Full Schema Evolution,即能够减少一个字段,删去一个字段,重命名,也就是变更一个字段。

还有一个是 Hudi 的索引设计。每一条数据写入 Hudi 时,都会保护数据主键到一个文件组 ID 的映射,这样在做更新、删除时能够更快的定位到变更的文件。

左边的图里有个订单表,能够依据日期写到不同的分区里。上面就是用户表,就不须要做分区,因为它的数据量没有那么大,变更没那么频繁,能够应用非分区的表。

对于分区表及变更频繁的表,在应用 Flink 写入时,利用 Flink State 构建的全局索引效率比拟高。整个索引是可插拔的,包含 Bloomfilter、HBase 高性能索引。在字节场景中,Bloomfilter 过滤器齐全不能满足日增 PB 的索引查找,因而他们应用 HBase 高性能索引,因而用户可依据本人的业务状态灵便抉择不同索引的实现。在有不同类型索引状况下能够以较低代价反对早退的更新、随机更新的场景。

另外一个设计是并发管制。并发管制是在 0.8 之后才引入的。Hudi 提供乐观锁机制来解决并发写问题,在提交的时候查看两个变更是否抵触,如果抵触就会写入失败。对于表服务如 Compaction 或者是 Clustering 外部没有锁,Hudi 外部有一套协调机制来防止锁竞争问题。比方做 Compaction,能够先在 timeline 上先打一个点,前面齐全能够和写入链路解耦,异步做 Compaction。

例如右边是数据摄取链路,数据每半个小时摄取一次,左边是异步删除作业,也会变更表,并且很有可能和写入批改抵触,会导致这个链路始终失败,平台无端的耗费 CPU 资源,当初社区针对这种状况也有改良计划,心愿尽早检测并发写入的抵触,提前终止,缩小资源节约。

另外一个设计是元数据表。因为 Hudi 最开始是基于 HDFS 构建和设计,没有太多思考云上存储场景,导致在云上 FileList 十分慢。因而在 0.8 版本,社区引入了 Metadata Table,Metadata Table 自身也是一张 Hudi 表,它构建成一张 Hudi,能够复用 Hudi 表等各种表服务。Metadata Table 表文件里会存分区下有的所有文件名以及文件大小,每一列的统计信息做查问优化,以及当初社区正在做的,基于 Meta Table 表构建全局索引,每条记录对应每个文件 ID 都记录在 Meta table,缩小解决 Upsert 时查问待更新文件的开销,也是上云必备。

四、Hudi 将来布局

将来的布局,如基于 Pulsar、Hudi 构建 Lakehouse,这是 StreamNative CEO 提出的 Proposal,想基于 Hudi 去构建 Pulsar 分层的存储。在 Hudi 社区,咱们也做了一些工作,想把 Hudi 内置的工具包 DeltaStreamar 内置 Pulsar Source,当初曾经有 PR 了,心愿两个社区分割能够更严密。Pular 分层存储内核局部 StreamNative 有同学正在做。

最近几天曾经公布了 0.9.0 重要的优化和改良。首先集成了 Spark SQL,极大升高了数据分析人员应用 Hudi 的门槛。

Flink 集成 Hudi 的计划早在 Hudi 的 0.7.0 版本就有了,通过几个版本的迭代,Flink 集成 Hudi 曾经十分成熟了,在字节跳动等大公司曾经在生产应用。Blink 团队做的一个 CDC 的 Format 集成,间接把 Update、Deltete 事件间接存到 Hudi。还有就是做存量数据的一次性迁徙,增量了批量导入能力,缩小了序列化和反序列化的开销。

另外当初有一些用户会感觉 Hudi 存一些元数据字段,比方_hoodie_commit_time 等元信息,这些信息都是从数据信息里提取的,有局部存储开销,当初反对虚构键,元数据字段不会再存数据了,它带来的限度就是不能应用增量 ETL,无奈获取 Hudi 某一个工夫点之后的变更数据。

另外很多小伙伴也在心愿 Hudi 反对 ORC 格局,Hudi 最新版本反对了 ORC 格局,同时这部分格局的是可插拔的,后续能够很灵便接入更多的格局。还做了 Metadata Table 的写入和查问优化,通过 Spark SQL 查问的时候,防止 Filelist,间接通过 Metadata Table 获取整个文件列表信息。

从更远来看社区将来的布局包含对于 Spark 集成降级到 Data SourceV2,当初 Hudi 基于 V1,无奈用到 V2 的性能优化。还有 Catalog 集成,能够通过 Catalog 治理表,能够创立、删除、更新,表格元数据的治理通过 Spark Catalog 集成。

Flink 模块 Blink 团队有专职同学负责,后续会把流式数据里的 Watremark 推到 Hudi 表里。

另外是与 Kafka Connect Sink 的集成,后续间接通过 Java 客户把 Kafka 的数据写到 Hudi。

在内核侧的优化,包含了基于 Metadata Table 全局记录级别索引。还有字节跳动小伙伴做的写入反对 Bucket,这样的益处就是做数据更新的时候,能够通过主键找到对应 Bucket,只有把对应 Bucket 的 parquet 文件的 Bloomfilter 读取进去就能够了,缩小了查找更新时候的开销。

还有更智能地 Clustering 策略,在咱们外部也做了这部分工作,更智能的 Clustering 能够基于之前的负载状况,动静的开启 Clustering 优化,另外还包含基于 Metadata Table 构建二级索引,以及 Full Schema Evolution 和跨表事务。

当初 Hudi 社区倒退得比拟快,代码重构量十分大,但都是为了更好的社区倒退,从 0.7.0 到 0.9.0 版本 Flink 集成 Hudi 模块基本上齐全重构了,如果有趣味的同学能够参加到社区,独特建设更好的数据湖平台。

原文链接
本文为阿里云原创内容,未经容许不得转载。

正文完
 0