乐趣区

关于spark:spark系列1deltaio到底解决了什么问题

本文转载自 https://mp.weixin.qq.com/s/ZN…

2019 年 10 月 16 日,在荷兰阿姆斯特丹举办的 Spark+AI 欧洲峰会上,Databricks 正式发表将 Delta Lake 捐献给了 Linux 基金会,其成为了该基金会中的一个正式我的项目。咱们期待在往年(2019 年)或者是将来,很快,Delta Lake 将会成为数据湖的支流或者说是事实标准。

在 9 月份颁布的 2019 年最佳开源软件奖名单中,Delta Lake 也榜上有名。正如官网对 Delta Lake 的颁奖评语形容,大家都很诧异,Databricks 公司居然把 Delta Lake 这个外围的拳头产品开源了。Delta Lake 的推出实际上是为了解决 Spark 作为大数据分析平台的诸多痛点,也置信它将会普惠整个 Spark 社区以及其余大数据社区,真正解决数据湖治理的各种关键问题。

很有幸,我参加了 Delta Lake 晚期的开发,尤其是 merge、update、delete 这种要害 DML 的设计和实现。这个我的项目最早启动于 2017 年 6 月。过后,多位用户向咱们埋怨 Spark 的有余和应用的不便,咱们公司的工程师们探讨后发现,是时候去提出咱们本人的存储架构。Spark 作为一种存储和计算拆散的一种计算引擎,之前咱们次要依赖于其余开源或非开源的我的项目去解决各种数据存储的问题,但实际上咱们发现在用户的生产环境中,现有的存储计划都没方法真正的解决数据湖。于是乎,咱们就和客户一起尝试去开发,去解决理论生产环境中的问题。通过四个月的疾速开发,咱们在 2017 年 10 月正式发表了 Delta Lake 产品的诞生。在第二年 6 月份的 Spark+AI 峰会中,Apple 的工程师和咱们的工程师 Michael 一起做了主题演讲,分享了 Apple 在应用 Delta Lake 的一些贵重教训,比如说他们过后用 Delta Lake 解决了 trillion 级别数据的大表的读写。

Summit 之后,咱们失去了多方的好评,目前已有超过 3000 个客户正在将 Delta Lake 用于他们的生产环境中。在这个背景中,咱们认为咱们应该把它推广到整个 Spark 社区,帮忙整个大数据社区解决他们大数据管理的痛点。于是,2019 年 4 月,咱们决定开源了。

但在开源之后,Spark 社区有很多反馈说 Delta Lake 是你们公司的一个的 Github repository,你们之后随时可能会改开源的 License,你们的开源管理模式都不是很通明。于是乎,为了解决这样的纳闷,咱们决定把它捐献给 Linux 基金会,让它成为一个规范凋谢的平台,让更多的人能够参加到 Delta Lake 的开发和应用中来。

明天咱们将分享一些典型的场景,为什么 Delta Lake 能够解决大家的各种痛点,而后也分享一下 Delta Lake 的基本原理和 Delta 架构,以及它如何取代大家正在广泛应用的 Lambda 架构。

数据工程师的纠结与运维的凌乱
项目经理总会跟工程师说,咱们有一个很简略的需要。可是,事实往往却是,这些简略的需要相当之难以实现。

在大数据的生产零碎,往往,作为工程师的你,会面对这样的一个项目经理:“我要有这么一个 Data Pipeline,继续地解决数据,并且是增量解决,只有有新的数据来了就解决,不应该每次把所有的历史数据都重新处理,而是只应该解决增量数据,并且要保障高效疾速。记住,咱们不能让用户在应用中意识到这是批处理还是流解决。总之,就是疾速失去正确后果。”

那么作为数据工程师的你,要建设一个根本的 Data Pipeline [数据处理流水线],依照项目经理的说法,那就很简略。咱们把 Kafka、Kinesis、各种各样数据湖的格局用 Spark 读出来,再用 Spark 做一些数据清理和数据转换,而后再把后果存到一个数据湖,再用另一个 Spark job 把数据湖的内容分析一下,做训练或者做各种各样的剖析,最初产生一个报告给终端用户。这是一个非常简单的 Pipeline。然而这个 Pipeline 有个头痛的问题。如果仅仅用 Spark 的批处理,那么提早可能不达标,而且也不是在做增量解决。

那么第二个计划就进去了,用 Spark Structured Streaming。Structured Streaming 有 Trigger Once,能够帮你记录上次解决到什么中央,这样的话能够把提早升高,只解决增量,你也不须要去记录和治理上次解决到哪里了。可是咱们又遇到了一个新的问题,就是你如果用 Structured Streaming,每个小的 Batch 都会产生多个小的 Spark 的后果文件。小文件越来越多,整个 Pipeline 就越来越慢,提早往往到了最初就无奈承受了。

为此,那咱们就得抉择下一个计划。咱们既然有小文件,就得定期去做压缩。然而在做压缩的过程中整个作业线会下线。为什么?因为不足原子性读写的能力,没方法在写你的压缩的时候同时读数据。压缩的周期太长也会影响到你的生产最初报表的时效性。比如说,业务是不能承受半小时或者一个小时这种提早的。那么,这个时候,大家自然而然会抉择最经典的架构,Lambda 架构。就是说,你同时能够部署一个批处理的和一个流解决的,批能够慢一点,然而后果全面精确,而流解决就是用最快的工夫对最新增量产生后果。而后将批和流的后果汇总,产生一个全局的后果。

然而这种 Lambda 架构须要同时经营两个不同的 pipeline,并且额定资源耗费也大幅增多,经营的人力和资源老本都大幅提高。

并且咱们对这两个 pipeline 都须要做验证。尤其是当数据来源于非构造数据的数据源,数据不是特地洁净和统一。

对于验证发现的谬误,咱们又不心愿将 Pipeline 给宕下来,而是心愿它主动去修复。那么,一种解决方案就是防止对全表做修改,而是对某些分区重新处理。数据的重新处理个别都会影响你整个 pipeline 的提早,而且还进一步减少硬件资源的负荷和 pipeline 的复杂度。

之后兴许会有一些业务上的调整,或者是诸多起因,你可能想把数据湖做一些 update 和 merge。因为以后数据湖不反对 update 和 delete,那么你可能须要本人实现 update 和 merge。咱们发现不同用户的实现办法都不太一样,几乎就是各显神通,这些计划岂但容易出错,复杂度和提早也很高,而且大多数状况还不通用。

简单归简单,然而通过了半年的研发,计划终于能够上线了,应该是一件开心的事件。可是这个 Lambda 架构上线之后你会收到有数的埋怨,比如说你这个数据加载太慢了,咱们做一些元数据操作的时候,其余并行的命令和查问都没方法用,都被 block 了。不得不等这些大的数据加载,或者是元数据处理做完了能力再做别的事件。或者用户做 update 改数据湖的时候会失去大量的报告说 FileNotFound。兴许是你的文件地址被更新了,然而元数据缓冲没有更新,找不到文件还须要 Refresh 缓存,但有时候客户会埋怨说 Refresh 如同不论用,可是什么时候管用呢?如果你用的 Object Store,剖析到最初,可能发现是 Eventual Consistency 的问题,兴许你不得不要过半小时之后才会见到这个文件……总之就是各种各样的错。

运维曾经很不容易了,相煎何太急。这个 Lambda 架构费钱又费劲,将大好的时光节约到了解决零碎的各种有余和局限,而不是花工夫去从数据抽取价值,真是得失相当。

然而咱们再反过来看,最开始第一个计划实际上是很简略,很柔美。那它到底哪里错了?是什么起因导致它最初变得这么简单?咱们缺了什么?如何能够简化来产生一个简略易保护的架构?

这里咱们列出了五点起因:

1)第一,要反对同时读写,就意味着你写的时候还能够读,不应该读到一个谬误的后果。同时还能够反对多个写,且能保证数据的一致性;

2)第二,能够高吞吐地从大表读取数据。大数据计划不能有诸多限度,比方,我据说有些计划里最多只能够反对几个并发读,或者读的文件太多了就不让你提交作业了。如果这样,对业务方来说,你的整个设计是不满足他的需要的;

3)第三,谬误是无可避免,你要能够反对回滚,能够重做,或者能够删改这个后果,不能为了反对删改而要求业务方去做业务逻辑的调整;

4)第四,在从新扭转业务逻辑的时候要对数据做重新处理,这个时候,业务是不能下线的。在数据被重新处理实现之前,数据湖的数据是要始终可被拜访的;

5)第五,因为有诸多起因,数据可能会有晚到的状况,你要能解决早退数据而不推迟下阶段的数据处理。

基于以上五点,咱们基于 Delta Lake 和 Structured Streaming 产生了一个新的架构,叫 Delta 架构,它是对 Lambda 架构的一种颠覆,或者称为一种晋升。

在 Delta 架构下,批流是合并的,并且要继续的进行数据处理,按需来重新处理历史数据,并且利用私有或公有云的个性来对计算或者存储资源按需别离做弹性扩大。

Delta Lake 的基本原理

Delta Lake 的基本原理其实很简略,简略得令人发指。作为一个一般的 Partquet 个别就是 Partition Directories 再加一些 Data Files。Delta Lake 也是基于这个构造的,惟一的区别就是它有一个 Transaction Log 记录你的 Table Version 和变更历史。

当初,让咱们来从新对待什么形成了一张表。表实际上是一堆操作的后果,比如说扭转元数据,扭转名字,扭转 Schema,减少或删除一些 Partitioning,还有另外一种操作是增加或者移除文件。所有表的以后状态或者是后果,都是这一系列 Action 产生的后果。这个后果蕴含了以后的 元数据,文件列表,transaction 的历史,还有版本信息。

那怎么去实现这个原子性?也很简略,只有保障 Commit File 的程序和原子性就能够了。

比如说表的第一个版本,它是减少两个文件,第二个版本就是把这两个文件删掉,减少一个新的文件,作为 Reader 来说,每次只能看到以后曾经 Commit 的后果。

怎么实现多个写入的并发?Spark 的 Pipeline 个别都是高并发读,低并发写。在这种状况下,乐观并发就更加适合了。它实际上很简略,就说你多个用户读的时候,先记录一下以后读用的 data 版本是什么,如果同时有两个人都在 commit,只有一方能够胜利,而另一方就须要去看一下胜利方之前的 commit 里有没有碰他读的文件。如果没有改,他就改一下文件名就行了,如果改了,那就得重做。这个能够是 Delta Lake 主动去重试,也能够是事务提交方 / 业务方,去重做。

Delta Lake 须要解决的另一个经典问题就是大规模元数据的解决。你发现你有大量的 commit log file,因为每次 commit 都会产生一个文件,这其实也是一个经典的小文件解决。如何解决这种元数据处理?标准答案就是应用 Spark。Delta Lake 便是应用 Spark 去解决它的元数据。比方方才说了一个例子,加了两个文件,减了两个文件,之后加了一个 parquet,之后 Spark 会把这些 commit 全副读下来,产生一个新的,咱们称之为叫 Checkpoint。

这就是 Delta Lake,就是这么简略。

Delta 架构
Delta 架构简介
咱们看一下 Delta 架构,怎么用 Delta 架构代替经典的 Lambda 架构。

1)第一,同时读写,并且要保证数据的一致性

就是方才咱们提出的第一个需要,就是要反对 transcation,就是说你只有能实现读写之间的 Snapshot isolation 就行了,这样你能够集中在你的 data flow,而不必放心会不会读到局部后果,不必放心 FileNotFound 的这类谬误,这些事件 Delta Lake 都能够帮你解决。

Delta Lake 提供了流,就是 streaming 和 batch 的读入和写入,规范 API,很容易实现,很容易去用。你能够在文档外面找到具体的 API。

2)能够高吞吐从大表读取数据

可能解决过大数据的同学们就遇到过这个经典痛点,我也解决过客户的这种问题好屡次,在 没有 Delta Lake 的时候,几乎痛不欲生。

如果没有 Delta Lake,读取百万级的 patition 的 location path 是须要用 Hive metastore 一行行地读的,要取一百万行几乎是奇慢无比。而后,在每个 patition 的 地址里还须要通过文件系统 列外面蕴含的所有文件。这在对象存储的零碎里,这种操作也是又贵又慢。

其实这个问题不又是一个典型的大数据问题吗?大数据系统都解决不了大数据问题,那不是贻笑大方?

当然,这里的解决方案很简略,就是规范的 Spark,用 parquet 去存 file path,并且用 Spark 的分布式的向量化的读入去读,这就是 Delta Lake 怎么去解决之前的痛点。咱们客户因为这个性能轻松地进步了几百倍甚至几千倍。其实也就是因为 Hive metastore 和文件系统的 list file 操作切实太慢了。

3)反对回滚和删改

数据这么脏,回滚和删改需要难以避免。Delta Lake 提供了 Time travel,因为 transaction log 理论能看到整个历史变动的后果,所以 Delta Lake 实现这个很不便。咱们提供了两条 API,你能够基于 Timestamp 去做,也能够基于 version number。Time travel 是一个特地好的性能,它能够做很多事件,不单单是纠错,你还能够 Debug,重建过往报告,查账,审计,简单的 temporal query,对疾速更新数据的表做版本查问……

Delta Lake 还反对删改(update/delete/merge),不过目前 Delta 还没有本人的 SQL 语法,当然咱们能够把 Spark 的语法齐全复制过去,然而保护老本也很高。但 Spark 3.0 来了之后这个问题就迎刃而解了。当然,如果要反对 Spark 2.4 的话,Delta 须要加上本人的 SQL parser,咱们还在探讨要不要这样干。

4)在线业务不下线的同时能够重新处理历史数据

你只有对 Delta Lake 做相干后果的删除,从新改一下业务逻辑,历史数据再做批处理,你就能够失去你的最新后果了。与此同时,因为 Delta Lake 反对 ACID,数据的上游实用方还能够同时拜访之前版本的数据。

5)解决早退数据而无需推延下阶段的数据处理

解决早退数据也不是什么问题,只有你能反对 merge,如果存在就 update,不存在就 insert,不影响你现有的 Delta Lake 重写。

如上所述,Delta Lake 完满解决了咱们的需要,让大家的 Data pipeline 从新变得简略而优雅,而不须要用那么简单的 Lambda 架构了。

怎么最好地应用 Delta 架构?基于跟客户的各种的探讨教训,咱们总结出了上面几点。

你须要有多个 stage 的 Delta Lake。咱们的根本 idea 是这样的:第一个 stage 就是你要保障没有原始数据损失。它保留在 Delta Lake 里,万一哪天发现之前的一些数据清理导致失落了很重要的信息,你还能够轻松复原。第二个 stage 就是做数据清理,做一些清理、转换、filter。而后才真正达到一个能够被数据分析的第三个 stage。这是基于数据品质分成多个级别,多个状态。至于理论生产线上须要多少个 stage,这个取决于业务的复杂度,SLA,和对提早的要求。

Delta 架构的个性
来看一下 Delta 架构的个性。

1)继续数据流

这听起来如同很高大上,但实际上略微解释多一点就很容易明确。

批流合并。Streaming 和 batch 用同一个 engine,不必保护多个;同一套 API,甚至都不必 batch 的 API,就用 streaming 的 API 就能解决问题;同样的 user code,无需用到 Lambda 架构,纯正就是一条 pipeline 解决所有问题。高效增量数据载入。如果一直有新数据进来就间接用 Structured Streaming 的 Trigger.Once 去记录上一次你解决到哪,你只须要重启这个 Trigger.Once,就解决了上次之后的新数据, 特地不便。疾速无提早的流解决,你能够抉择不同的 Trigger 的模式,当然 Trigger.Once 最省钱,当然你也能够低提早,比方多长时间 Trigger 一次,也能够低提早用继续 Trigger。你能够把批处理变成一个继续流解决,简略易用。而且 Delta Lake 因为反对原子性,所以它能保障 exactly once,这一点很重要,其余的数据源根本没方法保障。

2)物化两头后果

这一点就有点颠覆传统模式了。咱们倡议屡次物化你的两头后果,也就是之前说的多个 stage。每个 stage 就是把两头后果落地存在文件里,它有以下益处。

容错复原,出问题后能够回到某一个版本,从那个时候再开始,你不须要从最原始的数据开始,这点在 pipeline 里是很重要的事件。不便故障排查,你晓得哪一步出错了,要是不存,业务方报告出错的时候你也不晓得问题出在哪儿,连 debug 都没法 debug,回溯都没方法回溯。一写多读,当你的 pipeline 很多很简单的时候,可能重用两头的一些后果,这真的很不便。这外面比如说图例的两个 pipeline,其实到 T3 之前,都是一样的。咱们就能够复用。

如果你的转换很简单的时候,能够物化屡次。到底物化多少次,取决于你对 Reliability/SLA 和 end-2-end latency 的取舍,你要是 Reliability/SLA 好,你就必须要物化多几次,然而写必定有代价,所以 end-2-end latency 就慢,具体就要看你的需要了。

3)费用和提早的取舍

流解决,继续的数据流入和解决,无需作业调度治理,须要永远在线的 cluster。频繁的批处理,分钟级数据流入和解决,不须要低提早,比方半小时就能够了,须要 warm pool of machine,无事关机,按需启动。可应用 Spark structured streaming 的 Trigger.Once 模式。非频繁批处理,若干小时或若干天的数据批流入和解决,无事关机,按需启动,也可应用 structured streaming 的 Trigger.Once 模式。这样一来,就能够节俭很多资源了。

4)优化数据的物理存储

依据罕用查问的 predicate,为改善读取速度,可优化数据的物理存储。比方,用 partitioning 和 z-ordering。Partitioning 大家都应该很分明了,low cardinality 的 column 比拟适合,就是每个 partition 不要超过 1 GB,个别比如说用 date 这是一种常常被应用的 partition column,每个 date 外面要给予不同的 eventType。这样,每个 partition 不会太大,也不会产生太多 partition。反之如果用 timestamp 做 partition column,产生的 partition value 就是无数个,几乎奇葩无比,能够轻松把 Hive metastore 给撑爆。在 Delta Lake 外面咱们也不倡议,即便咱们不必 metastore。第二就是 Z-Ordering,这个还没到开源的版本,然而这个是能够解决什么问题呢,就是是针对那种 high cardinality,就是 column 里有大量的不一样的 value,这种就适宜做 z-ordering index。

5)重新处理历史数据

每次 keep 住上一个 stage 的益处是什么?你把后果一删,从新用 Tigger.Once 再做一次就好了,后果就进去了。如果你零碎部署在云上,那对你来说也很简略,你如果要疾速回填,你就再多加几台机器,后果就更快地进去了。比方,从原来的十台机器扩张到一百台。

6) 数据品质的调整

这是也是一个须要扭转大家思维形式的中央。

在最开始的时候,咱们最好是保障数据完整性。schema 能够抉择主动合并,就能够防止数据的失落。到了最初阶段,咱们就须要去强制 schema 不能变,data type 不能变,data expectation 也不能。比方,不能有 NULL。数据品质对于数据分析的准确度是至关重要的。

以上个性也不是很难了解,然而须要扭转思维形式。

Delta 架构的长处
1)缩小端到端的 pipeline SLA 多个应用单位(客户)把 data pipeline 的 SLA 从几小时缩小到几分钟。

2)缩小 pipeline 的保护老本原来的 Lambda 架构几乎就是费时费力。要同样达到分钟级的用例提早,Delta Lake 架构并不需要这么简单。

3)更容易的解决数据更新和删除简化了 Change data capture,GDPR,Sessionization,数据去冗。这些都能够用 Delta Lake 去实现,不便很多。

4)通过计算和存储的拆散和可弹缩而升高了 infrastructure 的费用多个应用单位将 infrastructure 的费用升高了超过十倍。

Delta 架构的经典案例
这里分享 3 个 Delta 架构的经典计划。

第一个是 COMCAST,一个像中国移动的通信类公司,它收集了美国海量的用户数据。它的 Petabyte-scale jobs 应用 Delta Lake,从原来须要 640 个服务器降到 64 个,原来是 84 个 job 升高到 34 个 job,提早还降了一半。

第二个是 Sam‘s Club,它们也是应用 Delta Lake,原来基本达不到数据的一致性,当初能够达到。提早从一个小时降到六秒。

第三个就是澳洲的 healthdirect,数据更洁净 \ 更统一了,做数据分析匹配的准确度从 80% 升到 95%,数据加载的时耗从一天降到了 20 分钟。

这都是来自于 Delta Lake 用户在 Spark Summit 上分享的案例。

应用 Delta Lake 特地简略,就把 parquet 的 keywords 一换。

怎么加这个 Delta Lake 呢,把这个 package 加上就好了。具体方法见 demo 演示:Delta Lake Primer(https://dbricks.co/dlw-01)。

Delta 社区
Delta Lake 迭代挺快的,咱们外部实际上还有大量的 feature,只是还没有开源,咱们会逐渐开源并且增强研发。

阿里巴巴的团队在帮 Delta Lake 做让 Hive 能够读 Delta Lake 外面的数据。

Delta Lake 的社区倒退也真的很快,从 2019 年 4 月份开源,目前已有 3700 个客户,maven 的下载量快两万,咱们本人客户使用量曾经达到超过 2 exabyte 的读和写。

欢送大家当初开始发明你本人的 Delta Lake,踊跃加入 Delta Lake 的社区,https://delta.io/ 有个 Slack Channel,大家能够看到各种各样的问题,咱们的工程师和专家们都在踊跃回应各种问题。

退出移动版