关于spark:Sparksubmit执行流程了解一下

摘要:本文次要是通过Spark代码走读来理解spark-submit的流程。

1.工作命令提交

咱们在进行Spark工作提交时,会应用“spark-submit -class …..”款式的命令来提交工作,该命令为Spark目录下的shell脚本。它的作用是查问spark-home,调用spark-class命令。

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

随后会执行spark-class命令,以SparkSubmit类为参数进行工作向Spark程序的提交,而Spark-class的shell脚本次要是执行以下几个步骤:

(1)加载spark环境参数,从conf中获取

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

# 寻找javahome
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

(2)载入java,jar包等

# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

(3)调用org.apache.spark.launcher中的Main进行参数注入

build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d0" $?
}

(4)shell脚本监测工作执行状态,是否实现或者退出工作,通过执行返回值,判断是否完结

if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

2.工作检测及提交工作到Spark

检测执行模式(class or submit)构建cmd,在submit中进行参数的查看(SparkSubmitOptionParser),构建命令行并且打印回spark-class中,最初调用exec执行spark命令行提交工作。通过组装而成cmd内容如下所示:

/usr/local/java/jdk1.8.0_91/bin/java-cp
/data/spark-1.6.0-bin-hadoop2.6/conf/:/data/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/data/hadoop-2.6.5/etc/hadoop/
-Xms1g-Xmx1g -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=1234
org.apache.spark.deploy.SparkSubmit
--classorg.apache.spark.repl.Main
--nameSpark shell
--masterspark://localhost:7077
--verbose/tool/jarDir/maven_scala-1.0-SNAPSHOT.jar

3.SparkSubmit函数的执行

(1)Spark工作在提交之后会执行SparkSubmit中的main办法

 def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit()
    submit.doSubmit(args)
  }

(2)doSubmit()对log进行初始化,增加spark工作参数,通过参数类型执行工作:

 def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

SUBMIT:应用提供的参数提交application

KILL(Standalone and Mesos cluster mode only):通过REST协定终止工作

REQUEST_STATUS(Standalone and Mesos cluster mode only):通过REST协定申请曾经提交工作的状态

PRINT_VERSION:对log输入版本信息

(3)调用submit函数:

def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(args, uninitLog)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
            } else {
              throw e
            }
        }
      } else {
        runMain(args, uninitLog)
      }
    }

doRunMain为集群调用子main class筹备参数,而后调用runMain()执行工作invoke main

4.总结

Spark在作业提交中会采纳多种不同的参数及模式,都会依据不同的参数抉择不同的分支执行,因而在最初提交的runMain中会将所须要的参数传递给执行函数。

本文分享自华为云社区《Spark内核解析之Spark-submit》,原文作者:笨熊爱喝cola。

点击关注,第一工夫理解华为云陈腐技术~

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理