共计 3905 个字符,预计需要花费 10 分钟才能阅读完成。
RDD 从一个样子转换成另一个状态,代码执行了,啥也没干,到了最后一步一下干了!懒加载是怎么做到的?
打开 RDD.scala,看最基础的 map 方法
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
其中,val cleanF = sc.clean(f)
是把函数 f
发送到各个 task 上。返回的还是 f
。map
会创建一个 MapPartitionsRDD
,可以看到f
最后还是由 iter 调用它自己的 map
方法来执行的,而这里 (context, pid, iter) => iter.map(cleanF)
整个是一个函数,也就是说,这个 RDD 的 map
方法是把自己和函数传进 MapPartitionsRDD
了,并没有任何执行。进入 MapPartitionsRDD.scala,它是一个 RDD 的实现类,但里面并没有装数据,只有个函数传进来。
/**
* An RDD that applies the provided function to every partition of the parent RDD.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
override def clearDependencies() {super.clearDependencies()
prev = null
}
}
这就是为什么代码都执行了,rdd 该转换了,可是数据并没有动。
这里也可以看到,无论是 map
还是 mapPartition
,都是把一个分区的数据封装成iterator
,执行iterator
的同名函数,这个 map
函数是 scala 的,不是 RDD 的。
这是 transform 算子,直到 action 算子,foreach
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
与 map
不同,有个sc.runJob
,才开始真正执行。
sc.runJob
进去是对每一个分区执行函数
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {if (stopped.get()) {throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job:" + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()}
注意到执行 job 的是 dagScheduler
,在sc
初始化的时候创建了,并且还创建了 taskScheduler
。
继续看,
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
这个 job 是封装成了 DAGSchedulerEvent
提交给了一个阻塞队列,由一个线程循环地从队列中取事件进行消费。
private[spark] abstract class EventLoop[E](name: String) extends Logging {private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
private val stopped = new AtomicBoolean(false)
private val eventThread = new Thread(name) {setDaemon(true)
override def run(): Unit = {
try {while (!stopped.get) {val event = eventQueue.take()
try {onReceive(event)
} catch {case NonFatal(e) =>
try {onError(e)
} catch {case NonFatal(e) => logError("Unexpected error in" + name, e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in" + name, e)
}
}
}
最后有一个 waiter 等待 job 执行结束返回结果
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}