共计 5913 个字符,预计需要花费 15 分钟才能阅读完成。
简介:作业帮是一家以科技为载体的在线教育公司,其大数据中台作为根底零碎中台,次要负责建设公司级数仓,向各个产品线提供面向业务主题的数据信息。本文次要分享了作业帮基于 DeltaLake 的数据湖建设最佳实际。
作者:
刘晋 作业帮 - 大数据平台技术部负责人
王滨 作业帮 - 大数据平台技术部高级架构师
毕岩 阿里云 - 计算平台开源大数据平台技术专家
内容框架:
- 业务背景
- 问题 & 痛点
- 解决方案
- 基于 DeltaLake 的离线数仓
- 将来布局
- 致谢
一、业务背景
作业帮是一家以科技为载体的在线教育公司。目前旗下领有工具类产品作业帮、作业帮口算,K12 直播课产品作业帮直播课,素质教育产品小鹿编程、小鹿写字、小鹿美术等,以及喵喵机等智能学习硬件。作业帮教研中台、教学中台、辅导经营中台、大数据中台等数个业务零碎,继续赋能更多素质教育产品,一直为用户带来更好的学习和应用体验。其中大数据中台作为根底零碎中台,次要负责建设公司级数仓,向各个产品线提供面向业务主题的数据信息,如留存率、到课率、沉闷人数等,进步经营决策效率和品质。
上图为作业帮数据中台总览。次要分为三层:
- 第一层是数据产品以及赋能层
次要是基于主题数据域构建的数据工具以及产品,撑持商业智能、趋势剖析等利用场景。
- 第二层是全域数据层
通过 OneModel 对立建模,咱们对接入的数据进行了标准化建模,针对不同时效性的场景构建了业务域的主题数据,进步下层产品的应用效率和品质。
- 第三层是数据开发层
构建了一系列的零碎和平台来反对公司内所有的数据开发工程,包含数据集成、工作开发、数据品质、数据服务、数据治理等。
本次分享的内容次要是面向离线数仓(天级、小时级)解决其生产、应用过程中的性能问题。
二、问题 & 痛点
作业帮离线数仓基于 Hive 提供从 ODS 层到 ADS 层的数据构建能力,当 ADS 表生成后,会通过数据集成写入 OLAP 零碎面向管理人员提供 BI 服务;此外,DWD、DWS、ADS 表,也会面向分析师提供线下的数据探查以及取数服务。
随着业务逐渐倒退以及对应的数据量越来越多,离线数仓零碎突显如下次要问题:
- ADS 表产出提早越来越长
因为数据量增多,从 ODS 层到 ADS 层的全链路构建工夫越来越长。尽管对于十分外围的 ADS 表链路能够通过歪斜资源的模式来短期解决,然而其实这个实质上就是丢车保帅的模式,该模式无奈规模化复制,影响了其余重要的 ADS 表的及时产出,如对于分析师来说,因为数据表的提早,对于 T + 1 的表最差需等到 T + 2 才能够看到。
- 小时级表需要难以承接
有些场景是小时级产出的表,如局部流动须要小时级反馈来及时调整经营策略。对于这类场景,随着数据量增多、计算集群的资源缓和,小时级表很多时候难以保障及时性,而为了进步计算性能,往往须要提前准备足够的资源来做,尤其是须要小时级计算天级数据的时候,最差状况下计算资源须要扩充 24 倍。
- 数据探查慢、取数稳定性差
数据产出后很多时候是面向分析师应用的,间接拜访 Hive 则须要几十分钟甚至小时级,齐全不能承受,常常会收到用户的吐槽反馈,而采纳 Presto 来减速 Hive 表的查问,因为 Presto 的架构特点,导致查问的数据表不能太大、逻辑不能太简单,否则会导致 Presto 内存 OOM,且 Hive 已有的 UDF 和 VIEW 等在 Presto 中也没法间接应用,这也十分限度分析师的应用场景。
三、解决方案
问题剖析
不论是惯例的 ODS 层到 ADS 层全链路产出慢、或者是面对具体表的探查取数慢,实质上都是在说 Hive 层的计算性能有余。从上述场景剖析来看:
链路计算慢的起因:因为 Hive 不反对增量更新,而来自业务层数据源的 Mysql-Binlog 则蕴含大量的更新信息,因而在 ODS 这一层,就须要用增量数据和历史的全量数据做去重后造成新的全量数据,其后 DWD、DWS、ADS 均是相似的原理。这个过程带来了数据的大量反复计算,同时也带来了数据产出的提早。
数据查问慢的起因:因为 Hive 自身短少必要的索引数据,因而不论是重吞吐的计算还是心愿保障分钟级提早的查问,均会翻译为 MR-Job 进行计算,这就导致在数据疾速探查场景下,查问后果产出变慢。
计划调研
从下面剖析来看,如果能够解决离线数仓的数据增量更新问题就能够进步链路计算的性能,而对于数据表反对索引能力,就能够在保障查问性能不降级的前提下升高查问的提早。
基于 HBase+ORC 的解决方案
解决数据的更新问题,能够采纳 HBase 来做。对 RowKey 设置为主键,对各列设置为 Column,这样就能够提供数据实时写入的能力。然而受限于 HBase 的架构,对于非主键列的查问性能则十分差。为了解决其查问性能,须要定期(如小时表则小时级、天级表则天级)将 HBase 的表依照特定字段排序后导出到 HDFS 并存储为 ORC 格局,然而 ORC 格局只反对单列的 min、max 索引,查问性能仍然无奈满足需要,且因为 HBase 的数据写入始终在继续产生,导出的机会难以管制,在导出过程中数据还可能发生变化,如咱们心愿导出 12 月 11 日 21 点前的数据作为数据表 21 点分区的数据就须要思考版本数、存储容量、筛选带来的计算性能等因素,零碎复杂度陡增,同时也引入了 HBase 零碎减少了运维老本。
数据湖
数据湖实际上是一种数据格式,能够集成在支流的计算引擎(如 Flink/Spark)和数据存储 (如对象存储) 两头,不引入额定的服务,同时反对实时 Upsert,提供了多版本反对,能够读取任意版本的数据。
目前数据湖计划次要有 DeltaLake、Iceberg、Hudi。咱们调研了阿里云上这三种计划,其区别和特点如下:
此外,思考到易用性(DeltaLake 语义清晰,阿里云提供全功能 SQL 语法反对,应用简略;后两者的应用门槛较高)、功能性(仅 DeltaLake 反对 Zorder/Dataskipping 查问减速)等方面,联合咱们的场景综合思考,咱们最初抉择 DeltaLake 作为数据湖解决方案。
四、基于 DeltaLake 的离线数仓
引入 DeltaLake 后,咱们的离线数仓架构如下:
首先 Binlog 通过 Canal 采集后通过咱们自研的数据散发零碎写入 Kafka,这里须要提前阐明的是,咱们的散发零碎须要对 Binlog 依照 Table 级严格保序,起因上面详述。其后应用 Spark 将数据分批写入 DeltaLake。最初咱们降级了数据取数平台,应用 Spark SQL 从 DeltaLake 中进行取数。
在应用 DeltaLake 的过程中,咱们须要解决如下关键技术点:
流数据转批
业务场景下,对于离线数仓的 ETL 工作,均是依照数据表分区就绪来触发的,如 2021-12-31 日的工作会依赖 2021-12-30 日的数据表分区就绪前方可触发运行。这个场景在 Hive 的零碎上是很容易反对的,因为 Hive 人造反对依照日期字段(如 dt)进行分区。然而对于 DeltaLake 来说,咱们数据写入是流式写入的,因而就须要将流数据转为批数据,即某天数据齐全就绪后,方可对外提供对应天级分区的读取能力。
如何界定数据齐全就绪
流式数据个别会有乱序的状况,在乱序的状况下,即便采纳 watermark 的机制,也只能保障肯定工夫范畴内的数据有序,而对于离线数仓来说,数据须要 100% 牢靠不丢。而如果咱们能够解决数据源的有序性问题,那么数据就绪问题的解决就会简化很多:如果数据依照天级分区,那么当呈现 12-31 的数据时,就能够认为 12-30 的数据都就绪了。
因而,咱们的计划拆解为两个子问题:
- 流数据有序后界定批数据边界
- 保障流数据有序的机制
首先对于前者,总体方案如下:
设定数据表的逻辑分区字段 dt 以及对应的工夫单位信息。
当 Spark 读取某一个 batch 数据后,根据上述表元数据应用数据中的 event time 生成对应的 dt 值,如数据流中 event time 的值均属于 T +1,则会触发生成数据版本 T 的 snapshot,数据读取时依据 snapshot 找到对应的数据版本信息进行读取。
如何解决流数据的乱序问题
不论是 app-log 还是 MySQL-Binlog,对于日志自身都是有序的,以 MySQL-Binlog 举例,单个物理表的 Binlog 必然有序,然而理论业务场景下,业务零碎会常常进行分库分表的应用,对于应用分表的场景,一张逻辑表 Table 会分为 Table1、Table2、……几张表,对于离线数仓的 ODS 表,则须要屏蔽掉业务侧 MySQL 分表的细节和逻辑,这样,问题就聚焦为如何解决分表场景下数据有序的问题。
保障分库分表,甚至不同分表在不同集群的状况下,数据写入到 Kafka 后的有序性。即写入 DeltaLake 的 Spark 从某个 topic 读取到逻辑表的数据是 partition 粒度有序的。
保障 ODS 表就绪的时效性,如辨别无 Binlog 数据的状况下,ODS 层数据也能够按期就绪。
此处须要对原有零碎进行降级革新,计划如下:
如上图所示:某个 MySQL 集群的 Binlog 经 Canal 采集后写入到特定的 Kafka-topic,然而因为写入时依照 db 和 Table(去分表_* 后缀)做 hash 确定 partition,因而单个 partition 外部会存在多个物理表的 Binlog,对于写入 DeltaLake 来说十分不敌对。思考到对其余数据利用方的兼容性,咱们新增了数据散发服务:
- 将逻辑表名(去分表_* 后缀)的数据写入到对应的 topic,并应用物理表名进行 hash。保障单 partition 外部数据始终有序,单 topic 内仅包含一张逻辑表的数据。
- 在 MySQL 集群内构建了外部的心跳表,来做 Canal 采集的提早异样监控,并基于此性能设置肯定的阈值来判断当零碎没有 Binlog 数据时是零碎出问题了还是真的没数据了。如果是后者,也会触发 DeltaLake 进行 savepoint,进而及时触发 snapshot 来保障 ODS 表的及时就绪。
通过上述计划,咱们将 Binlog 数据流式的写入 DeltaLake 中,且表分区就绪时间延迟 <10mins。
读写性能优化
上面讲下咱们在应用 DeltaLake 过程中遇到的性能问题以及对应的解法。
通过 DPP 进步写性能
DeltaLake 反对通过 SparkStreamingSQL 的形式来写入数据。
因为要做记录的合并去重,因而这里须要通过 merge into 的形式写入。DeltaLake 更新数据时分为两步:
- 定位到要更新的文件,默认状况下须要读取全副的文件和 Spark 内 batch 的增量数据做 Join,关联出须要更新的文件来。
- Merge 后从新写入这些文件,把老的文件标记为删除。
如上左图所示,因为 DeltaLake 默认会读取上个版本的全量文件,因而导致写入性能极低,一次合并操作无奈在 Spark 一个 batch 内实现。
针对这种场景,对 DeltaLake 做了降级:应用 DPP 做分区剪枝来优化 Megre into 的性能,如上右图所示:
- 剖析 Merge-on 条件,失去 source 表中对应到 DeltaLake 表分区字段的字段。
- 统计失去分区字段的枚举列表。
- 将上步后果转化成 Filter 对象并利用,进一步过滤裁剪数据文件列表。
- 读取最终的数据文件列表和 batch 的 source 数据关联失去最终需更新的文件列表。
通过 DPP 优化后,Spark 一个 batch(5min 粒度)的解决提早由最大 20mins+ 缩小到 最大~3mins,齐全打消了过来因为解决工夫过长导致提早一直叠加的问题。
应用 Zorder 进步读性能
在解决了数据的写入性能后,咱们又遇到了数据读取性能的问题。
咱们应用同样的数据(200 亿 +),应用 Hive 计算,均匀提早 10min+,而应用 DeltaLake 后,均匀提早竟然高达~11mins+。剖析后发现次要是没有对筛选列应用 Zorder 排序,当开启 Zorder 后,提早则升高到了~24s,进步了近 25X 性能。
基于 Zorder 对 DeltaLake 表进行查问优化,次要会波及两个方面的晋升:
- Dataskipping
DeltaLake 会依照文件粒度统计各个字段的 max/min 值,用于间接过滤数据文件。
- Zorder
一种数据 layout 的形式,能够对数据重排列尽可能保障 Zorder 字段的数据局部性。
Zorder 构建耗时优化
对哪些列开启 Zorder 是按需构建的,惯例状况构建时长~30mins,数据歪斜下,构建 Zorder 时长高达~90mins。
针对这两种状况,对 Zorder 进行了优化:
- 惯例状况下,对于多列的 Zorder,由屡次遍历数据集改为遍历一次数据集来晋升构建效率。构建时长从均匀~30mins 升高到~20mins。
- 数据歪斜下,对于歪斜列所在的 bucket 做了热点扩散,构建时长从均匀~90mins 升高到~30mins。
总体成果
通过了近半年多的开发和优化,近期基于 DeltaLake 的离线数仓曾经上线,重点是晋升剖析的查问优化,同时针对有小时全量需要的场景,也同样提供了反对,整体看的成果如下:
就绪工夫更快:ODS 替换到 DeltaLake 后,产出工夫从之前凌晨 2:00 – 3:00 提前到凌晨 00:10 左右,产出工夫提前了 2 个多小时。
能力扩大更广:大数据具备了反对小时全量表的能力,利用 DeltaLake 增量更新的个性,低成本的实现了小时全量的需要,防止了传统计划下读取全量数据的耗费。目前曾经利用到了局部外围业务中来,构建小时级全量表,同时时效性上保障从过来的~40mins 升高到~10mins。
查问速度晋升:咱们重点晋升的分析师的即席查问效率,通过将分析师罕用的数仓表迁徙到 Deltalake 之后,利用 Zorder 实现了查问减速,查问速度从过来的数十分钟升高到~3mins。
五、将来布局
随着 DeltaLake 在作业帮的应用,以后还有一些问题有待解决:
- 进步修数效力。
应用 Hive 时咱们能够不便的针对某个历史分区独立修复,然而 DeltaLake 表修数时须要通过回退故障版本后的所有版本。
- 齐全反对 Hive 引擎。
目前咱们应用 DeltaLake,次要解决了过来应用 Hive 查问慢、应用 Presto 限度简单查问的问题,在简单查问、低提早上提供了解决方案,但后面提到的 GSCD、Dataskipping 等个性 Hive 还不反对,导致用户无奈像应用 Hive 一样应用 DeltaLake。
- 反对 Flink 接入。
咱们流计算零碎生态次要围绕 Flink 构建,引入 DeltaLake 后,也同时应用 Spark,会导致咱们的流计算生态保护老本减轻。
六、致谢
最初,非常感谢阿里云 EMR 数据湖团队,凭借他们在 DeltaLake 中的业余能力和单干过程中的高效反对,在咱们这次数据湖迁徙过程中,帮忙咱们解决了很多关键性问题。
原文链接
本文为阿里云原创内容,未经容许不得转载。