Apache Hudi 是由 Uber 开发并开源的数据湖框架,它于 2019 年 1 月进入 Apache 孵化器孵化,次年 5 月份顺利毕业晋升为 Apache 顶级我的项目。是以后最为热门的数据湖框架之一。
1. 为何要解耦
Hudi 自诞生至今始终应用 Spark 作为其数据处理引擎。如果用户想应用 Hudi 作为其数据湖框架,就必须在其平台技术栈中引入 Spark。放在几年前,应用 Spark 作为大数据处理引擎能够说是很平时甚至是天经地义的事。因为 Spark 既能够进行批处理也能够应用微批模仿流,流批一体,一套引擎解决流、批问题。然而,近年来,随着大数据技术的倒退,同为大数据处理引擎的 Flink 逐步进入人们的视线,并在计算引擎畛域获占据了肯定的市场,大数据处理引擎不再是一家独大。在大数据技术社区、论坛等领地,Hudi 是否反对应用 Flink 计算引擎的的声音开始逐步呈现,并日渐频繁。所以使 Hudi 反对 Flink 引擎是个有价值的事件,而集成 Flink 引擎的前提是 Hudi 与 Spark 解耦。
同时,纵观大数据畛域成熟、沉闷、有生命力的框架,无一不是设计优雅,能与其余框架互相交融,彼此借力,各专所长。因而将 Hudi 与 Spark 解耦,将其变成一个引擎无关的数据湖框架,无疑是给 Hudi 与其余组件的交融发明了更多的可能,使得 Hudi 能更好的融入大数据生态圈。
2. 解耦难点
Hudi 外部应用 Spark API 像咱们平时开发应用 List 一样稀松平时。自从数据源读取数据,到最终写出数据到表,无处不是应用 Spark RDD 作为次要数据结构,甚至连一般的工具类,都应用 Spark API 实现,能够说 Hudi 就是用 Spark 实现的一个通用数据湖框架,它与 Spark 的绑定堪称是深入骨髓。
此外,此次解耦后集成的首要引擎是 Flink。而 Flink 与 Spark 在外围形象上差别很大。Spark 认为数据是有界的,其外围形象是一个无限的数据汇合。而 Flink 则认为数据的实质是流,其外围形象 DataStream 中蕴含的是各种对数据的操作。同时,Hudi 外部还存在多处同时操作多个 RDD,以及将一个 RDD 的处理结果与另一个 RDD 联结解决的状况,这种形象上的区别以及实现时对于两头后果的复用,使得 Hudi 在解耦形象上难以使用对立的 API 同时操作 RDD 和 DataStream。
3. 解耦思路
实践上,Hudi 应用 Spark 作为其计算引擎无非是为了应用 Spark 的分布式计算能力以及 RDD 丰盛的算子能力。抛开分布式计算能力外,Hudi 更多是把 RDD 作为一个数据结构形象,而 RDD 实质上又是一个有界数据集,因而,把 RDD 换成 List,在实践上齐全可行(当然,可能会就义些性能)。为了尽可能保障 Hudi Spark 版本的性能和稳定性。咱们能够保留将有界数据集作为基本操作单位的设定,Hudi 次要操作 API 不变,将 RDD 抽取为一个泛型,Spark 引擎实现仍旧应用 RDD,其余引擎则依据理论状况应用 List 或者其余有界数据集。
解耦准则:
1)对立泛型。Spark API 用到的 JavaRDD<HoodieRecord>,JavaRDD<HoodieKey>,JavaRDD<WriteStatus> 对立应用泛型 I,K,O 代替;
2)去 Spark 化。形象层所有 API 必须与 Spark 无关。波及到具体操作难以在形象层实现的,改写为形象办法,引入 Spark 子类实现。
例如:Hudi 外部多处应用到了 JavaSparkContext#map() 办法,去 Spark 化,则须要将 JavaSparkContext 暗藏,针对该问题咱们引入了 HoodieEngineContext#map() 办法,该办法会屏蔽 map 的具体实现细节,从而在形象成实现去 Spark 化。
3)形象层尽量减少改变,保障 Hudi 原版性能和性能;
4)应用 HoodieEngineContext 抽象类替换 JavaSparkContext,提供运行环境上下文。
4.Flink 集成设计
Hudi 的写操作在实质上是批处理,DeltaStreamer 的间断模式是通过循环进行批处理实现的。为应用对立 API,Hudi 集成 Flink 时抉择攒一批数据后再进行解决,最初对立进行提交(这里 Flink 咱们应用 List 来攒批数据)。
攒批操作最容易想到的是通过应用工夫窗口来实现,然而,应用窗口,在某个窗口没有数据流入时,将没有输入数据,Sink 端难以判断同一批数据是否曾经解决完。因而咱们应用 Flink 的检查点机制来攒批,每两个 Barrier 之间的数据为一个批次,当某个子工作中没有数据时,mock 后果数据凑数。这样在 Sink 端,当每个子工作都有后果数据下发时即可认为一批数据曾经解决实现,能够执行 commit。
DAG 如下:
- source 接管 Kafka 数据,转换成 List<HoodieRecord>;
- InstantGeneratorOperator 生成全局惟一的 instant.当上一个 instant 未实现或者以后批次无数据时,不创立新的 instant;
- KeyBy partitionPath 依据 partitionPath 分区,防止多个子工作写同一个分区;
- WriteProcessOperator 执行写操作,当以后分区无数据时,向上游发送空的后果数据凑数;
- CommitSink 接管上游工作的计算结果,当收到 parallelism 个后果时,认为上游子工作全副执行实现,执行 commit.
注:InstantGeneratorOperator 和 WriteProcessOperator 均为自定义的 Flink 算子,InstantGeneratorOperator 会在其外部阻塞查看上一个 instant 的状态,保障全局只有一个 inflight(或 requested)状态的 instant.WriteProcessOperator 是理论执行写操作的中央,其写操作在 checkpoint 时触发。
5. 实现示例
1) HoodieTable
/** * Abstract implementation of a HoodieTable. * * @param <T> Sub type of HoodieRecordPayload * @param <I> Type of inputs * @param <K> Type of keys * @param <O> Type of outputs */public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable { protected final HoodieWriteConfig config; protected final HoodieTableMetaClient metaClient; protected final HoodieIndex<T, I, K, O> index; public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime, I records); public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime, I records); public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime, I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner); ......}
HoodieTable 是 Hudi 的外围形象之一,其中定义了表反对的 insert,upsert,bulkInsert 等操作。以 upsert 为例,输出数据由原先的 JavaRDD<HoodieRecord> inputRdds 换成了 I records, 运行时 JavaSparkContext jsc 换成了 HoodieEngineContext context.
从类正文能够看到 T,I,K,O 别离代表了 Hudi 操作的负载数据类型、输出数据类型、主键类型以及输入数据类型。这些泛型将贯通整个形象层。
2) HoodieEngineContext
/** * Base class contains the context information needed by the engine at runtime. It will be extended by different * engine implementation if needed. */public abstract class HoodieEngineContext { public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism); public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism); public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism); ......}
HoodieEngineContext 表演了 JavaSparkContext 的角色,它不仅能提供所有 JavaSparkContext 能提供的信息,还封装了 map,flatMap,foreach 等诸多办法,暗藏了 JavaSparkContext#map(),JavaSparkContext#flatMap(),JavaSparkContext#foreach() 等办法的具体实现。
以 map 办法为例,在 Spark 的实现类 HoodieSparkEngineContext 中,map 办法如下:
@Override public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) { return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect(); }
在操作 List 的引擎中其实现能够为(不同办法需注意线程平安问题,慎用 parallel()):
@Override public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) { return data.stream().parallel().map(func::apply).collect(Collectors.toList()); }
注:map 函数中抛出的异样,能够通过包装 SerializableFunction<I, O> func 解决.这里简要介绍下 SerializableFunction:
@FunctionalInterfacepublic interface SerializableFunction<I, O> extends Serializable { O apply(I v1) throws Exception;}
该办法实际上是 java.util.function.Function 的变种,与java.util.function.Function 不同的是 SerializableFunction 能够序列化,能够抛异样。引入该函数是因为 JavaSparkContext#map() 函数能接管的入参必须可序列,同时在hudi的逻辑中,有多处须要抛异样,而在 Lambda 表达式中进行 try catch 代码会略显臃肿,不太优雅。
6.现状和后续打算
6.1 工作时间轴
2020 年 4 月,T3 出行(杨华@vinoyang,王祥虎@wangxianghu)和阿里巴巴的同学(李少锋@leesf)以及若干其余小伙伴一起设计、敲定了该解耦计划;
2020 年 4 月,T3 出行(王祥虎@wangxianghu)在外部实现了编码实现,并进行了初步验证,得出计划可行的论断;
2020 年 7 月,T3 出行(王祥虎@wangxianghu)将该设计实现和基于新形象实现的 Spark 版本推向社区(HUDI-1089);
2020 年 9 月 26 日,顺丰科技基于 T3 外部分支批改欠缺的版本在 Apache Flink Meetup(深圳站)公开 PR, 使其成为业界第一个在线上应用 Flink 将数据写 Hudi 的企业。
2020 年 10 月 2 日,HUDI-1089 合并入 Hudi 主分支,标记着 Hudi-Spark 解耦实现。
6.2 后续打算
1)推动 Hudi 和 Flink 集成
将 Flink 与 Hudi 的集成尽快推向社区,初期该个性可能只反对 Kafka 数据源。
2)性能优化
为保障 Hudi-Spark 版本的稳定性和性能,此次解耦没有太多思考 Flink 版本可能存在的性能问题。
3)类 flink-connector-hudi 第三方包开发
将 Hudi-Flink 的绑定做成第三方包,用户能够在 Flink 利用中以编码方式读取任意数据源,通过这个第三方包写入 Hudi。