有的时候,须要判断 rdd.isEmpty()
,以决定是否须要后续操作。而这个isEmpty
办法是个 action 算子。也就是说如果 rdd 不为空,须要做后续操作的话,那么这个 rdd 的创立过程可能就执行了两遍。那么 rdd 须要 cache
吗?
进入 isEmpty 办法
def isEmpty(): Boolean = withScope {partitions.length == 0 || take(1).length == 0
}
如果这个 rdd 是从 kafka 读出来的,那么 partitions.length == 0
这个判断就为 false
, 会进入take(num = 1)
办法,
def take(num: Int): Array[T] = withScope {
// 扫描范畴扩充因子
val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
if (num == 0) {new Array[T](0)
} else {
// 保留 take 的后果
val buf = new ArrayBuffer[T]
// 总共的分区个数
val totalParts = this.partitions.length
// 浏览过的分区个数
var partsScanned = 0
// 后果中的记录数小于 take 须要的 num,并且浏览过的分区数小于总分区数
while (buf.size < num && partsScanned < totalParts) {
// 应该浏览的分区个数
// 最开始为 1,也就是先尝试从第 0 个分区取记录,如果一个这个分区的记录数不够,再浏览其余分区
var numPartsToTry = 1L
val left = num - buf.size
if (partsScanned > 0) {
// 进入到这个判断里阐明不是第一次循环,上次浏览的分区取出来的记录数量还不够 num,这时就须要扩充应该本次应该浏览的分区数了
if (buf.isEmpty) {numPartsToTry = partsScanned * scaleUpFactor} else {// 曾经浏览过的分区个数 * (残余要拜访的记录数与曾经拜访过的记录数的比值),再扩充 50%,得出还须要浏览的分区个数
numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
}
}
// p 是应该浏览的分区索引数组,表明哪些分区应该被浏览
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
// 按指定的分区执行“小规模”job
// 这里 it.take(left)会让各分区的迭代器只迭代以后 buf 所须要的记录数。依据迭代器模式,可知这里并不会遍历整个分区的数据再从中拿出 left 条记录
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
// 将 job 的后果塞进 buf 中
// 这里应用_.take(num - buf.size),保障 buf 的记录数量不会超过 num
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += p.size
}
buf.toArray
从源码中可见,
如果 take 的 num 不超过第 0 个分区里的记录数,那么会产生一次“小规模 job”,总共拜访过的记录数 =num;
如果超过了,就会再在更大的范畴(更多分区中)查找更少的残余须要 take 进去的记录数,从而产生一个“中等规模 job”,可能使总共拜访过的记录数 >num;
举个例子
val rdd = sc.makeRDD(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 5)
rdd.cache().take(5).foreach(println)
1. 取出第 0 个分区的 (1,2),buf 的 size=2,left=3
2. 算出还须要从 2.25≈3 个分区(第 1,2,3 号分区) 中各取 3 条记录(但各分区只有两条,所以取了 2 条)
3. 第 2 步取出的 (3,4),(5,6),(7,8) 这三组共 6 条数据,塞入 buf 中。buf 只还须要 3 条,所以只塞进 (3,4,5)
4. 返回后果 buf(1,2,3,4,5)
由图可见,总共执行了两次 job,第一次 1 个分区,第二次 3 个分区。并且缓存了 4 个分区。
顺便提一点,cache()
是以分区为最小单位的,如果只须要遍历的某个分区的一小部分数据,用了 cache,也会把整个分区都遍历一次缓存起来。
答复最后的问题
1.isEmpty()
,只会从 rdd 产生的源头中遍历第一条数据,如果不cache()
,它只会从数据源拜访一条数据。如果 cache 了,会遍历第 0 个分区的所有数据并缓存;
2. 这个 rdd 如果在前面是的代码中不被应用的话,就不要 cache,否则能够 cache。