关于flink:网易Flink-Iceberg-数据湖探索与实践

8次阅读

共计 5120 个字符,预计需要花费 13 分钟才能阅读完成。

01 数据仓库平台建设的痛点

痛点一:

咱们凌晨一些大的离线工作常常会因为一些起因呈现提早,这种提早会导致外围报表的产出工夫不稳固,有些时候会产出比拟早,然而有时候就可能会产出比拟晚,业务很难承受。

为什么会呈现这种景象的产生呢?目前来看大抵有这么几点因素:

  • 工作自身要申请的数据量会特地大。通常来说一天原始的数据量可能在几十 TB。几百个分区,甚至上千个分区,五万 + 的文件数这样子。如果说全量读取这些文件的话,几百个分区就会向 NameNode 发送几百次申请,咱们晓得离线工作在凌晨运行的时候,NameNode 的压力是十分大的。所以就很有可能呈现 Namenode 响应很慢的状况,如果申请响应很慢就会导致工作初始化工夫很长。
  • 工作自身的 ETL 效率是绝对低效的,这个低效并不是说 Spark 引擎低效,而是说咱们的存储在这块反对的不是特地的好。比方目前咱们查一个分区的话是须要将所有文件都扫描一遍而后进行剖析,而实际上我可能只对某些文件感兴趣。所以相对而言这个计划自身来说就是绝对低效的。
  • 这种大的离线工作一旦遇到磁盘坏盘或者机器宕机,就须要重试,重试一次须要消耗很长的工夫比方几十分钟。如果说重试一两次的话这个提早就会比拟大了。

痛点二:

针对一些细琐的一些问题而言的。这里简略列举了三个场景来剖析:

  • 不牢靠的更新操作。咱们常常在 ETL 过程中执行一些 insert overwrite 之类的操作,这类操作会先把相应分区的数据删除,再把生成的文件加载到分区中去。在咱们移除文件的时候,很多正在读取这些文件的工作就会产生异样,这就是不牢靠的更新操作。
  • 表 Schema 变更低效。目前咱们在对表做一些加字段、更改分区的操作其实是十分低效的操作,咱们须要把所有的原始数据读出来,而后在从新写回去。这样就会十分耗时,并且低效。
  • 数据可靠性不足保障。次要是咱们对于分区的操作,咱们会把分区的信息分为两个中央,HDFS 和 Metastore,别离存储一份。在这种状况下,如果进行更新操作,就可能会呈现一个更新胜利而另一个更新失败,会导致数据不牢靠。

痛点三:

基于 Lambda 架构建设的实时数仓存在较多的问题。如上图的这个架构图,第一条链路是基于 kafka 直达的一条实时链路(提早要求小于 5 分钟),另一条是离线链路(提早大于 1 小时),甚至有些公司会有第三条准实时链路(提早要求 5 分钟~一小时),甚至更简单的场景。

  • 两条链路对应两份数据,很多时候实时链路的处理结果和离线链路的处理结果对不上。
  • Kafka 无奈存储海量数据,无奈基于以后的 OLAP 剖析引擎高效查问 Kafka 中的数据。
  • Lambda 保护老本高。代码、数据血统、Schema 等都须要两套。运维、监控等老本都十分高。

痛点四:

不能敌对地反对高效更新场景。大数据的更新场景个别有两种,一种是 CDC (Change Data Capture) 的更新,尤其在电商的场景下,将 binlog 中的更新删除同步到 HDFS 上。另一种是提早数据带来的聚合后后果的更新。目前 HDFS 只反对追加写,不反对更新。因而业界很多公司引入了 Kudu。然而 Kudu 自身是有一些局限的,比方计算存储没有做到拆散。这样整个数仓零碎中引入了 HDFS、Kafka 以及 Kudu,运维老本不堪称不大。

下面就是针对目前数仓所波及到的四个痛点的大抵介绍,因而咱们也是通过对数据湖的调研和实际,心愿能在这四个方面对数仓建设有所帮忙。接下来重点解说下对数据湖的一些思考。

02 数据湖 Iceberg 外围原理

1. 数据湖开源产品调研

数据湖大抵是从 19 年开始缓缓火起来的,目前市面上外围的数据湖开源产品大抵有这么几个:

  • DELTA LAKE,在 17 年的时候 DataBricks 就做了 DELTA LAKE 的商业版。次要想解决的也是基于 Lambda 架构带来的存储问题,它的初衷是心愿通过一种存储来把 Lambda 架构做成 kappa 架构。
  • Hudi (Uber 开源) 能够反对疾速的更新以及增量的拉取操作。这是它最大的卖点之一。
  • Iceberg 的初衷是想做规范的 Table Format 以及高效的 ETL。

上图是来自 Flink 个人针对数据湖计划的一些调研比照,总体来看这些计划的根底性能绝对都还是比较完善的。我说的根底性能次要包含:

  • 高效 Table Schema 的变更,比方针对增减分区,增减字段等性能
  • ACID 语义保障
  • 同时反对流批读写,不会呈现脏读等景象
  • 反对 OSS 这类便宜存储

2. 当然还有一些不同点:

Hudi 的个性次要是反对疾速的更新删除和增量拉取。
Iceberg 的个性次要是代码形象水平高,不绑定任何的 Engine。它裸露进去了十分外围的表层面的接口,能够十分不便的与 Spark/Flink 对接。然而 Delta 和 Hudi 基本上和 Spark 的耦合很重。如果想接入 Flink,绝对比拟难。

3. 咱们抉择 Iceberg 的起因:

当初国内的实时数仓建设围绕 Flink 的状况会多一点。所以可能基于 Flink 扩大生态,是咱们抉择 Iceberg 一个比拟重要的点。
国内也有很多基于 Iceberg 开发的重要力量,比方腾讯团队、Flink 官网团队,他们的数据湖选型也是 Iceberg。目前他们在社区别离主导 update 以及 Flink 的生态对接。

4. 接下来咱们重点介绍一下 Iceberg:

这是来自官网对于 Iceberg 的一段介绍,大抵就是 Iceberg 是一个开源的基于表格局的数据湖。对于 table format 再给大家具体介绍下:

左侧图是一个形象的数据处理系统,别离由 SQL 引擎、table format、文件汇合以及分布式文件系统形成。右侧是对应的事实中的组件,SQL 引擎比方 HiveServer、Impala、Spark 等等,table format 比方 Metastore 或者 Iceberg,文件汇合次要有 Parquet 文件等,而分布式文件系统就是 HDFS。

对于 table format,我认为次要蕴含 4 个层面的含意,别离是表 schema 定义(是否反对简单数据类型),表中文件的组织模式,表相干统计信息、表索引信息以及表的读写 API 实现。详述如下:

  • 表 schema 定义了一个表反对字段类型,比方 int、string、long 以及简单数据类型等。
  • 表中文件组织模式最典型的是 Partition 模式,是 Range Partition 还是 Hash Partition。
  • Metadata 数据统计信息。
  • 封装了表的读写 API。下层引擎通过对应的 API 读取或者写入表中的数据。

和 Iceberg 差不多相当的一个组件是 Metastore。不过 Metastore 是一个服务,而 Iceberg 就是一个 jar 包。这里就 Metastore 和 Iceberg 在表格局的 4 个方面别离进行一下比照介绍:

① 在 schema 层面上没有任何区别:

都反对 int、string、bigint 等类型。

② partition 实现齐全不同:

两者在 partition 上有很大的不同:

metastore 中 partition 字段不能是表字段,因为 partition 字段实质上是一个目录构造,不是用户表中的一列数据。基于 metastore,用户想定位到一个 partition 下的所有数据,首先须要在 metastore 中定位出该 partition 对应的所在目录地位信息,而后再到 HDFS 上执行 list 命令获取到这个分区下的所有文件,对这些文件进行扫描失去这个 partition 下的所有数据。

Iceberg 中 partition 字段就是表中的一个字段。Iceberg 中每一张表都有一个对应的文件元数据表,文件元数据表中每条记录示意一个文件的相干信息,这些信息中有一个字段是 partition 字段,示意这个文件所在的 partition。

很显著,Iceberg 表依据 partition 定位文件相比 metastore 少了一个步骤,就是依据目录信息去 HDFS 上执行 list 命令获取分区下的文件。

试想,对于一个二级分区的大表来说,一级分区是小时工夫分区,二级分区是一个枚举字段分区,如果每个一级分区下有 30 个二级分区,那么这个表每天就会有 24 * 30 = 720 个分区。基于 Metastore 的 partition 计划,如果一个 SQL 想基于这个表扫描昨天一天的数据的话,就须要向 Namenode 下发 720 次 list 申请,如果扫描一周数据或者一个月数据,申请数就更是相当夸大。这样,一方面会导致 Namenode 压力很大,一方面也会导致 SQL 申请响应提早很大。而基于 Iceberg 的 partition 计划,就齐全没有这个问题。

③ 表统计信息实现粒度不同:

Metastore 中一张表的统计信息是表 / 分区级别粒度的统计信息,比方记录一张表中某一列的记录数量、均匀长度、为 null 的记录数量、最大值最小值等。

Iceberg 中统计信息准确到文件粒度,即每个数据文件都会记录所有列的记录数量、均匀长度、最大值最小值等。

很显著,文件粒度的统计信息对于查问中谓词(即 where 条件)的过滤会更有成果。

④ 读写 API 实现不同:

metastore 模式下下层引擎写好一批文件,调用 metastore 的 add partition 接口将这些文件增加到某个分区下。

Iceberg 模式下下层业务写好一批文件,调用 iceberg 的 commit 接口提交本次写入造成一个新的 snapshot 快照。这种提交形式保障了表的 ACID 语义。同时基于 snapshot 快照提交能够实现增量拉取实现。

总结下 Iceberg 绝对于 Metastore 的劣势:

  • 新 partition 模式:防止了查问时 n 次调用 namenode 的 list 办法,升高 namenode 压力,晋升查问性能
  • 新 metadata 模式:文件级别列统计信息能够用来依据 where 字段进行文件过滤,很多场景下能够大大减少扫描文件数,晋升查问性能
  • 新 API 模式:存储批流一体
  • 流式写入 - 增量拉取(基于 Iceberg 对立存储模式能够同时满足业务批量读取以及增量订阅需要)
  • 反对批流同时读写同一张表,对立表 schema,工作执行过程中不会呈现 FileNotFoundException

Iceberg 的晋升体现在:

03 数据湖 Iceberg 社区现状

目前 Iceberg 次要反对的计算引擎包含 Spark 2.4.5、Spark 3.x、Flink 1.11 以及 Presto。同时,一些运维工作比方 snapshot 过期、小文件合并、增量订阅生产等性能都能够实现。

对于 Apache Flink 来说,Apache Iceberg 是 delta、iceberg、hudi 三个开源我的项目中最先实现 Flink 接入的开源我的项目。通过 Flink 来实现实时导入数据到 Iceberg 数据湖、通过 Flink batch 作业来读取 Iceberg 数据,这两个外围性能将在 Apache Iceberg 0.10.0 版本公布(预计将在 10 月底公布)。对 Flink+iceberg 集成工作感兴趣的同学,能够参考 Apache Iceberg 社区的应用文档。

https://github.com/apache/iceberg/blob/master/site/docs/flink.md

依照目前的研发进度,咱们预计实时写入和读取 CDC 数据这个性能,将在 Iceberg 的 0.11.0 版本公布。

04 网易数据湖 Iceberg 实际之路

Iceberg 针对目前的大数量的状况下,能够大大晋升 ETL 工作执行的效率,这次要得益于新 Partition 模式下不再须要申请 NameNode 分区信息,同时得益于文件级别统计信息模式下能够过滤很多不满足条件的数据文件。

以后 Iceberg 社区仅反对 Spark 2.4.5,咱们在这个根底上做了更多计算引擎的适配工作。次要包含如下:

  • 集成 Hive。能够通过 Hive 创立和删除 iceberg 表,通过 HiveSQL 查问 Iceberg 表中的数据。
  • 集成 Impala。用户能够通过 Impala 新建 iceberg 内表表面,并通过 Impala 查问 Iceberg 表中的数据。目前该性能曾经奉献给 Impala 社区。
  • 集成 Flink。曾经实现了 Flink 到 Iceberg 的 sink 实现,业务能够生产 kafka 中的数据将后果写入到 Iceberg 中。同时咱们基于 Flink 引擎实现了小文件异步合并的性能,这样能够实现 Flink 一边写数据文件,一边执行小文件的合并。基于 Iceberg 的小文件合并通过 commit 的形式提交,不须要删除合并前的小文件,也就不会引起读取工作的任何异样。

作者介绍:

范欣欣,网易大数据技术专家。他与 Apache HBase PMC 成员、阿里巴巴技术专家胡争合著的新书《HBase 原理与实际》,这也是业界第一本专门论述 HBase 原理的书。

正文完
 0