RDD从一个样子转换成另一个状态,代码执行了,啥也没干,到了最后一步一下干了!懒加载是怎么做到的?
打开RDD.scala,看最基础的map方法

  /**   * Return a new RDD by applying a function to all elements of this RDD.   */  def map[U: ClassTag](f: T => U): RDD[U] = withScope {    val cleanF = sc.clean(f)    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))  }

其中,val cleanF = sc.clean(f)是把函数f发送到各个task上。返回的还是f
map会创建一个MapPartitionsRDD,可以看到f最后还是由iter调用它自己的map方法来执行的,而这里(context, pid, iter) => iter.map(cleanF)整个是一个函数,也就是说,这个RDD的map方法是把自己和函数传进MapPartitionsRDD了,并没有任何执行。进入MapPartitionsRDD.scala,它是一个RDD的实现类,但里面并没有装数据,只有个函数传进来。

/** * An RDD that applies the provided function to every partition of the parent RDD. */private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](    var prev: RDD[T],    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)    preservesPartitioning: Boolean = false)  extends RDD[U](prev) {  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None  override def getPartitions: Array[Partition] = firstParent[T].partitions  override def compute(split: Partition, context: TaskContext): Iterator[U] =    f(context, split.index, firstParent[T].iterator(split, context))  override def clearDependencies() {    super.clearDependencies()    prev = null  }}

这就是为什么代码都执行了,rdd该转换了,可是数据并没有动。
这里也可以看到,无论是map还是mapPartition,都是把一个分区的数据封装成iterator,执行iterator的同名函数,这个map函数是scala的,不是RDD的。

这是transform算子,直到action算子,foreach

  /**   * Applies a function f to all elements of this RDD.   */  def foreach(f: T => Unit): Unit = withScope {    val cleanF = sc.clean(f)    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))  }

map不同,有个sc.runJob,才开始真正执行。

sc.runJob进去是对每一个分区执行函数

  /**   * Run a function on a given set of partitions in an RDD and pass the results to the given   * handler function. This is the main entry point for all actions in Spark.   *   * @param rdd target RDD to run tasks on   * @param func a function to run on each partition of the RDD   * @param partitions set of partitions to run on; some jobs may not want to compute on all   * partitions of the target RDD, e.g. for operations like `first()`   * @param resultHandler callback to pass each result to   */  def runJob[T, U: ClassTag](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      resultHandler: (Int, U) => Unit): Unit = {    if (stopped.get()) {      throw new IllegalStateException("SparkContext has been shutdown")    }    val callSite = getCallSite    val cleanedFunc = clean(func)    logInfo("Starting job: " + callSite.shortForm)    if (conf.getBoolean("spark.logLineage", false)) {      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)    }    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)    progressBar.foreach(_.finishAll())    rdd.doCheckpoint()  }

注意到执行job的是dagScheduler,在sc初始化的时候创建了,并且还创建了taskScheduler
继续看,

eventProcessLoop.post(JobSubmitted(      jobId, rdd, func2, partitions.toArray, callSite, waiter,      SerializationUtils.clone(properties)))

这个job是封装成了DAGSchedulerEvent提交给了一个阻塞队列,由一个线程循环地从队列中取事件进行消费。

private[spark] abstract class EventLoop[E](name: String) extends Logging {  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  private val stopped = new AtomicBoolean(false)  private val eventThread = new Thread(name) {    setDaemon(true)    override def run(): Unit = {      try {        while (!stopped.get) {          val event = eventQueue.take()          try {            onReceive(event)          } catch {            case NonFatal(e) =>              try {                onError(e)              } catch {                case NonFatal(e) => logError("Unexpected error in " + name, e)              }          }        }      } catch {        case ie: InterruptedException => // exit even if eventQueue is not empty        case NonFatal(e) => logError("Unexpected error in " + name, e)      }    }  }

最后有一个waiter等待job执行结束返回结果

    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)    waiter.completionFuture.value.get match {      case scala.util.Success(_) =>        logInfo("Job %d finished: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))      case scala.util.Failure(exception) =>        logInfo("Job %d failed: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.        val callerStackTrace = Thread.currentThread().getStackTrace.tail        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)        throw exception    }