关于前端:Spark如何对源端数据做切分

3次阅读

共计 8658 个字符,预计需要花费 22 分钟才能阅读完成。

简介:典型的 Spark 作业读取位于 OSS 的 Parquet 表面时,源端的并发度(task/partition)如何确定?特地是在做 TPCH 测试时有一些疑难,如源端扫描文件的并发度是如何确定的?是否一个 parquet 文件对应一个 partition?多个 parquet 文件对应一个 partition?还是一个 parquet 文件对应多个 partition?本文将从源码角度进行剖析进而解答这些疑难。引言典型的 Spark 作业读取位于 OSS 的 Parquet 表面时,源端的并发度(task/partition)如何确定?特地是在做 TPCH 测试时有一些疑难,如源端扫描文件的并发度是如何确定的?是否一个 parquet 文件对应一个 partition?多个 parquet 文件对应一个 partition?还是一个 parquet 文件对应多个 partition?本文将从源码角度进行剖析进而解答这些疑难。剖析数据源读取对应的物理执行节点为 FileSourceScanExec,读取数据代码块如下 lazy val inputRDD: RDD[InternalRow] = {

val readFile: (PartitionedFile) => Iterator[InternalRow] =
  relation.fileFormat.buildReaderWithPartitionValues(
    sparkSession = relation.sparkSession,
    dataSchema = relation.dataSchema,
    partitionSchema = relation.partitionSchema,
    requiredSchema = requiredSchema,
    filters = pushedDownFilters,
    options = relation.options,
    hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
val readRDD = if (bucketedScan) {
  createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,
    relation)
} else {createReadRDD(readFile, dynamicallySelectedPartitions, relation)
}
sendDriverMetrics()
readRDD

} 次要关注非 bucket 的解决,对于非 bucket 的扫描调用 createReadRDD 办法定义如下 /**

  • Create an RDD for non-bucketed reads.
  • The bucketed variant of this function is [[createBucketedReadRDD]].
    *
  • @param readFile a function to read each (part of a) file.
  • @param selectedPartitions Hive-style partition that are part of the read.
  • @param fsRelation [[HadoopFsRelation]] associated with the read.
    */

private def createReadRDD(

  readFile: (PartitionedFile) => Iterator[InternalRow],
  selectedPartitions: Array[PartitionDirectory],
  fsRelation: HadoopFsRelation): RDD[InternalRow] = {
// 文件关上开销,每次关上文件起码须要读取的字节    
val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
// 最大切分分片大小
val maxSplitBytes =
  FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes," +
  s"open cost is considered as scanning $openCostInBytes bytes.")
// Filter files with bucket pruning if possible
val bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabled
val shouldProcess: Path => Boolean = optionalBucketSet match {case Some(bucketSet) if bucketingEnabled =>
    // Do not prune the file if bucket file name is invalid
    filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)
  case _ =>
    _ => true
}
// 对分区下文件进行切分并依照从大到小进行排序
val splitFiles = selectedPartitions.flatMap { partition =>
  partition.files.flatMap { file =>
    // getPath() is very expensive so we only want to call it once in this block:
    val filePath = file.getPath
    if (shouldProcess(filePath)) {
      // 文件是否可 split,parquet/orc/avro 均可被 split
      val isSplitable = relation.fileFormat.isSplitable(relation.sparkSession, relation.options, filePath)
      // 切分文件
      PartitionedFileUtil.splitFiles(
        sparkSession = relation.sparkSession,
        file = file,
        filePath = filePath,
        isSplitable = isSplitable,
        maxSplitBytes = maxSplitBytes,
        partitionValues = partition.values
      )
    } else {Seq.empty}
  }
}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val partitions =
  FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

} 能够看到确定最大切分分片大小 maxSplitBytes 对于后续切分为多少个文件十分重要,其外围逻辑如下 def maxSplitBytes(

  sparkSession: SparkSession,
  selectedPartitions: Seq[PartitionDirectory]): Long = {
// 读取文件时打包成最大的 partition 大小,默认为 128MB,对应一个 block 大小
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
// 关上每个文件的开销,默认为 4MB
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// 倡议的(不保障)最小宰割文件分区数,默认未设置,从 leafNodeDefaultParallelism 获取
// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism -> SparkContext#defaultParallelism
// -> TaskSchedulerImpl#defaultParallelism -> CoarseGrainedSchedulerBackend#defaultParallelism
// -> 总共多少核 max(executor core 总和, 2),起码为 2
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
  .getOrElse(sparkSession.leafNodeDefaultParallelism)
// 总共读取的大小
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
// 单 core 读取的大小
val bytesPerCore = totalBytes / minPartitionNum
// 计算大小,不会超过设置的 128MB
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

} 对于 PartitionedFileUtil#splitFiles,其外围逻辑如下,较为简单,间接依照最大切分大小切分大文件来进行分片 def splitFiles(

  sparkSession: SparkSession,
  file: FileStatus,
  filePath: Path,
  isSplitable: Boolean,
  maxSplitBytes: Long,
  partitionValues: InternalRow): Seq[PartitionedFile] = {if (isSplitable) {
  // 切分为多个分片
  (0L until file.getLen by maxSplitBytes).map { offset =>
    val remaining = file.getLen - offset
    val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
    val hosts = getBlockHosts(getBlockLocations(file), offset, size)
    PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
  }
} else {Seq(getPartitionedFile(file, filePath, partitionValues))
}

} 在获取到 Seq[PartitionedFile] 列表后,还并没有实现对文件的切分,还须要调用 FilePartition#getFilePartitions 做最初的解决,办法外围逻辑如下 def getFilePartitions(

  sparkSession: SparkSession,
  partitionedFiles: Seq[PartitionedFile],
  maxSplitBytes: Long): Seq[FilePartition] = {val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L
/** Close the current partition and move to the next. */
def closePartition(): Unit = {if (currentFiles.nonEmpty) {
    // Copy to a new Array.
    // 从新生成一个新的 PartitionFile
    val newPartition = FilePartition(partitions.size, currentFiles.toArray)
    partitions += newPartition
  }
  currentFiles.clear()
  currentSize = 0
}
// 关上文件开销,默认为 4MB
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// Assign files to partitions using "Next Fit Decreasing"
partitionedFiles.foreach { file =>
  if (currentSize + file.length > maxSplitBytes) {
    // 如果累加的文件大小大于的最大切分大小,则敞开该分区,示意实现一个 Task 读取的数据切分
    closePartition()}
  // Add the given file to the current partition.
  currentSize += file.length + openCostInBytes
  currentFiles += file
}
// 最初敞开一次分区,文件可能较小
closePartition()
partitions.toSeq

} 能够看到通过这一步后,会把一些小文件做合并,生成 maxSplitBytes 大小的 PartitionFile,这样能够防止拉起太多 task 读取太多小的文件。生成的 FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions)) 的并发度为 partitions 的长度,也即最初 Spark 生成的 Task 个数 override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray 整体流程图如下图所示

 拆分、合并过程如下图所示

实战对于 TPCH 10G 生成的 customer parquet 表 https://oss.console.aliyun.co… 共 8 个 Parquet 文件,总文件大小为 113.918MB 

Spark 作业配置如下,executor 只有 1coreconf spark.driver.resourceSpec=small;
conf spark.executor.instances=1;
conf spark.executor.resourceSpec=small;
conf spark.app.name=Spark SQL Test;
conf spark.adb.connectors=oss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100; 依据后面的公式计算 defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(1, 2) = 2
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 2 = 72.959MB
maxSplitBytes = 72.959MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) 失去 maxSplitBytes 为 72.959MB,从日志中也可看到对应大小

通过排序后的文件程序为 (00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007),再次通过合并后失去 3 个 FilePartitioned,别离对应 FilePartitioned 1: 00000, 00001, 00002FilePartitioned 2: 00003, 00004, 00006FilePartitioned 3: 00005, 00007 即总共会生成 3 个 Task 从 Spark UI 查看的确生成 3 个 Task

从日志查看也是生成 3 个 Task

变更 Spark 作业配置,5 个 executor 共 10coreconf spark.driver.resourceSpec=small;
conf spark.executor.instances=5;
conf spark.executor.resourceSpec=medium;
conf spark.app.name=Spark SQL Test;
conf spark.adb.connectors=oss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100; 依据后面的公式计算 defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(10, 2) = 10
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 10 = 14.5918MB
maxSplitBytes = 14.5918MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) 查看日志

此时能够看到 14.5918MB 会对源文件进行切分,会对 00001, 00002,00003,00004,00005,00006 进行切分,切分成两份,00007 因为小于 14.5918MB,因而不会进行切分,通过 PartitionedFileUtil#splitFiles 后,总共存在 7 * 2 + 1 = 15 个 PartitionedFile00000(0 -> 14.5918MB), 00000(14.5918MB -> 15.698MB)00001(0 -> 14.5918MB), 00001(14.5918MB -> 15.632MB)00002(0 -> 14.5918MB), 00002(14.5918MB -> 15.629MB)00003(0 -> 14.5918MB), 00003(14.5918MB -> 15.624MB)00004(0 -> 14.5918MB), 00004(14.5918MB -> 15.617MB)00005(0 -> 14.5918MB), 00005(14.5918MB -> 15.536MB)00006(0 -> 14.5918MB), 00006(14.5918MB -> 15.539MB)00007(0 -> 4.634MB) 通过排序后失去如下以及合并后失去 10 个 FilePartitioned,别离对应 FilePartitioned 1: 00000(0 -> 14.5918MB)FilePartitioned 2: 00001(0 -> 14.5918MB)FilePartitioned 3: 00002(0 -> 14.5918MB)FilePartitioned 4: 00003(0 -> 14.5918MB)FilePartitioned 5: 00004(0 -> 14.5918MB)FilePartitioned 6: 00005(0 -> 14.5918MB)FilePartitioned 7: 00006(0 -> 14.5918MB)FilePartitioned 8: 00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)FilePartitioned 9: 00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)FilePartitioned 10: 00004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB) 即总共会生成 10 个 Task 通过 Spark UI 也可查看到生成了 10 个 Task

查看日志,000004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB) 在同一个 Task 中

00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)

 00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB) 在同一个 Task 中 

总结通过源码可知 Spark 对于源端 Partition 切分,会思考到分区下所有文件大小以及关上每个文件的开销,同时会波及对大文件的切分以及小文件的合并,最初失去一个绝对正当的 Partition。原文链接:http://click.aliyun.com/m/100… 本文为阿里云原创内容,未经容许不得转载。

正文完
 0