有的时候,须要判断rdd.isEmpty(),以决定是否须要后续操作。而这个isEmpty办法是个action算子。也就是说如果rdd不为空,须要做后续操作的话,那么这个rdd的创立过程可能就执行了两遍。那么rdd须要cache吗?

进入isEmpty办法

  def isEmpty(): Boolean = withScope {    partitions.length == 0 || take(1).length == 0  }

如果这个rdd是从kafka读出来的,那么partitions.length == 0这个判断就为false,会进入take(num = 1)办法,

  def take(num: Int): Array[T] = withScope {    // 扫描范畴扩充因子    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)    if (num == 0) {      new Array[T](0)    } else {      // 保留take的后果      val buf = new ArrayBuffer[T]      // 总共的分区个数      val totalParts = this.partitions.length      // 浏览过的分区个数      var partsScanned = 0      // 后果中的记录数小于take须要的num,并且浏览过的分区数小于总分区数      while (buf.size < num && partsScanned < totalParts) {        // 应该浏览的分区个数        // 最开始为1,也就是先尝试从第0个分区取记录,如果一个这个分区的记录数不够,再浏览其余分区        var numPartsToTry = 1L               val left = num - buf.size        if (partsScanned > 0) {          // 进入到这个判断里阐明不是第一次循环,上次浏览的分区取出来的记录数量还不够num,这时就须要扩充应该本次应该浏览的分区数了          if (buf.isEmpty) {            numPartsToTry = partsScanned * scaleUpFactor          } else {            // 曾经浏览过的分区个数 * (残余要拜访的记录数与曾经拜访过的记录数的比值),再扩充50%,得出还须要浏览的分区个数            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)          }        }        // p是应该浏览的分区索引数组,表明哪些分区应该被浏览        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)        // 按指定的分区执行“小规模”job        // 这里it.take(left)会让各分区的迭代器只迭代以后buf所须要的记录数。依据迭代器模式,可知这里并不会遍历整个分区的数据再从中拿出left条记录        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)        // 将job的后果塞进buf中        // 这里应用_.take(num - buf.size),保障buf的记录数量不会超过num        res.foreach(buf ++= _.take(num - buf.size))        partsScanned += p.size      }      buf.toArray

从源码中可见,
如果take的num不超过第0个分区里的记录数,那么会产生一次“小规模job”,总共拜访过的记录数=num;
如果超过了,就会再在更大的范畴(更多分区中)查找更少的残余须要take进去的记录数,从而产生一个“中等规模job”,可能使总共拜访过的记录数>num;

举个例子

val rdd = sc.makeRDD(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 5)rdd.cache().take(5).foreach(println)

1.取出第0个分区的(1,2),buf的size=2,left=3
2.算出还须要从2.25≈3个分区(第1,2,3号分区)中各取3条记录(但各分区只有两条,所以取了2条)
3.第2步取出的(3,4),(5,6),(7,8)这三组共6条数据,塞入buf中。buf只还须要3条,所以只塞进(3,4,5)
4.返回后果buf(1,2,3,4,5)


由图可见,总共执行了两次job,第一次1个分区,第二次3个分区。并且缓存了4个分区。
顺便提一点,cache()是以分区为最小单位的,如果只须要遍历的某个分区的一小部分数据,用了cache,也会把整个分区都遍历一次缓存起来。

答复最后的问题

1.isEmpty(),只会从rdd产生的源头中遍历第一条数据,如果不cache(),它只会从数据源拜访一条数据。如果cache了,会遍历第0个分区的所有数据并缓存;
2.这个rdd如果在前面是的代码中不被应用的话,就不要cache,否则能够cache。