本文首发于微信公众号“Shopee 技术团队”
摘要
Apache Hudi 是业内基于 Lakehouse 解决方案中的典型组件,相比于传统基于 HDFS 和 Hive 的数据仓库架构,基于 Apache Hudi 的 Lakehouse 解决方案有泛滥劣势,例如:低提早的数据刷新,高度的数据新鲜度;小文件自动化治理;反对数据文件的多版本读写;与大数据生态内 Hive/Spark/Presto 等引擎的无缝连接。基于这些个性,咱们开始尝试对以后次要基于 Hive 的数仓架构进行降级革新。
本文将重点介绍 Shopee Marketplace 业务应用 Flink + Hudi 构建实时数据仓库的解决方案、实际案例以及下一步布局。
1. 背景介绍
1.1 现状
在 Shopee 外部,Data Infra 团队曾经反对了数据入湖到 Hudi 的过程,提供了大量具备较高实时性和稳定性的数据源。咱们 Marketplace Data Engineering 团队也基于这些 Hudi 表构建了订单、商品和用户的小时数据 Pipeline,这些小时数据不仅在大促期间为业务人员提供数据反对,也被用于日常的风控业务中。
以后,咱们采纳了相似于 mini batch 的思维,每小时对最近 3 小时的数据进行计算和刷新,在保证数据及时更新的状况下,解决数据提早、JOIN 工夫不对齐等问题。但随着 数据量的迅速增长,小时级数据 SLA 的保障难度和计算资源的耗费都在一直减少。咱们开始摸索晋升数据产出及时性并缩小资源耗费的解决方案。
1.2 痛点剖析
通过对小时工作的架构进行梳理和剖析之后,咱们发现:
- 在小时表不同小时的批计算中存在较多的反复计算,因为一些未变更的记录也参加了计算;
- 存在较多的大表全量 JOIN 操作,比方通过商品次要信息表关联商品价格表和商品库存表;
- 读取的 Hudi 表数据时延从几分钟到几十分钟不等,大表的提早较高。
可通过防止不必要的反复计算,用增量数据 JOIN 代替大表全量 JOIN,以及采纳及时性更好的数据源的形式来缓解或者解决以后的痛点。通过相干的技术调研之后,咱们拟定了以应用 Flink + Hudi 为外围的技术计划,通过实时计算和数据湖存储构建实时数据仓库。
1.3 典型实时计算利用与实时数仓的差别
在介绍具体计划介绍,咱们先比照典型实时计算利用和实时数仓,以理解和辨别他们的不同点和侧重点。
其中,典型实时计算利用包含:
1)事件驱动型利用
- 定义:它从一个或多个事件流提取数据,并依据事件触发计算、状态更新或其余内部动作。
- 特点:偏重对输出 Event 的业务逻辑解决,会下发新 Event 给其余利用。
- 案例:应用 CEP(Complex Event Processing)进行实时反欺诈。
2)数据分析利用
- 定义:对输出数据流进行实时剖析型计算,并将实时更新的后果数据写入内部存储系统,而后基于后果数据提供数据服务。
- 特点:罕用于计算聚合类指标,用以满足特定的需要;可实现几个关联数据流的繁难复合指标计算;计算结果大多输入到外存零碎;多流场景下的计算结果不肯定完全正确,或者要失去最终完全正确的后果老本会极高。
- 案例:实时数据统计监控。
3)数据管道利用
- 定义:实时转换、丰盛数据,并将其从某个存储系统挪动到另一个存储系统中。
- 特点:从一个一直生成数据的源头读取记录,并将它们以低提早挪动到起点。
- 案例:实时数据入湖。
实时数仓可了解为将经典离线数仓实时化,给用户提供与离线数仓类似的数据应用体验,包含但不限于表的 schema,数据查问形式等。为了达到这一指标,实时数仓需满足以下要求:
- 易用性:实时数仓需存储在已有的数据仓库中,因为咱们冀望能在 Spark 和 Presto 中拜访它,并且能便捷地与离线数据仓库数据联合应用。
- 完整性:实时数仓须要反对各类简单指标的计算,因为咱们冀望它能笼罩以后所有小时数据的指标,甚至未来会齐全替换天数据,走流批一体化架构。
- 准确性:实时数仓须要提供具备全局(所有字段)最终一致性的数据,因为在多数据源的状况下,很难保障刹时的全局一致性,而全局最终一致性则是对数据及时性和准确性的折衷。
- 及时性:实时数仓须要提供尽量及时的数据,让端到端的时延降到尽可能低。
和典型实时计算相比,实时数仓有如下的侧重点或者劣势,例如:
- 升高了实时计算中简单业务指标的计算艰难;
- 升高了实时计算中多流 JOIN 中数据不统一的危险,进步了可维护性。
但,实时数仓为此付出了升高时效性的代价。
2. 基于 Flink+Hudi 的实时数仓架构设计
2.1 DataFlow 简介
基于以上剖析,咱们设计了满足实时数据仓库场景需要的如下 DataFlow,数据会从 Kafka①,经 Flink 计算后②,写入 Partial Updated Hudi 表(局部列更新)③,而后与离线 Hive 表或其余实时入湖的 Hudi 表④ 一起,经周期性的 Flink/Spark 批计算⑤ 后写入后果 Hudi 表⑥。
能够看到 Data Flow 中联合了流计算② 与批计算⑤,并有两个局部应用了 Hudi③⑥。实时计算用来减速数据处理,晋升全局数据的 及时性 ;批计算会计算简单指标并更新以后最新的数据,用来确保数据的 完整性 和准确性 ;而 Partial Update Hudi 表③ 和 Multi-version Hudi 表⑥ 都能在 Spark 和 Presto 中便捷拜访,保障了数据的 易用性。
2.2 DataFlow 详情
2.2.1 分组 Kafka Topics
- 性能:将领有雷同主键的多个 Kafka Topic 造成一个逻辑上的 topic 组,一个 topic 组会被一个 Flink 作业生产。
- 阐明:每个 topic 的音讯相当于该组所有 topic 形成的逻辑宽表的一部分字段。例如,Kafka 组
G
有T1
和T2
两个 topic,其中,T1
topic 有主键pk
和字段col-1
,T2
topic 有主键pk
和字段col-2
,那么 Kafka 组G
逻辑上包含主键pk
,字段col-1
和字段col-2
,即T1(pk, col-1) + T2(pk, col-2) => G(pk, col-1, col-2)
- 输出:无
- 输入:具备雷同主键的多个 Kafka Topic 音讯。
2.2.2 通用流式 ETL
- 性能:进行简略的数据 ETL,例如,常见的
project
,filter
,map
或自定义Scalar Function
和Table Function
。 - 阐明 : 防止应用
group by
、rank
等任何会应用 Flink State 的操作,因为当 Kafka Topic 组的主键的基数较大时(比方全量商品数),同时解决多个 topic(10+)的数据须要的计算资源极大,而且微小的 State 会使得作业的稳定性难以保障。 - 输出:具备雷同主键的多个 Kafka Topic 音讯。
- 输入:经 ETL 解决后的具备雷同主键的局部列的音讯。
2.2.3 Partial Update Hudi 表
- 性能:依据写入 Hudi 表的音讯,更新音讯主键所在行的局部数据列。
- 阐明:Partial Update Hudi 表是一个物理上的宽表,Kafka Topic 组中的任意一个 topic 的音讯在通过第二步的通用流式 ETL 之后,会失去该音讯的局部列最新的数据,Hudi 会通过
PartialUpdateAvroPayload
更新主键行对应的局部列。能够发现 Partial Update Hudi 表实际上实现了将整个 Kafka Topic 组的所有 topic 的数据依照雷同的主键 JOIN 成一行残缺记录的性能,即多流 JOIN。PartialUpdateAvroPayload
是 Shopee Data Infra 团队开发的 Payload,在社区OverwriteNonDefaultsWithLatestAvroPayload
的根底上反对了 MOR 表的 Partial Update。 - 输出:经 ETL 解决后的具备雷同主键的局部列的音讯。
- 输入:行记录为所有字段以后能失去的最新值的 Hudi 表。
2.2.4 其余 Hudi 表和 Hive 表
- 性能:作为批计算的局部数据起源。
-
阐明:
- 首先,并非所有的数据都有实时数据源,例如:有一部分数据源是人工保护,或者来自其余数据仓库。
- 其次,维度信息的变更在次要数据流上不肯定有事件驱动,例如:在商品信息数据流上只能捕捉商品类目 ID 变更的事件,而类目名称因为不在商品信息数据流中,就无奈捕捉商品类目变更,只能通过稍后修改的形式更新。
- 最初,一些在无奈或者难以实时计算的指标的数据源也属于这一部分,能够是 Hive 表或 Data Infra 团队保护的实时数据入湖的 Hudi 表。
- 输出:无
- 输入:其余 Hudi 表和 Hive 表。
2.2.5 周期性批处理
- 性能:基于以后最新的数据计算最新的指标数据,将各数据源的数据 JOIN 在一起,计算简单指标和关联多个数据源的衍生指标。
- 阐明:批处理的执行周期应该尽量的短。
- 输出:Partial Update Hudi 表,Hive 表和其余 Hudi 表。
- 输入:以后最新的后果数据快照。
2.2.6 Multi-version Hudi 表
- 性能:贮存每一次批处理写入的以后最新的后果数据快照。
- 阐明:Hudi 的多版本个性,能够确保数据的写入对正在执行的数据查问无影响。
- 输出:以后最新的后果数据快照。
- 输入:蕴含以后最新后果数据快照的多版本 Hudi 表。
2.3 DataFlow 示例
下图是实时数据仓库中店铺维表的 DataFlow,仅示意。
-
分组 Kafka Topics
- 第一组以
shop_id
为主键,包含三个 topic。 - 第二组以
user_id
为主键,包含两个 topic。
- 第一组以
-
通用流式 ETL
- 仅执行
project
,选出局部字段。 - 将不同 topic 的数据 UNION 起来,非该 topic 提供的字段设为
NULL
。
- 仅执行
-
Partial Update Hudi 表
- 执行 Partial Update,将雷同主键的不同音讯的数据合并和更新。
-
其余 Hudi 表和 Hive 表
- 其余实时入湖的 Hudi 表,蕴含 tag 信息。
- 其余数据仓库产出的 Hive 表,获取用户的回复率。
-
周期性批处理
- 将数据源 JOIN 在一起。
- 进行简单计算,比方
ROW_NUMBER() OVER (PARTITION BY is_sbs ORDER BY item_count DESC) AS item_cnt_rank
。 - 进行跨数据源的衍生指标计算,比方
IF(is_sbs=1 and uea1 > x, 1, 0) AS is_uea1_sbs_shop
。
-
Multi-version Hudi 表
- 不同的调度批次生成过后最新的店铺维表数据,00:00 开始第一次调度,并于 00:10 生成第一个版本的数据;00:15 开始第二次调度,并于 00:25 生成第二个版本的数据,以此类推。
2.4 咱们的思考
1)为什么不构建成齐全实时的作业,而增加额定的批处理过程?
- 10+ 个实时数据流的 JOIN 老本昂扬,即便把局部数据作为维表或用 API 点查,都有较高的老本;而且对于局部数据提早的状况解决老本也较高,不仅须要保护微小的 State,还有反复申请数据时的申请放大问题。
- 维表或点查 API 的数据更新,在主数据流上不肯定有事件触发,即不会有 Event 去驱动实时处理并或取最新的维度信息,这会导致一些关联的维度信息将始终无奈更新,直至主数据流上有更新的事件产生。
- 批处理尽管会使得数据的及时性升高,但能够解决以上两点问题。能够将流计算与批计算了解为分级计算的策略,实时计算提供大多数可用的数据,时延较低,但更简单和精确的数据由批计算实现,时延绝对较高,不同用户可按需抉择应用流计算更新的 Partial Update Hudi 表或者批计算更新的 Multi-version Hudi 表。
2)为什么要应用一个 Flink 作业生产一个 Kafka Topic 组,而不是一个 Flink 作业生产一个 topic?
- 写入 Partial Update Hudi 表是对多个 topic 的数据
UNION ALL
之后写入,非该 topic 生成的字段设置为NULL
。如果用一个 Flink 作业生产一个 topic,当新增一个字段时,生产该 Topic 组的所有 Flink 作业都须要增加字段,而后重启,带来了不必要的额定保护老本。 - Hudi 在 0.8 开始反对通过 Zookeeper 或者 HiveMetastore 进行锁管制的形式并发写入,后续如果一个 Kafka Topic 组中的 topic 数量太多,咱们可能思考一分为二。
3)为什么不把一些维表关联的操作放在 Flink 作业外面执行?
- 如果维表是和 Partial Update Hudi 表雷同的主键,则 Partial Update Hudi 表其实曾经实现了维表 JOIN 的操作。
- 如果维表的主键与 Partial Update Hudi 表不同,则维表有更新时,主数据流不肯定有事件驱动去关联最新的维度属性,这样就会导致维度属性的不精确。尽管能够通过 State 缓存数据,通过特定事件触发维表关联,然而在维表数量较多时,整个作业的资源耗费极大,稳定性也会降落。通过批计算就能够比拟不便的解决这种状况。
4)Partial Update Hudi 表和 Multi-version Hudi 表别离是什么类型的表?
- Partial Update Hudi 表采纳数据时延较低的 MOR 表,每次 Checkpoint 胜利后数据即可见,而且异步 Compaction 对作业的性能影响很小。用户能够依据不同的数据时延和查问性能要求对 MOR 表应用 Read Optimized Query 或 Snapshot Query。批计算读取 Partial Update Hudi 表是采纳数据时延较低的 Snapshot Query,尽量减少端到端的数据时延。
- Multi-version Hudi 表目前采纳的是 COW 表,因为当初的批处理都采纳的是
INSERT OVERWRITE
形式生成最新的文件 Snapshot 版本。后续如果批处理能够优化成增量解决INSERT INTO
的形式时,会采纳 MOR 表。
5)如何确保数据的全局最终一致性?
- 对于 Partial Update Hudi 表,只有 Kafka Topic 组中的不同 topic 数据都被失常生产,在 Partial Update Hudi 表中的终究会更新所有数据,防止了多流 JOIN 时,局部流提早较大或者流损坏导致的数据失落问题。
- 同理,对于 Multi-version Hudi 表,只有上游的各数据源能够确保最终一致性,通过批处理计算也终究会失去最终统一的后果,因为在批处理的下一次调度中会依据最新的上游数据生成最新版本的快照。
6)相比于小时数据,哪些局部有减速?
- 之前小时作业中的多表 JOIN 操作,被 Partial Update Hudi 表代替,Partial Update Hudi 表等效于 JOIN 后的后果表。
- 第二步的 Flink 流计算只解决增量数据缩小了反复计算。
- 尽可能地将字段转换也在第二步 Flink 流计算中实现,如常见的
project
、filter
、map
或自定义Scalar Function
和Table Function
,升高了第五步批计算的复杂度。
7)端到端的时延怎么评估?
- 端到端的时延为时延最大的 Partial Update Hudi 表的时延,加上第五步批处理的时延。这里咱们疏忽第四步的数据源时延,因为第四局部会有一些离线的数据源,其余非常重要数据源咱们会以 Partial Update Hudi 表的形式生成。
- 若第 i 组 Kafka Topic 组对应 Flink 作业更新 Partial Update Hudi 表的 Checkpoint 的执行时长
ci
,Checkpoint 距离时长为di
,则 Partial Update Hudi 表的时延为ci ~ ci + di + ci
,其中在 Checkpoint 过程中生产的数据,须要到下一次 Checkpoint 完结才可读,故最大时延为ci + di + ci
。定义所有的 Kafka Topic 组中,时延最大的 Partial Update Hudi 表时延为c ~ c + d + c
。 - 若批处理的调度周期为
b
,执行工夫为e
,则批处理的时延为e ~ b + e
,同样在一次批处理过程中数据源更新的数据,须要到下一次执行完结才可读,故最大时延为b + e
。 - 端到端的时延为
c + e ~ (c + d + c) + (b + e)
。
3. 实时数据仓库施行
3.1 Partial Update Hudi 表
3.1.1 Bootstrap 配置
Partial Update Hudi 表,须要先通过批处理写入历史数据,而后再实时处理在 Kafka 中的增量数据。
批处理写入历史数据时应用 Bulk Insert 的形式写入,设置 hoodie.sql.bulk.insert.enable=true
开启 Bulk Insert,设置 hoodie.bulkinsert.shuffle.parallelism
能够管制写入的并行度,每个分区产生的文件数也和这个参数无关,当设置的过大时会产生小文件的问题。
在此基础上设置 Clustering 相干参数能够实现小文件的合并,设置 hoodie.clustering.inline=true
开启 Clustering,设置 hoodie.clustering.inline.max.commits=1
能够在 Bulk Insert 之后立刻执行 Clustering 操作。hoodie.clustering.plan.strategy.max.bytes.per.group
,hoodie.clustering.plan.strategy.target.file.max.bytes
和 hoodie.clustering.plan.strategy.small.file.limit
用来管制 Clustering 输入的文件大小和数量。
3.1.2 Bootstrap 执行
实现以上的配置后就能够执行 INSERT INTO
脚本,在 INSERT INTO
中须要将 Kafka Topic 组对应的离线或者实时入湖的 Hudi 表,进行简略的计算解决后以 UNION ALL 的模式写入。
这里咱们通过结构不同数据类型的 MAP,将全字段的 UNION ALL 转换成几个不同类型 MAP 的 UNION ALL 和从对应的 MAP 中取出字段的形式,这样能极大地晋升代码的可读性和可维护性,尤其是当 Kafka Topic 组中的 topic 较多时。
3.1.3 Bootstrap 后果
Bulk Insert 和 Clustering 对应两次 commit,其中 Bulk Insert 对应下图中的第一局部,为 deltacommit;而 Clustering 对应的是一次 replacecommit。而且从 commit 的工夫能够看出,是先进行了 Bulk Insert,再 Clustering。
Bulk Insert 和 Clustering 都只生成 parquet 文件,其中 Bulk Insert 的文件是第一个版本,文件工夫为 11:31,会有大量小文件;而 Clustering 会生成小文件合并之后的文件作为第二个版本,文件工夫为 11:32。Bulk Insert 产生的第一个版本的小文件会在之后实时作业中依照数据的保留策略清理。
3.1.4 Flink Indexing
在 Flink 作业中执行 Indexing 用于构建 Bootstrap 后的文件索引信息并存入 state 中。作业中设置 index.bootstrap.enabled = true
开启 indexing,write.index_bootstrap.tasks
用来指定 indexing 的并行度,write.bucket_assign.tasks
能够指定 bucket_assign 算子的并行度,待第一个 Checkpoint 实现后,能够应用 Savepoint 并退出作业,这就实现了 Flink 作业的 Indexing。
3.1.5 Flink Insert
正式运行的 Flink Insert 作业中,须要去掉 index.bootstrap.enabled
参数(默认是 false),来敞开 Indexing,而后从之前 Indexing 最初的 Savepoint 启动即可失常写入数据。
Insert 中比拟要害的参数有 compaction.tasks
示意执行 Compaction 的并行度,compaction.delta_commits
用来管制执行 Compaction 的周期,这也决定了 Read Optimized Query 和 RO 表的数据时延。
另外,hoodie.cleaner.commits.retained
,hoodie.keep.min.commits
和 hoodie.keep.max.commits
这三个和 Cleaner 相干的参数用来配置数据的版本淘汰策略,用户的查问时长如果超过 hoodie.keep.min.commits
的时长之后,可能会失败。
3.2 Multi-version Hudi 表
3.2.1 周期性批作业
在之前曾经介绍过周期性批作业的时延状况,应尽量减少每次批处理执行的工夫,也须要尽量使用各种批处理的优化策略,这样才可能缩短调度周期,升高端到端的时延。
3.2.2 Multi-version Hudi 表
Multi-version Hudi 表是一个 COW 表,需设置失当的 hoodie.cleaner.commits.retained
值,来确保反对的最长用户查问耗时,在该时长内的查问能确保数据文件未被清理,设置得太大会有较大的存储老本压力;设置得太小,可能会因为查问文件被清理,而导致用户的查问失败。
3.3 实际效果
3.3.1 用户维表
用户维表目前只须要第 1-3 步来生成 Partial Update Hudi 表,是一个纯正的实时 Flink 作业,只有一个 Kafka Topic 组被 Flink 作业生产,主键为 user_id。Flink 作业的 Checkpoint 周期为 1 分钟,Checkpoint 的距离为 1 分钟,Checkpoint 耗时约 5 秒。用户维表的端到端时延约为 2 分钟, 设置 hoodie.cleaner.commits.retained=50
,反对用户查问的时长约 2*50=100 分钟。
相比于小时数据,在资源耗费上升高了约 40%,端到端时延从约 90 分钟升高到近 2 分钟。
3.3.2 店铺维表
简介:店铺维表须要包含流计算和批计算的所有步骤,一个 Flink 作业生产一个 Kafka Topic 组,主键为 shop_id,并将后果数据写入 Partial Update Hudi 表,Flink 作业的 Checkpoint 周期为 1 分钟,Checkpoint 的距离为 1 分钟,Checkpoint 耗时约 10 秒,Partial Update Hudi 表的时延约 2 分钟;周期性批处理的调度周期为 15 分钟,每次执行时长约 10 分钟,Multi-version Hudi 表的端到端时延约 27 分钟,设置 hoodie.cleaner.commits.retained=5
,反对用户查问的时长约 (5+1)*15=90 分钟。
因为店铺维表之前没有小时数据,先将天工作的资源耗费换算到小时工作后比照发现,新计划在资源耗费上升高了 54%,端到端的时延从约 90 分钟升高到了 30 分钟。
4. 总结与瞻望
本文介绍了基于 Flink + Hudi 的实时数据仓库解决方案,一方面通过实时计算来减速计算,另一方面通过与批处理技术的联合来确保数据的最终一致性。并通过提供分级的后果表来满足不同场景的及时性要求,实时计算产出的 Partial Update Hudi 表提供局部外围实时数据,批处理产出的 Multi-version Hudi 表提供残缺且更精确的数据。总的来说,该计划在易用性、完整性、准确性和及时性这四个方面都合乎预期,同时升高了计算资源的耗费。
前面咱们会持续在如下几个方面进行尝试和摸索:
- 此计划曾经在数仓的 DIM 层表中失去了验证,后续还要对 DWD 层以及 DWS 层的解决方案进行摸索。
- 如果在监控告警、故障复原、Bootstrap 流程简化等方面失去进一步优化,使得作业稳定性更好,就能够用该计划齐全代替目前的离线天工作,降本增效。
- 是否基于 FlinkSQL 提供一种相似于 Watermark 的机制,在 Checkpoint 或者 Compaction 产生的时候,生成相应的数据 marker,供上游用户做调度依赖。
本文作者
Wanglong,数据研发工程师,来自 Shopee Marketplace Data Engineering 团队。继续深耕大数据畛域 7+ 年,专一于数据仓库建模、实时离线数仓架构、湖仓一体架构等技术。