乐趣区

关于spark:spark系列2spark-301-AQEAdaptive-Query-Exection分析

AQE 简介

从 spark configuration, 到在最早在 spark 1.6 版本就曾经有了 AQE; 到了 spark 2.x 版本,intel 大数据团队进行了相应的原型开发和实际;到了 spark 3.0 时代,Databricks 和 intel 一起为社区奉献了新的 AQE

spark 3.0.1 中的 AQE 的配置

配置项 默认值 官网阐明 剖析
spark.sql.adaptive.enabled false 是否开启自适应查问 此处设置为 true 开启
spark.sql.adaptive.coalescePartitions.enabled true 是否合并邻近的 shuffle 分区(依据 ’spark.sql.adaptive.advisoryPartitionSizeInBytes’ 的阈值来合并) 此处默认为 true 开启,剖析见: 剖析 1
spark.sql.adaptive.coalescePartitions.initialPartitionNum (none) shuffle 合并分区之前的初始分区数,默认为 spark.sql.shuffle.partitions 的值 剖析见: 剖析 2
spark.sql.adaptive.coalescePartitions.minPartitionNum (none) shuffle 分区合并后的最小分区数,默认为 spark 集群的默认并行度 剖析见: 剖析 3
spark.sql.adaptive.advisoryPartitionSizeInBytes 64MB 倡议的 shuffle 分区的大小,在合并分区和解决 join 数据歪斜的时候用到 剖析见:剖析 3
spark.sql.adaptive.skewJoin.enabled true 是否开启 join 中数据歪斜的自适应解决
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5 数据歪斜判断因子,必须同时满足 skewedPartitionFactor 和 skewedPartitionThresholdInBytes 剖析见:剖析 4
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 数据歪斜判断阈值, 必须同时满足 skewedPartitionFactor 和 skewedPartitionThresholdInBytes 剖析见:剖析 4

剖析 1

在 OptimizeSkewedJoin.scala 中,咱们看到 ADVISORY_PARTITION_SIZE_IN_BYTES,也就是 spark.sql.adaptive.advisoryPartitionSizeInBytes 被援用的中央,(OptimizeSkewedJoin 是物理打算中的规定)

 /**
   * The goal of skew join optimization is to make the data distribution more even. The target size
   * to split skewed partitions is the average size of non-skewed partition, or the
   * advisory partition size if avg size is smaller than it.
   */
  private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
    val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
    // It's impossible that all the partitions are skewed, as we use median size to define skew.
    assert(nonSkewSizes.nonEmpty)
    math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
  }

其中:

  1. nonSkewSizes 为 task 非歪斜的分区
  2. targetSize 返回的是 max(非歪斜的分区的平均值,advisorySize),其中 advisorySize 为 spark.sql.adaptive.advisoryPartitionSizeInBytes 值,所以说

targetSize 不肯定是 spark.sql.adaptive.advisoryPartitionSizeInBytes 值

  1. medianSize 值为 task 的分区大小的中位值

剖析 2

在 SQLConf.scala

def numShufflePartitions: Int = {if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
    } else {defaultNumShufflePartitions}
  }

从 spark 3.0.1 开始如果开启了 AQE 和 shuffle 分区合并,则用的是 spark.sql.adaptive.coalescePartitions.initialPartitionNum,这在如果有多个 shuffle stage 的状况下,减少分区数,能够无效的加强 shuffle 分区合并的成果

剖析 3

在 CoalesceShufflePartitions.scala,CoalesceShufflePartitions 是一个物理打算的规定,会执行如下操作

 if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) {plan} else {
      // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions,
      // we should skip it when calculating the `partitionStartIndices`.
      val validMetrics = shuffleStages.flatMap(_.mapStats)

      // We may have different pre-shuffle partition numbers, don't reduce shuffle partition number
      // in that case. For example when we union fully aggregated data (data is arranged to a single
      // partition) and a result of a SortMergeJoin (multiple partitions).
      val distinctNumPreShufflePartitions =
        validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
      if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
        // We fall back to Spark default parallelism if the minimum number of coalesced partitions
        // is not set, so to avoid perf regressions compared to no coalescing.
        val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
          .getOrElse(session.sparkContext.defaultParallelism)
        val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
          validMetrics.toArray,
          advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),
          minNumPartitions = minPartitionNum)
        // This transformation adds new nodes, so we must use `transformUp` here.
        val stageIds = shuffleStages.map(_.id).toSet
        plan.transformUp {
          // even for shuffle exchange whose input RDD has 0 partition, we should still update its
          // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
          // number of output partitions.
          case stage: ShuffleQueryStageExec if stageIds.contains(stage.id) =>
            CustomShuffleReaderExec(stage, partitionSpecs, COALESCED_SHUFFLE_READER_DESCRIPTION)
        }
      } else {plan}
    }
  }

也就是说:

  1. 如果是用户本人指定的分区操作,如 repartition 操作,spark.sql.adaptive.coalescePartitions.minPartitionNum 有效
  2. 如果多个 task 进行 shuffle,且 task 有不同的分区数的话,spark.sql.adaptive.coalescePartitions.minPartitionNum 有效
  3. 见 ShufflePartitionsUtil.coalescePartition 剖析

剖析 4

在 OptimizeSkewedJoin.scala 中,咱们看到

/**
   * A partition is considered as a skewed partition if its size is larger than the median
   * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than
   * ADVISORY_PARTITION_SIZE_IN_BYTES.
   */
  private def isSkewed(size: Long, medianSize: Long): Boolean = {size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
      size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
  }
  1. OptimizeSkewedJoin 是个物理打算的规定,会依据 isSkewed 来判断是否数据数据有歪斜,而且必须是满足 SKEW_JOIN_SKEWED_PARTITION_FACTOR 和 SKEW_JOIN_SKEWED_PARTITION_THRESHOLD 才会判断为数据歪斜了
  2. medianSize 为 task 的分区大小的中位值

ShufflePartitionsUtil.coalescePartition 剖析(合并分区的外围代码)

见 coalescePartition 如示:

def coalescePartitions(mapOutputStatistics: Array[MapOutputStatistics],
      advisoryTargetSize: Long,
      minNumPartitions: Int): Seq[ShufflePartitionSpec] = {
    // If `minNumPartitions` is very large, it is possible that we need to use a value less than
    // `advisoryTargetSize` as the target size of a coalesced task.
    val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
    // The max at here is to make sure that when we have an empty table, we only have a single
    // coalesced partition.
    // There is no particular reason that we pick 16. We just need a number to prevent
    // `maxTargetSize` from being set to 0.
    val maxTargetSize = math.max(math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
    val targetSize = math.min(maxTargetSize, advisoryTargetSize)

    val shuffleIds = mapOutputStatistics.map(_.shuffleId).mkString(",")
    logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize," +
      s"actual target size $targetSize.")

    // Make sure these shuffles have the same number of partitions.
    val distinctNumShufflePartitions =
      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
    // The reason that we are expecting a single value of the number of shuffle partitions
    // is that when we add Exchanges, we set the number of shuffle partitions
    // (i.e. map output partitions) using a static setting, which is the value of
    // `spark.sql.shuffle.partitions`. Even if two input RDDs are having different
    // number of partitions, they will have the same number of shuffle partitions
    // (i.e. map output partitions).
    assert(
      distinctNumShufflePartitions.length == 1,
      "There should be only one distinct value of the number of shuffle partitions" +
        "among registered Exchange operators.")

    val numPartitions = distinctNumShufflePartitions.head
    val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]()
    var latestSplitPoint = 0
    var coalescedSize = 0L
    var i = 0
    while (i < numPartitions) {
      // We calculate the total size of i-th shuffle partitions from all shuffles.
      var totalSizeOfCurrentPartition = 0L
      var j = 0
      while (j < mapOutputStatistics.length) {totalSizeOfCurrentPartition += mapOutputStatistics(j).bytesByPartitionId(i)
        j += 1
      }

      // If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a
      // new coalesced partition.
      if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
        latestSplitPoint = i
        // reset postShuffleInputSize.
        coalescedSize = totalSizeOfCurrentPartition
      } else {coalescedSize += totalSizeOfCurrentPartition}
      i += 1
    }
    partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)

    partitionSpecs
  }
  1. totalPostShuffleInputSize 先计算出总的 shuffle 的数据大小
  2. maxTargetSize 取 max(totalPostShuffleInputSize/minNumPartitions,16)的最大值,minNumPartitions 也就是 spark.sql.adaptive.coalescePartitions.minPartitionNum 的值
  3. targetSize 取 min(maxTargetSize,advisoryTargetSize),advisoryTargetSize 也就是 spark.sql.adaptive.advisoryPartitionSizeInBytes 的值,所以说该值只是倡议值,不肯定是 targetSize
  4. while 循环就是取相邻的分区合并,对于每个 task 中的每个相邻分区合并,直到不大于 targetSize

OptimizeSkewedJoin.optimizeSkewJoin 剖析 (数据歪斜优化的外围代码)

见 optimizeSkewJoin 如示:

def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
    case smj @ SortMergeJoinExec(_, _, joinType, _,
        s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
        s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
        if supportedJoinTypes.contains(joinType) =>
      assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
      val numPartitions = left.partitionsWithSizes.length
      // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.
      val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))
      val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))
      logDebug(
        s"""
          |Optimizing skewed join.
          |Left side partitions size info:
          |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}
          |Right side partitions size info:
          |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}
        """.stripMargin)
      val canSplitLeft = canSplitLeftSide(joinType)
      val canSplitRight = canSplitRightSide(joinType)
      // We use the actual partition sizes (may be coalesced) to calculate target size, so that
      // the final data distribution is even (coalesced partitions + split partitions).
      val leftActualSizes = left.partitionsWithSizes.map(_._2)
      val rightActualSizes = right.partitionsWithSizes.map(_._2)
      val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
      val rightTargetSize = targetSize(rightActualSizes, rightMedSize)

      val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
      val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
      val leftSkewDesc = new SkewDesc
      val rightSkewDesc = new SkewDesc
      for (partitionIndex <- 0 until numPartitions) {val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
        val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1
        val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex

        val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight
        val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
        val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex

        // A skewed partition should never be coalesced, but skip it here just to be safe.
        val leftParts = if (isLeftSkew && !isLeftCoalesced) {
          val reducerId = leftPartSpec.startReducerIndex
          val skewSpecs = createSkewPartitionSpecs(left.mapStats.shuffleId, reducerId, leftTargetSize)
          if (skewSpecs.isDefined) {
            logDebug(s"Left side partition $partitionIndex is skewed, split it into" +
              s"${skewSpecs.get.length} parts.")
            leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))
          }
          skewSpecs.getOrElse(Seq(leftPartSpec))
        } else {Seq(leftPartSpec)
        }

        // A skewed partition should never be coalesced, but skip it here just to be safe.
        val rightParts = if (isRightSkew && !isRightCoalesced) {
          val reducerId = rightPartSpec.startReducerIndex
          val skewSpecs = createSkewPartitionSpecs(right.mapStats.shuffleId, reducerId, rightTargetSize)
          if (skewSpecs.isDefined) {
            logDebug(s"Right side partition $partitionIndex is skewed, split it into" +
              s"${skewSpecs.get.length} parts.")
            rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))
          }
          skewSpecs.getOrElse(Seq(rightPartSpec))
        } else {Seq(rightPartSpec)
        }

        for {
          leftSidePartition <- leftParts
          rightSidePartition <- rightParts
        } {
          leftSidePartitions += leftSidePartition
          rightSidePartitions += rightSidePartition
        }
      }

      logDebug("number of skewed partitions:" +
        s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")
      if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
        val newLeft = CustomShuffleReaderExec(left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
        val newRight = CustomShuffleReaderExec(right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
        smj.copy(left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
      } else {smj}
  }
  1. SortMergeJoinExec 阐明实用于 sort merge join
  2. assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length) 保障进行 join 的两个 task 的分区数相等
  3. 别离计算进行 join 的 task 的分区中位数的大小 leftMedSize 和 rightMedSize
  4. 别离计算进行 join 的 task 的分区的 targetzise 大小 leftTargetSize 和 rightTargetSize
  5. 循环判断两个 task 的每个分区的是否存在歪斜,如果歪斜且满足没有进行过 shuffle 分区合并,则进行歪斜分区解决,否则不解决
  6. createSkewPartitionSpecs 办法为:
    1. 获取每个 join 的 task 的对应分区的数据大小
    2. 依据 targetSize 分成多个 slice
  7. 如果存在数据歪斜,则结构包装成 CustomShuffleReaderExec,进行后续工作的运行,最最终调用 ShuffledRowRDD 的 compute 办法 匹配 case PartialMapperPartitionSpec 进行数据的读取,其中还会主动开启“spark.sql.adaptive.fetchShuffleBlocksInBatch”批量 fetch 缩小 io

OptimizeSkewedJoin/CoalesceShufflePartitions 在哪里被调用

如:AdaptiveSparkPlanExec

@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(ReuseAdaptiveSubquery(conf, context.subqueryCache),
    CoalesceShufflePartitions(context.session),
    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
    // added by `CoalesceShufflePartitions`. So they must be executed after it.
    OptimizeSkewedJoin(conf),
    OptimizeLocalShuffleReader(conf)
  )

可见在 AdaptiveSparkPlanExec 中被调用,且 CoalesceShufflePartitions 先于 OptimizeSkewedJoin,
而 AdaptiveSparkPlanExec 在 InsertAdaptiveSparkPlan 中被调用
, 而 InsertAdaptiveSparkPlan 在 QueryExecution 中被调用

而在 InsertAdaptiveSparkPlan.shouldApplyAQE 办法和 supportAdaptive 中咱们看到

private def shouldApplyAQE(plan: SparkPlan, isSubquery: Boolean): Boolean = {conf.getConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY) || isSubquery || {
      plan.find {
        case _: Exchange => true
        case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
        case p => p.expressions.exists(_.find {
          case _: SubqueryExpression => true
          case _ => false
        }.isDefined)
      }.isDefined
    }
  }

private def supportAdaptive(plan: SparkPlan): Boolean = {
    // TODO migrate dynamic-partition-pruning onto adaptive execution.
    sanityCheck(plan) &&
      !plan.logicalLink.exists(_.isStreaming) &&
      !plan.expressions.exists(_.find(_.isInstanceOf[DynamicPruningSubquery]).isDefined) &&
    plan.children.forall(supportAdaptive)
  }

如果不满足以上条件也是不会开启 AQE 的,如果要强制开启,也能够配置 spark.sql.adaptive.forceApply 为 true(文档中提醒是外部配置)

留神:

在 spark 3.0.1 中曾经废除了如下的配置:

spark.sql.adaptive.skewedPartitionMaxSplits    
spark.sql.adaptive.skewedPartitionRowCountThreshold    
spark.sql.adaptive.skewedPartitionSizeThreshold   

本文局部参考:
https://mp.weixin.qq.com/s?__…
https://mp.weixin.qq.com/s/Rv…

退出移动版