乐趣区

关于字节跳动:Hudi-Bucket-Index-在字节跳动的设计与实践

由字节跳动数据湖团队奉献的 RFC-29 Bucket Index 在近期合入 Hudi 主分支,本文具体介绍 Hudi Bucket Index 产生的背景与实践经验。

文 | 字节跳动数据平台数据湖团队

Hudi 是一个流式数据湖平台,提供 ACID 性能,反对实时生产增量数据、离线批量更新数据,并且能够通过 Spark、Flink、Presto 等计算引擎进行写入和查问。Hudi 官网对于文件治理和索引概念的介绍如下:

Hudi 提供相似 Hive 的分区组织形式,与 Hive 不同的是,Hudi 分区由多个 File Group 形成,每个 File Group 由 File ID 进行标识。File Group 内的文件分为 Base File (parquet 格局) 和 Delta File(log 文件),Delta File 记录对 Base File 的批改。Hudi 应用了 MVCC 的设计,能够通过 Compaction 工作把 Delta File 和 Base File 合并成新的 Base File,并通过 Clean 操作删除不须要的旧文件。
Hudi 通过索引机制将给定的 Hudi 记录统一地映射到 File ID,从而提供高效的 Upsert。Record Key 和 File Group/File ID 之间的这种映射关系,一旦在 Record 的第一个版本确定后,就永远不会扭转。简而言之,蕴含一组记录的所有版本必然在同一个 File Group 中。
在本文中,咱们将重点介绍 Hudi 索引机制相干的作用和原理,以及优化实际。

Hudi 索引的作用与类型

索引的作用

在传统 Hive 数仓的场景下,如果须要对一个分区数据做更新,整个更新过程会波及三个很重的操作。举一个更直观的例子。假如一个 Hive 分区存在 100,000 条记录,散布在 400 个文件中,咱们须要更新其中的 100 条数据。这三个很重的操作别离是:

  1. 从 400 个文件中读出 100,000 条数据
  2. 与 100 条更新的数据做分布式关联,取最新值
  3. 将更新后的 100,000 条数据写入长期目录,最初笼罩原先的数据

由此能够引出三个问题:

  1. 读那么多文件是必要的吗?
  2. 更新那么多文件是必要的吗?
  3. 分布式关联是必要的吗?
    假如在数据分布最蹩脚的状况下,须要被更新的 100 条数据分布在 100 个文件中。那咱们理论须要读和更新的文件是多少个?
    答案是 100 个,只占总量的 1/4。
    因而,Hudi 为了打消不必要的读写,引入了索引的实现。在有了索引之后,更新的数据能够疾速被定位到对应的 File Group,以上面的官网的示意图为例,
  • 防止读取不须要的文件
  • 防止更新不必要的文件
    无需将更新数据与历史数据做分布式关联,只须要在 File Group 内做合并

    索引的类型

    索引是独立模块,开源 Hudi 次要提供以下两种索引:

    在本文中,咱们将介绍一个新的 Hudi 索引模块 Bucket Index 在字节跳动的设计与实际。

    Bucket Index 产生背景

    索引带来的性能收益是十分微小的,只管 Hudi 已反对 Bloom Filter Index、Hbase index 类型,但在字节跳动大规模数据入湖、摸索剖析等场景中,咱们依然碰到了现有索引类型无奈解决的挑战,因而在实践中咱们开发了 Bucket Index 的索引形式。

    业务场景挑战

字节跳动某业务部门须要利用实时数据计算各种指标。在其业务场景中存在定期批量写入和流式写入场景,整个流程能够形容如下:

  1. 批量场景会先将 binlog 导入存储到 Hive 离线仓库中,再依照小时 / 天级粒度更新数据湖。
  2. 实时场景则通过 Flink 生产更新的 kafka 数据,写入数据湖,供上游业务应用。
  3. 当源头数据中的记录存在主键反复的状况下,须要保留最新一条数据即可。
  4. 在剖析侧,业务会基于 Hudi 数据集,通过 Presto/Spark 查问引擎,构建可视化的 BI 报表看板,供经营或分析师自助进行近实时数据分析。
    随着入湖的数据量减少,Hudi 中生成了约 40,000 个 File Group。尽管该业务部门应用了 Hudi 索引防止了全局合并操作,然而随着 File Group 的数量以及存储的数据量减少,定位 File Group 的工夫也在减少,这造成了 Upsert 速度逐步迟缓的状况,这重大影响了工作产出工夫,甚至导致工作无奈跑下去。

剖析与对策

为了解决 Upsert 数据场景逐渐迟缓的状况,字节跳动数据湖团队对整体的性能降落起因做了进一步剖析,并针对性地提出了解决方案。

  • 原先的业务场景应用了默认的 Bloom Filter Index 的索引形式。在察看中,团队发现最终在数据量约 30TB 的场景下,定位 Record 的性能会十分蹩脚,此时一共产生了约 5 千亿条记录散布在 40,000 个 File Group 中。
  • 在 5 千亿条记录的数据规模下,团队发现定位迟缓的问题来自 Bloom Filter Index 的假阳性。当 Bloom Filter 产生假阳性时,Hudi 须要确定该 Record Key 是否真的存在。这个操作须要读取文件里的理论数据一条一条做比照,而理论数据量规模很大,这会导致查问 Record Key 跟 File ID 的映射关系代价十分大,因而造成了索引的性能下滑。
  • 团队也调研了 Hudi 的另外一种索引形式 Hbase Index。这是一种 HBase 外置存储系统索引。但因为业务方不心愿引入 HBase 这一额定依赖,且放心运维 Hbase 过程中存在新的问题,认为 Hbase Index 整体不够轻量,因而在整个业务场景中也无奈作为 Bloom Filter 索引的代替。

在这样的场景下,字节跳动须要一个更加轻量且高效的索引形式,并且可能防止在大数据场景下的插入性能问题。

在一直实际中,字节跳动数据湖团队在逻辑层开发了一种基于哈希的索引,使得在插入过程中,定位传入 Record 的待写入文件地位信息时,无需读历史的 Record,并奉献到了社区的 RFC-29。

革新过后,索引层变成了一层简略的哈希操作,能够间接通过对索引键的哈希操作来找到文件所在的地位。

Bucket Index 设计原理

Bucket Index 是一种基于哈希的索引,借鉴了数据库里的 Hash Index。给定 n 个桶,用 Hash 函数决定某个记录属于哪个桶。最终所有分区被分成 N 个桶,每个桶对应一个 File Group。

相比拟 Bloom Filter Index 来说,Hash Index 在逻辑层面提供了 Record Key 跟 File Group 的映射关系,不存在假阳性问题。雷同 key 的数据肯定是落在同一个桶外面。最终一分区内的构造如下,目前一个 Partition 外面 Bucket 和 File Group 是一一对应的关系。

Bucket Index 数据写入原理

Bucket Index 的理论写入流程能够参考上面的过程示意图。以上面的实时插入场景为例,某业务批次新增了 5 条记录,并且须要 Upsert 到已有的分区 partition=20220203 中,对已有数据依据主键 Record 做一个更新,保留最新的数据。
整个过程能够用上面的示意图示意:

  1. 在建表时先预估表的单个分区数据存储大小,设置一个分桶数 numBuckets。
  2. 在数据插入前,首先生成 n 个 File ID, 将 File ID 的前 8 位替换成 bucketId 的数字:

    00000000-e929-4327-8b0c-7d0d66091321
    00000001-e3cd-4756-b311-863803a6cdaf
    00000002-c4ed-4418-90d4-6e348f380636
    00000003-c7bd-4916-78c5-6g787g090636

  3. 在插入过程中,最重要的一步就是标记每条新插入的记录属于哪个文件 File Group,而后找到对应的 File Group 去更新或者合并。在目前的设计中,分桶数跟 File Group 是一一对应的映射关系,因而找到每条 Record 对应的桶 ID,即可确定 Record Key 跟 File Group 的映射关系。
    在具体实现中,咱们会对更新数据的索引键计算哈希,再对分桶数取模疾速定位到每个 Record 对应的桶,整个过程如上面的 Hash 函数所示:

    hashKeyFields.hashCode() & Integer.MAX_VALUE) % numBuckets

    其中 hashKeyFields 能够由用户指定,是 Record Key 的一个子集,当默认不指定时,会以 Record Key 自身作为 hash 键。在计算好后,每条记录即可晓得行将写入的桶。

  4. 实现数据写入
    通过索引层之后,每条数据都会带有一个 File ID,引擎会依据 File ID 进行一次 Shuffle,将雷同 File ID 的数据导入到同一个子工作中。对于 COW 表而言,更新 Update 局部须要和已有的 BaseFile 合并生成新的 BaseFile。而 MOR 表将 Update 的数据间接写入对应 File Group 的 delta log,Insert 局部生成新的 BaseFile,最终实现该批次数据的 Upsert。
    由此可见,整个过程中 Bucket Index 不须要对现有的数据进行扫描组成相似 Bloom Filter 一样的过滤器,因而能够省去整个定位 File Group 的查问工夫,定位 File Group 的工夫也不会随着已有 Record 条数的减少而导致性能降落。同时分桶操作会在每个桶内对分桶列排序,排序后的数据个别能取得更高的压缩率,也能节俭存储。

    Bucket Index 查问优化原理

在查问时,Bucket Index 的查问优化会充分利用支流计算引擎的个性。例如 Spark 会利用表的 Bucket 散布做查问优化,例如晋升查问性能。从 Bucket Index 表中读取数据时,因为数据分布曾经依照按索引字段进行聚类和排序。Spark 能够通过在优化器中利用规定来匹配这种模式,来防止一些 Shuffle 操作。
目前的优化规定次要有上面两种:

  • Bucket Pruning,利用表的 Bucket 散布对读取数据进行剪枝。
    例如,如下的 T1 表的 bucket column 为 city,在执行上面查问时:
    select * from T1 where city = beijing
    在针对索引列 city 的某个值进行查问时,实际上只需读取一个分桶数据 (bucket pruning) , 因为 city= beijing 的 Record 在一个分区中必然是 Hash 到同一个 Bucket,这样对于每个分区来说,被 Scan 读取的 Hudi 数据量会大大减少。
  • Bucket Join,利用表的 Bucket 散布缩小 Aggregate/Join 带来的 shuffle 操作。
    对于 Group by 的场景,例如 city 是其中的一个索引列,在进行上面的聚合操作时:
    select city from T1 group by city
    因为雷同 A 的取值必然是落在同一个 bucket 桶中,因而寻找 city=’beijing’ 时,不须要去拜访其它的 bucket 中去取得,因而能够在 window 操作时能够省去一次 Shuffle 操作。

同理在 Join 的过程中,如果 T1 是一张 bucket 表并且 bucket index 的索引键为 city。而 T2 是一张非 bucket 表。
在 join 时,对于开启 bucket index 的表 T1 能够防止一次额定的 exchange 操作:

select count(*) from T1 join T2 where T1.city = T2.city

总体而言,所以利用 Bucket Index 的 Hudi 表能够做到晋升过滤速度和进步查问效率。

Bucket Index 的实际与将来布局

在实际过程中,咱们也发现了 Bucket Index 的一些实际倡议以及将来的方向。一个要害的问题,是如何确定 numBuckets 的值,目前 Bucket Index 的桶数量,须要依据预估的数据量提前在建表时进行确定,且建表后不可更改,对于这种限度,咱们目前有上面的解决方案。

要设置正当的桶数量,须要预测表的指标大小和将来数据增长状况。

  • 桶的数量过小会升高整体引擎的并行速度,起因不难理解:当数据量增大时,单个 File Group 对应的数据将增大,而 Hudi 表是以 File Group 为单位将数据切割生成 inputSplit 的,单个 File Group 数据过大将导致查问并发升高,性能降落。一般说来倡议单个桶的大小管制在 3GB 左右。
  • 同时咱们也应该防止桶的数量过多,过多的桶数量则会造成单个桶的数据量太小,造成小文件状况。基于这样的范畴,当指标表的大小能够被预测时,咱们能够比拟容易失去一个适合的 Bucket Index 的桶数量值。

当然,咱们也意识到这样的做法并不是一个灵便的办法。在将来,咱们将推出可扩大的 Hash Index 桶办法来彻底解决这个问题。咱们将反对已有的 Hudi 表在建表后间接扩大桶的数量,以防止当业务数据暴增时单个文件太大,影响查问以及 Compaction 性能。咱们的后续优化将利用 Hashmap 的扩容过程,将分桶数按倍数做到轻量级扩容。当桶的数量在初期预测设置较小时,今后也能动静扩容,能够彻底解决预估桶数量不精确带来的懊恼。

总结

总结而言,Hudi Bucket Index 作为一种基于哈希的索引,充沛做到了轻量级。对更新数据的主键计算哈希,再对分桶数取模疾速定位到 File Group,能够稳固的保障导入性能。相比 Bloom Filter Index 而言,在大数据导入 Upsert 场景下有肯定的劣势,帮忙字节跳动的业务部门解决了导入性能随着数据量增长而降落的难题。
同时在查问时,也能充沛跟计算引擎联合,利用表的 Bucket 散布对读取数据进行剪枝,并且利用 Bucket 散布个性缩小 Aggregate/Join 带来的 Shuffle 操作,晋升了查问性能。
对于 Hudi 应用用户来说,也不须要扭转原有的习惯,只需以插拔的形式指定 Hudi 表想应用的索引类型和桶的数量配置即可,充沛做到了易用性与便捷。
目前 Hudi Bucket Index (RFC-29) 的实现曾经合入社区最新的主分支,因而,咱们十分举荐宽广 Hudi 社区用户在实践中应用,并且欢送各位同行在 Hudi 社区进行技术交换与深刻探讨,后续咱们也会基于 Bucket Index 的反馈继续奉献新个性。

产品介绍

火山引擎湖仓一体剖析服务 LAS

湖仓一体剖析服务 LAS(Lakehouse Analytics Service)是面向湖仓一体架构的 Serverless 数据处理剖析服务,提供一站式的海量数据存储计算和交互剖析能力,齐全兼容 Spark、Presto、Flink 生态,帮忙企业轻松实现数据价值洞察。地址

火山引擎 E-MapReduce

反对构建开源 Hadoop 生态的企业级大数据分析系统,齐全兼容开源,提供 Hadoop、Spark、Hive、Flink 集成和治理,帮忙用户轻松实现企业大数据平台的构建,升高运维门槛,疾速造成大数据分析能力。地址

欢送关注 字节跳动数据平台 同名公众号

退出移动版