共计 3726 个字符,预计需要花费 10 分钟才能阅读完成。
摘要: 本文次要是通过 Spark 代码走读来理解 spark-submit 的流程。
1. 工作命令提交
咱们在进行 Spark 工作提交时,会应用“spark-submit -class …..”款式的命令来提交工作,该命令为 Spark 目录下的 shell 脚本。它的作用是查问 spark-home,调用 spark-class 命令。
if [-z "${SPARK_HOME}" ]; then
source "$(dirname"$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
随后会执行 spark-class 命令,以 SparkSubmit 类为参数进行工作向 Spark 程序的提交,而 Spark-class 的 shell 脚本次要是执行以下几个步骤:
(1)加载 spark 环境参数,从 conf 中获取
if [-z "${SPARK_HOME}" ]; then
source "$(dirname"$0")"/find-spark-home
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh
# 寻找 javahome
if [-n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if ["$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
(2)载入 java,jar 包等
# Find Spark jars.
if [-d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
(3)调用 org.apache.spark.launcher 中的 Main 进行参数注入
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d0" $?
}
(4)shell 脚本监测工作执行状态,是否实现或者退出工作,通过执行返回值,判断是否完结
if ! [[$LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [$LAUNCHER_EXIT_CODE != 0]; then
exit $LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
2. 工作检测及提交工作到 Spark
检测执行模式(class or submit)构建 cmd,在 submit 中进行参数的查看(SparkSubmitOptionParser),构建命令行并且打印回 spark-class 中,最初调用 exec 执行 spark 命令行提交工作。通过组装而成 cmd 内容如下所示:
/usr/local/java/jdk1.8.0_91/bin/java-cp
/data/spark-1.6.0-bin-hadoop2.6/conf/:/data/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/data/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/data/hadoop-2.6.5/etc/hadoop/
-Xms1g-Xmx1g -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=1234
org.apache.spark.deploy.SparkSubmit
--classorg.apache.spark.repl.Main
--nameSpark shell
--masterspark://localhost:7077
--verbose/tool/jarDir/maven_scala-1.0-SNAPSHOT.jar
3.SparkSubmit 函数的执行
(1)Spark 工作在提交之后会执行 SparkSubmit 中的 main 办法
def main(args: Array[String]): Unit = {val submit = new SparkSubmit()
submit.doSubmit(args)
}
(2)doSubmit()对 log 进行初始化,增加 spark 工作参数,通过参数类型执行工作:
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = parseArguments(args)
if (appArgs.verbose) {logInfo(appArgs.toString)
}
appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()}
}
SUBMIT:应用提供的参数提交 application
KILL(Standalone and Mesos cluster mode only):通过 REST 协定终止工作
REQUEST_STATUS(Standalone and Mesos cluster mode only):通过 REST 协定申请曾经提交工作的状态
PRINT_VERSION:对 log 输入版本信息
(3)调用 submit 函数:
def doRunMain(): Unit = {if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {override def run(): Unit = {runMain(args, uninitLog)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
} else {throw e}
}
} else {runMain(args, uninitLog)
}
}
doRunMain 为集群调用子 main class 筹备参数,而后调用 runMain() 执行工作 invoke main
4. 总结
Spark 在作业提交中会采纳多种不同的参数及模式,都会依据不同的参数抉择不同的分支执行,因而在最初提交的 runMain 中会将所须要的参数传递给执行函数。
本文分享自华为云社区《Spark 内核解析之 Spark-submit》,原文作者:笨熊爱喝 cola。
点击关注,第一工夫理解华为云陈腐技术~