关于flink:Flink-Iceberg-对象存储构建数据湖方案

3次阅读

共计 7080 个字符,预计需要花费 18 分钟才能阅读完成。

简介:上海站 Flink Meetup 分享内容,如何基于 Flink、对象存储、Iceberg 来构建数据湖生态。
本文整顿自 Dell 科技团体高级软件研发经理孙伟在 4 月 17 日 上海站 Flink Meetup 分享的《Iceberg 和对象存储构建数据湖计划》,文章内容为:

数据湖和 Iceberg 简介
对象存储撑持 Iceberg 数据湖
演示计划
存储优化的一些思考

一、数据湖和 Iceberg 简介

1. 数据湖生态

如上图所示,对于一个成熟的数据湖生态而言:

  • 首先咱们认为它底下应具备海量存储的能力,常见的有对象存储,私有云存储以及 HDFS;
  • 在这之上,也须要反对丰盛的数据类型,包含非结构化的图像视频,半结构化的 CSV、XML、Log,以及结构化的数据库表;
  • 除此之外,须要高效对立的元数据管理,使得计算引擎能够不便地索引到各种类型数据来做剖析。
  • 最初,咱们须要反对丰盛的计算引擎,包含 Flink、Spark、Hive、Presto 等,从而不便对接企业中已有的一些利用架构。

2. 结构化数据在数据湖上的利用场景

上图为一个典型的数据湖上的利用场景。

数据源上可能会有各种数据,不同的数据源和不同格局。比如说事物数据,日志,埋点信息,IOT 等。这些数据通过一些流而后进入计算平台,这个时候它须要一个结构化的计划,把数据组织放到一个存储平台上,而后供后端的数据利用进行实时或者定时的查问。

这样的数据库计划它须要具备哪些特色呢?

  • 首先,能够看到数据源的类型很多,因而须要反对比拟丰盛的数据 Schema 的组织;
  • 其次,它在注入的过程中要撑持实时的数据查问,所以须要 ACID 的保障,确保不会读到一些还没写完的中间状态的脏数据;
  • 最初,例如日志这些有可能长期须要改个格局,或者加一列。相似这种状况,须要防止像传统的数仓一样,可能要把所有的数据从新提出来写一遍,从新注入到存储;而是须要一个轻量级的解决方案来达成需要。

Iceberg 数据库的定位就在于实现这样的性能,于上对接计算平台,于下对接存储平台。

3. 结构化数据在数据湖上的典型解决方案

对于数据结构化组织,典型的解决形式是用数据库传统的组织形式。

如上图所示,上方有命名空间,数据库表的隔离;两头有多个表,能够提供多种数据 Schema 的保留;底下会放数据,表格须要提供 ACID 的个性,也反对部分 Schema 的演进。

4. Iceberg 表数据组织架构

  • 快照 Metadata:表格 Schema、Partition、Partition spec、Manifest List 门路、以后快照等。
  • Manifest List:Manifest File 门路及其 Partition,数据文件统计信息。
  • Manifest File:Data File 门路及其每列数据高低边界。
  • Data File:理论表内容数据,以 Parque,ORC,Avro 等格局组织。

接下来具体看一下 Iceberg 是如何将数据组织起来的。如上图所示:

  • 能够看到左边从数据文件开始,数据文件寄存表内容数据,个别反对 Parquet、ORC、Avro 等格局;
  • 往上是 Manifest File,它会记录底下数据文件的门路以及每列数据的高低边界,不便过滤查问文件;
  • 再往上是 Manifest List,它来链接底下多个 Manifest File,同时记录 Manifest File 对应的分区范畴信息,也是为了不便后续做过滤查问;

Manifest List 其实曾经示意了快照的信息,它蕴含当下数据库表所有的数据链接,也是 Iceberg 可能反对 ACID 个性的要害保障。

有了快照,读数据的时候只能读到快照所能援用到的数据,还在写的数据不会被快照援用到,也就不会读到脏数据。多个快照会共享以前的数据文件,通过共享这些 Manifest File 来共享之前的数据。

  • 再往上是快照元数据,记录了以后或者历史上表格 Scheme 的变动、分区的配置、所有快照 Manifest File 门路、以及以后快照是哪一个。

同时,Iceberg 提供命名空间以及表格的形象,做残缺的数据组织治理。

5. Iceberg 写入流程

上方为 Iceberg 数据写入的流程图,这里用计算引擎 Flink 为例。

  • 首先,Data Workers 会从元数据上读出数据进行解析,而后把一条记录交给 Iceberg 存储;
  • 与常见的数据库一样,Iceberg 也会有预约义的分区,那些记录会写入到各个不同的分区,造成一些新的文件;
  • Flink 有个 CheckPoint 机制,文件达到当前,Flink 就会实现这一批文件的写入,而后生成这一批文件的清单,接着交给 Commit Worker;
  • Commit Worker 会读出以后快照的信息,而后与这一次生成的文件列表进行合并,生成一个新的 Manifest List 以及后续元数据的表文件的信息,之后进行提交,胜利当前就造成一个新的快照。

6. Iceberg 查问流程

上方为 Iceberg 数据查问流程。

  • 首先是 Flink Table scan worker 做一个 scan,scan 的时候能够像树一样,从根开始,找到以后的快照或者用户指定的一个历史快照,而后从快照中拿出以后快照的 Manifest List 文件,依据过后保留的一些信息,就能够过滤出满足这次查问条件的 Manifest File;
  • 再往下通过 Manifest File 里记录的信息,过滤出底下须要的 Data Files。这个文件拿进去当前,再交给 Recorder reader workers,它从文件中读出满足条件的 Recode,而后返回给下层调用。

这里能够看到一个特点,就是在整个数据的查问过程中没有用到任何 List,这是因为 Iceberg 残缺地把它记录好了,整个文件的树形构造不须要 List,都是间接单门路指向的,因而查问性能上没有耗时 List 操作,这点对于对象存储比拟敌对,因为对象存储在 List 下面是一个比拟耗资源的操作。

7. Iceberg Catalog 性能一览

Iceberg 提供 Catalog 用良好的形象来对接数据存储和元数据管理。任何一个存储,只有实现 Iceberg 的 Catalog 形象,就有机会跟 Iceberg 对接,用来组织接入下面的数据湖计划。

如上图所示,Catalog 次要提供几方面的形象。

  • 它能够对 Iceberg 定义一系列角色文件;
  • 它的 File IO 都是能够定制,包含读写和删除;
  • 它的命名空间和表的操作 (也可称为元数据操作),也能够定制;
  • 包含表的读取 / 扫描,表的提交,都能够用 Catalog 来定制。

这样能够提供灵便的操作空间,不便对接各种底下的存储。

二、对象存储撑持 Iceberg 数据湖

1. 以后 Iceberg Catalog 实现

目前社区外面曾经有的 Iceberg Catalog 实现可分为两个局部,一是数据 IO 局部,二是元数据管理局部。

如上图所示,其实短少面向公有对象存储的 Catalog 实现,S3A 实践上能够接对象存储,但它用的是文件系统语义,不是人造的对象存储语义,模仿这些文件操作会有额定的开销,而咱们想实现的是把数据和元数据管理全副都交给一个对象存储,而不是拆散的设计。

2. 对象存储和 HDFS 的比拟

这里存在一个问题,在有 HDFS 的状况下,为什么还要用对象存储?

如下所示,咱们从各个角度将对象存储和 HDFS 进行比照。

总结下来,咱们认为:

  • 对象存储在集群扩展性,小文件敌对,多站点部署和低存储开销上更加有劣势;
  • HDFS 的益处就是提供追加上传和原子性 rename,这两个劣势正是 Iceberg 须要的。

上面对两个存储各自的劣势进行简略论述。

1)比拟之:集群扩展性

  • HDFS 架构是用单个 Name Node 保留所有元数据,这就决定了它单节点的能力无限,所以在元数据方面没有横向扩大能力。
  • 对象存储个别采纳哈希形式,把元数据分隔成各个块,把这个块交给不同 Node 下面的服务来进行治理,人造地它元数据的下限会更高,甚至在极其状况下能够进行 rehash,把这个块切得更细,交给更多的 Node 来治理元数据,达到扩大能力。

2)比拟之:小文件敌对

现在在大数据利用中,小文件越来越常见,并逐步成为一个痛点。

  • HDFS 基于架构的限度,小文件存储受限于 Name Node 内存等资源,尽管 HDFS 提供了 Archive 的办法来合并小文件,缩小对 Name Node 的压力,但这须要额定减少复杂度,不是原生的。
  • 同样,小文件的 TPS 也是受限于 Name Node 的解决能力,因为它只有单个 Name Node。对象存储的元数据是分布式存储和治理,流量能够很好地散布到各个 Node 上,这样单节点就能够存储海量的小文件。
  • 目前,很多对象存储提供多介质,分层减速,能够晋升小文件的性能。

3)比拟之:多站点部署

对象存储反对多站点部署

  • 全局命名空间
  • 反对丰盛的规定配置
  • 对象存储的多站点部署能力实用于两地三核心多活的架构,而 HDFS 没有原生的多站点部署能力。尽管目前看到一些商业版本给 HDFS 减少了多站点负责数据的能力,但因为它的两个零碎可能是独立的,因而并不能撑持真正的全局命名空间下多活的能力。

4)比拟之:低存储开销

对于存储系统来说,为了适应随机的硬件故障,它个别会有正本机制来爱护数据。

  • 常见的如三正本,把数据存三份,而后离开保留到三个 Node 下面,存储开销是三倍,然而它能够同时容忍两个正本遇到故障,保证数据不会失落。
  • 另一种是 Erasure Coding,通常称为 EC。以 10+2 举例,它把数据切成 10 个数据块,而后用算法算出两个代码块,一共 12 个块。接着散布到四个节点上,存储开销是 1.2 倍。它同样能够容忍同时呈现两个块故障,这种状况能够用残余的 10 个块算出所有的数据,这样缩小存储开销,同时达到故障容忍水平。
  • HDFS 默认应用三正本机制,新的 HDFS 版本上曾经反对 EC 的能力。通过钻研,它是基于文件做 EC,所以它对小文件有人造的劣势。因为如果小文件的大小小于分块要求的大小时,它的开销就会比原定的开销更大,因为两个代码块这边是不能省的。在极其状况下,如果它的大小等同于单个代码块的大小,它就曾经等同于三正本了。
  • 同时,HDFS 一旦 EC,就不能再反对 append、hflush、hsync 等操作,这会极大地影响 EC 可能应用的场景。对象存储原生反对 EC,对于小文件的话,它外部会把小文件合并成一个大的块来做 EC,这样确保数据开销方面始终是恒定的,基于事后配置的策略。

3. 对象存储的挑战:数据的追加上传

在 S3 协定中,对象在上传时须要提供大小。

以 S3 规范为例,对象存储跟 Iceberg 对接时,S3 规范对象存储不反对数据追加上传的接口,协定要求上传文件时提供文件大小。所以在这种状况下,对于这种流式的 File IO 传入,其实不太敌对。

1)解决方案一:S3 Catalog 数据追加上传 – 小文件缓存本地 / 内存

对于一些小文件,流式传入的时候就写入到本地缓存 / 内存,等它齐全写完后,再把它上传到对象存储里。

2)解决办法二:S3 Catalog 数据追加上传 – MPU 分段上传大文件

对于大文件,会用到 S3 规范定义的 MPU 分段上传。

它个别分为几个步骤:

  • 第一步先创立初始化的 MPU,拿到一个 Upload ID,而后给每一个分段赋予一个 Upload ID 以及一个编号,这些分块就能够并行上传;
  • 在上传实现当前,还须要一步 Complete 操作,这样相当于告诉零碎,它会把基于同一个 Upload ID 以及所有的编号,从小到大排起来,组成一个大文件;
  • 把机制使用到数据追加上传场景,惯例实现就是写入一个文件,把文件缓存到本地,当达到分块要求大小时,就能够把它进行初始化 MPU,把它的一个分块开始上传。前面每一个分块也是一样的操作,直到最初一个分块上传完,最初再调用一个实现操作来实现上传。

MPU 有长处也有毛病:

  • 毛病是 MPU 的分片数量有下限,S3 规范里可能只有 1 万个分片。想反对大文件的话,这个分块就不能太小,所以对于小于分块的文件,仍然是要利用后面一种办法进行缓存上传;
  • MPU 的长处在于并行上传的能力。假如做一个异步的上传,文件在缓存达到当前,不必等上一个分块上传胜利,就能够持续缓存下一个,之后开始上传。当后面注入的速度足够快时,后端的异步提交就变成了并行操作。利用这个机制,它能够提供比单条流上传速度更快的上传能力。

4. 对象存储的挑战:原子提交

下一个问题是对象存储的原子提交问题。

后面提到在数据注入的过程中,最初的提交其实分为几步,是一个线性事务。首先它要读到以后的快照版本,而后把这一次的文件清单合并,接着提交本人新的版本。这个操作相似于咱们编程里常见的“i=i+1”,它不是一个原子操作,对象存储的规范里也没有提供这个能力。

上图是并发提交元信息的场景。

  • 这里 Commit Worker 1 拿到了 v006 版本,而后合并本人的文件,提交 v007 胜利。
  • 此时还有另一个 Commit Worker 2,它也拿到了 v006,而后合并进去,且也要提供 v007。此时咱们须要一个机制通知它 v007 曾经抵触,不能上传,而后让它本人去 Retry。Retry 当前取出新的 v007 合并,而后提交给 v008。

这是一个典型的抵触场景,这里须要一套机制,因为如果它不能检测到本人是一个抵触的状况的话,再提交 v007 会把下面 v007 笼罩,会导致上一次提交的所有数据都失落。

如上图所示,咱们能够应用一个分布式锁的机制来解决上述问题。

  • 首先,Commit Worker 1 拿到 v006,而后合并文件,在提交之前先要获取这一把锁,拿到锁当前判断以后快照版本。如果是 v006,则 v007 能提交胜利,提交胜利当前再解锁。
  • 同样,Commit Worker 2 拿到 v006 合并当前,它一开始拿不到锁,要等 Commit Worker 1 开释掉这个锁当前能力拿到。等拿到锁再去查看的时候,会发现以后版本曾经是 v007,与本人的 v007 有抵触,因而这个操作肯定会失败,而后它就会进行 Retry。

这是通过锁来解决并发提交的问题。

5. Dell EMC ECS 的数据追加上传

基于 S3 规范的对象存储和 Iceberg 问题的解决方案存在一些问题,例如性能损失,或者须要额定部署锁服务等。

Dell EMC ECS 也是个对象存储,基于这个问题有不一样的解答,它基于 S3 的标准协议有一些扩大,能够反对数据的追加上传。

它的追加上传与 MPU 不同的中央在于,它没有分块大小的限度。分块能够设置得比拟小一点,上传后外部就会串联起来,仍然是一个无效的文件。

追加上传和 MPU 这两者能够在肯定水平上适应不同的场景。

MPU 有减速上传能力,追加上传在速度在不是很快的状况下,性能也是足够用,而且它没有 MPU 的初始化和合并的操作,所以两者在性能上可能适应不同场景进行应用。

6. Dell EMC ECS 在并发提交下的解决方案

ECS 对象存储还提供了一个 If-Match 的语义,在微软的云存储以及谷歌的云存储上都有这样一个接口能力。

  • If-Match 就是说在 Commit Worker 1 提交拿到 v006 的时候,同时拿到了文件的 eTag。提交的时候会带上 eTag,零碎须要判断要覆盖文件的 eTag 跟以后这个文件实在 eTag 是否雷同,如果雷同就容许这次笼罩操作,那么 v007 就能提交胜利;
  • 另一种状况,是 Commit Worker 2 也拿到了 v006 的 eTag,而后上传的时候发现拿到 eTag 跟以后零碎里文件不同,则会返回失败,而后触发 Retry。

这个实现是和锁机制一样的成果,不须要内部再重新部署锁服务来保障原子提交的问题。

7. S3 Catalog – 对立存储的数据

回顾一下,上方咱们解决了文件 IO 中上传数据 IO 的问题,和解决了元数据表格的原子提交问题。

解决这些问题当前,就能够把数据以及元数据的治理全副都交到对象存储,不再须要额定部署元数据服务,做到真正对立数据存储的概念。

三、演示计划

如上所示,演示计划用到了 Pravega,能够简略了解为 Kafka 的一个代替,然而对它进行了性能优化。

在这个例子中,咱们会把数据注入 Pravega 的流里,而后 Flink 会从 Pravega 中读出数据进行解析,而后存入 Iceberg 组织。Iceberg 利用 ECS Catalog,间接对接对象存储,这外面没有任何其余部署,最初用 Flink 读出这个数据。

四、存储优化的一些思考

上图为以后 Iceberg 反对的数据组织构造,能够看到它间接 Parquet 文件存在存储外面。

咱们的想法是如果这个湖跟元数据的湖其实是一个湖,有没有可能生成的 Parquet 文件跟源文件存在很大的数据冗余度,是否能够缩小冗余信息的存储。

比方最极其的状况,源文件的一个信息记录在 Iceberg 中,就不存这个 Parquet 数据文件。当要查问的时候,通过定制 File IO,让它依据原文件在内存中实时生成一个相似于 Parquet 的格局,提交给下层利用查问,就能够达到一样的成果。

然而这种形式,局限于对存储的老本有很高的要求,然而对查问的性能要求却不高的状况。可能实现这个也要基于 Iceberg 好的形象,因为它的文件元数据和 File IO 都是形象进去的,能够把源文件拆进去,让它认为这是一个 Parquet 文件。

进一步思考,是否优化查问性能,同时节俭存储空间。

比方预计算一下,把源文件某些罕用的列拿进去,而后统计信息到 Iceberg 中,在读的时候利用源文件和云计算的文件,能够很快查问到信息,同时又节俭了不罕用的数据列存储空间。

这是比拟初步的想法,如果可能实现,则用 Iceberg 不仅能够索引结构化的 Parquet 文件格式,甚至能够索引一些半结构化、结构化的数据,通过长期的计算来解决下层的查问工作,变成一个更残缺的 Data Catalog。

原文链接
本文为阿里云原创内容,未经容许不得转载。

正文完
 0