乐趣区

关于flink:数据湖有新解Apache-Hudi-与-Apache-Flink-集成

Apache Hudi 是由 Uber 开发并开源的数据湖框架,它于 2019 年 1 月进入 Apache 孵化器孵化,次年 5 月份顺利毕业晋升为 Apache 顶级我的项目。是以后最为热门的数据湖框架之一。

1. 为何要解耦

Hudi 自诞生至今始终应用 Spark 作为其数据处理引擎。如果用户想应用 Hudi 作为其数据湖框架,就必须在其平台技术栈中引入 Spark。放在几年前,应用 Spark 作为大数据处理引擎能够说是很平时甚至是天经地义的事。因为 Spark 既能够进行批处理也能够应用微批模仿流,流批一体,一套引擎解决流、批问题。然而,近年来,随着大数据技术的倒退,同为大数据处理引擎的 Flink 逐步进入人们的视线,并在计算引擎畛域获占据了肯定的市场,大数据处理引擎不再是一家独大。在大数据技术社区、论坛等领地,Hudi 是否反对应用 Flink 计算引擎的的声音开始逐步呈现,并日渐频繁。所以使 Hudi 反对 Flink 引擎是个有价值的事件,而集成 Flink 引擎的前提是 Hudi 与 Spark 解耦。

同时,纵观大数据畛域成熟、沉闷、有生命力的框架,无一不是设计优雅,能与其余框架互相交融,彼此借力,各专所长。因而将 Hudi 与 Spark 解耦,将其变成一个引擎无关的数据湖框架,无疑是给 Hudi 与其余组件的交融发明了更多的可能,使得 Hudi 能更好的融入大数据生态圈。

2. 解耦难点

Hudi 外部应用 Spark API 像咱们平时开发应用 List 一样稀松平时。自从数据源读取数据,到最终写出数据到表,无处不是应用 Spark RDD 作为次要数据结构,甚至连一般的工具类,都应用 Spark API 实现,能够说 Hudi 就是用 Spark 实现的一个通用数据湖框架,它与 Spark 的绑定堪称是深入骨髓。

此外,此次解耦后集成的首要引擎是 Flink。而 Flink 与 Spark 在外围形象上差别很大。Spark 认为数据是有界的,其外围形象是一个无限的数据汇合。而 Flink 则认为数据的实质是流,其外围形象 DataStream 中蕴含的是各种对数据的操作。同时,Hudi 外部还存在多处同时操作多个 RDD, 以及将一个 RDD 的处理结果与另一个 RDD 联结解决的状况,这种形象上的区别以及实现时对于两头后果的复用,使得 Hudi 在解耦形象上难以使用对立的 API 同时操作 RDD 和 DataStream。

3. 解耦思路

实践上,Hudi 应用 Spark 作为其计算引擎无非是为了应用 Spark 的分布式计算能力以及 RDD 丰盛的算子能力。抛开分布式计算能力外,Hudi 更多是把 RDD 作为一个数据结构形象,而 RDD 实质上又是一个有界数据集,因而,把 RDD 换成 List, 在实践上齐全可行(当然,可能会就义些性能)。为了尽可能保障 Hudi Spark 版本的性能和稳定性。咱们能够保留将有界数据集作为基本操作单位的设定,Hudi 次要操作 API 不变,将 RDD 抽取为一个泛型,Spark 引擎实现仍旧应用 RDD,其余引擎则依据理论状况应用 List 或者其余有界数据集。

解耦准则:

1)对立泛型。Spark API 用到的 JavaRDD<HoodieRecord>,JavaRDD<HoodieKey>,JavaRDD<WriteStatus> 对立应用泛型 I,K,O 代替;

2)去 Spark 化。形象层所有 API 必须与 Spark 无关。波及到具体操作难以在形象层实现的,改写为形象办法,引入 Spark 子类实现。

例如:Hudi 外部多处应用到了 JavaSparkContext#map() 办法,去 Spark 化,则须要将 JavaSparkContext 暗藏,针对该问题咱们引入了 HoodieEngineContext#map() 办法,该办法会屏蔽 map 的具体实现细节,从而在形象成实现去 Spark 化。

3)形象层尽量减少改变,保障 Hudi 原版性能和性能;

4)应用 HoodieEngineContext 抽象类替换 JavaSparkContext,提供运行环境上下文。

4.Flink 集成设计

Hudi 的写操作在实质上是批处理,DeltaStreamer 的间断模式是通过循环进行批处理实现的。为应用对立 API,Hudi 集成 Flink 时抉择攒一批数据后再进行解决,最初对立进行提交(这里 Flink 咱们应用 List 来攒批数据)。

攒批操作最容易想到的是通过应用工夫窗口来实现,然而,应用窗口,在某个窗口没有数据流入时,将没有输入数据,Sink 端难以判断同一批数据是否曾经解决完。因而咱们应用 Flink 的检查点机制来攒批,每两个 Barrier 之间的数据为一个批次,当某个子工作中没有数据时,mock 后果数据凑数。这样在 Sink 端,当每个子工作都有后果数据下发时即可认为一批数据曾经解决实现,能够执行 commit。

DAG 如下:

  • source 接管 Kafka 数据,转换成 List<HoodieRecord>;
  • InstantGeneratorOperator 生成全局惟一的 instant. 当上一个 instant 未实现或者以后批次无数据时,不创立新的 instant;
  • KeyBy partitionPath 依据 partitionPath 分区,防止多个子工作写同一个分区;
  • WriteProcessOperator 执行写操作,当以后分区无数据时,向上游发送空的后果数据凑数;
  • CommitSink 接管上游工作的计算结果,当收到 parallelism 个后果时,认为上游子工作全副执行实现,执行 commit.

注:InstantGeneratorOperator 和 WriteProcessOperator 均为自定义的 Flink 算子,InstantGeneratorOperator 会在其外部阻塞查看上一个 instant 的状态,保障全局只有一个 inflight(或 requested)状态的 instant.WriteProcessOperator 是理论执行写操作的中央,其写操作在 checkpoint 时触发。

5. 实现示例

1) HoodieTable


/**
 * Abstract implementation of a HoodieTable.
 *
 * @param <T> Sub type of HoodieRecordPayload
 * @param <I> Type of inputs
 * @param <K> Type of keys
 * @param <O> Type of outputs
 */
public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {

  protected final HoodieWriteConfig config;
  protected final HoodieTableMetaClient metaClient;
  protected final HoodieIndex<T, I, K, O> index;

  public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
      I records);

  public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
      I records);

  public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
      I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);

  ......
}

HoodieTable 是 Hudi 的外围形象之一,其中定义了表反对的 insert,upsert,bulkInsert 等操作。以 upsert 为例,输出数据由原先的 JavaRDD<HoodieRecord> inputRdds 换成了 I records, 运行时 JavaSparkContext jsc 换成了 HoodieEngineContext context.

从类正文能够看到 T,I,K,O 别离代表了 Hudi 操作的负载数据类型、输出数据类型、主键类型以及输入数据类型。这些泛型将贯通整个形象层。

2) HoodieEngineContext


/**
 * Base class contains the context information needed by the engine at runtime. It will be extended by different
 * engine implementation if needed.
 */
public abstract class HoodieEngineContext {public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);

  public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);

  public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);

  ......
}

HoodieEngineContext 表演了 JavaSparkContext 的角色,它不仅能提供所有 JavaSparkContext 能提供的信息,还封装了 map,flatMap,foreach 等诸多办法,暗藏了 JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach() 等办法的具体实现。

以 map 办法为例,在 Spark 的实现类 HoodieSparkEngineContext 中,map 办法如下:

  @Override
  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();}

在操作 List 的引擎中其实现能够为(不同办法需注意线程平安问题,慎用 parallel()):

  @Override
  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {return data.stream().parallel().map(func::apply).collect(Collectors.toList());
  }

注:map 函数中抛出的异样,能够通过包装 SerializableFunction<I, O> func 解决. 这里简要介绍下 SerializableFunction:

@FunctionalInterface
public interface SerializableFunction<I, O> extends Serializable {O apply(I v1) throws Exception;
}

该办法实际上是 java.util.function.Function 的变种,与 java.util.function.Function 不同的是 SerializableFunction 能够序列化,能够抛异样。引入该函数是因为 JavaSparkContext#map() 函数能接管的入参必须可序列,同时在 hudi 的逻辑中,有多处须要抛异样,而在 Lambda 表达式中进行 try catch 代码会略显臃肿,不太优雅。

6. 现状和后续打算

6.1 工作时间轴

2020 年 4 月,T3 出行(杨华 @vinoyang,王祥虎 @wangxianghu)和阿里巴巴的同学(李少锋 @leesf)以及若干其余小伙伴一起设计、敲定了该解耦计划;

2020 年 4 月,T3 出行 (王祥虎 @wangxianghu) 在外部实现了编码实现,并进行了初步验证,得出计划可行的论断;

2020 年 7 月,T3 出行 (王祥虎 @wangxianghu) 将该设计实现和基于新形象实现的 Spark 版本推向社区(HUDI-1089);

2020 年 9 月 26 日,顺丰科技基于 T3 外部分支批改欠缺的版本在 Apache Flink Meetup(深圳站)公开 PR, 使其成为业界第一个在线上应用 Flink 将数据写 Hudi 的企业。

2020 年 10 月 2 日,HUDI-1089 合并入 Hudi 主分支,标记着 Hudi-Spark 解耦实现。

6.2 后续打算

1)推动 Hudi 和 Flink 集成

将 Flink 与 Hudi 的集成尽快推向社区,初期该个性可能只反对 Kafka 数据源。

2)性能优化

为保障 Hudi-Spark 版本的稳定性和性能,此次解耦没有太多思考 Flink 版本可能存在的性能问题。

3)类 flink-connector-hudi 第三方包开发

将 Hudi-Flink 的绑定做成第三方包,用户能够在 Flink 利用中以编码方式读取任意数据源,通过这个第三方包写入 Hudi。

退出移动版