关于data:基于Hudi的湖仓一体技术在Shopee的实践

38次阅读

共计 6433 个字符,预计需要花费 17 分钟才能阅读完成。

关注「Shopee 技术团队」公众号👆,摸索更多 Shopee 技术实际

目录
1. Shopee 数据系统建设中面临的典型问题
2. 为什么抉择 Hudi
3. Shopee 在 Hudi 落地过程中的实际
4. 社区奉献
5. 总结与瞻望

湖仓一体(LakeHouse)作为大数据畛域的重要倒退方向,提供了流批一体和湖仓联合的新场景。目前,企业许多业务中会遇到的数据及时性、准确性,以及存储的老本等问题,都能够通过湖仓一体计划失去解决。

当下,几个支流的湖仓一体开源计划都在一直迭代开发中,业界的利用也都是在摸索中前行,在理论的应用中难免会遇到一些不够欠缺的中央和未反对的个性。Shopee 外部在应用过程中基于开源的 Apache Hudi 定制了本人的版本,以实现企业级的利用和一些外部业务需要的新个性。

通过引入 Hudi 的 Data lake 计划,Shopee 的 Data Mart、举荐、ShopeeVideo 等产品的数据处理流程实现了流批一体、增量解决的个性,很大水平上简化了这一流程,并晋升了性能。

1. Shopee 数据系统建设中面临的典型问题

1.1 Shopee 数据系统简介

上图是 Shopee Data Infrastructure 团队为公司外部业务方提供的一套整体解决方案。

  • 第一步是数据集成(Data Integration),目前咱们提供了基于日志数据、数据库和业务事件流的数据集成形式;
  • 而后通过平台的 ETL(Extract Transform Load)服务 load 到业务的数仓中,业务同学通过咱们提供的开发平台和计算服务进行数据处理;
  • 最初的后果数据通过 Dashboard 进行展现,应用即时查问引擎进行数据摸索,或者通过数据服务反馈到业务零碎中。

上面先来剖析一下 Shopee 数据系统建设中遇到的三个典型问题。

1.2 流批一体的数据集成

第一个问题:在基于数据库的数据集成过程中,存在同一份数据同时面临流解决和批处理的需要。传统的做法是实现全量导出和 CDC 两条链路。全量导出链路满足批处理的需要,CDC 链路用于实时处理和增量解决的场景。

然而,这种做法存在的一个问题是 全量导出效率低,导致数据库负载高 。另外, 数据一致性也难以失去保障

同时,在批数据集构建上有肯定的存储效率优化,所以咱们心愿基于 CDC 数据去构建批数据集,以此同时满足三种解决场景的需要,进步数据时效性。

1.3 状态表明细存储

第二个问题是状态表明细的存储。咱们能够认为,传统批数据集是在某一时间点对业务数据整体状态的一个快照,压缩到一个点的快照会合并掉业务流程中的过程信息。这些变动过程反映了用户应用咱们服务的过程,是十分重要的剖析对象。一旦被合并掉,将无奈开展。

另外,在很多场景下,业务数据每天变动的局部只占全量数据的一小部分,每个批次都全量存储会带来很大的资源节约。

1.4 大宽表创立

第三个问题是大宽表的创立。近实时宽表构建是数据处理中常见的一种场景,它存在的问题是传统的批处理提早过高,应用流式计算引擎资源节约重大。因而,咱们基于多个数据汇合构建了业务宽表,反对 Ad hoc 类 OLAP 查问。

2. 为什么抉择 Hudi

针对上述业务中遇到的问题,基于以下三点考量,最终咱们抉择 Apache Hudi 来作为解决方案。

2.1 生态反对丰盛

咱们冀望应用纯流式的形式建设数据集成环境,而 Hudi 对流式场景有着良好的反对。

第二点是对各个大数据生态的兼容。咱们构建的数据集将会同时存在批处理、流解决、增量解决和动静摸索等多种需要的负载。目前这些工作负载运行在各种计算引擎中,所以,对多种计算引擎的反对也在咱们的思考范畴之内。

另一个考量点则是和 Shopee 业务需要的符合。以后,咱们亟待解决的数据集大部分来源于业务零碎,都带有唯一性标识信息,所以 Hudi 的设计更加合乎咱们的数据个性。

2.2 插件化的能力

目前咱们平台提供 Flink 和 Spark 作为通用计算引擎,作为数据集成和数仓建设负载的承载者,同时也应用 Presto 承载数据摸索的性能。Hudi 对这三者都反对。

在理论应用中,依据业务数据的重要水平不同,咱们也会给用户提供不同的数据索引形式。

2.3 业务个性匹配

在数据集成过程中,用户的 schema 变动是一个十分常见的须要。ODS 的数据变动可能导致上游的计算工作出错。同时,在增量解决时,咱们须要工夫解决的语义。反对主键数据的存储对于咱们业务数据库的数据来说,意义重大。

3. Shopee 在 Hudi 落地过程中的实际

3.1 实时数据集成

目前 Shopee 外部有大量的业务数据来自业务数据库,咱们采纳相似 CDC 的技术获取数据库中的变更数据,给业务方构建反对批处理和近实时增量解决的 ODS 层数据。

当一个业务方的数据须要接入时,咱们会在进行增量实时集成之前先做一次全量 Bootstrap,构建根底表,而后基于新接入的 CDC 数据进行实时构建。

构建的过程中,咱们个别依据用户需要抉择构建的 COW 表或者 MOR 表。

1)问题构建与解决方案

在进行实时构建的过程中,存在以下两种较为常见的问题:

一种是用户将有大量变更的数据集的类型配置为 COW 表,导致数据写放大。此时咱们须要做的事件是建设相应的监控来辨认这种配置。同时,咱们基于 MOR 表的配置化数据合并逻辑,反对数据文件的同步或者异步更新。

第二个问题是默认的 Bloom filter 导致数据存在性判断的问题。这里比拟好的形式是采纳 HBase Index 解决超大数据集的写入问题。

2)问题解决的成果

这是将咱们的某些数据集成链路换成基于 Hudi 的实时集成后的成果。上图是数据可见性占比与时延的关系,目前咱们能保障 80% 的数据在 10 分钟内可见可用,所有的数据 15 分钟内可见可用。

下图是咱们统计的资源耗费占比图。蓝色局部是实时链路的资源耗费,红色是历史的按批数据集成的资源耗费。

因为切换成了实时链路,对于一些大表反复率低的数据缩小了反复解决,同时也缩小了集中式解决效率升高导致的资源耗费。因而,咱们的资源耗费远低于批处理形式。

3.2 增量视图

针对用户须要状态明细的场景,咱们提供了基于 Hudi Savepoint 性能的服务,依照用户须要的工夫周期,定期构建快照(snapshot),这些快照以分区的模式存在元数据管理系统中。

用户能够不便地在 Flink、Spark,或者 Presto 中利用 SQL 去应用这些数据。因为数据存储是残缺且没有合并的明细,所以数据自身反对全量计算,也反对增量解决。

在应用增量视图的存储时,对于一些变动数据占比不大的场景,会获得比拟好的存储节俭成果。

这里有一则简略的公式,用于计算空间使用率:(1 + (t - 1) * p ) / t

其中,P 示意变动数据的占比,t 示意须要保留的工夫周期数。变动数据占比越低,所带来的存储节俭越好。对于长周期数据,也会有一个比拟好的节俭成果。

同时,这种形式对增量计算的资源节俭成果也比拟好。毛病是按批全量计算会有肯定的读放大的问题。

3.3 增量计算

当咱们的数据集基于 Hudi MOR 表来构建时,就能够同时反对批处理、增量解决和近实时处理负载。

以图为例,Table A 是一个增量的 MOR 表,当咱们基于 Table A 来构建后续的表 B 和表 C 时,如果计算逻辑都反对增量的构建,那咱们在计算的过程中,只须要获取新增的数据和变动的数据。这样在计算的过程中就显著缩小了参加计算的数据量。

这里是离线计算平台基于 Hudi 的增量计算来构建的一个近实时的用户作业剖析。当用户提交一个 Spark 工作到集群运行,工作完结后会主动收集用户的日志,并从中提取相干的 Metric 和要害日志写入到 Hudi 表。而后一个解决工作增量读取这些日志,剖析出工作的优化项,以供用户参考。

当一个用户作业运行完后,一分钟之内就能够剖析出用户的作业状况,并造成剖析报告提供给用户。

增量 Join

除了增量计算,增量的 Join 也是一个十分重要的利用场景。

绝对于传统的 Join,增量计算只须要依据增量数据查找到须要读取的数据文件,进行读取,并剖析出须要重写的分区,从新写入。

绝对于全量来说,增量计算显著缩小了参加计算的数据量。

Merge Into

Merge Into 是在 Hudi 中十分实用的一个用于构建实时宽表的技术,它次要基于 Partial update 来实现。

MERGE INTO target_table t0
USING SOURCE TABLE s0
ON t0.id = s0.id
WHEN matched THEN UPDATE SET 
t0.price = s0.price+5, 
_ts = s0.ts;
MERGE INTO target_table_name [target_alias]
USING source_table_reference [source_alias]
ON merge_condition
[WHEN MATCHED [ AND condition] THEN matched_action ] [...]
[WHEN NOT MATCHED [ AND condition]  THEN not_matched_action ] [...]

matched_action
{ DELETE |
UPDATE SET * |
UPDATE SET {column1 = value1} [, ...] }

not_matched_action
{ INSERT * |
INSERT (column1 [, ...] ) VALUES (value1 [, ...])

这里展现了基于 Spark SQL 的 Merge Into 语法,它让用户构建宽表的作业开发变得非常简单。

基于 Merge Into 的增量 Join 实现

Hudi 的实现是采纳 Payload 的形式,在一个 Payload 中能够只存在一张表的局部列。

增量数据的 Payload 被写入到 log 文件中,而后在后续的合并中生成用户应用的宽表。因为后续合并存在时间延迟,所以咱们优化了合并的写入逻辑。

在数据合并实现后,咱们会在元数据管理中写入一个合并的数据工夫和相干的 DML,而后在读取这张 MOR 表的过程中剖析 DML 和工夫,为数据可见性提供保障。

而采纳 Partial Update 的益处是:

  • 显著升高了流式构建大宽表的资源应用;
  • 文件级别的数据批改时,解决效率增高。

4. 社区奉献

在解决解决 Shopee 外部业务问题的同时,咱们也奉献了一批代码到社区,将外部的优化和新个性分享进去,比拟大的 feature 有 meta sync(RFC-55 已实现)snapshot view(RFC-61)partial update(HUDI-3304)FileSystemLocker(HUDI-4065 已实现) 等等;同时也帮忙社区修复了很多 bug。后续也心愿可能用这种形式,更好地满足业务需要的同时,参加社区共建。

4.1 Snapshot View

增量视图(snapshot view)有以下几个典型利用场景:

  • 每天在根底表上生成名称为 compacted-YYYYMMDD 的快照,用户应用快照表生成每日的衍生数据表,并计算报表数据。当用户上游的计算逻辑发生变化时,可能抉择对应快照进行从新计算。还能够设置留存期为 X 天,每天清理掉过期数据。这里其实也能够在多快照的数据上天然地实现 SCD-2。
  • 一个命名为 yyyy-archived 的存档分支能够每年在数据进行压缩和优化之后生成,如果咱们的保留策略有变动(例如要删除敏感信息),那么能够在进行相干的操作之后,在这个分支上生成一个新的快照。
  • 一个命名为 preprod-xx 的快照能够在进行了必要的质量检查之后再正式公布给用户,防止内部工具与 pipeline 自身的耦合。

对于 snapshot view 的需要,Hudi 曾经能够在肯定水平上通过两个要害个性来做反对:

  • Time travel:用户能够提供一个工夫点来查问对应工夫上的 Hudi 表快照数据。
  • Savepoint:能够保障某个 commit 工夫点的快照数据不会被清理,而在 savepoint 之外的两头数据依然能够被清理。

简略的实现如下图所示:

然而在理论的业务场景中,为了满足用户的 snapshot view 需要,还须要从 易用性 可用性 上思考更多。

例如,用户如何得悉一个 snapshot 曾经正确地公布进去了?这其中蕴含的一个问题是可见性,也就是说,用户应该能够在整个 pipeline 中显式地拿到 snapshot 表,这里就须要提供相似 Git 的 tag 性能,加强易用性。

另外,在打快照的场景中,一个常见的需要是数据的精准切分。一个例子就是用户其实不心愿 event time 在 1 号的数据漂移到 2 号的快照之中,更心愿的做法是在每个 FileGroup 下联合 watermark 做精密的 instant 切分。

为了更好地满足生产环境中的需要,咱们实现了以下优化:

  • 扩大了 savepoint metadata,在此基础上实现快照的 tag、branch 以及 lifecycle 治理,和主动的 meta 同步性能;
  • 在 MergeOnRead 表上实现精细化的 ro 表 base file 切分,在 compaction 的时候通过 watermark 切分日志文件,保障 snapshot 的精确性。也就是说,咱们能够在流式写入的根底上,给上游的离线解决提供准确到 0 点的数据。

目前咱们正在将整体性能通过 RFC-61 奉献回社区,理论落地过程的收益后面章节已有介绍,这里不再赘述。

4.2 多源 Partial update

前文简略介绍了多源局部列更新(大宽表拼接)的场景,咱们依赖 Hudi 的多源合并能力在存储层实现 Join 的操作,大大降低了计算层在 state 和 shuffle 上的压力。

目前,咱们次要是通过 Hudi 外部的 Payload 接口实现多源的局部列更新。上面这张图展现了 Payload 在 Hudi 的写端和读端的交互流程。

实现的原理基本上就是通过自定义的 Payload class 来实现雷同 key 不同源数据的合并逻辑,写端会在批次内做多源的合并并写入 log,读端在读时合并时也会调用雷同的逻辑来解决跨批次的状况。

这里须要留神的是 乱序和早退数据(out-of-order and late events)的问题。如果不做解决,在上游常常会导致旧数据笼罩新数据,或者列更新不残缺的状况。

针对乱序和早退数据,咱们对 Hudi 做了 Multiple ordering value 的加强,保障每个源只能更新属于本人那局部列的数据,并且能够依据设置的 event time (ordering value) 列,确保只会让新数据笼罩旧数据。

后续咱们还筹备联合 lock less multiple writers 来实现多 Job 多源的并发写入。

5. 总结与瞻望

针对在 Shopee 数据系统建设中面临的问题,咱们提出了湖仓一体的解决方案,通过比照选型抉择了 Hudi 作为外围组件。

在落地过程中,咱们通过应用 Hudi 的外围个性以及在此之上的扩大革新,别离满足了三个次要的用户需要场景:实时数据集成、增量视图和增量计算。并为用户带来了低延时(约 10 分钟)、升高计算资源耗费、升高存储耗费等收益。

接下来,咱们还将提供更多个性,并针对以下两个方面做进一步欠缺,从而满足用户更多的场景,提供更好的性能。

5.1 跨工作并发写反对

以后 Hudi 反对了基于文件锁的单个工作单 writer 的写入形式。

然而在理论中,有一些场景须要多个工作多 writer 同时写入,且写入分区有穿插,目前的 OCC 对这种状况反对不佳。目前咱们正在与社区单干解决 Flink 与 Spark 多重 writer 的场景。

5.2 性能优化

元数据读取以及 File listing 操作无论是在写入端还是读取端都会有很大的性能耗费,海量的分区对外部元数据系统(比方 HMS)也会造成很大压力。

针对这一问题,咱们打算第一步将 schema 之外的信息存储从 HMS 过渡到 MDT;第二步是在将来应用一个独立的 MetaStore 和 Table service 的 server,不再强耦合于 HDFS。

在这个 server 中,咱们能够更容易地优化读取性能,更灵便地进行资源调整。

本文作者

Jian,大数据技术专家,来自 Shopee Data Infrastructure 团队。

正文完
 0