作者:vivo 互联网搜寻团队 - Deng Jie
随着技术的一直的倒退,大数据畛域对于海量数据的存储和解决的技术框架越来越多。在离线数据处理生态系统最具代表性的分布式解决引擎当属 Hive 和 Spark,它们在分区策略方面有着一些相似之处,但也存在一些不同之处。
一、概述
随着技术的一直的倒退,大数据畛域对于海量数据的存储和解决的技术框架越来越多。在离线数据处理生态系统最具代表性的分布式解决引擎当属 Hive 和 Spark,它们在分区策略方面有着一些相似之处,但也存在一些不同之处。本篇文章将剖析 Hive 与 Spark 分区策略的异同点、它们各自的优缺点,以及一些优化措施。
二、Hive 和 Spark 分区概念
在理解 Hive 和 Spark 分区内容之前,首先,咱们先来回顾一下 Hive 和 Spark 的分区概念。在 Hive 中,分区是指将表中的数据划分为不同的目录或者子目录,这些目录或子目录的名称通常与表的列名相关联。比方,一个名为“t\_orders\_name”的表能够依照日期分为多个目录,每个目录名称对应一个日期值。这样做的益处是能够大大提高查问效率,因为只有波及到特定日期的查问才须要扫描对应的目录,而不须要去扫描整个表。Spark 的分区概念与 Hive 相似,然而有一些不同之处,咱们将在后文中进行探讨。
在 Hive 中,分区能够基于多个列进行,这些列的值组合造成目录名称。例如,如果咱们将“t\_orders\_name”表依照日期和地区分区,那么目录的名称将蕴含日期和地区值的组合。在 Hive 中,数据存储在分区的目录下,而不是存储在表的目录下。这使得 Hive 能够快速访问须要的数据,而不用扫描整个表。另外,Hive 的分区概念也能够用于数据分桶,分桶是将表中的数据划分为固定数量的桶,每个桶蕴含雷同的行。
而与 Hive 不同的是,Spark 的分区是将数据分成小块以便并行计算解决。在 Spark 中,分区的数量由 Spark 执行引擎依据数据大小和硬件资源主动计算得出。Spark 的分区数越多,能够并行处理的数据也就越多,因而也能更快的实现计算工作。然而,如果分区数太多,将会导致过多的任务调度和数据传输开销,从而升高整体的性能。因而,Spark 分区数的抉择应该思考数据大小、硬件资源和计算工作复杂度等因素。
三、Hive 和 Spark 分区的利用场景
在理解 Hive 和 Spark 的分区概念之后,接下来,咱们来看看 Hive 和 Spark 分区在不同的利用场景中有哪些不同的劣势。
3.1 Hive 分区
Hive 分区实用于大数据场景,能够对数据进行多级分区,以便更细粒度地划分数据,进步查问效率。例如,在游戏平台的充值数据中,能够依照道具购买日期、道具付款状态、游戏用户 ID 等多个维度进行分区。这样能够不便的进行数据统计、剖析和查问操作,同时防止繁多分区数据过大导致的性能问题。
3.2 Spark 分区
Spark 分区实用于大规模数据处理场景,能够充分利用集群资源进行并行计算解决。比方,在机器学习算法的训练过程中,能够将大量数据进行分区,而后并行处理每个分区的数据,从而进步算法的训练速度和效率。另外,Spark 的分布式计算引擎也能够反对在多个节点上进行数据分区和计算,从而进步整个集群的计算能力和效率。
简而言之,Hive 和 Spark 分区在大数据处理和分布式计算场景这都有宽泛的利用,能够通过抉择适合的分区策略和优化措施,进一步提高数据处理的效率和性能。
四、如何抉择分区策略
在相熟了 Hive 和 Spark 的分区概念以及利用场景后。接下来,咱们来看看在 Hive 和 Spark 中如何抉择分区策略。分区策略的抉择对数据处理的效率和性能有着重要的影响。上面将别离论述 Hive 和 Spark 分区策略的优缺点以及如何抉择分区策略。
4.1 Hive 分区策略
长处:
Hive 的分区策略能够进步查问效率和数据处理性能,特地是在大数据集上体现突出。另外,Hive 还反对多级分区,容许更细粒度的数据划分。
毛病:
在 Hive 中,分区是以目录的模式存在的,这会导致大量的目录和子目录,如果分区过多,将会占用过多的存储空间。此外,Hive 的分区策略须要在创立表时进行设置,如果数据分布呈现变动,须要从新设置分区策略。
4.2 Spark 分区策略
长处:
Spark 的分区策略能够依据数据大小和硬件资源主动计算分区数,这使得计算工作能够并行计算解决,从而进步了解决效率和性能。
毛病:
如果分区数设置不当,将会导致过多的任务调度和数据传输开销,从而影响整体性能。此外,Spark 的分区策略也须要依据数据大小、硬件资源和计算工作复杂度等因素进行调整。
4.3 分区策略抉择
在理论我的项目开发应用中,抉择适合的分区策略能够显著进步数据处理的效率和性能。然而,如何抉择分区策略须要依据具体情况进行思考,这里总结了一些分区策略抉择的场景:
数据集大小 :如果数据集较大,能够思考应用 Hive 的多级划分策略,以便更细粒度的划分数据,进步查问效率。如果数据集较小,能够应用 Spark 主动计算分区策略,以便充分利用硬件资源并进步计算效率。
计算工作复杂度 :如果计算工作比较复杂,例如须要进行多个 JOIN 操作,能够应用 Hive 的分桶策略,以便放慢数据访问速度,缩小 JOIN 操作的开销。
硬件资源 :分区策略的抉择也须要思考硬件资源的限度。如果硬件资源比拟短缺,能够减少分区数以进步计算效率。如果硬件资源比拟缓和,须要缩小分区数以防止任务调度和数据传输的开销。
综上所述,抉择适合的分区策略须要依据具体的状况进行思考,包含数据集大小、计算工作复杂度和硬件资源等因素。在理论应用中,能够通过试验和调试来找到最佳的分区策略。
五、如何优化分区性能
除了抉择适合的分区策略之外,还能够通过一些优化措施来进一步提高分区的性能。在 Spark 中,大多数的 Spark 工作能够通过三个阶段来表述,它们别离是读取输出数据、应用 Spark 解决、放弃输入数据。Spark 尽管理论数据处理次要产生在内存中,然而 Spark 应用的是存储在 HDFS 上的数据来作为输出和输入,工作的调度执行会应用大量的 I/O,存在性能瓶颈。
而 Hive 分区数据是存储在 HDFS 上的,然而 HDFS 对于大量小文件反对不太敌对,因为在每个 NameNode 内存中每个文件大略有 150 字节的存储开销,而整个 HDFS 集群的 IOPS 数量是有下限的。当文件写入达到峰值时,会对 HDFS 集群的基础架构的某些局部产生性能瓶颈。
5.1 通过缩小 I/O 带宽来优化性能
在 Hadoop 集群中,它依附大规模并行 I/O 来反对数千个并发工作。比方现有一个大小为 96TB 的数据节点,磁盘的大小有两种,它们别离是 8TB 和 16TB。具备 8TB 磁盘的数据节点有 12 块这样的磁盘,而具备 16TB 磁盘的数据节点有 6 块这样的磁盘。咱们能够假如每个磁盘的均匀读写吞吐量约为 100MB/s,而这两种不同的磁盘散布,它们对应的带宽和 IOPS,具体详情如下表所示:
5.2 通过设置参数来优化性能
在 Hadoop 集群中,每个数据节点为每个卷运行一个卷扫描器,用于扫描块的状态。因为卷扫描器与应用程序竞争磁盘资源,因而限度其磁盘带宽很重要。配置 dfs.block.scanner.volume.bytes.per.second 属性值来定义卷扫描器每秒能够扫描的字节数,默认为 1MB/s。
比方设置带宽为 5MB/s,扫描 12TB 所须要的工夫为:12TB / 5MBps = (12 1024 1024 / (3600 * 24)) = 29.13 天。
5.3 通过优化 Spark 解决分区工作来晋升性能
如果,当初须要从新计算历史分区的数据表,这种场景通常用于修复谬误或者数据品质问题。在解决蕴含一年数据的大型数据集(比方 1TB 以上)时,可能会将数据分成几千个 Spark 分区来进行解决。尽管,从外表上看,这种解决办法并不是最合适的,应用动静分区并将数据后果写入依照日期分区的 Hive 表中将产生多达上百万个文件。
上面,咱们将工作分区数放大,现有一个蕴含 3 个分区的 Spark 工作,并且想将数据写入到蕴含 3 个分区的 Hive 表。在这种状况下,心愿发送的是将 3 个文件写入到 HDFS 中,所有数据都存储在每个分区的单个文件中。最终会生成 9 个文件,并且每个文件都有 1 个记录。应用动静分区写入 Hive 表时,每个 Spark 分区都由执行程序来并行处理。
解决 Spark 分区数据时,每次执行程序在给定的 Spark 分区中遇到新的分区时,它都会关上一个新文件。默认状况下,Spark 对数据会应用 Hash 或者 Round Robin 分区器。当利用于任意数据时,能够假如这两种办法在整个 Spark 分区中绝对平均且随机散布数据。如下图所示:
现实状况下,指标文件大小应该大概是 HDFS 块大小的倍数,默认状况下是 128MB。在 Hive 中,提供了一些配置参数来主动将后果写入到正当大小的文件中,从开发者的角度来看简直是通明的,比方设置属性 hive.merge.smallfiles.avgsize 和 hive.merge.size.per.task。然而,Spark 中不存在此类性能,因而,咱们须要本人开发实现,来确定一个数据集,应该写入多少文件。
5.3.1 基于大小的计算
实践上,这是最间接的办法,设置指标大小,估算数据的大小,而后进行划分。然而,在很多状况下,文件被写入磁盘时会进行压缩,并且其格局与存储在 Java 堆中的记录格局有所不同。这意味着估算写入磁盘时内存的记录大小不是一件容易的事件。尽管能够应用 Spark SizeEstimator 应用程序通过内存中的数据的大小进行估算。然而,SizeEstimator 会思考数据帧、数据集的外部耗费,以及数据的大小。总体来说,这种形式不太容易精确实现。
5.3.2 基于行数的计算
这种办法是设置指标行数,计算数据集的大小,而后执行除法来估算指标。咱们的指标行数能够通过多种形式确定,或者通过为所有数据集抉择一个动态数字,或者通过确定磁盘上单个记录的大小并执行必要的计算。哪种形式最优,取决于你的数据集数量及其复杂性。计算相对来说老本较低,然而须要在计算前缓存以防止从新计算数据集。
5.3.3 动态文件计算
最简略的解决方案是,只要求开发者在每个写入工作的根底上,通知 Spark 总共应该写入多少个文件。这种形式须要给开发者一些其余办法来获取具体的数字,能够通过这种形式来代替低廉的计算。
5.4. 优化 Spark 散发数据形式来晋升性能
即便咱们晓得了如何将文件写入磁盘,然而,咱们仍须让 Spark 以符合实际的形式来构建咱们的分区。在 Spark 中,它提供了许多工具来确定数据在整个分区中的散布形式。然而,各种性能中暗藏着很多复杂性,在某些状况下,它们的含意并不显著,上面将介绍 Spark 提供的一些选项来管制 Spark 输入文件的数量。
5.4.1 合并
Spark Coalesce 是一个非凡版本的从新分区,它只容许缩小总的分区,然而不须要齐全的 Shuffle,因而比从新分区要快得多。它通过无效的合并分区来实现这一点。如下图所示:
Coalesce 在某些状况下看起来是不错的,然而也有一些问题。首先,Coalesce 有一个难以使用的行为,以一个十分根底的 Spark 应用程序为例,代码如下所示:
Spark
load().map(…).filter(…).save()
比方,设置的并行度为 1000,然而最终只想写入 10 个文件,能够设置如下:
Spark
load().map(…).filter(…).coalesce(10).save()
然而,Spark 会尽可能早的无效的将合并操作下推,因而这将执行为如下代码:
Spark
load().coalesce(10).map(…).filter(…).save()
无效的解决这种问题的办法是在转换和合并之间强制执行,代码如下所示:
Spark
val df = load().map(…).filter(…).cache()
df.count()
df.coalesce(10)
在 Spark 中,缓存是必须的,否则,你将不得不从新计算数据,这可能会从新耗费计算资源。而后,缓存是须要生产肯定资源的,如果你的数据集无奈放入内存中,或者无奈开释内存,将数据无效的存储在内存中两次,那么必须应用磁盘缓存,这有其本身的局限性和显著的性能损失。
此外,正如咱们看到的,通常须要执行 Shuffle 来取得咱们想要的更简单的数据集后果。因而,Coalesce 仅实用于特定的状况,比方如下场景:
- 保障只写入一个 Hive 分区;
- 指标文件数少于你用于解决数据的 Spark 分区数;
- 有短缺的缓存资源。
5.4.2 简略从新分区
在 Spark 中,一个简略的从新分区,能够通过设置参数来实现,比方 df.repartition(100)。在这种状况下,应用循环分区器,这意味着惟一的保障是输入数据具备大致相同大小的 Spark 分区,这种分区仅实用于以下状况:
- 保障只须要写入一个 Hive 分区;
- 正在写入的文件数大于你的 Spark 分区数,或者因为某些起因你无奈应用合并。
5.4.3 按列从新分区
按列从新分区接管指标 Spark 分区计数,以及要从新分区的列序列,例如,df.repartition(100,$”date”)。这对于强制要求 Spark 将具备雷同键的数据,散发到同一个分区很有用。一般来说,这对许多 Spark 操作(比方 JOIN)很有用。
按列从新分区应用 HashPartitioner,将具备雷同值的数据,分发给同一个分区,实际上,它将执行以下操作:
然而,这种办法只有在每个分区键都能够平安的写入到一个文件时才无效。这是因为无论有多少特定的 Hash 值,它们最终都会在同一个分区中。按列从新分区仅在你写入一个或者多个小的 Hive 分区时才无效。在任何其余状况下,它都是有效的,因为每个 Hive 分区最终都会生成一个文件,仅实用于最小的数据集。
5.4.4 按具备随机因子的列从新分区
咱们能够通过增加束缚的随机因子来按列批改从新分区,具体代码如下:
Spark
df
.withColumn("rand", rand() % filesPerPartitionKey)
.repartition(100, $"key", $"rand")
实践上,只有满足以下条件,这种办法应该会产生排序规定的数据和大小平均的文件:
- Hive 分区的大小大致相同;
- 晓得每个 Hive 分区的指标文件数并且能够在运行时对其进行编码。
然而,即便咱们满足上述这些条件,还有另外一个问题:散列抵触。假如,当初正在解决一年的数据,日期作为分区的惟一键。如果每个分区须要 5 个文件,能够执行如下代码操作:
Spark
df.withColumn("rand", rand() % 5).repartition(5*365, $"date", $"rand")
在后盾,Scala 将结构一个蕴含日期和随机因子的键,例如(,<0-4>)。而后,如果咱们查看 HashPartitioner 代码,能够发现它将执行以下操作:
Spark
class HashPartitioner(partitions: Int) extends Partitioner {def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
}
实际上,这外面所做的事件,就是获取要害元组的散列,而后应用指标数量的 Spark 分区获取它的 mod。咱们能够剖析一下在这种状况下咱们的数据将如何实现散布,具体代码如下:
Spark
import java.time.LocalDate
def hashCodeTuple(one: String, two: Int, mod: Int): Int = {val rawMod = (one, two).hashCode % mod
rawMod + (if (rawMod < 0) mod else 0)
}
def hashCodeSeq(one: String, two: Int, mod: Int): Int = {val rawMod = Seq(one, two).hashCode % mod
rawMod + (if (rawMod < 0) mod else 0)
}
def iteration(numberDS: Int, filesPerPartition: Int): (Double, Double, Double) = {val hashedRandKeys = (0 to numberDS - 1).map(x => LocalDate.of(2019, 1, 1).plusDays(x)).flatMap(x => (0 to filesPerPartition - 1).map(y => hashCodeTuple(x.toString, y, filesPerPartition*numberDS))
)
hashedRandKeys.size // Number of unique keys, with the random factor
val groupedHashedKeys = hashedRandKeys.groupBy(identity).view.mapValues(_.size).toSeq
groupedHashedKeys.size // number of actual sPartitions used
val sortedKeyCollisions = groupedHashedKeys.filter(_._2 != 1).sortBy(_._2).reverse
val sortedSevereKeyCollisions = groupedHashedKeys.filter(_._2 > 2).sortBy(_._2).reverse
sortedKeyCollisions.size // number of sPartitions with a hashing collision
// (collisions, occurences)
val collisionCounts = sortedKeyCollisions.map(_._2).groupBy(identity).view.mapValues(_.size).toSeq.sortBy(_._2).reverse
(
groupedHashedKeys.size.toDouble / hashedRandKeys.size.toDouble,
sortedKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble,
sortedSevereKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble
)
}
val results = Seq(iteration(365, 1),
iteration(365, 5),
iteration(365, 10),
iteration(365, 100),
iteration(365 * 2, 100),
iteration(365 * 5, 100),
iteration(365 * 10, 100)
)
val avgEfficiency = results.map(_._1).sum / results.length
val avgCollisionRate = results.map(_._2).sum / results.length
val avgSevereCollisionRate = results.map(_._3).sum / results.length
(avgEfficiency, avgCollisionRate, avgSevereCollisionRate) // 63.2%, 42%, 12.6%
下面的脚本计算了 3 个数量:
- 效率 :非空的 Spark 分区与输入文件数量的比率;
- 碰撞率:(date,rand) 的 Hash 值发送抵触的 Spark 分区的百分比;
- 重大抵触率 :同上,然而此键上的抵触次数为 3 或者更多。
抵触很重要,因为它们意味着咱们的 Spark 分区蕴含多个惟一的分区键,而咱们预计每个 Spark 分区只有 1 个。咱们从剖析的后果可知,咱们应用了 63% 的执行器,并且可能会呈现重大的偏差,咱们将近一半的执行正在解决比预期多 2 到 3 倍或者在某些状况下高达 8 倍的数据。
当初,有一个解决办法,即分区缩放。在之前示例中,输入的 Spark 分区数量等于预期的总文件数。如果将 N 个对象随机调配给 N 个插槽,能够预期会有多个插槽蕴含多个对象,并且有几个空插槽。因而,须要解决此问题,必须要升高对象与插槽的比率。
咱们通过缩放输入分区计数来实现这一点,通过将输入 Spark 分区数乘以一个大因子,相似于:
Spark
df
.withColumn("rand", rand() % 5)
.repartition(5*365*SCALING_FACTOR, $"date", $"rand")
具体分析代码如下所示:
Spark
import java.time.LocalDate
def hashCodeTuple(one: String, two: Int, mod: Int): Int = {val rawMod = (one, two).hashCode % mod
rawMod + (if (rawMod < 0) mod else 0)
}
def hashCodeSeq(one: String, two: Int, mod: Int): Int = {val rawMod = Seq(one, two).hashCode % mod
rawMod + (if (rawMod < 0) mod else 0)
}
def iteration(numberDS: Int, filesPerPartition: Int, partitionFactor: Int = 1): (Double, Double, Double, Double) = {
val partitionCount = filesPerPartition*numberDS * partitionFactor
val hashedRandKeys = (0 to numberDS - 1).map(x => LocalDate.of(2019, 1, 1).plusDays(x)).flatMap(x => (0 to filesPerPartition - 1).map(y => hashCodeTuple(x.toString, y, partitionCount))
)
hashedRandKeys.size // Number of unique keys, with the random factor
val groupedHashedKeys = hashedRandKeys.groupBy(identity).view.mapValues(_.size).toSeq
groupedHashedKeys.size // number of unique hashes - and thus, sPartitions with > 0 records
val sortedKeyCollisions = groupedHashedKeys.filter(_._2 != 1).sortBy(_._2).reverse
val sortedSevereKeyCollisions = groupedHashedKeys.filter(_._2 > 2).sortBy(_._2).reverse
sortedKeyCollisions.size // number of sPartitions with a hashing collision
// (collisions, occurences)
val collisionCounts = sortedKeyCollisions.map(_._2).groupBy(identity).view.mapValues(_.size).toSeq.sortBy(_._2).reverse
(
groupedHashedKeys.size.toDouble / partitionCount,
groupedHashedKeys.size.toDouble / hashedRandKeys.size.toDouble,
sortedKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble,
sortedSevereKeyCollisions.size.toDouble / groupedHashedKeys.size.toDouble
)
}
// With a scale factor of 1
val results = Seq(iteration(365, 1),
iteration(365, 5),
iteration(365, 10),
iteration(365, 100),
iteration(365 * 2, 100),
iteration(365 * 5, 100),
iteration(365 * 10, 100)
)
val avgEfficiency = results.map(_._2).sum / results.length // What is the ratio of executors / output files
val avgCollisionRate = results.map(_._3).sum / results.length // What is the average collision rate
val avgSevereCollisionRate = results.map(_._4).sum / results.length // What is the average collision rate where 3 or more hashes collide
(avgEfficiency, avgCollisionRate, avgSevereCollisionRate) // 63.2% Efficiency, 42% collision rate, 12.6% severe collision rate
iteration(365, 5, 2) // 37.7% partitions in-use, 77.4% Efficiency, 24.4% collision rate, 4.2% severe collision rate
iteration(365, 5, 5)
iteration(365, 5, 10)
iteration(365, 5, 100)
随着咱们的比例因子靠近无穷大,碰撞很快靠近于 0,效率靠近 100%。然而,这会产生另外一个问题,即大量 Spark 分区输入将为空。同时这些空的 Spark 分区也会带来一些资源开销,减少 Driver 的内存大小,会使咱们更容易遇到,因为异样谬误而导致分区键空间意外增大的问题。
这里的一个常见办法,是在应用这种办法时不显示设置分区(默认并行度和缩放),如果不提供分区计数,则依赖 Spark 默认的 spark.default.parallelism 值。尽管,通常并行度天然高于总输入文件数(因而,隐式提供大于 1 的缩放因子)。如果满足以下条件,这种形式仍然是一种无效的办法:
- Hive 分区的文件数大抵相等;
- 能够确定均匀分区文件数应该是多少;
- 大抵晓得惟一分区键的总数。
5.4.5 按范畴从新分区
按范畴从新分区是一个特列,它不应用 RoundRobin 和 Hash Partitioner,而是应用一种非凡的办法,叫做 Range Partitioner。
范畴分区器依据某些给定键的程序在 Spark 分区之间进行拆分行,然而,它不仅仅是全局排序,而且还领有以下个性:
- 具备雷同散列的所有记录将在同一个分区中完结;
- 所有 Spark 分区都将有一个最小值和最大值与之关联;
- 最小值和最大值将通过应用采样来检测要害频率和范畴来确定,分区边界将依据这些估计值进行初始设置;
- 分区的大小不能保障齐全相等,它们的相等性基于样本的准确性,因而,预测的每个 Spark 分区的最小值和最大值,分区将依据须要增大或放大来保障前两个条件。
总而言之,范畴分区将导致 Spark 创立与申请的 Spark 分区数量相等的 Bucket 数量,而后它将这些 Bucket 映射到指定分区键的范畴。例如,如果你的分区键是日期,则范畴可能是(最小值 2022-01-01,最大值 2023-01-01)。而后,对于每条记录,将记录的分区键与存储 Bucket 的最小值和最大值进行比拟,并相应的进行调配。如下图所示:
六、总结
在抉择分区策略时,须要依据具体的利用场景和需要进行抉择。常见的分区策略包含依照工夫、地区、用户 ID 等多个维度进行分区。在利用分区策略时,还能够通过一些优化措施来进一步提高分区的性能和效率,例如正当设置分区数、防止过多的分区列、缩小反复数据等。
总之,分区是大数据处理和分布式计算中十分重要的技术,能够帮忙咱们更好的治理和解决大规模的数据,进步数据处理的效率和性能,进而帮忙咱们更好的应答数据分析和业务利用的挑战。
参考:
- https://github.com/apache/spark
- https://github.com/apache/hive
- https://spark.apache.org/
- https://hive.apache.org/