Spark提交一个计算是调用spark-submit。
spark-submit 调用的是 bin目录下的spark-submit脚本,咱们关上spark-submit脚本;

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

能够看到 spark-submit执行的是bin/spark-class文件。

CMD=("${CMD[@]:0:$LAST}")exec "${CMD[@]}"

咱们加一行打印命令:

......CMD=("${CMD[@]:0:$LAST}")echo "${CMD[@]}"exec "${CMD[@]}"

咱们提交一个计算:

 spark-submit \--class org.apache.spark.examples.SparkPi\--master yarn \--deploy-mode client \--driver-memory 512m \--executor-memory 512m \--total-executor-cores 2 \/home/hadoop/examples/jars/spark-examples_2.12-3.0.0.jar

打印进去的代码:

java -cp......-Xmx512m org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode client --conf spark.driver.memory=512m --class MySpark --executor-memory 512m --total-executor-cores 2 / /home/hadoop/examples/jars/spark-examples_2.12-3.0.0.jar

java会启动一个SparkSubmit过程,执行SparkSubmit main 办法。

override def main(args: Array[String]): Unit = {    val submit = new SparkSubmit() {      self =>      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {        new SparkSubmitArguments(args) {          override protected def logInfo(msg: => String): Unit = self.logInfo(msg)          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)          override protected def logError(msg: => String): Unit = self.logError(msg)        }      }      override protected def logInfo(msg: => String): Unit = printMessage(msg)      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")      override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")      override def doSubmit(args: Array[String]): Unit = {        try {          super.doSubmit(args)        } catch {          case e: SparkUserAppException =>            exitFn(e.exitCode)        }      }    }    submit.doSubmit(args)  }

main办法会创立SparkSubmit对象,并且执行doSubmit办法;

def doSubmit(args: Array[String]): Unit = {    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to    // be reset before the application starts.    val uninitLog = initializeLogIfNecessary(true, silent = true)    val appArgs = parseArguments(args)    if (appArgs.verbose) {      logInfo(appArgs.toString)    }    appArgs.action match {      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)      case SparkSubmitAction.KILL => kill(appArgs)      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)      case SparkSubmitAction.PRINT_VERSION => printVersion()    }  }

doSubmit办法首先进行参数解析;

val appArgs = parseArguments(args)
protected def parseArguments(args: Array[String]): SparkSubmitArguments = {    new SparkSubmitArguments(args)  }

关上SparkSubmitArguments类;

// Set parameters from command line arguments  parse(args.asJava)

首先会从命令行参数里解析参数,应用正则表达式;

Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

具体handle办法如下:

override protected def handle(opt: String, value: String): Boolean = {    opt match {      case NAME =>        name = value      case MASTER =>        master = value      case CLASS =>        mainClass = value      case DEPLOY_MODE =>        if (value != "client" && value != "cluster") {          error("--deploy-mode must be either \"client\" or \"cluster\"")        }        deployMode = value      case NUM_EXECUTORS =>        numExecutors = value      case TOTAL_EXECUTOR_CORES =>        totalExecutorCores = value      case EXECUTOR_CORES =>        executorCores = value      case EXECUTOR_MEMORY =>        executorMemory = value      case DRIVER_MEMORY =>        driverMemory = value      case DRIVER_CORES =>        driverCores = value   ......

代码

case MASTER =>        master = value

对应

protected final String MASTER = "--master";//命令行代码--master yarn 

代码

case CLASS =>        mainClass = value

对应

protected final String CLASS = "--class";//命令行代码--class org.apache.spark.examples.SparkPi 

代码

case DEPLOY_MODE =>  if (value != "client" && value != "cluster") {    error("--deploy-mode must be either \"client\" or \"cluster\"")  }  deployMode = value

对应

protected final String DEPLOY_MODE = "--deploy-mode";//命令行代码--deploy-mode client 

代码

case DRIVER_MEMORY =>  driverMemory = value

对应

protected final String DRIVER_MEMORY = "--driver-memory";//命令行代码--driver-memory 512m

代码

case EXECUTOR_MEMORY =>--executorMemory = value

对应

protected final String EXECUTOR_MEMORY = "--executor-memory";//命令行代码--executor-memory 512m 

代码

case TOTAL_EXECUTOR_CORES =>  totalExecutorCores = value

对应

protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";//命令行代码--total-executor-cores 2 

这样,咱们就把 spark-submit提交的参数全副解析进去了。

SparkSubmitArguments类有个参数叫action

var action: SparkSubmitAction = null

默认值是null,默认被赋值为"SUBMIT"

appArgs.action match {      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)      case SparkSubmitAction.KILL => kill(appArgs)      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)      case SparkSubmitAction.PRINT_VERSION => printVersion()    }

深刻submit办法,

private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {    def doRunMain(): Unit = {      if (args.proxyUser != null) {        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,          UserGroupInformation.getCurrentUser())        try {          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {            override def run(): Unit = {              runMain(args, uninitLog)            }          })        } catch {          case e: Exception =>            // Hadoop's AuthorizationException suppresses the exception's stack trace, which            // makes the message printed to the output by the JVM not very helpful. Instead,            // detect exceptions with empty stack traces here, and treat them differently.            if (e.getStackTrace().length == 0) {              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")            } else {              throw e            }        }      } else {        runMain(args, uninitLog)      }    }    // In standalone cluster mode, there are two submission gateways:    //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper    //   (2) The new REST-based gateway introduced in Spark 1.3    // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over    // to use the legacy gateway if the master endpoint turns out to be not a REST server.    if (args.isStandaloneCluster && args.useRest) {      try {        logInfo("Running Spark using the REST application submission protocol.")        doRunMain()      } catch {        // Fail over to use the legacy submission gateway        case e: SubmitRestConnectionException =>          logWarning(s"Master endpoint ${args.master} was not a REST server. " +            "Falling back to legacy submission gateway instead.")          args.useRest = false          submit(args, false)      }    // In all other modes, just run the main class as prepared    } else {      doRunMain()    }  }

submit办法首先会判断是不是standalone集群,咱们提交的是yarn集群,那会执行doRunMain()办法。doRunMain办法有个判断是否应用代理用户,否则执行主程序。

深刻runMain(),
runMain办法有个很重要的代码,筹备提交环境;

 val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

类加载器,

val loader = getSubmitClassLoader(sparkConf)

通过反射找出类的信息,

mainClass = Utils.classForName(childMainClass)

mainClass 有没有继承SparkApplication,如果有,则通过结构器间接创立实例
如果没有,间接new一个对象, new JavaMainApplication(mainClass)

val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {  mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]} else {  new JavaMainApplication(mainClass)}

调用SparkApplication的start办法。

try {  app.start(childArgs.toArray, sparkConf)} catch {  case t: Throwable =>    throw findCause(t)}

咱们深刻 prepareSubmitEnvironment办法,

办法定义了四个变量,

// Return values    val childArgs = new ArrayBuffer[String]()    val childClasspath = new ArrayBuffer[String]()    val sparkConf = args.toSparkConf()    var childMainClass = ""

其中 childMainClass 是可变的,咱们找到给childMainClass 赋值的中央,

 if (isYarnCluster) {      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
 private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =    "org.apache.spark.deploy.yarn.YarnClusterApplication"

在Yarn框架下,childMainClass 赋值 "org.apache.spark.deploy.yarn.YarnClusterApplication"

private[spark] class YarnClusterApplication extends SparkApplication {  override def start(args: Array[String], conf: SparkConf): Unit = {    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,    // so remove them from sparkConf here for yarn mode.    conf.remove(JARS)    conf.remove(FILES)    new Client(new ClientArguments(args), conf, null).run()  }}  

深刻Client

private[spark] class Client(    val args: ClientArguments,    val sparkConf: SparkConf,    val rpcEnv: RpcEnv)  extends Logging {  import Client._  import YarnSparkHadoopUtil._  private val yarnClient = YarnClient.createYarnClient  private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))  private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"  private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode  private var appMaster: ApplicationMaster = _  private var stagingDirPath: Path = _

Client会创立Yarn的客户端,

private val yarnClient = YarnClient.createYarnClient


深刻YarnClient.createYarnClient,

public static YarnClient createYarnClient() {    YarnClient client = new YarnClientImpl();    return client;}

createYarnClient创立一个对象 YarnClientImpl(),深刻 YarnClientImpl(),

public class YarnClientImpl extends YarnClient {    private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);    protected ApplicationClientProtocol rmClient;....

rmClient就是resourcemanager的客户端。

咱们回到YarnClusterApplication,进入run办法,

def run(): Unit = {    this.appId = submitApplication()......

submitApplication()办法就是提交应用程序,返回appId ,appId 是全局yarn的利用id,后续的状态、报告等都能够通过 appId 失去。

深刻submitApplication(),

def submitApplication(): ApplicationId = {    ResourceRequestHelper.validateResources(sparkConf)    var appId: ApplicationId = null    try {      launcherBackend.connect()      yarnClient.init(hadoopConf)      yarnClient.start()      logInfo("Requesting a new application from cluster with %d NodeManagers"        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))      // Get a new application from our RM      val newApp = yarnClient.createApplication()      val newAppResponse = newApp.getNewApplicationResponse()      appId = newAppResponse.getApplicationId()      // The app staging dir based on the STAGING_DIR configuration if configured      // otherwise based on the users home directory.      val appStagingBaseDir = sparkConf.get(STAGING_DIR)        .map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) }        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())      stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),        Option(appId.toString)).setCurrentContext()      // Verify whether the cluster has enough resources for our AM      verifyClusterResources(newAppResponse)      // Set up the appropriate contexts to launch our AM      val containerContext = createContainerLaunchContext(newAppResponse)      val appContext = createApplicationSubmissionContext(newApp, containerContext)      // Finally, submit and monitor the application      logInfo(s"Submitting application $appId to ResourceManager")      yarnClient.submitApplication(appContext)      launcherBackend.setAppId(appId.toString)      reportLauncherState(SparkAppHandle.State.SUBMITTED)      ......      ......

代码

try {    launcherBackend.connect()    yarnClient.init(hadoopConf)    yarnClient.start()

示意连贯Yarn胜利,Yarn客户端开启;

// Get a new application from our RM      val newApp = yarnClient.createApplication()      val newAppResponse = newApp.getNewApplicationResponse()      appId = newAppResponse.getApplicationId()

从咱们的 resourcemanager开启一个利用,并且取得响应ID;

// Set up the appropriate contexts to launch our AM//设置适当的上下文以启动咱们的AMval containerContext = createContainerLaunchContext(newAppResponse)val appContext = createApplicationSubmissionContext(newApp, containerContext)

下面代码示意创立容器的启动环境和提交环境;

提交应用程序到ResourceManager

// Finally, submit and monitor the application//最初,提交并监督应用程序logInfo(s"Submitting application $appId to ResourceManager")yarnClient.submitApplication(appContext)launcherBackend.setAppId(appId.toString)reportLauncherState(SparkAppHandle.State.SUBMITTED)


咱们提交应用程序,也就是提交appContext,提交的是

val appContext = createApplicationSubmissionContext(newApp, containerContext)

先创立容器上下文,再筹备利用提交上下文;

val containerContext = createContainerLaunchContext(newAppResponse)

深刻createContainerLaunchContext,

val amClass =      if (isClusterMode) {        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName      } else {        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName      }    if (args.primaryRFile != null &&        (args.primaryRFile.endsWith(".R") || args.primaryRFile.endsWith(".r"))) {      args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs    }    val userArgs = args.userArgs.flatMap { arg =>      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))    }    val amArgs =      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++      Seq("--properties-file",        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++      Seq("--dist-cache-conf",        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))    // Command for the ApplicationMaster    // AppMaster负责启动指令    val commands = prefixEnv ++      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++      javaOpts ++ amArgs ++      Seq(        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")    // TODO: it would be nicer to just make sure there are no null commands here    val printableCommands = commands.map(s => if (s == null) "null" else s).toList    amContainer.setCommands(printableCommands.asJava)

咱们是集群环境,amClass 抉择 "org.apache.spark.deploy.yarn.ApplicationMaster",而后组合成amArgs 启动一个JVM。commands 指令会包装一下,放到容器中,

// send the acl settings into YARN to control who has access via YARN interfaces    val securityManager = new SecurityManager(sparkConf)    amContainer.setApplicationACLs(      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)    setupSecurityToken(amContainer)    amContainer

最初间接返回amContainer 容器。

RM会在NM的容器中启动AM 也就是在 容器中执行 "java org.apache.spark.deploy.yarn.ApplicationMaster",启动一个AM过程。

咱们找到org.apache.spark.deploy.yarn.ApplicationMaster的 main 办法;

def main(args: Array[String]): Unit = {    SignalUtils.registerLogger(log)    val amArgs = new ApplicationMasterArguments(args)    val sparkConf = new SparkConf()    if (amArgs.propertiesFile != null) {      Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>        sparkConf.set(k, v)      }    }    ......

main办法首先会new一个对象

val amArgs = new ApplicationMasterArguments(args)

该对象会把命令行参数进行封装成ApplicationMasterArguments(args) 对象。

 val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))    master = new ApplicationMaster(amArgs, sparkConf, yarnConf)

传入 SparkConf和yarnConf创立AppMaster对象;

深刻ApplicationMaster对象,对象里会创立YarnRMClient()对象

private val client = new YarnRMClient()

持续深刻YarnRMClient;

private[spark] class YarnRMClient extends Logging {  private var amClient: AMRMClient[ContainerRequest] = _  private var uiHistoryAddress: String = _  private var registered: Boolean = false

这个 AMRMClient 是 ApplicationMaster 连贯 ResourceManager的客户端。

持续深刻 org.apache.spark.deploy.yarn.ApplicationMaster 类。

ugi.doAs(new PrivilegedExceptionAction[Unit]() {      override def run(): Unit = System.exit(master.run())    })

这里master会执行run办法,咱们深刻run办法。

......if (isClusterMode) {        runDriver()      } else {        runExecutorLauncher()      }......      

这里有个判断,如果是集群模式,则就运行runDriver(),否则运行runExecutorLauncher(),
咱们是集群模式,执行runDriver();

private def runDriver(): Unit = {    addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))    userClassThread = startUserApplication()

首先会开启用户利用线程,

 val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)    try {      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,        Duration(totalWaitTime, TimeUnit.MILLISECONDS))      if (sc != null) {        val rpcEnv = sc.env.rpcEnv......

而后期待上下文环境对象;
咱们深刻 startUserApplication(),

val mainMethod = userClassLoader.loadClass(args.userClass)      .getMethod("main", classOf[Array[String]])

这里会应用类加载器加载一个类,

case ("--class") :: value :: tail =>          userClass = value          args = tail

userClass 来自传入的参数 "--class",咱们传入参数"--class org.apache.spark.examples.SparkPi"就在这里被用到,从指定的类当中找到"main"办法。

            sparkContextPromise.tryFailure(e.getCause())        } finally {          // Notify the thread waiting for the SparkContext, in case the application did not          // instantiate one. This will do nothing when the user code instantiates a SparkContext          // (with the correct master), or when the user code throws an exception (due to the          // tryFailure above).          sparkContextPromise.trySuccess(null)

初始化SparkContext。

    userThread.setContextClassLoader(userClassLoader)    userThread.setName("Driver")    userThread.start()    userThread

随后启动线程,线程名字为"Driver";

咱们回到 runDriver,启动线程,SparkContext对象初始化胜利;

val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)    try {      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,        Duration(totalWaitTime, TimeUnit.MILLISECONDS))      if (sc != null) {        val rpcEnv = sc.env.rpcEnv        val userConf = sc.getConf        val host = userConf.get(DRIVER_HOST_ADDRESS)        val port = userConf.get(DRIVER_PORT)        //注册 ApplicationMaster        //注册的目标是跟RM连贯,申请资源        registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)        val driverRef = rpcEnv.setupEndpointRef(          RpcAddress(host, port),          YarnSchedulerBackend.ENDPOINT_NAME)        createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

createAllocator创立分配器,咱们深刻代码,

 allocator = client.createAllocator(      yarnConf,      _sparkConf,      appAttemptId,      driverUrl,      driverRef,      securityMgr,      localResources)    // Initialize the AM endpoint *after* the allocator has been initialized. This ensures    // that when the driver sends an initial executor request (e.g. after an AM restart),    // the allocator is ready to service requests.    rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))    allocator.allocateResources()

代码创立分配器,并且取得可分配资源。

深刻allocateResources,

val allocateResponse = amClient.allocate(progressIndicator)val allocatedContainers = allocateResponse.getAllocatedContainers()

示意拿到可调配的容器;

if (allocatedContainers.size > 0) {      logDebug(("Allocated containers: %d. Current executor count: %d. " +        "Launching executor count: %d. Cluster resources: %s.")        .format(          allocatedContainers.size,          runningExecutors.size,          numExecutorsStarting.get,          allocateResponse.getAvailableResources))      handleAllocatedContainers(allocatedContainers.asScala)    }

如果容器容量大于0,就能够调配容器;深刻handleAllocatedContainers;

def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)    // Match incoming requests by host    val remainingAfterHostMatches = new ArrayBuffer[Container]    for (allocatedContainer <- allocatedContainers) {      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,        containersToUse, remainingAfterHostMatches)    }    // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts    // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use    // a separate thread to perform the operation.    val remainingAfterRackMatches = new ArrayBuffer[Container]    if (remainingAfterHostMatches.nonEmpty) {      var exception: Option[Throwable] = None      val thread = new Thread("spark-rack-resolver") {        override def run(): Unit = {          try {            for (allocatedContainer <- remainingAfterHostMatches) {              val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)              matchContainerToRequest(allocatedContainer, rack, containersToUse,                remainingAfterRackMatches)            }          } catch {            case e: Throwable =>              exception = Some(e)          }        }      }      thread.setDaemon(true)      thread.start()      try {        thread.join()      } catch {        case e: InterruptedException =>          thread.interrupt()          throw e      }      if (exception.isDefined) {        throw exception.get      }    }    // Assign remaining that are neither node-local nor rack-local    val remainingAfterOffRackMatches = new ArrayBuffer[Container]    for (allocatedContainer <- remainingAfterRackMatches) {      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,        remainingAfterOffRackMatches)    }    if (remainingAfterOffRackMatches.nonEmpty) {      logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +        s"allocated to us")      for (container <- remainingAfterOffRackMatches) {        internalReleaseContainer(container)      }    }    runAllocatedContainers(containersToUse)

代码依据主机信息,机架信息来解决可调配信息。
解决完之后,执行

runAllocatedContainers(containersToUse)

深刻runAllocatedContainers

private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {    for (container <- containersToUse) {      executorIdCounter += 1      val executorHostname = container.getNodeId.getHost      val containerId = container.getId      val executorId = executorIdCounter.toString      assert(container.getResource.getMemory >= resource.getMemory)      logInfo(s"Launching container $containerId on host $executorHostname " +        s"for executor with ID $executorId")      def updateInternalState(): Unit = synchronized {        runningExecutors.add(executorId)        numExecutorsStarting.decrementAndGet()        executorIdToContainer(executorId) = container        containerIdToExecutorId(container.getId) = executorId        val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,          new HashSet[ContainerId])        containerSet += containerId        allocatedContainerToHostMap.put(containerId, executorHostname)      }      if (runningExecutors.size() < targetNumExecutors) {        numExecutorsStarting.incrementAndGet()        if (launchContainers) {          launcherPool.execute(() => {            try {              new ExecutorRunnable(                Some(container),                conf,                sparkConf,                driverUrl,                executorId,                executorHostname,                executorMemory,                executorCores,                appAttemptId.getApplicationId.toString,                securityMgr,                localResources,                ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported              ).run()

代码会遍历可用的容器,判断正在运行的容器大小 ,是否小于指标容器大小。如果小于,则就创立容器。
launcherPool是线程池。线程池启动execute,咱们点开run办法。

def run(): Unit = {    logDebug("Starting Executor Container")    nmClient = NMClient.createNMClient()    nmClient.init(conf)    nmClient.start()    startContainer()  }

执行run的时候会创立nmClient,而后初始化,建设连贯,最初启动容器。
深刻startContainer;

val commands = prepareCommand()ctx.setCommands(commands.asJava)// Send the start request to the ContainerManager    try {      nmClient.startContainer(container.get, ctx)    } catch {      case ex: Exception =>        throw new SparkException(s"Exception while starting container ${container.get.getId}" +          s" on host $hostname", ex)    }

首先筹备commands,上下文,而后让一个NM启动Container;
深刻prepareCommand(),

val commands = prefixEnv ++      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++      javaOpts ++      Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",        "--driver-url", masterAddress,        "--executor-id", executorId,        "--hostname", hostname,        "--cores", executorCores.toString,        "--app-id", appId,        "--resourceProfileId", resourceProfileId.toString) ++      userClassPath ++      Seq(        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

代码启动一个过程YarnCoarseGrainedExecutorBackend,说到启动Executor。=,就是启动
"org.apache.spark.executor.YarnCoarseGrainedExecutorBackend";

深刻 YarnCoarseGrainedExecutorBackend,

def main(args: Array[String]): Unit = {    val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>      new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,        arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,        arguments.resourcesFileOpt, resourceProfile)    }    val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,      this.getClass.getCanonicalName.stripSuffix("$"))    CoarseGrainedExecutorBackend.run(backendArgs, createFn)    System.exit(0)  }

创立YarnCoarseGrainedExecutorBackend对象,而后执行run办法;

 var driver: RpcEndpointRef = null      val nTries = 3      for (i <- 0 until nTries if driver == null) {        try {          driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)        } catch {          case e: Throwable => if (i == nTries - 1) {            throw e          }        }      }

run办法外部连贯了driver,

 val driverConf = new SparkConf()      for ((key, value) <- props) {        // this is required for SSL in standalone mode        if (SparkConf.isExecutorStartupConf(key)) {          driverConf.setIfMissing(key, value)        } else {          driverConf.set(key, value)        }      }      cfg.hadoopDelegationCreds.foreach { tokens =>        SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)      }      driverConf.set(EXECUTOR_ID, arguments.executorId)      val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,        arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

代码创立了执行环境 env ,

env.rpcEnv.setupEndpoint("Executor",        backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))      arguments.workerUrl.foreach { url =>        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))      }      env.rpcEnv.awaitTermination()

代码的意思是,在整个环境中装置一个通信终端。

深刻setupEndpoint,

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {    dispatcher.registerRpcEndpoint(name, endpoint)  }

这里注册了RPC的通信终端,深刻registerRpcEndpoint,

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {    val addr = RpcEndpointAddress(nettyEnv.address, name)    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)    synchronized {      if (stopped) {        throw new IllegalStateException("RpcEnv has been stopped")      }      if (endpoints.containsKey(name)) {        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")      }      // This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be      // active when registering, and endpointRef must be put into endpointRefs before onStart is      // called.      endpointRefs.put(endpoint, endpointRef)      var messageLoop: MessageLoop = null      try {        messageLoop = endpoint match {          case e: IsolatedRpcEndpoint =>            new DedicatedMessageLoop(name, e, this)          case _ =>            sharedLoop.register(name, endpoint)            sharedLoop        }        endpoints.put(name, messageLoop)      } catch {        case NonFatal(e) =>          endpointRefs.remove(endpoint)          throw e      }    }    endpointRef

代码会定义一个音讯循环器,进行模式匹配,匹配胜利会创立DedicatedMessageLoop(name, e, this)对象。

深刻DedicatedMessageLoop,

private class DedicatedMessageLoop(    name: String,    endpoint: IsolatedRpcEndpoint,    dispatcher: Dispatcher)  extends MessageLoop(dispatcher) {  private val inbox = new Inbox(name, endpoint)  override protected val threadpool = if (endpoint.threadCount() > 1) {    ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())  } else {    ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")  }

外部有创立Inbox和线程池threadpool,

inbox是收件箱的意思,

private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)  extends Logging {  inbox =>  // Give this an alias so we can use it more clearly in closures.  @GuardedBy("this")  protected val messages = new java.util.LinkedList[InboxMessage]()  /** True if the inbox (and its associated endpoint) is stopped. */  @GuardedBy("this")  private var stopped = false  /** Allow multiple threads to process messages at the same time. */  @GuardedBy("this")  private var enableConcurrent = false  /** The number of threads processing messages for this inbox. */  @GuardedBy("this")  private var numActiveThreads = 0  // OnStart should be the first message to process  inbox.synchronized {    messages.add(OnStart)  }

通信环境有一个生命周期:

constructor -> onStart -> receive* -> onStop

咱们以后节点有个收件箱,收件箱会发消息给本人,音讯就叫OnStart。
当音讯发送后,CoarseGrainedExecutorBackend会收到音讯,会执行上面的OnStart代码,

override def onStart(): Unit = {    logInfo("Connecting to driver: " + driverUrl)    try {      _resources = parseOrFindResources(resourcesFileOpt)    } catch {      case NonFatal(e) =>        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)    }    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>      // This is a very fast action so we can use "ThreadUtils.sameThread"      driver = Some(ref)      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,        extractAttributes, _resources, resourceProfile.id))    }(ThreadUtils.sameThread).onComplete {      case Success(_) =>        self.send(RegisteredExecutor)      case Failure(e) =>        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)    }(ThreadUtils.sameThread)  }

代码连贯driver,并且发送申请(ask),申请注册executor(RegisterExecutor)(向AppMaster注册)。注册executor会被ApplicationMaster的Driver的SparkContext收到。

SparkContext有个SchedulerBackend属性,

private var _schedulerBackend: SchedulerBackend = _
private[spark] trait SchedulerBackend {  private val appId = "spark-application-" + System.currentTimeMillis  def start(): Unit  def stop(): Unit  def reviveOffers(): Unit  def defaultParallelism(): Int

关上CoarseGrainedSchedulerBackend,这是一个通信终端。

onStart(),

override def onStart(): Unit = {      // Periodically revive offers to allow delay scheduling to work      val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)      reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {        Option(self).foreach(_.send(ReviveOffers))      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)    }

receive(),

override def receive: PartialFunction[Any, Unit] = {      case StatusUpdate(executorId, taskId, state, data, resources) =>        scheduler.statusUpdate(taskId, state, data.value)        if (TaskState.isFinished(state)) {          executorDataMap.get(executorId) match {            case Some(executorInfo) =>              executorInfo.freeCores += scheduler.CPUS_PER_TASK              resources.foreach { case (k, v) =>                executorInfo.resourcesInfo.get(k).foreach { r =>                  r.release(v.addresses)                }              }              makeOffers(executorId)            case None =>              // Ignoring the update since we don't know about the executor.              logWarning(s"Ignored task status update ($taskId state $state) " +                s"from unknown executor with ID $executorId")          }        }      case ReviveOffers =>        makeOffers()        ......

receiveAndReply,

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,          attributes, resources, resourceProfileId) =>        if (executorDataMap.contains(executorId)) {          context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))        } else if (scheduler.nodeBlacklist.contains(hostname) ||            isBlacklisted(executorId, hostname)) {          // If the cluster manager gives us an executor on a blacklisted node (because it          // already started allocating those resources before we informed it of our blacklist,          // or if it ignored our blacklist), then we reject that executor immediately.          logInfo(s"Rejecting $executorId as it has been blacklisted.")          context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))        } else {......

在代码的 case RegisterExecutor处收到申请;

logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")          addressToExecutorId(executorAddress) = executorId          totalCoreCount.addAndGet(cores)          totalRegisteredExecutors.addAndGet(1)          val resourcesInfo = resources.map{ case (k, v) =>            (v.name,             new ExecutorResourceInfo(v.name, v.addresses,               // tell the executor it can schedule resources up to numParts times,               // as configured by the user, or set to 1 as that is the default (1 task/resource)               taskResourceNumParts.getOrElse(v.name, 1)))

收到音讯后,会在以后环境中做一次减少,totalRegisteredExecutors加1个;

 listenerBus.post(            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))          // Note: some tests expect the reply to come after we put the executor in the map          context.reply(true)        }

这里返回音讯True,注册胜利;
胜利之后在CoarseGrainedExecutorBackend中,给本人发消息示意注册胜利,

 }(ThreadUtils.sameThread).onComplete {      case Success(_) =>        self.send(RegisteredExecutor)      case Failure(e) =>        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)    }(ThreadUtils.sameThread)  }

本人给本人发消息"self.send(RegisteredExecutor)",本人会接管到,

override def receive: PartialFunction[Any, Unit] = {    case RegisteredExecutor =>      logInfo("Successfully registered with driver")      try {        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,          resources = _resources)        driver.get.send(LaunchedExecutor(executorId))      } catch {        case NonFatal(e) =>          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)      }

这个对象就是货真价实的executor

executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)

启动胜利driver后会发送音讯"LaunchedExecutor"

driver.get.send(LaunchedExecutor(executorId))

CoarseGrainedSchedulerBackend会接管到,

 case LaunchedExecutor(executorId) =>        executorDataMap.get(executorId).foreach { data =>          data.freeCores = data.totalCores        }        makeOffers(executorId)


到这里,就胜利了。