简介: 典型的Spark作业读取位于OSS的Parquet表面时,源端的并发度(task/partition)如何确定?特地是在做TPCH测试时有一些疑难,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行剖析进而解答这些疑难。引言典型的Spark作业读取位于OSS的Parquet表面时,源端的并发度(task/partition)如何确定?特地是在做TPCH测试时有一些疑难,如源端扫描文件的并发度是如何确定的?是否一个parquet文件对应一个partition?多个parquet文件对应一个partition?还是一个parquet文件对应多个partition?本文将从源码角度进行剖析进而解答这些疑难。剖析数据源读取对应的物理执行节点为FileSourceScanExec,读取数据代码块如下 lazy val inputRDD: RDD[InternalRow] = {

val readFile: (PartitionedFile) => Iterator[InternalRow] =  relation.fileFormat.buildReaderWithPartitionValues(    sparkSession = relation.sparkSession,    dataSchema = relation.dataSchema,    partitionSchema = relation.partitionSchema,    requiredSchema = requiredSchema,    filters = pushedDownFilters,    options = relation.options,    hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))val readRDD = if (bucketedScan) {  createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions,    relation)} else {  createReadRDD(readFile, dynamicallySelectedPartitions, relation)}sendDriverMetrics()readRDD

}次要关注非bucket的解决,对于非bucket的扫描调用createReadRDD办法定义如下/**

  • Create an RDD for non-bucketed reads.
  • The bucketed variant of this function is [[createBucketedReadRDD]].
    *
  • @param readFile a function to read each (part of a) file.
  • @param selectedPartitions Hive-style partition that are part of the read.
  • @param fsRelation [[HadoopFsRelation]] associated with the read.
    */

private def createReadRDD(

  readFile: (PartitionedFile) => Iterator[InternalRow],  selectedPartitions: Array[PartitionDirectory],  fsRelation: HadoopFsRelation): RDD[InternalRow] = {// 文件关上开销,每次关上文件起码须要读取的字节    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes// 最大切分分片大小val maxSplitBytes =  FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +  s"open cost is considered as scanning $openCostInBytes bytes.")// Filter files with bucket pruning if possibleval bucketingEnabled = fsRelation.sparkSession.sessionState.conf.bucketingEnabledval shouldProcess: Path => Boolean = optionalBucketSet match {  case Some(bucketSet) if bucketingEnabled =>    // Do not prune the file if bucket file name is invalid    filePath => BucketingUtils.getBucketId(filePath.getName).forall(bucketSet.get)  case _ =>    _ => true}// 对分区下文件进行切分并依照从大到小进行排序val splitFiles = selectedPartitions.flatMap { partition =>  partition.files.flatMap { file =>    // getPath() is very expensive so we only want to call it once in this block:    val filePath = file.getPath    if (shouldProcess(filePath)) {      // 文件是否可split,parquet/orc/avro均可被split      val isSplitable = relation.fileFormat.isSplitable(        relation.sparkSession, relation.options, filePath)      // 切分文件      PartitionedFileUtil.splitFiles(        sparkSession = relation.sparkSession,        file = file,        filePath = filePath,        isSplitable = isSplitable,        maxSplitBytes = maxSplitBytes,        partitionValues = partition.values      )    } else {      Seq.empty    }  }}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)val partitions =  FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

} 能够看到确定最大切分分片大小maxSplitBytes对于后续切分为多少个文件十分重要,其外围逻辑如下def maxSplitBytes(

  sparkSession: SparkSession,  selectedPartitions: Seq[PartitionDirectory]): Long = {// 读取文件时打包成最大的partition大小,默认为128MB,对应一个block大小val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes// 关上每个文件的开销,默认为4MBval openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes// 倡议的(不保障)最小宰割文件分区数,默认未设置,从leafNodeDefaultParallelism获取// 代码逻辑调用链 SparkSession#leafNodeDefaultParallelism -> SparkContext#defaultParallelism// -> TaskSchedulerImpl#defaultParallelism -> CoarseGrainedSchedulerBackend#defaultParallelism// -> 总共多少核max(executor core总和, 2),起码为2val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum  .getOrElse(sparkSession.leafNodeDefaultParallelism)// 总共读取的大小val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum// 单core读取的大小val bytesPerCore = totalBytes / minPartitionNum// 计算大小,不会超过设置的128MBMath.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

} 对于PartitionedFileUtil#splitFiles,其外围逻辑如下,较为简单,间接依照最大切分大小切分大文件来进行分片def splitFiles(

  sparkSession: SparkSession,  file: FileStatus,  filePath: Path,  isSplitable: Boolean,  maxSplitBytes: Long,  partitionValues: InternalRow): Seq[PartitionedFile] = {if (isSplitable) {  // 切分为多个分片  (0L until file.getLen by maxSplitBytes).map { offset =>    val remaining = file.getLen - offset    val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining    val hosts = getBlockHosts(getBlockLocations(file), offset, size)    PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)  }} else {  Seq(getPartitionedFile(file, filePath, partitionValues))}

}在获取到Seq[PartitionedFile]列表后,还并没有实现对文件的切分,还须要调用FilePartition#getFilePartitions做最初的解决,办法外围逻辑如下def getFilePartitions(

  sparkSession: SparkSession,  partitionedFiles: Seq[PartitionedFile],  maxSplitBytes: Long): Seq[FilePartition] = {val partitions = new ArrayBuffer[FilePartition]val currentFiles = new ArrayBuffer[PartitionedFile]var currentSize = 0L/** Close the current partition and move to the next. */def closePartition(): Unit = {  if (currentFiles.nonEmpty) {    // Copy to a new Array.    // 从新生成一个新的PartitionFile    val newPartition = FilePartition(partitions.size, currentFiles.toArray)    partitions += newPartition  }  currentFiles.clear()  currentSize = 0}// 关上文件开销,默认为4MBval openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes// Assign files to partitions using "Next Fit Decreasing"partitionedFiles.foreach { file =>  if (currentSize + file.length > maxSplitBytes) {    // 如果累加的文件大小大于的最大切分大小,则敞开该分区,示意实现一个Task读取的数据切分    closePartition()  }  // Add the given file to the current partition.  currentSize += file.length + openCostInBytes  currentFiles += file}// 最初敞开一次分区,文件可能较小closePartition()partitions.toSeq

}能够看到通过这一步后,会把一些小文件做合并,生成maxSplitBytes大小的PartitionFile,这样能够防止拉起太多task读取太多小的文件。生成的FileScanRDD(new FileScanRDD(fsRelation.sparkSession, readFile, partitions))的并发度为partitions的长度,也即最初Spark生成的Task个数override protected def getPartitions: Array[RDDPartition] = filePartitions.toArray整体流程图如下图所示

 拆分、合并过程如下图所示

实战对于TPCH 10G生成的customer parquet表https://oss.console.aliyun.co... 共8个Parquet文件,总文件大小为113.918MB 

Spark作业配置如下,executor只有1coreconf spark.driver.resourceSpec=small;
conf spark.executor.instances=1;
conf spark.executor.resourceSpec=small;
conf spark.app.name=Spark SQL Test;
conf spark.adb.connectors=oss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100;依据后面的公式计算defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(1, 2) = 2
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 2 = 72.959MB
maxSplitBytes = 72.959MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))失去maxSplitBytes为72.959MB,从日志中也可看到对应大小

通过排序后的文件程序为(00000, 00001, 00002, 00003, 00004, 00006, 00005, 00007),再次通过合并后失去3个FilePartitioned,别离对应FilePartitioned 1: 00000, 00001, 00002FilePartitioned 2: 00003, 00004, 00006FilePartitioned 3: 00005, 00007即总共会生成3个Task从Spark UI查看的确生成3个Task

从日志查看也是生成3个Task


变更Spark作业配置,5个executor共10coreconf spark.driver.resourceSpec=small;
conf spark.executor.instances=5;
conf spark.executor.resourceSpec=medium;
conf spark.app.name=Spark SQL Test;
conf spark.adb.connectors=oss;
use tpcd;
select * from customer order by C_CUSTKEY desc limit 100;依据后面的公式计算defaultMaxSplitBytes = 128MB
openCostInBytes = 4MB
minPartitionNum = max(10, 2) = 10
totalBytes = 113.918 + 8 * 4MB = 145.918MB
bytesPerCore = 145.918MB / 10 = 14.5918MB
maxSplitBytes = 14.5918MB = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))查看日志

此时能够看到14.5918MB会对源文件进行切分,会对00001, 00002,00003,00004,00005,00006进行切分,切分成两份,00007因为小于14.5918MB,因而不会进行切分,通过PartitionedFileUtil#splitFiles后,总共存在7 * 2 + 1 = 15个PartitionedFile00000(0 -> 14.5918MB), 00000(14.5918MB -> 15.698MB)00001(0 -> 14.5918MB), 00001(14.5918MB -> 15.632MB)00002(0 -> 14.5918MB), 00002(14.5918MB -> 15.629MB)00003(0 -> 14.5918MB), 00003(14.5918MB -> 15.624MB)00004(0 -> 14.5918MB), 00004(14.5918MB -> 15.617MB)00005(0 -> 14.5918MB), 00005(14.5918MB -> 15.536MB)00006(0 -> 14.5918MB), 00006(14.5918MB -> 15.539MB)00007(0 -> 4.634MB)通过排序后失去如下以及合并后失去10个FilePartitioned,别离对应FilePartitioned 1: 00000(0 -> 14.5918MB)FilePartitioned 2: 00001(0 -> 14.5918MB)FilePartitioned 3: 00002(0 -> 14.5918MB)FilePartitioned 4: 00003(0 -> 14.5918MB)FilePartitioned 5: 00004(0 -> 14.5918MB)FilePartitioned 6: 00005(0 -> 14.5918MB)FilePartitioned 7: 00006(0 -> 14.5918MB)FilePartitioned 8: 00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)FilePartitioned 9: 00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)FilePartitioned 10: 00004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB) 即总共会生成10个Task通过Spark UI也可查看到生成了10个Task

查看日志,000004(14.5918MB -> 15.617MB),00005(14.5918MB -> 15.536MB),00006(14.5918MB -> 15.539MB)在同一个Task中

00007(0 -> 4.634MB),00000(14.5918MB -> 15.698MB)

 00001(14.5918MB -> 15.632MB),00002(14.5918MB -> 15.629MB),00003(14.5918MB -> 15.624MB)在同一个Task中 

总结通过源码可知Spark对于源端Partition切分,会思考到分区下所有文件大小以及关上每个文件的开销,同时会波及对大文件的切分以及小文件的合并,最初失去一个绝对正当的Partition。原文链接:http://click.aliyun.com/m/100...本文为阿里云原创内容,未经容许不得转载。