关于后端:SmartNews-基于-Flink-的-Iceberg-实时数据湖实践

5次阅读

共计 6166 个字符,预计需要花费 16 分钟才能阅读完成。

摘要:本文整顿自 SmartNews 数据平台架构师 Apache Iceberg Contributor 戢清雨,在 Flink Forward Asia 2022 实时湖仓专场的分享。本篇内容次要分为五个局部:

  1. SmartNews 数据湖介绍
  2. 基于 Icebergv1 格局的数据湖实际
  3. 基于 Flink 实时更新的数据湖(Iceberg v2)解决方案
  4. 实时更新小文件问题的优化
  5. 总结与瞻望

点击查看原文视频 & 演讲 PPT

一、SmartNews 数据湖介绍

2012 年,SmartNews 公司在日本东京成立。始终专一于 PGC 新闻,是一款在日本处于领先地位的新闻 APP。目前,服务的客户次要集中在日本、欧美等国家。SmartNews 公司在日本、美国和中国均设有办公室,在 2019 年入驻北京和上海。

SmartNews 数据湖次要存储所有广告数据,包含从服务器端收集到的点击 / 转化等事件信息,维表信息。其次要的广告信息都存储在 Kafka 上,服务器端在收集到事件后,会间接实时写入 Kafka。其余的维表信息,比方广告信息、统计信息等等,次要存储在 MySQL 或 Hive 中。这些信息个别以实时或小时级别更新。

数据湖的上游是业务端的 ETL 或实时报表数据,是上游数据的对立入口。因而,咱们尽量把所有维度都放进来,做成一个大宽表,供上游实时查问应用。

接下来介绍下数据湖须要解决的技术挑战。

  • 第一,依照广告主键去重。上游数据依照每条广告的事件,进行收集。比方一条广告的点击或者转化会生成多条记录,因而咱们须要将这些事件打平。其次是上游的 Kafka 数据,可能蕴含了肯定水平的反复数据。
  • 第二,须要更新点击 / 转化工夫戳字段。比方事件的工夫戳,须要计算最新一次的工夫,须要对数据湖执行更新操作。
  • 第三,上游近实时读取。要求数据湖具备同时写入 / 读取的操作。而 Hive 在重写数据的过程中是会影响到上游正在产生的查问,这就要求咱们须要一个新的解决方案。

二、基于 Icebergv1 格局的数据湖实际

上图是咱们第一个解决方案的整体架构。在这个解决方案中,咱们采纳了 Spark 计算引擎,把所有的广告事件依照主键进行打平并去重。而后,所有的维表进行查问 join。

除此之外,咱们将数据源切换成 S3 文件,没有用流式数据源。其次要起因如下:

  • 第一,这个计划是一个小时级别的解决方案,并不需要实时读取流式数据。
  • 第二,咱们在设计 Spark 工作时,会定义一个最小的执行单元。将指标数据源限度在某一天的某一时间。通过 S3 文件的分区信息,就能够间接进行读取。
  • 第三,为了升高肯定的容错概率。目前,咱们的业务须要回滚过来四天的数据。比方有一个比拟大的 Spark 工作须要重写,如果 Spark 工作失败,会导致整个工作失败。如果设计为最小执行单元,每个 Spark 工作只解决某个小时的数据,容错几率会大幅晋升。

为了防止一些反复计算,咱们也会去检测以后小时是否比上次 Spark 工作启动的时候有新增加文件,通过 airflow 来管制 Spark 工作的启动与重试。

每一个独立的 Spark 工作都会去尝试 overwrite 某一个小时的数据到 Iceberg 中。滚动刷新过来 4 天 /96 个小时的数据 – 这也是这个解决方案的一个限度,其理论场景中这个刷新的窗口理论值是 30 天,然而思考到老本等因素,这个计划只回刷过来 96 个小时。与此同时,上游也会通过一些查问引擎来对这个数据湖数据进行实时查问。

在这个解决方案中,解决了之前咱们提到的一些挑战。比方在 Spark 作业中,依照主键进行去重,并且更新相应的工夫戳。通过 Iceberg 解决方案,岂但隔离上下游的读写,而且提供了小时级别的更新。

但这个计划也有很多有余。比方占用 Infra 资源太多,计算资源的节约。通过计算发现,须要更新的行只占总体的 1%~2% 左右。除此之外,还有存储资源节约的问题。Spark 每次从 overwrite 提交到 Iceberg 的过程中,都须要重写整个数据。对于并行提交到 Iceberg 的锁问题,每个最小的 Spark 执行单元,会同时执行提交 Iceberg 操作。在向 Iceberg 提交的过程中,会先从 Hive 里拿一个锁,导致大家对锁存在竞争,造成了资源的节约。

三、基于 Flink 实时更新的数据湖 (Iceberg v2)解决方案

咱们通过充沛的调研之后,决定采纳 Flink+Iceberg V2 的形式,进行实时更新。这个解决方案利用了 Iceberg V2 反对行级别更新,其次是 Flink 的实时写入。因为 Flink 在写入 Iceberg 的过程中,应用了 Merge On Read。所以 Flink 只会写入须要更新的数据。

因为咱们只有 1% 的数据量须要更新,所以 Merge On Read 模式非常适合以后的业务场景。

除此之外,咱们心愿通过 MySQL CDC 的流式解决方案,解决 dimension join 维表查问,能够更快、更精确的将维表信息写入数据库。

在这个新的解决方案中,咱们将上游的数据源都流式化,发送到 Kafka 中。与此同时,将 MySQL 的维表信息通过 CDC 的形式,输出到 Flink 工作里。Flink 再将这些维表信息通过 broadcast 到 State 中,供上游查问。

Flink 在通过 Iceberg Sink 的 Upsert Mode 来将数据实时写入到数据湖中。offline 的话,咱们再通过 airflow 来定时启动一些 Spark 工作来做数据文件的合并,次要是为了解决小文件的问题,咱们在前面的章节也会有具体介绍。

比照上述两种不同的解决方案,能够看出以下区别。首先,Spark + Iceberg v1 的写入形式是 overwrite。每次会将所有的数据集从新计算,而后从新放到数据湖中。Flink + Iceberg v2 的写入形式是 Upsert,只是将更新的数据写入到数据湖中。

从输入文件数量的角度来讲,Spark + Iceberg v1 的文件大小可控,数量可控。因为每次输出的都是这个小时的全量数据,能够依照需要来管制文件大小,管制文件数量。Flink + Iceberg v2 会产生大量的小文件,带来微小的挑战。

从计算形式的角度来讲,Spark 须要全副从新计算。Flink + Iceberg v2 仅须要计算更新的数据。

从时效性的角度来讲,Spark + Iceberg v1 提供的是小时级别的解决方案。Flink + Iceberg v2 提供的是分钟级别的解决方案,给上游查问 ETL 带来了极大劣势。

四、实时更新小文件问题的优化

方才提到的实时小文件问题,会在很大水平上影响上游查问工作的性能。接下来,着重介绍一下咱们如何解决小文件问题的。首先,介绍一下 Iceberg Sink 的写入模式。因为存在更新数据的状况,所以抉择应用 Upsert Mode。

在每次写入数据的过程中,会生成两条 Record 数据,即 Delete 和 Insert。在肯定水平上,这种形式造成了存储空间的节约。上游 Writer 算子会有 CPU 压力,它须要解决的数据量更多,须要写入的数据更多。

通过引入 Flink State 的形式,在肯定水平上解决了 Upsert 写入多行的问题。首先,依照广告主键进行 KeyBy Stream。如果以后主键不在 Flink State 中,这条数据是第一次写入,会向上游输入一条 RowKind INSERT 数据,表明这是一条全新的数据。

如果该数据主键曾经存在于 Flink State 中,会向上游输入两条记录。一条是 UPDATE_BEFORE,另一条是 UPDATE_AFTER。在这一环节,会更加具体的查看是否须要输入,比方是否有工夫戳的更新,是否有维表信息更新等等。

通过这些操作,能够在肯定水平上,缩小一部分的小文件。但在理论状况下,咱们发现该办法仍有有余,仍然会有很多的小文件生成。基于 Iceberg Flink Sink 原理,大量的小文件通过 IcebergStreamWriter 生成的。

Iceberg 反对两种不同的 Distribution 模式,将数据从上游的 input stream,传输到上游的 Writer 算子中。第一种是 Equality Field KeySelector,行将 RowData 的 equality filed 进行 hash。第二种是 PartitionKeySelector,行将 RowData 的 parition field 进行 hash。

这两种形式有什么区别呢?Equality Field KeySelector 从语义上能够了解为将 RowData 以主键 hash 的形式传输到上游,这样能够最大化应用上游 Writer 算子的写出速度。而 partitionKeySelector,能够将具备雷同 Parition 的 RowData 输入到同一个 Writer,确保同一个 Partition 的数据都是通过同一个 Writer 写出。

StreamWriter 负责将所有收到的数据输入到 DFS,比方 S3 上,这里会依据表上是否带有 Partition 信息来辨别到底是输入到同一个文件还是多个文件。

在咱们这个用例中,数据湖是依照 Partition 来进行物理分区,即同一个小时的数据只会存在同一个门路上面,而同一个数据文件不能蕴含多个 Partition 的数据。上游的 Writer 在收到数据当前,就会依照 Partition 的信息来写出文件。

所有的 Writer 在 Checkpoint 阶段会将写出去的文件统计信息发送到最初的 Committer 算子。Commit 算子再将所有的批改提交到 Iceberg 中。

Equality Field KeySelector 是依照 Record 主键,Shuffle 到上游 Writer 中。

在同一个 Partition 门路上面,会有多个 Writer 同时写入。次要起因就是上游 Writer 接管到的 RowData 是依照主键来进行 hash Shuffle 的,所以每个 Writer 算子都有可能接管到同一个 Partition 下的数据。

假如 Checkpoint 的距离为 20 分钟,应用 10 个 Writer 去写文件。实践上,每个小时能够写出 90 个小文件,是十分的典型的长尾型数据分布。由此可见,越凑近以后小时,须要解决的数据量越大的。如果间隔以后小时越远,须要解决的数量十分小。对于这些 Partition 来说,它们须要生成的文件数量根本恒定。

PartitionKeySelector 依照 Record 的 Partition 信息,Shuffle 到上游 Writer。在同一个 Partition 门路下,只有 1 个 Writer 写入。

假如 Checkpoint 的距离为 20 分钟,应用 1 个 Writer 去写文件。越凑近最新工夫,它的反压越重大,导致整个 Flink 作业提早。因为越是凑近以后小时,须要解决的数量级越大。越远离以后小时,须要解决的数据量是越小。

Equality Field KeySelector 的劣势是高效,但问题在于小文件特地多。尤其在长尾末端,均匀都是几十 kb 的小文件。PartitionKeySelector 的劣势在于,小文件数量少,对于数据量较大的 Partition,会造成很大的反压。

为了解决上述问题,咱们引入了 Dynamic Shuffle Operator 算子。它能够依照不同的 Partition,抉择不同的 KeySelector。

比方最近的 Partition 数据量特地大,Dynamic Shuffle Operator 会抉择应用 Equality Field KeySelector。面对长尾的 Partition,Dynamic Shuffle Operator 会抉择 PartitionKeySelector。该计划既保证了大批量的 Partition 数据,能够及时输入到文件中,也缩小了在长尾末端生成的小文件。

在这个解决方案中,通过引入 Dynamic Shuffle Operator,在数据输出到 Writer 前,先通过 Dynamic Shuffle Operator 进行一次物理 Partition,即物理分区。

而 Partition 策略会依照 Shuffle Operator 过来解决的统计信息,进行动静编排。如上图所示,首先通过引入 Shuffle Coordinator 解决不同 Shuffle Subtask 之间的信息通信问题。

其次咱们须要确保的是不同的 Subtask 在输入文件的时候依照同一个 ShuffleStrategy 来进行输入,因为 Iceberg 在解决 Delete 文件时,须要同一个主键的 RowData 在雷同的 Writer 输入,比方咱们现有一条 insert,再来一条 Update,如果这两个 RowData 是依照不同的 ShuffleStrategy 来进行 Shuffle,很有可能这两个数据会 Shuffle 到不同的 Writer 算子,这样会导致反复数据的产生。

除此之外,Shuffle Operator 负责将曾经解决的统计信息发送给 Coordinator。比方各个 Partition 解决的数据量。

其目标是,Coordinator 在收集到 Shuffle Operator 的统计信息之后,能够依照历史信息动静的判断出,最新的 Partition 须要什么样的 Strategy。比方当最新的 Partition 曾经写出了 70% 的数据时,Coordinator 能够让 Shuffle Operator 切换到 PartitionKey,从而缩小小文件的数量生成。

综上所述,Dynamic Shuffle KeySelector 依照以后最大 PartitionKey 来调配 ShuffleStrategy;依照历史数据信息来动态分配 Shuffle Strategy,最终确保所有 Subtask 都应用雷同的 Shuffle Strategy。

接下来,介绍一下相干的试验比照。咱们比照了 24 小时以内,每小时文件生成的数量以及均匀大小。咱们将 Flink 并发设置为 20。

如上表所示,首先咱们比拟雷同 Partition 每小时新增文件数量,+1 示意比最新的小时晚一个小时。No Shuffle 示意是用 Iceberg 的默认 Shuffle,即 EqualityFieldKeyBy。Dynamicshuffle 是新的 Shuffle Strategy。

能够看到不仅在最新的几个 Partition 中,Dynamic Shuffle 写出了更少的文件数量,而且在长尾的 Partition 也有更好的成果。

一般来说 当过了 1 个小时候之后,Dynamic Shuffle Operator 就会将该 Partition 的 Strategy 切换为 Partitionkeyby,因而以后小时的文件增长速率就是根本恒定的。

右侧的图也能够在反馈这个长尾的景象:能够看出文件生成的顶峰个别都是在第一个小时,而后续长尾小时根本是固定的。

对于文件的均匀大小,Dynamic Shuffle Operator 也有更好的体现。因为这里采取的指标是均匀文件大小,而一次 Writer 的写入可能会有很大的 Data 文件,但 Delete 文件通常较小。因为只蕴含了局部主键或者地位信息。最近小时的均匀大小成果比较显著。

五、总结与瞻望

点击查看原文视频 & 演讲 PPT


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc

正文完
 0