乐趣区

关于spark:Spark30源码解读环境准备源码Yarn集群

Spark 提交一个计算是调用 spark-submit。
spark-submit 调用的是 bin 目录下的 spark-submit 脚本,咱们关上 spark-submit 脚本;

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

能够看到 spark-submit 执行的是 bin/spark-class 文件。

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

咱们加一行打印命令:

......
CMD=("${CMD[@]:0:$LAST}")
echo "${CMD[@]}"
exec "${CMD[@]}"

咱们提交一个计算:

 spark-submit \
--class org.apache.spark.examples.SparkPi\
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--total-executor-cores 2 \
/home/hadoop/examples/jars/spark-examples_2.12-3.0.0.jar

打印进去的代码:

java -cp
......
-Xmx512m org.apache.spark.deploy.SparkSubmit 
--master yarn --deploy-mode client 
--conf spark.driver.memory=512m 
--class MySpark --executor-memory 512m 
--total-executor-cores 2 / 
/home/hadoop/examples/jars/spark-examples_2.12-3.0.0.jar

java 会启动一个 SparkSubmit 过程,执行 SparkSubmit main 办法。

override def main(args: Array[String]): Unit = {val submit = new SparkSubmit() {
      self =>

      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {new SparkSubmitArguments(args) {override protected def logInfo(msg: => String): Unit = self.logInfo(msg)

          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)

          override protected def logError(msg: => String): Unit = self.logError(msg)
        }
      }

      override protected def logInfo(msg: => String): Unit = printMessage(msg)

      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")

      override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")

      override def doSubmit(args: Array[String]): Unit = {
        try {super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
        }
      }

    }

    submit.doSubmit(args)
  }

main 办法会创立 SparkSubmit 对象,并且执行 doSubmit 办法;

def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    val appArgs = parseArguments(args)
    if (appArgs.verbose) {logInfo(appArgs.toString)
    }
    appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()}
  }

doSubmit 办法首先进行参数解析;

val appArgs = parseArguments(args)
protected def parseArguments(args: Array[String]): SparkSubmitArguments = {new SparkSubmitArguments(args)
  }

关上 SparkSubmitArguments 类;

// Set parameters from command line arguments
  parse(args.asJava)

首先会从命令行参数里解析参数,应用正则表达式;

Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

具体 handle 办法如下:

override protected def handle(opt: String, value: String): Boolean = {
    opt match {
      case NAME =>
        name = value

      case MASTER =>
        master = value

      case CLASS =>
        mainClass = value

      case DEPLOY_MODE =>
        if (value != "client" && value != "cluster") {error("--deploy-mode must be either \"client\"or \"cluster\"")
        }
        deployMode = value

      case NUM_EXECUTORS =>
        numExecutors = value

      case TOTAL_EXECUTOR_CORES =>
        totalExecutorCores = value

      case EXECUTOR_CORES =>
        executorCores = value

      case EXECUTOR_MEMORY =>
        executorMemory = value

      case DRIVER_MEMORY =>
        driverMemory = value

      case DRIVER_CORES =>
        driverCores = value
   ......

代码

case MASTER =>
        master = value

对应

protected final String MASTER = "--master";
// 命令行代码
--master yarn 

代码

case CLASS =>
        mainClass = value

对应

protected final String CLASS = "--class";
// 命令行代码
--class org.apache.spark.examples.SparkPi 

代码

case DEPLOY_MODE =>
  if (value != "client" && value != "cluster") {error("--deploy-mode must be either \"client\"or \"cluster\"")
  }
  deployMode = value

对应

protected final String DEPLOY_MODE = "--deploy-mode";
// 命令行代码
--deploy-mode client 

代码

case DRIVER_MEMORY =>
  driverMemory = value

对应

protected final String DRIVER_MEMORY = "--driver-memory";
// 命令行代码
--driver-memory 512m

代码

case EXECUTOR_MEMORY =>
--executorMemory = value

对应

protected final String EXECUTOR_MEMORY = "--executor-memory";
// 命令行代码
--executor-memory 512m 

代码

case TOTAL_EXECUTOR_CORES =>
  totalExecutorCores = value

对应

protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
// 命令行代码
--total-executor-cores 2 

这样,咱们就把 spark-submit 提交的参数全副解析进去了。

SparkSubmitArguments 类有个参数叫 action

var action: SparkSubmitAction = null

默认值是 null,默认被赋值为 ”SUBMIT”

appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()}

深刻 submit 办法,

private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {def doRunMain(): Unit = {if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {override def run(): Unit = {runMain(args, uninitLog)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
            } else {throw e}
        }
      } else {runMain(args, uninitLog)
      }
    }

    // In standalone cluster mode, there are two submission gateways:
    //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
    //   (2) The new REST-based gateway introduced in Spark 1.3
    // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
    // to use the legacy gateway if the master endpoint turns out to be not a REST server.
    if (args.isStandaloneCluster && args.useRest) {
      try {logInfo("Running Spark using the REST application submission protocol.")
        doRunMain()} catch {
        // Fail over to use the legacy submission gateway
        case e: SubmitRestConnectionException =>
          logWarning(s"Master endpoint ${args.master} was not a REST server." +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args, false)
      }
    // In all other modes, just run the main class as prepared
    } else {doRunMain()
    }
  }

submit 办法首先会判断是不是 standalone 集群,咱们提交的是 yarn 集群,那会执行 doRunMain()办法。doRunMain 办法有个判断是否应用代理用户,否则执行主程序。

深刻 runMain(),
runMain 办法有个很重要的代码,筹备提交环境;

 val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

类加载器,

val loader = getSubmitClassLoader(sparkConf)

通过反射找出类的信息,

mainClass = Utils.classForName(childMainClass)

mainClass 有没有继承 SparkApplication,如果有,则通过结构器间接创立实例
如果没有,间接 new 一个对象,new JavaMainApplication(mainClass)

val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {new JavaMainApplication(mainClass)
}

调用 SparkApplication 的 start 办法。

try {app.start(childArgs.toArray, sparkConf)
} catch {
  case t: Throwable =>
    throw findCause(t)
}

咱们深刻 prepareSubmitEnvironment 办法,

办法定义了四个变量,

// Return values
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sparkConf = args.toSparkConf()
    var childMainClass = ""

其中 childMainClass 是可变的,咱们找到给 childMainClass 赋值的中央,

 if (isYarnCluster) {childMainClass = YARN_CLUSTER_SUBMIT_CLASS
 private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.yarn.YarnClusterApplication"

在 Yarn 框架下,childMainClass 赋值 “org.apache.spark.deploy.yarn.YarnClusterApplication”

private[spark] class YarnClusterApplication extends SparkApplication {override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove(JARS)
    conf.remove(FILES)

    new Client(new ClientArguments(args), conf, null).run()}
}  

深刻 Client

private[spark] class Client(
    val args: ClientArguments,
    val sparkConf: SparkConf,
    val rpcEnv: RpcEnv)
  extends Logging {

  import Client._
  import YarnSparkHadoopUtil._

  private val yarnClient = YarnClient.createYarnClient
  private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

  private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"

  private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode
  private var appMaster: ApplicationMaster = _
  private var stagingDirPath: Path = _

Client 会创立 Yarn 的客户端,

private val yarnClient = YarnClient.createYarnClient


深刻 YarnClient.createYarnClient,

public static YarnClient createYarnClient() {YarnClient client = new YarnClientImpl();
    return client;
}

createYarnClient 创立一个对象 YarnClientImpl(),深刻 YarnClientImpl(),


public class YarnClientImpl extends YarnClient {private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
    protected ApplicationClientProtocol rmClient;
....

rmClient 就是 resourcemanager 的客户端。

咱们回到 YarnClusterApplication,进入 run 办法,

def run(): Unit = {this.appId = submitApplication()
......

submitApplication()办法就是提交应用程序,返回 appId,appId 是全局 yarn 的利用 id,后续的状态、报告等都能够通过 appId 失去。

深刻 submitApplication(),

def submitApplication(): ApplicationId = {ResourceRequestHelper.validateResources(sparkConf)

    var appId: ApplicationId = null
    try {launcherBackend.connect()
      yarnClient.init(hadoopConf)
      yarnClient.start()

      logInfo("Requesting a new application from cluster with %d NodeManagers"
        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

      // Get a new application from our RM
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()

      // The app staging dir based on the STAGING_DIR configuration if configured
      // otherwise based on the users home directory.
      val appStagingBaseDir = sparkConf.get(STAGING_DIR)
        .map {new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) }
        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
      stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))

      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
        Option(appId.toString)).setCurrentContext()

      // Verify whether the cluster has enough resources for our AM
      verifyClusterResources(newAppResponse)

      // Set up the appropriate contexts to launch our AM
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // Finally, submit and monitor the application
      logInfo(s"Submitting application $appId to ResourceManager")
      yarnClient.submitApplication(appContext)
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)
      ......
      ......

代码

try {launcherBackend.connect()
    yarnClient.init(hadoopConf)
    yarnClient.start()

示意连贯 Yarn 胜利,Yarn 客户端开启;

// Get a new application from our RM
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()

从咱们的 resourcemanager 开启一个利用,并且取得响应 ID;

// Set up the appropriate contexts to launch our AM
// 设置适当的上下文以启动咱们的 AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

下面代码示意创立容器的启动环境和提交环境;

提交应用程序到 ResourceManager

// Finally, submit and monitor the application
// 最初,提交并监督应用程序
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)


咱们提交应用程序,也就是提交 appContext,提交的是

val appContext = createApplicationSubmissionContext(newApp, containerContext)

先创立容器上下文,再筹备利用提交上下文;

val containerContext = createContainerLaunchContext(newAppResponse)

深刻 createContainerLaunchContext,

val amClass =
      if (isClusterMode) {Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }
    if (args.primaryRFile != null &&
        (args.primaryRFile.endsWith(".R") || args.primaryRFile.endsWith(".r"))) {args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
    }
    val userArgs = args.userArgs.flatMap { arg =>
      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
    }
    val amArgs =
      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
      Seq("--properties-file",
        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
      Seq("--dist-cache-conf",
        buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))

    // Command for the ApplicationMaster
    // AppMaster 负责启动指令
    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

    // TODO: it would be nicer to just make sure there are no null commands here
    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
    amContainer.setCommands(printableCommands.asJava)

咱们是集群环境,amClass 抉择 “org.apache.spark.deploy.yarn.ApplicationMaster”,而后组合成 amArgs 启动一个 JVM。commands 指令会包装一下,放到容器中,

// send the acl settings into YARN to control who has access via YARN interfaces
    val securityManager = new SecurityManager(sparkConf)
    amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
    setupSecurityToken(amContainer)
    amContainer

最初间接返回 amContainer 容器。

RM 会在 NM 的容器中启动 AM 也就是在 容器中执行 “java org.apache.spark.deploy.yarn.ApplicationMaster”,启动一个 AM 过程。

咱们找到 org.apache.spark.deploy.yarn.ApplicationMaster 的 main 办法;

def main(args: Array[String]): Unit = {SignalUtils.registerLogger(log)
    val amArgs = new ApplicationMasterArguments(args)
    val sparkConf = new SparkConf()
    if (amArgs.propertiesFile != null) {Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach {case (k, v) =>
        sparkConf.set(k, v)
      }
    }
    ......

main 办法首先会 new 一个对象

val amArgs = new ApplicationMasterArguments(args)

该对象会把命令行参数进行封装成 ApplicationMasterArguments(args) 对象。

 val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
    master = new ApplicationMaster(amArgs, sparkConf, yarnConf)

传入 SparkConf 和 yarnConf 创立 AppMaster 对象;

深刻 ApplicationMaster 对象,对象里会创立 YarnRMClient()对象

private val client = new YarnRMClient()

持续深刻 YarnRMClient;

private[spark] class YarnRMClient extends Logging {private var amClient: AMRMClient[ContainerRequest] = _
  private var uiHistoryAddress: String = _
  private var registered: Boolean = false

这个 AMRMClient 是 ApplicationMaster 连贯 ResourceManager 的客户端。

持续深刻 org.apache.spark.deploy.yarn.ApplicationMaster 类。

ugi.doAs(new PrivilegedExceptionAction[Unit]() {override def run(): Unit = System.exit(master.run())
    })

这里 master 会执行 run 办法,咱们深刻 run 办法。

......
if (isClusterMode) {runDriver()
      } else {runExecutorLauncher()
      }
......      

这里有个判断,如果是集群模式,则就运行 runDriver(),否则运行 runExecutorLauncher(),
咱们是集群模式,执行 runDriver();

private def runDriver(): Unit = {addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
    userClassThread = startUserApplication()

首先会开启用户利用线程,


 val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
    try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        val rpcEnv = sc.env.rpcEnv
......

而后期待上下文环境对象;
咱们深刻 startUserApplication(),

val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])

这里会应用类加载器加载一个类,

case ("--class") :: value :: tail =>
          userClass = value
          args = tail

userClass 来自传入的参数 “–class”,咱们传入参数 ”–class org.apache.spark.examples.SparkPi” 就在这里被用到,从指定的类当中找到 ”main” 办法。

            sparkContextPromise.tryFailure(e.getCause())
        } finally {
          // Notify the thread waiting for the SparkContext, in case the application did not
          // instantiate one. This will do nothing when the user code instantiates a SparkContext
          // (with the correct master), or when the user code throws an exception (due to the
          // tryFailure above).
          sparkContextPromise.trySuccess(null)

初始化 SparkContext。

    userThread.setContextClassLoader(userClassLoader)
    userThread.setName("Driver")
    userThread.start()
    userThread

随后启动线程,线程名字为 ”Driver”;

咱们回到 runDriver,启动线程,SparkContext 对象初始化胜利;

val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
    try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        val rpcEnv = sc.env.rpcEnv

        val userConf = sc.getConf
        val host = userConf.get(DRIVER_HOST_ADDRESS)
        val port = userConf.get(DRIVER_PORT)
        // 注册 ApplicationMaster
        // 注册的目标是跟 RM 连贯,申请资源
        registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

        val driverRef = rpcEnv.setupEndpointRef(RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)
        createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

createAllocator 创立分配器,咱们深刻代码,

 allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      appAttemptId,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)

    // Initialize the AM endpoint *after* the allocator has been initialized. This ensures
    // that when the driver sends an initial executor request (e.g. after an AM restart),
    // the allocator is ready to service requests.
    rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

    allocator.allocateResources()

代码创立分配器,并且取得可分配资源。

深刻 allocateResources,

val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()

示意拿到可调配的容器;

if (allocatedContainers.size > 0) {
      logDebug(("Allocated containers: %d. Current executor count: %d." +
        "Launching executor count: %d. Cluster resources: %s.")
        .format(
          allocatedContainers.size,
          runningExecutors.size,
          numExecutorsStarting.get,
          allocateResponse.getAvailableResources))

      handleAllocatedContainers(allocatedContainers.asScala)
    }

如果容器容量大于 0,就能够调配容器;深刻 handleAllocatedContainers;

def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)

    // Match incoming requests by host
    val remainingAfterHostMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- allocatedContainers) {
      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
        containersToUse, remainingAfterHostMatches)
    }

    // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
    // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
    // a separate thread to perform the operation.
    val remainingAfterRackMatches = new ArrayBuffer[Container]
    if (remainingAfterHostMatches.nonEmpty) {var exception: Option[Throwable] = None
      val thread = new Thread("spark-rack-resolver") {override def run(): Unit = {
          try {for (allocatedContainer <- remainingAfterHostMatches) {val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
              matchContainerToRequest(allocatedContainer, rack, containersToUse,
                remainingAfterRackMatches)
            }
          } catch {
            case e: Throwable =>
              exception = Some(e)
          }
        }
      }
      thread.setDaemon(true)
      thread.start()

      try {thread.join()
      } catch {
        case e: InterruptedException =>
          thread.interrupt()
          throw e
      }

      if (exception.isDefined) {throw exception.get}
    }

    // Assign remaining that are neither node-local nor rack-local
    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- remainingAfterRackMatches) {
      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
        remainingAfterOffRackMatches)
    }

    if (remainingAfterOffRackMatches.nonEmpty) {logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were" +
        s"allocated to us")
      for (container <- remainingAfterOffRackMatches) {internalReleaseContainer(container)
      }
    }

    runAllocatedContainers(containersToUse)

代码依据主机信息,机架信息来解决可调配信息。
解决完之后,执行

runAllocatedContainers(containersToUse)

深刻 runAllocatedContainers

private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {for (container <- containersToUse) {
      executorIdCounter += 1
      val executorHostname = container.getNodeId.getHost
      val containerId = container.getId
      val executorId = executorIdCounter.toString
      assert(container.getResource.getMemory >= resource.getMemory)
      logInfo(s"Launching container $containerId on host $executorHostname" +
        s"for executor with ID $executorId")

      def updateInternalState(): Unit = synchronized {runningExecutors.add(executorId)
        numExecutorsStarting.decrementAndGet()
        executorIdToContainer(executorId) = container
        containerIdToExecutorId(container.getId) = executorId

        val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
          new HashSet[ContainerId])
        containerSet += containerId
        allocatedContainerToHostMap.put(containerId, executorHostname)
      }

      if (runningExecutors.size() < targetNumExecutors) {numExecutorsStarting.incrementAndGet()
        if (launchContainers) {launcherPool.execute(() => {
            try {
              new ExecutorRunnable(Some(container),
                conf,
                sparkConf,
                driverUrl,
                executorId,
                executorHostname,
                executorMemory,
                executorCores,
                appAttemptId.getApplicationId.toString,
                securityMgr,
                localResources,
                ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported
              ).run()

代码会遍历可用的容器, 判断正在运行的容器大小,是否小于指标容器大小。如果小于,则就创立容器。
launcherPool 是线程池。线程池启动 execute, 咱们点开 run 办法。

def run(): Unit = {logDebug("Starting Executor Container")
    nmClient = NMClient.createNMClient()
    nmClient.init(conf)
    nmClient.start()
    startContainer()}

执行 run 的时候会创立 nmClient,而后初始化,建设连贯,最初启动容器。
深刻 startContainer;

val commands = prepareCommand()
ctx.setCommands(commands.asJava)

// Send the start request to the ContainerManager
    try {nmClient.startContainer(container.get, ctx)
    } catch {
      case ex: Exception =>
        throw new SparkException(s"Exception while starting container ${container.get.getId}" +
          s"on host $hostname", ex)
    }

首先筹备 commands,上下文,而后让一个 NM 启动 Container;
深刻 prepareCommand(),

val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++
      Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
        "--driver-url", masterAddress,
        "--executor-id", executorId,
        "--hostname", hostname,
        "--cores", executorCores.toString,
        "--app-id", appId,
        "--resourceProfileId", resourceProfileId.toString) ++
      userClassPath ++
      Seq(s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

代码启动一个过程 YarnCoarseGrainedExecutorBackend, 说到启动 Executor。=,就是启动
“org.apache.spark.executor.YarnCoarseGrainedExecutorBackend”;

深刻 YarnCoarseGrainedExecutorBackend,

def main(args: Array[String]): Unit = {val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
      CoarseGrainedExecutorBackend = {case (rpcEnv, arguments, env, resourceProfile) =>
      new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
        arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
        arguments.resourcesFileOpt, resourceProfile)
    }
    val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
      this.getClass.getCanonicalName.stripSuffix("$"))
    CoarseGrainedExecutorBackend.run(backendArgs, createFn)
    System.exit(0)
  }

创立 YarnCoarseGrainedExecutorBackend 对象,而后执行 run 办法;

 var driver: RpcEndpointRef = null
      val nTries = 3
      for (i <- 0 until nTries if driver == null) {
        try {driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
        } catch {case e: Throwable => if (i == nTries - 1) {throw e}
        }
      }

run 办法外部连贯了 driver,

 val driverConf = new SparkConf()
      for ((key, value) <- props) {
        // this is required for SSL in standalone mode
        if (SparkConf.isExecutorStartupConf(key)) {driverConf.setIfMissing(key, value)
        } else {driverConf.set(key, value)
        }
      }

      cfg.hadoopDelegationCreds.foreach { tokens =>
        SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
      }

      driverConf.set(EXECUTOR_ID, arguments.executorId)
      val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
        arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

代码创立了执行环境 env,

env.rpcEnv.setupEndpoint("Executor",
        backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
      arguments.workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()

代码的意思是,在整个环境中装置一个通信终端。

深刻 setupEndpoint,

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {dispatcher.registerRpcEndpoint(name, endpoint)
  }

这里注册了 RPC 的通信终端,深刻 registerRpcEndpoint,

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {if (stopped) {throw new IllegalStateException("RpcEnv has been stopped")
      }
      if (endpoints.containsKey(name)) {throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }

      // This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
      // active when registering, and endpointRef must be put into endpointRefs before onStart is
      // called.
      endpointRefs.put(endpoint, endpointRef)

      var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
            new DedicatedMessageLoop(name, e, this)
          case _ =>
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {case NonFatal(e) =>
          endpointRefs.remove(endpoint)
          throw e
      }
    }
    endpointRef

代码会定义一个音讯循环器,进行模式匹配,匹配胜利会创立 DedicatedMessageLoop(name, e, this)对象。

深刻 DedicatedMessageLoop,

private class DedicatedMessageLoop(
    name: String,
    endpoint: IsolatedRpcEndpoint,
    dispatcher: Dispatcher)
  extends MessageLoop(dispatcher) {private val inbox = new Inbox(name, endpoint)

  override protected val threadpool = if (endpoint.threadCount() > 1) {ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
  } else {ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
  }

外部有创立 Inbox 和线程池 threadpool,

inbox 是收件箱的意思,

private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
  extends Logging {

  inbox =>  // Give this an alias so we can use it more clearly in closures.

  @GuardedBy("this")
  protected val messages = new java.util.LinkedList[InboxMessage]()

  /** True if the inbox (and its associated endpoint) is stopped. */
  @GuardedBy("this")
  private var stopped = false

  /** Allow multiple threads to process messages at the same time. */
  @GuardedBy("this")
  private var enableConcurrent = false

  /** The number of threads processing messages for this inbox. */
  @GuardedBy("this")
  private var numActiveThreads = 0

  // OnStart should be the first message to process
  inbox.synchronized {messages.add(OnStart)
  }

通信环境有一个生命周期:

constructor -> onStart -> receive* -> onStop

咱们以后节点有个收件箱,收件箱会发消息给本人,音讯就叫 OnStart。
当音讯发送后,CoarseGrainedExecutorBackend 会收到音讯,会执行上面的 OnStart 代码,

override def onStart(): Unit = {logInfo("Connecting to driver:" + driverUrl)
    try {_resources = parseOrFindResources(resourcesFileOpt)
    } catch {case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to" + e.getMessage, e)
    }
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
        extractAttributes, _resources, resourceProfile.id))
    }(ThreadUtils.sameThread).onComplete {case Success(_) =>
        self.send(RegisteredExecutor)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

代码连贯 driver,并且发送申请(ask),申请注册 executor(RegisterExecutor)(向 AppMaster 注册)。注册 executor 会被 ApplicationMaster 的 Driver 的 SparkContext 收到。

SparkContext 有个 SchedulerBackend 属性,

private var _schedulerBackend: SchedulerBackend = _
private[spark] trait SchedulerBackend {
  private val appId = "spark-application-" + System.currentTimeMillis

  def start(): Unit
  def stop(): Unit
  def reviveOffers(): Unit
  def defaultParallelism(): Int

关上 CoarseGrainedSchedulerBackend,这是一个通信终端。

onStart(),

override def onStart(): Unit = {
      // Periodically revive offers to allow delay scheduling to work
      val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)

      reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReviveOffers))
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }

receive(),

override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data, resources) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              resources.foreach {case (k, v) =>
                executorInfo.resourcesInfo.get(k).foreach { r =>
                  r.release(v.addresses)
                }
              }
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state)" +
                s"from unknown executor with ID $executorId")
          }
        }

      case ReviveOffers =>
        makeOffers()
        ......

receiveAndReply,

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
          attributes, resources, resourceProfileId) =>
        if (executorDataMap.contains(executorId)) {context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId"))
        } else if (scheduler.nodeBlacklist.contains(hostname) ||
            isBlacklisted(executorId, hostname)) {
          // If the cluster manager gives us an executor on a blacklisted node (because it
          // already started allocating those resources before we informed it of our blacklist,
          // or if it ignored our blacklist), then we reject that executor immediately.
          logInfo(s"Rejecting $executorId as it has been blacklisted.")
          context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
        } else {......

在代码的 case RegisterExecutor 处收到申请;

logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
          addressToExecutorId(executorAddress) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val resourcesInfo = resources.map{case (k, v) =>
            (v.name,
             new ExecutorResourceInfo(v.name, v.addresses,
               // tell the executor it can schedule resources up to numParts times,
               // as configured by the user, or set to 1 as that is the default (1 task/resource)
               taskResourceNumParts.getOrElse(v.name, 1)))

收到音讯后,会在以后环境中做一次减少,totalRegisteredExecutors 加 1 个;

 listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          // Note: some tests expect the reply to come after we put the executor in the map
          context.reply(true)
        }

这里返回音讯 True,注册胜利;
胜利之后在 CoarseGrainedExecutorBackend 中,给本人发消息示意注册胜利,


 }(ThreadUtils.sameThread).onComplete {case Success(_) =>
        self.send(RegisteredExecutor)
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

本人给本人发消息 ”self.send(RegisteredExecutor)”,本人会接管到,

override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
          resources = _resources)
        driver.get.send(LaunchedExecutor(executorId))
      } catch {case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to" + e.getMessage, e)
      }

这个对象就是货真价实的 executor

executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)

启动胜利 driver 后会发送音讯 ”LaunchedExecutor”

driver.get.send(LaunchedExecutor(executorId))

CoarseGrainedSchedulerBackend 会接管到,

 case LaunchedExecutor(executorId) =>
        executorDataMap.get(executorId).foreach { data =>
          data.freeCores = data.totalCores
        }
        makeOffers(executorId)


到这里,就胜利了。

退出移动版