乐趣区

关于前端:Flink-如何实时分析-Iceberg-数据湖的-CDC-数据

简介:数据湖的架构中,CDC 数据实时读写的计划和原理

本文由李劲松、胡争分享,社区志愿者杨伟海、李培殿整顿。次要介绍在数据湖的架构中,CDC 数据实时读写的计划和原理。文章次要分为 4 个局部内容:

  1. 常见的 CDC 剖析计划
  2. 为何抉择 Flink + Iceberg
  3. 如何实时写入读取
  4. 将来布局

一、常见的 CDC 剖析计划

咱们先看一下明天的 topic 须要设计的是什么?输出是一个 CDC 或者 upsert 的数据,输入是 Database 或者是用于大数据 OLAP 剖析的存储。

咱们常见的输出次要有两种数据,第一种数据是数据库的 CDC 数据,一直的产生 changeLog;另一种场景是流计算产生的 upsert 数据,在最新的 Flink 1.12 版本曾经反对了 upsert 数据。

1.1 离线 HBase 集群剖析 CDC 数据

咱们通常想到的第一个计划,就是把 CDC upsert 的数据通过 Flink 进行一些解决之后,实时的写到 HBase 当中。HBase 是一个在线的、能提供在线点查能力的一种数据库,具备十分高的实时性,对写入操作是十分敌对的,也能够反对一些小范畴的查问,而且集群可扩大。

这种计划其实跟一般的点查实时链路是同一套,那么用 HBase 来做大数据的 OLAP 的查问剖析有什么问题呢?

首先,HBase 是一个面向点查设计的一种数据库,是一种在线服务,它的行存的索引不适宜剖析工作。典型的数仓设计必定是要列存的,这样压缩效率和查问效率才会高。第二,HBase 的集群保护老本比拟高。最初,HBase 的数据是 HFile,不不便与大数据里数仓当中典型的 Parquet、Avro、Orc 等联合。

1.2 Apache Kudu 保护 CDC 数据集

针对 HBase 剖析能力比拟弱的状况,社区前几年呈现了一个新的我的项目,这就是 Apache Kudu 我的项目。Kudu 我的项目领有 HBase 的点查能力的同时,采纳列存,这样列存减速非常适合 OLAP 剖析。

这种计划会有什么问题呢?

首先 Kudu 是比拟小众的、独立的集群,保护老本也比拟高,跟 HDFS、S3、OSS 比拟割裂。其次因为 Kudu 在设计上保留了点查能力,所以它的批量扫描性能不如 parquet,另外 Kudu 对于 delete 的反对也比拟弱,最初它也不反对增量拉取。

1.3 间接导入 CDC 到 Hive 剖析

第三种计划,也是大家在数仓中比拟罕用的计划,就是把 MySQL 的数据写到 Hive,流程是:保护一个全量的分区,而后每天做一个增量的分区,最初把增量分区写好之后进行一次 Merge,写入一个新的分区,流程上这样是走得通的。Hive 之前的全量分区是不受增量的影响的,只有当增量 Merge 胜利之后,分区才可查,才是一个全新的数据。这种纯列存的 append 的数据对于剖析是十分敌对的。

这种计划会有什么问题呢?

增量数据和全量数据的 Merge 是有延时的,数据不是实时写入的,典型的是一天进行一次 Merge,这就是 T+1 的数据了。所以,时效性很差,不反对实时 upsert。每次 Merge 都须要把所有数据全副重读重写一遍,效率比拟差、比拟浪费资源。

1.4 Spark + Delta 剖析 CDC 数据

针对这个问题,Spark + Delta 在剖析 CDC 数据的时候提供了 MERGE INTO 的语法。这并不仅仅是对 Hive 数仓的语法简化,Spark + Delta 作为新型数据湖的架构(例如 Iceberg、Hudi),它对数据的治理不是分区,而是文件,因而 Delta 优化 MERGE INTO 语法,仅扫描和重写发生变化的文件即可,因而高效很多。

咱们评估一下这个计划,他的长处是仅依赖 Spark + Delta 架构简洁、没有在线服务、列存,剖析速度十分快。优化之后的 MERGE INTO 语法速度也够快。

这个计划,业务上是一个 Copy On Write 的一个计划,它只须要 copy 大量的文件,能够让提早做的绝对低。实践上,在更新的数据跟现有的存量没有很大重叠的话,能够把天级别的提早做到小时级别的提早,性能也是能够跟得上的。

这个计划在 Hive 仓库解决 upsert 数据的路上曾经后退了一小步了。但小时级别的提早毕竟不如实时更无效,因而这个计划最大的毛病在 Copy On Write 的 Merge 有肯定的开销,提早不能做的太低。

第一局部大略现有的计划就是这么多,同时还须要再强调一下,upsert 之所以如此重要,是因为在数据湖的计划中,upsert 是实现数据库准实时、实时入湖的一个关键技术点。

二、为何抉择 Flink + Iceberg

2.1 Flink 对 CDC 数据生产的反对

第一,Flink 原生反对 CDC 数据生产。在前文 Spark + Delta 的计划中,MARGE INTO 的语法,用户须要感知 CDC 的属性概念,而后写到 merge 的语法上来。然而 Flink 是原生反对 CDC 数据的。用户只有申明一个 Debezium 或者其余 CDC 的 format,Flink 下面的 SQL 是不须要感知任何 CDC 或者 upsert 的属性的。Flink 中内置了 hidden column 来标识它 CDC 的类型数据,所以对用户而言比拟简洁。

如下图示例,在 CDC 的解决当中,Flink 在只用申明一个 MySQL Binlog 的 DDL 语句,前面的 select 都不必感知 CDC 属性。

2.2 Flink 对 Change Log Stream 的反对

下图介绍的是 Flink 原生反对 Change Log Stream,Flink 在接入一个 Change Log Stream 之后,拓扑是不必关怀 Change Log flag 的 SQL。拓扑齐全是依照本人业务逻辑来定义,并且始终到最初写入 Iceberg,两头不必感知 Change Log 的 flag。

2.3 Flink + Iceberg CDC 导入计划评估

最初,Flink + Iceberg 的 CDC 导入计划的长处是什么?

比照之前的计划,Copy On Write 跟 Merge On Read 都有实用的场景,侧重点不同。Copy On Write 在更新局部文件的场景中,当只须要重写其中的一部分文件时是很高效的,产生的数据是纯 append 的全量数据集,在用于数据分析的时候也是最快的,这是 Copy On Write 的劣势。

另外一个是 Merge On Read,行将数据连同 CDC flag 间接 append 到 Iceberg 当中,在 merge 的时候,把这些增量的数据依照肯定的组织格局、肯定高效的计算形式与全量的上一次数据进行一次 merge。这样的益处是反对近实时的导入和实时数据读取;这套计算计划的 Flink SQL 原生反对 CDC 的摄入,不须要额定的业务字段设计。

Iceberg 是对立的数据湖存储,反对多样化的计算模型,也反对各种引擎(包含 Spark、Presto、hive)来进行剖析;产生的 file 都是纯列存的,对于前面的剖析是十分快的;Iceberg 作为数据湖基于 snapshot 的设计,反对增量读取;Iceberg 架构足够简洁,没有在线服务节点,纯 table format 的,这给了上游平台方足够的能力来定制本人的逻辑和服务化。

三、如何实时写入读取

3.1 批量更新场景和 CDC 写入场景

首先咱们来理解一下在整个数据湖外面批量更新的两个场景。

  • 第一批量更新的这种场景,在这个场景中咱们应用一个 SQL 更新了成千上万行的数据,比方欧洲的 GDPR 策略,当一个用户登记掉本人的账户之后,后盾的零碎是必须将这个用户所有相干的数据全副物理删除。
  • 第二个场景是咱们须要将 date lake 中一些领有独特个性的数据删除掉,这个场景也是属于批量更新的一个场景,在这个场景中删除的条件可能是任意的条件,跟主键(Primary key)没有任何关系,同时这个待更新的数据集是十分大,这种作业是一个长耗时低频次的作业。

另外是 CDC 写入的场景,对于对 Flink 来说,个别罕用的有两种场景,第一种场景是上游的 Binlog 可能很疾速的写到 data lake 中,而后供不同的剖析引擎做剖析应用;第二种场景是应用 Flink 做一些聚合操作,输入的流是 upsert 类型的数据流,也须要可能实时的写到数据湖或者是上游零碎中去做剖析。如下图示例中 CDC 写入场景中的 SQL 语句,咱们应用单条 SQL 更新一行数据,这种计算模式是一种流式增量的导入,而且属于高频的更新。

3.2 Apache Iceberg 设计 CDC 写入计划须要思考的问题

接下来咱们看下 iceberg 对于 CDC 写入这种场景在方案设计时须要思考哪些问题。

  • 第一是正确性,即须要保障语义及数据的正确性,如上游数据 upsert 到 iceberg 中,当上游 upsert 进行后,iceberg 中的数据须要和上游零碎中的数据保持一致。
  • 第二是高效写入,因为 upsert 的写入频率十分高,咱们须要放弃高吞吐、高并发的写入。
  • 第三是疾速读取,当数据写入后咱们须要对数据进行剖析,这其中波及到两个问题,第一个问题是须要反对细粒度的并发,当作业应用多个 task 来读取时能够保障为各个 task 进行平衡的调配以此来减速数据的计算;第二个问题是咱们要充分发挥列式存储的劣势来减速读取。
  • 第四是反对增量读,例如一些传统数仓中的 ETL,通过增量读取来进行进一步数据转换。

3.3 Apache Iceberg Basic

在介绍具体的计划细节之前,咱们先理解一下 Iceberg 在文件系统中的布局,总体来讲 Iceberg 分为两局部数据,第一局部是数据文件,如下图中的 parquet 文件,每个数据文件对应一个校验文件(.crc 文件)。第二局部是表元数据文件(Metadata 文件),蕴含 Snapshot 文件(snap-_.avro)、Manifest 文件 (_.avro)、TableMetadata 文件(*.json) 等。

下图展现了在 iceberg 中 snapshot、manifest 及 partition 中的文件的对应关系。下图中蕴含了三个 partition,第一个 partition 中有两个文件 f1、f3,第二个 partition 有两个文件 f4、f5,第三个 partition 有一个文件 f2。对于每一次写入都会生成一个 manifest 文件,该文件记录本次写入的文件与 partition 的对应关系。再向下层有 snapshot 的概念,snapshot 可能帮忙快速访问到整张表的全量数据,snapshot 记录多个 manifest,如第二个 snapshot 蕴含 manifest2 和 manifest3。

3.4 INSERT、UPDATE、DELETE 写入

在理解了根本的概念,上面介绍 iceberg 中 insert、update、delete 操作的设计。

下图示例的 SQL 中展现的表蕴含两个字段即 id、data,两个字段都是 int 类型。在一个 transaction 中咱们进行了图示中的数据流操作,首先插入了(1,2)一条记录,接下来将这条记录更新为(1,3),在 iceberg 中 update 操作将会拆为 delete 和 insert 两个操作。

这么做的起因是思考到 iceberg 作为流批对立的存储层,将 update 操作拆解为 delete 和 insert 操作能够保障流批场景做更新时读取门路的对立,如在批量删除的场景下以 Hive 为例,Hive 会将待删除的行的文件 offset 写入到 delta 文件中,而后做一次 merge on read,因为这样会比拟快,在 merge 时通过 position 将原文件和 delta 进行映射,将会很快失去所有未删除的记录。

接下来又插入记录(3,5),删除了记录(1,3),插入记录(2,5),最终查问是咱们失去记录(3,5)(2,5)。

下面操作看上去非常简单,但在实现中是存在一些语义上的问题。如下图中,在一个 transaction 中首先执行插入记录(1,2)的操作,该操作会在 data file1 文件中写入 INSERT(1,2),而后执行删除记录(1,2)操作,该操作会在 equalify delete file1 中写入 DELETE(1,2),接着又执行插入记录(1,2)操作,该操作会在 data file1 文件中再写入 INSERT(1,2),而后执行查问操作。

在失常状况下查问后果应该返回记录 INSERT(1,2),但在实现中,DELETE(1,2)操作无奈得悉删除的是 data file1 文件中的哪一行,因而两行 INSERT(1,2)记录都将被删除。

那么如何来解决这个问题呢,社区以后的形式是采纳了 Mixed position-delete and equality-delete。Equality-delete 即通过指定一列或多列来进行删除操作,position-delete 是依据文件门路和行号来进行删除操作,通过将这两种办法联合起来以保障删除操作的正确性。

如下图咱们在第一个 transaction 中插入了三行记录,即 INSERT(1,2)、INSERT(1,3)、INSERT(1,4),而后执行 commit 操作进行提交。接下来咱们开启一个新的 transaction 并执行插入一行数据(1,5),因为是新的 transaction,因而新建了一个 data file2 并写入 INSERT(1,5)记录,接下来执行删除记录(1,5),理论写入 delete 时是:

在 position delete file1 文件写入(file2, 0),示意删除 data file2 中第 0 行的记录,这是为了解决同一个 transaction 内同一行数据重复插入删除的语义的问题。
在 equality delete file1 文件中写入 DELETE (1,5),之所以写入这个 delete 是为了确保本次 txn 之前写入的 (1,5) 能被正确删除。

而后执行删除(1,4)操作,因为(1,4)在以后 transaction 中未曾插入过,因而该操作会应用 equality-delete 操作,即在 equality delete file1 中写入(1,4)记录。在上述流程中能够看出在以后计划中存在 data file、position delete file、equality delete file 三类文件。

在理解了写入流程后,如何来读取呢。如下图所示,对于 position delete file 中的记录(file2, 0)只需和以后 transaction 的 data file 进行 join 操作,对于 equality delete file 记录(1,4)和之前的 transaction 中的 data file 进行 join 操作。最终失去记录 INSERT(1,3)、INSERT(1,2)保障了流程的正确性。

3.5 Manifest 文件的设计

下面介绍了 insert、update 及 delete,但在设计 task 的执行打算时咱们对 manifest 进行了一些设计,目标是通过 manifest 可能疾速到找到 data file,并依照数据大小进行宰割,保障每个 task 解决的数据尽可能的均匀分布。

如下图示例,蕴含四个 transaction,前两个 transaction 是 INSERT 操作,对应 M1、M2,第三个 transaction 是 DELETE 操作,对应 M3,第四个 transaction 是 UPDATE 操作,蕴含两个 manifest 文件即 data manifest 和 delete manifest。

对于为什么要对 manifest 文件拆分为 data manifest 和 delete manifest 呢,实质上是为了疾速为每个 data file 找到对应的 delete file 列表。能够看下图示例,当咱们在 partition-2 做读取时,须要将 deletefile-4 与 datafile-2、datafile-3 做一个 join 操作,同样也须要将 deletefile-5 与 datafile-2、datafile-3 做一个 join 操作。

以 datafile-3 为例,deletefile 列表蕴含 deletefile-4 和 deletefile-5 两个文件,如何疾速找到对应的 deletefIle 列表呢,咱们能够依据下层的 manifest 来进行查问,当咱们将 manifest 文件拆分为 data manifest 和 delete manifest 后,能够将 M2(data manifest)与 M3、M4(delete manifest)先进行一次 join 操作,这样便能够疾速的失去 data file 所对应的 delete file 列表。

3.6 文件级别的并发

另一个问题是咱们须要保障足够高的并发读取,在 iceberg 中这点做得十分杰出。在 iceberg 中能够做到文件级别的并发读取,甚至文件中更细粒度的分段的并发读取,比方文件有 256MB,能够分为两个 128MB 进行并发读取。这里举例说明,假如 insert 文件跟 delete 文件在两个 Bucket 中的布局形式如下图所示。

咱们通过 manifest 比照发现,datafile-2 的 delete file 列表只有 deletefile-4,这样能够将这两个文件作为一个独自的 task(图示中 Task-2)进行执行,其余的文件也是相似,这样能够保障每个 task 数据较为平衡的进行 merge 操作。

对于这个计划咱们做了简略的总结,如下图所示。首先这个计划的长处能够满足正确性,并且能够实现高吞吐写入和并发高效的读取,另外能够实现 snapshot 级别的增量的拉取。

以后该计划还是比拟毛糙,上面也有一些能够优化的点。

  • 第一点,如果同一个 task 内的 delete file 有反复能够做缓存解决,这样能够进步 join 的效率。
  • 第二点,当 delete file 比拟大须要溢写到磁盘时能够应用 kv lib 来做优化,但这不依赖内部服务或其余沉重的索引。
  • 第三点,能够设计 Bloom filter(布隆过滤器)来过滤有效的 IO,因为对于 Flink 中罕用的 upsert 操作会产生一个 delete 操作和一个 insert 操作,这会导致在 iceberg 中 data file 和 delete file 大小相差不大,这样 join 的效率不会很高。如果采纳 Bloom Filter,当 upsert 数据到来时,拆分为 insert 和 delete 操作,如果通过 bloom filter 过滤掉那些之前没有 insert 过数据的 delete 操作(即如果这条数据之前没有插入过,则不须要将 delete 记录写入到 delete file 中),这将极大的进步 upsert 的效率。
  • 第四点,是须要一些后盾的 compaction 策略来管制 delete file 文件大小,当 delete file 越少,剖析的效率越高,当然这些策略并不会影响失常的读写。

3.7 增量文件集的 Transaction 提交

后面介绍了文件的写入,下图咱们介绍如何依照 iceberg 的语义进行写入并且供用户读取。次要分为数据和 metastore 两局部,首先会有 IcebergStreamWriter 进行数据的写入,但此时写入数据的元数据信息并没有写入到 metastore,因而对外不可见。第二个算子是 IcebergFileCommitter,该算子会将数据文件进行收集, 最终通过 commit transaction 来实现写入。

在 Iceberg 中并没有其余任何其余第三方服务的依赖,而 Hudi 在某些方面做了一些 service 的形象,如将 metastore 形象为独立的 Timeline,这可能会依赖一些独立的索引甚至是其余的内部服务来实现。

四、将来布局

上面是咱们将来的一些布局,首先是 Iceberg 内核的一些优化,包含计划中波及到的全链路稳定性测试及性能的优化,并提供一些 CDC 增量拉取的相干 Table API 接口。

在 Flink 集成上,会实现 CDC 数据的主动和手动合并数据文件的能力,并提供 Flink 增量拉取 CDC 数据的能力。

在其余生态集成上,咱们会对 Spark、Presto 等引擎进行集成,并借助 Alluxio 减速数据查问。

作者:阿里云实时计算 Flink
原文链接
本文为阿里云原创内容,未经容许不得转载

退出移动版