AQE简介

从spark configuration,到在最早在spark 1.6版本就曾经有了AQE;到了spark 2.x版本,intel大数据团队进行了相应的原型开发和实际;到了spark 3.0时代,Databricks和intel一起为社区奉献了新的AQE

spark 3.0.1中的AQE的配置

配置项默认值官网阐明剖析
spark.sql.adaptive.enabledfalse是否开启自适应查问此处设置为true开启
spark.sql.adaptive.coalescePartitions.enabledtrue是否合并邻近的shuffle分区(依据'spark.sql.adaptive.advisoryPartitionSizeInBytes'的阈值来合并)此处默认为true开启,剖析见: 剖析1
spark.sql.adaptive.coalescePartitions.initialPartitionNum(none)shuffle合并分区之前的初始分区数,默认为spark.sql.shuffle.partitions的值剖析见:剖析2
spark.sql.adaptive.coalescePartitions.minPartitionNum(none)shuffle 分区合并后的最小分区数,默认为spark集群的默认并行度剖析见: 剖析3
spark.sql.adaptive.advisoryPartitionSizeInBytes64MB倡议的shuffle分区的大小,在合并分区和解决join数据歪斜的时候用到剖析见:剖析3
spark.sql.adaptive.skewJoin.enabledtrue是否开启join中数据歪斜的自适应解决
spark.sql.adaptive.skewJoin.skewedPartitionFactor5数据歪斜判断因子,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes剖析见:剖析4
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256MB数据歪斜判断阈值,必须同时满足skewedPartitionFactor和skewedPartitionThresholdInBytes剖析见:剖析4

剖析1

在OptimizeSkewedJoin.scala中,咱们看到ADVISORY_PARTITION_SIZE_IN_BYTES,也就是spark.sql.adaptive.advisoryPartitionSizeInBytes被援用的中央, (OptimizeSkewedJoin是物理打算中的规定)

 /**   * The goal of skew join optimization is to make the data distribution more even. The target size   * to split skewed partitions is the average size of non-skewed partition, or the   * advisory partition size if avg size is smaller than it.   */  private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {    val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)    val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))    // It's impossible that all the partitions are skewed, as we use median size to define skew.    assert(nonSkewSizes.nonEmpty)    math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)  }

其中:

  1. nonSkewSizes为task非歪斜的分区
  2. targetSize返回的是max(非歪斜的分区的平均值,advisorySize),其中advisorySize为spark.sql.adaptive.advisoryPartitionSizeInBytes值,所以说

targetSize不肯定是spark.sql.adaptive.advisoryPartitionSizeInBytes值

  1. medianSize值为task的分区大小的中位值

剖析2

在SQLConf.scala

def numShufflePartitions: Int = {    if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {      getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)    } else {      defaultNumShufflePartitions    }  }

从spark 3.0.1开始如果开启了AQE和shuffle分区合并,则用的是spark.sql.adaptive.coalescePartitions.initialPartitionNum,这在如果有多个shuffle stage的状况下,减少分区数,能够无效的加强shuffle分区合并的成果

剖析3

在CoalesceShufflePartitions.scala,CoalesceShufflePartitions是一个物理打算的规定,会执行如下操作

 if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {      plan    } else {      // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,      // we should skip it when calculating the `partitionStartIndices`.      val validMetrics = shuffleStages.flatMap(_.mapStats)      // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number      // in that case. For example when we union fully aggregated data (data is arranged to a single      // partition) and a result of a SortMergeJoin (multiple partitions).      val distinctNumPreShufflePartitions =        validMetrics.map(stats => stats.bytesByPartitionId.length).distinct      if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {        // We fall back to Spark default parallelism if the minimum number of coalesced partitions        // is not set, so to avoid perf regressions compared to no coalescing.        val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)          .getOrElse(session.sparkContext.defaultParallelism)        val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(          validMetrics.toArray,          advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),          minNumPartitions = minPartitionNum)        // This transformation adds new nodes, so we must use `transformUp` here.        val stageIds = shuffleStages.map(_.id).toSet        plan.transformUp {          // even for shuffle exchange whose input RDD has 0 partition, we should still update its          // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same          // number of output partitions.          case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>            CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)        }      } else {        plan      }    }  }

也就是说:

  1. 如果是用户本人指定的分区操作,如repartition操作,spark.sql.adaptive.coalescePartitions.minPartitionNum有效
  2. 如果多个task进行shuffle,且task有不同的分区数的话,spark.sql.adaptive.coalescePartitions.minPartitionNum有效
  3. 见ShufflePartitionsUtil.coalescePartition剖析

剖析4

在OptimizeSkewedJoin.scala中,咱们看到

/**   * A partition is considered as a skewed partition if its size is larger than the median   * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than   * ADVISORY_PARTITION_SIZE_IN_BYTES.   */  private def isSkewed(size: Long, medianSize: Long): Boolean = {    size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&      size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)  }
  1. OptimizeSkewedJoin是个物理打算的规定,会依据isSkewed来判断是否数据数据有歪斜,而且必须是满足SKEW_JOIN_SKEWED_PARTITION_FACTOR和SKEW_JOIN_SKEWED_PARTITION_THRESHOLD才会判断为数据歪斜了
  2. medianSize为task的分区大小的中位值

ShufflePartitionsUtil.coalescePartition剖析(合并分区的外围代码)

见coalescePartition如示:

def coalescePartitions(      mapOutputStatistics: Array[MapOutputStatistics],      advisoryTargetSize: Long,      minNumPartitions: Int): Seq[ShufflePartitionSpec] = {    // If `minNumPartitions` is very large, it is possible that we need to use a value less than    // `advisoryTargetSize` as the target size of a coalesced task.    val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum    // The max at here is to make sure that when we have an empty table, we only have a single    // coalesced partition.    // There is no particular reason that we pick 16. We just need a number to prevent    // `maxTargetSize` from being set to 0.    val maxTargetSize = math.max(      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)    val targetSize = math.min(maxTargetSize, advisoryTargetSize)    val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(", ")    logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +      s"actual target size $targetSize.")    // Make sure these shuffles have the same number of partitions.    val distinctNumShufflePartitions =      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct    // The reason that we are expecting a single value of the number of shuffle partitions    // is that when we add Exchanges, we set the number of shuffle partitions    // (i.e. map output partitions) using a static setting, which is the value of    // `spark.sql.shuffle.partitions`. Even if two input RDDs are having different    // number of partitions, they will have the same number of shuffle partitions    // (i.e. map output partitions).    assert(      distinctNumShufflePartitions.length == 1,      "There should be only one distinct value of the number of shuffle partitions " +        "among registered Exchange operators.")    val numPartitions = distinctNumShufflePartitions.head    val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]()    var latestSplitPoint = 0    var coalescedSize = 0L    var i = 0    while (i < numPartitions) {      // We calculate the total size of i-th shuffle partitions from all shuffles.      var totalSizeOfCurrentPartition = 0L      var j = 0      while (j < mapOutputStatistics.length) {        totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)        j += 1      }      // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a      // new coalesced partition.      if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {        partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)        latestSplitPoint = i        // reset postShuffleInputSize.        coalescedSize = totalSizeOfCurrentPartition      } else {        coalescedSize += totalSizeOfCurrentPartition      }      i += 1    }    partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)    partitionSpecs  }
  1. totalPostShuffleInputSize 先计算出总的shuffle的数据大小
  2. maxTargetSize取max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions也就是spark.sql.adaptive.coalescePartitions.minPartitionNum的值
  3. targetSize取min(maxTargetSize,advisoryTargetSize),advisoryTargetSize也就是spark.sql.adaptive.advisoryPartitionSizeInBytes的值,所以说该值只是倡议值,不肯定是targetSize
  4. while循环就是取相邻的分区合并,对于每个task中的每个相邻分区合并,直到不大于targetSize

OptimizeSkewedJoin.optimizeSkewJoin剖析(数据歪斜优化的外围代码)

见optimizeSkewJoin如示:

def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {    case smj @ SortMergeJoinExec(_, _, joinType, _,        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)        if supportedJoinTypes.contains(joinType) =>      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)      val numPartitions = left.partitionsWithSizes.length      // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.      val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))      val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))      logDebug(        s"""          |Optimizing skewed join.          |Left side partitions size info:          |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}          |Right side partitions size info:          |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}        """.stripMargin)      val canSplitLeft = canSplitLeftSide(joinType)      val canSplitRight = canSplitRightSide(joinType)      // We use the actual partition sizes (may be coalesced) to calculate target size, so that      // the final data distribution is even (coalesced partitions + split partitions).      val leftActualSizes = left.partitionsWithSizes.map(_._2)      val rightActualSizes = right.partitionsWithSizes.map(_._2)      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]      val leftSkewDesc = new SkewDesc      val rightSkewDesc = new SkewDesc      for (partitionIndex <- 0 until numPartitions) {        val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex        val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex        // A skewed partition should never be coalesced, but skip it here just to be safe.        val leftParts = if (isLeftSkew && !isLeftCoalesced) {          val reducerId = leftPartSpec.startReducerIndex          val skewSpecs = createSkewPartitionSpecs(            left.mapStats.shuffleId, reducerId, leftTargetSize)          if (skewSpecs.isDefined) {            logDebug(s"Left side partition $partitionIndex is skewed, split it into " +              s"${skewSpecs.get.length} parts.")            leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))          }          skewSpecs.getOrElse(Seq(leftPartSpec))        } else {          Seq(leftPartSpec)        }        // A skewed partition should never be coalesced, but skip it here just to be safe.        val rightParts = if (isRightSkew && !isRightCoalesced) {          val reducerId = rightPartSpec.startReducerIndex          val skewSpecs = createSkewPartitionSpecs(            right.mapStats.shuffleId, reducerId, rightTargetSize)          if (skewSpecs.isDefined) {            logDebug(s"Right side partition $partitionIndex is skewed, split it into " +              s"${skewSpecs.get.length} parts.")            rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))          }          skewSpecs.getOrElse(Seq(rightPartSpec))        } else {          Seq(rightPartSpec)        }        for {          leftSidePartition <- leftParts          rightSidePartition <- rightParts        } {          leftSidePartitions += leftSidePartition          rightSidePartitions += rightSidePartition        }      }      logDebug("number of skewed partitions: " +        s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")      if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {        val newLeft = CustomShuffleReaderExec(          left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)        val newRight = CustomShuffleReaderExec(          right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)        smj.copy(          left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)      } else {        smj      }  }
  1. SortMergeJoinExec阐明实用于sort merge join
  2. assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)保障进行join的两个task的分区数相等
  3. 别离计算进行join的task的分区中位数的大小leftMedSize和rightMedSize
  4. 别离计算进行join的task的分区的targetzise大小leftTargetSize和rightTargetSize
  5. 循环判断两个task的每个分区的是否存在歪斜,如果歪斜且满足没有进行过shuffle分区合并,则进行歪斜分区解决,否则不解决
  6. createSkewPartitionSpecs办法为:
    1.获取每个join的task的对应分区的数据大小
    2.依据targetSize分成多个slice
  7. 如果存在数据歪斜,则结构包装成CustomShuffleReaderExec,进行后续工作的运行,最最终调用ShuffledRowRDD的compute办法 匹配case PartialMapperPartitionSpec进行数据的读取,其中还会主动开启“spark.sql.adaptive.fetchShuffleBlocksInBatch”批量fetch缩小io

OptimizeSkewedJoin/CoalesceShufflePartitions 在哪里被调用

如:AdaptiveSparkPlanExec

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(    ReuseAdaptiveSubquery(conf, context.subqueryCache),    CoalesceShufflePartitions(context.session),    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'    // added by `CoalesceShufflePartitions`. So they must be executed after it.    OptimizeSkewedJoin(conf),    OptimizeLocalShuffleReader(conf)  )

可见在AdaptiveSparkPlanExec中被调用 ,且CoalesceShufflePartitions先于OptimizeSkewedJoin,
而AdaptiveSparkPlanExec在InsertAdaptiveSparkPlan中被调用
,而InsertAdaptiveSparkPlan在QueryExecution中被调用

而在InsertAdaptiveSparkPlan.shouldApplyAQE办法和supportAdaptive中咱们看到

private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {    conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {      plan.find {        case _: Exchange => true        case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true        case p => p.expressions.exists(_.find {          case _: SubqueryExpression => true          case _ => false        }.isDefined)      }.isDefined    }  }private def supportAdaptive(plan: SparkPlan): Boolean = {    // TODO migrate dynamic-partition-pruning onto adaptive execution.    sanityCheck(plan) &&      !plan.logicalLink.exists(_.isStreaming) &&      !plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&    plan.children.forall(supportAdaptive)  }

如果不满足以上条件也是不会开启AQE的,如果要强制开启,也能够配置spark.sql.adaptive.forceApply 为true(文档中提醒是外部配置)

留神:

在spark 3.0.1中曾经废除了如下的配置:

spark.sql.adaptive.skewedPartitionMaxSplits    spark.sql.adaptive.skewedPartitionRowCountThreshold    spark.sql.adaptive.skewedPartitionSizeThreshold   

本文局部参考:
https://mp.weixin.qq.com/s?__...
https://mp.weixin.qq.com/s/Rv...