大数据实践解析下Spark的读写流程分析

6次阅读

共计 2679 个字符,预计需要花费 7 分钟才能阅读完成。

导读:

众所周知,在大数据 / 数据库领域,数据的存储格式直接影响着系统的读写性能。spark 是一种基于内存的快速、通用、可扩展的大数据计算引擎,适用于新时代的数据处理场景。在“大数据实践解析(上):聊一聊 spark 的文件组织方式”中,我们分析了 spark 的多种文件存储格式,以及分区和分桶的设计。接下来,本文通过简单的例子来分析在 Spark 中的读写流程,主要聚焦于 Spark 中的高效并行读写以及在写过程中如何保证事务性。

1、文件读

如何在 Spark 中做到高效的查询处理呢?这里主要有两个优化手段:

1)减少不必要的数据处理。数据处理涉及文件的 IO 以及计算,它们分别需要耗费大量的 IO 带宽和 CPU 计算。在实际的生产环境中,这两类资源都是有限的,同时这些操作十分耗时,很容易成为瓶颈,所以减少不必要的数据处理能有效提高查询的效率;

以下面的查询为例:

spark.read.parquet("/data/events")
.where("year = 2019")
.where("city ='Amsterdam'")
.select("timestamp")

由于在 events 表中按照 year 字段做了分区,那么首先通过 year 字段我们就可以过滤掉所有 year 字段不为 2019 的分区:

因为文件是 parquet 的文件格式,通过谓词下推可以帮助我们过滤掉 city 字段不是 “Amsterdam” 的 row groups;同时,由于我们的查询最终需要输出的投影字段只有 “timestamp”,所以我们可以进行列裁剪优化,不用读取其他不需要的字段,所以最终整个查询所读的数据只有剩下的少部分,过滤掉了大部分的数据,提升了整体的查询效率:

2)并行处理,这里主流的思想分为两类:任务并行和数据并行。任务并行指充分利用多核处理器的优势,将大的任务分为一个个小的任务交给多个处理器执行并行处理;数据并行指现如今越来越丰富的 SIMD 指令,一次动作中处理多个数据,比如 AVX-512 可以一次处理 16 个 32bit 的整型数,这种也称为向量化执行。当然,随着其他新硬件的发展,并行也经常和 GPU 联系在一起。本文主要分析 Spark 读流程中的任务并行。

下面是 Spark 中一个读任务的过程,它主要分为三个步骤:

(1)将数据按照某个字段进行 hash,将数据尽可能均匀地分为多个大小一致的 Partition;

(2)发起多个任务,每个任务对应到图中的一个 Executor;

(3)任务之间并行地进行各自负责的 Partition 数据读操作,提升读文件效率。

2 文件写

Spark 写过程的目标主要是两个:并行和事务性。其中并行的思想和读流程一样,将任务分配给不同的 Executor 进行写操作,每个任务写各自负责的数据,互不干扰。

为了保证写过程的事务性,Spark 在写过程中,任何未完成的写都是在临时文件夹中进行写文件操作。如下图所示:写过程中,results 文件夹下只存在一个临时的文件夹_temporary;不同的 job 拥有各自 job id 的文件目录,相互隔离;同时在各目录未完成的写操作都是存在临时文件夹下,task 的每次执行都视为一个 taskAttempt,并会分配一个 task attempt id,该目录下的文件是未 commit 之前的写文件。

当 task 完成自己的写任务时,会进行 commit 操作,commit 成功后,该任务目录下的临时文件夹会移动,写文件移到对应的位置,表示该任务已经写完成。

当写任务失败时,首先需要删除之前写任务的临时文件夹和未完成的文件,之后重新发起该写任务(relaunch),直到写任务 commit 提交完成。

整个任务的描述可用下图表示,如果 commit 成功,将写完成文件移动到最终的文件夹;如果未 commit 成功,写失败,删除对应的文件,重新发起写任务。当写未完成时,所有写数据都存在对应的临时文件中,其他任务不可见,直到整个写 commit 成功,保证了写操作的事务性。

当所有任务完成时,所有的临时文件夹都移动,留下最终的数据文件,它是最终 commitJob 之后的结果。

本文介绍的算法是 FileOutputCommitter v1 的实现,它的 commitJob 阶段由 Driver 负责依次移动数据到最终的目录。但是在当前广泛应用的云环境下,通常采取存算分离的架构,这时数据一般存放在对象存储中(如 AWS S3,华为云 OBS),Spark FileOutputCommitter 中的数据移动并不像 HDFS 文件系统移动那么高效,v1 的 commitJob 过程耗时可能会非常长。为了提升 FileOutputCommitter 的性能,业界提出了 FileOutputCommitter v2 的实现,它们可以通过 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 1 或 2 配置项来设置,它和 v1 的不同点在于,每个 Task 在 commitTask 时就将文件移动到最终的目录,而在 commitJob 时,Driver 只需要负责将 Task 留下来的空目录删除,这样相比 v1 带来好处是性能提升,但是由于 commit task 时直接写最终目录,在执行未完成时,部分数据就对外可见。同时,如果 job 失败了,成功的那部分 task 产生的数据也会残留下来。这些情况导致 spark 写作业的事务性和一致性无法得到保障。

其实 v1 也不完全一定能保证数据一致性,文件移动过程中完成的数据对外是可见的,这部分数据外部已经可以读取,但是正在移动和还未移动的数据对外是不可见的,而在云环境下,这个移动耗时会进一步加长,加重数据不一致的情况。

那么有没有能够使得 Spark 分析在云环境下也可以保证数据的事务性和一致性的解决方案呢?华为云数据湖探索 DLI(Data Lake Insight)改进了 v1 和 v2 这两种算法,使得 Spark 分析在云环境下也可以保证数据的事务性和一致性,同时做到高性能,并且完全兼容 Apache Spark 和 Apache Flink 生态,是实现批流一体的 Serverless 大数据计算分析服务,欢迎点击体验。

参考

【1】Databricks. 2020. Apache Spark’s Built-In File Sources In Depth – Databricks. [online] Available at: <https://databricks.com/session_eu19/apache-sparks-built-in-file-sources-in-depth>.

点击关注,第一时间了解华为云新鲜技术~

正文完
 0