sortByKey引发的疑问jobshufflecache

39次阅读

共计 2689 个字符,预计需要花费 7 分钟才能阅读完成。

Just for fun,写了一个 demo,

val rdd = sc.parallelize(Seq((1, "a"), (2, "c"), (3, "b"), (2, "c")))
val sorted = rdd.sortByKey()
sorted.foreach(println)
val c = sorted.count()

1.job

打开 Spark UI,如图:

sortByKey,一个 transform 算子。为什么 transform 算子会引发一个 job 呢?
翻看源码,

  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }

有一个RangePartitioner,点进去,

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: Product2[K, V]],
    private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {if (partitions <= 1) {Array.empty} else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

有一个 sketch 方法,点进去,

  def sketch[K : ClassTag](rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex {(idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    val numItems = sketched.map(_._2).sum
    (numItems, sketched)
  }

有个 collect,这个collect 就是 rdd 的 action 算子。所以触发了一个 job。但是它仍然是一个 transform 算子。点开佛 reach 算子触发的 job,如图,经过了 sortByKey


这段 RangePartitioner 里的代码是干嘛呢?就是根据 key 划分各分区的边界,以决定后续 shuffle 重新分区的数据去向。

2.shuffle

点开 count 触发的 job,


stage3 被 skip 掉了。代码并没有缓存却能跳过一个 stage。
这是因为 sortByKey 是个宽依赖算子,发生 shuffle,shuffle 的过程是上游 stage 把 rdd 的数据写出到临时文件里,再由下游 stage 去读取。sparkContext 的生命周期里,这些临时文件(中间结果)一直存在,所以在下一个 job 触发的时候,根据 rdd 的依赖会找到这些临时文件,从而起到了“缓存”的作用。
于是,我在 sortByKey 后加了cache。UI 图没变(这里不贴了,下面有讲)。意味着 sortByKey 似乎又执行了一次。cache 没用还是 UI 显示方式就这样?

3.cache

为了验证这个问题,我把代码改了

val rddA = sc.parallelize(Seq((1, "a"), (2, "c"), (3, "b"), (2, "c")))
  .filter(x => x._1 > 1).aggregateByKey(0)((x, y) => {println("agg:" + y); 0
}, (x1, x2) => 0).cache() // 缓存 agg 后的 rdd
val c = rddA.count()
println("总数:" + c)
rddA.foreach(println)

看 UI,

如上,似乎 aggregateByKey 又执行了一遍!我代码中在 aggregateByKey 里打印了。

可以看到,只打印了一次,说明 aggregateByKey 只执行了一次,但是在 UI 中只能整个 stage 为灰色或蓝色。
并且这个 stage3 不会去读取 shuffle 生成的临时文件,而是直接从 cache 中读取 ShuffledRDD。有图为证,


Shuffle Read 没有数据。

PS:的字体确实比 win 的好看!Ayuthaya 或 Manaco 都比 Console 好看~

正文完
 0