关于spark:isEmpty类型的action算子需要cache吗

97次阅读

共计 2061 个字符,预计需要花费 6 分钟才能阅读完成。

有的时候,须要判断 rdd.isEmpty(),以决定是否须要后续操作。而这个isEmpty 办法是个 action 算子。也就是说如果 rdd 不为空,须要做后续操作的话,那么这个 rdd 的创立过程可能就执行了两遍。那么 rdd 须要 cache 吗?

进入 isEmpty 办法

  def isEmpty(): Boolean = withScope {partitions.length == 0 || take(1).length == 0
  }

如果这个 rdd 是从 kafka 读出来的,那么 partitions.length == 0 这个判断就为 false, 会进入take(num = 1) 办法,

  def take(num: Int): Array[T] = withScope {
    // 扫描范畴扩充因子
    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
    if (num == 0) {new Array[T](0)
    } else {
      // 保留 take 的后果
      val buf = new ArrayBuffer[T]
      // 总共的分区个数
      val totalParts = this.partitions.length
      // 浏览过的分区个数
      var partsScanned = 0
      // 后果中的记录数小于 take 须要的 num,并且浏览过的分区数小于总分区数
      while (buf.size < num && partsScanned < totalParts) {
        // 应该浏览的分区个数
        // 最开始为 1,也就是先尝试从第 0 个分区取记录,如果一个这个分区的记录数不够,再浏览其余分区
        var numPartsToTry = 1L
       
        val left = num - buf.size
        if (partsScanned > 0) {
          // 进入到这个判断里阐明不是第一次循环,上次浏览的分区取出来的记录数量还不够 num,这时就须要扩充应该本次应该浏览的分区数了
          if (buf.isEmpty) {numPartsToTry = partsScanned * scaleUpFactor} else {// 曾经浏览过的分区个数 * (残余要拜访的记录数与曾经拜访过的记录数的比值),再扩充 50%,得出还须要浏览的分区个数
            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
          }
        }
        // p 是应该浏览的分区索引数组,表明哪些分区应该被浏览
        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        // 按指定的分区执行“小规模”job
        // 这里 it.take(left)会让各分区的迭代器只迭代以后 buf 所须要的记录数。依据迭代器模式,可知这里并不会遍历整个分区的数据再从中拿出 left 条记录
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
        // 将 job 的后果塞进 buf 中
        // 这里应用_.take(num - buf.size),保障 buf 的记录数量不会超过 num
        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }

      buf.toArray

从源码中可见,
如果 take 的 num 不超过第 0 个分区里的记录数,那么会产生一次“小规模 job”,总共拜访过的记录数 =num;
如果超过了,就会再在更大的范畴(更多分区中)查找更少的残余须要 take 进去的记录数,从而产生一个“中等规模 job”,可能使总共拜访过的记录数 >num;

举个例子

val rdd = sc.makeRDD(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 5)
rdd.cache().take(5).foreach(println)

1. 取出第 0 个分区的 (1,2),buf 的 size=2,left=3
2. 算出还须要从 2.25≈3 个分区(第 1,2,3 号分区) 中各取 3 条记录(但各分区只有两条,所以取了 2 条)
3. 第 2 步取出的 (3,4),(5,6),(7,8) 这三组共 6 条数据,塞入 buf 中。buf 只还须要 3 条,所以只塞进 (3,4,5)
4. 返回后果 buf(1,2,3,4,5)


由图可见,总共执行了两次 job,第一次 1 个分区,第二次 3 个分区。并且缓存了 4 个分区。
顺便提一点,cache()是以分区为最小单位的,如果只须要遍历的某个分区的一小部分数据,用了 cache,也会把整个分区都遍历一次缓存起来。

答复最后的问题

1.isEmpty(),只会从 rdd 产生的源头中遍历第一条数据,如果不cache(),它只会从数据源拜访一条数据。如果 cache 了,会遍历第 0 个分区的所有数据并缓存;
2. 这个 rdd 如果在前面是的代码中不被应用的话,就不要 cache,否则能够 cache。

正文完
 0