Apache Spark 是专为大规模数据处理而设计的疾速通用的计算引擎,并且领有 Hadoop MapReduce 所具备的长处;但不同于 MapReduce 的是——Job 两头输入后果能够保留在内存中,从而不再须要读写 HDFS,因而 Spark 能更好地实用于须要迭代 MapReduce 的算法。接下来带大家摸索一下 Spark 启动及提交流程的外部外围原理。
Netty
在摸索 Spark 启动及提交流程的外部外围原理之前,咱们得先简略介绍一下 Spark 外部的通信框架 —-Netty
Spark 中通信框架的倒退:
Spark 晚期版本中采纳 Akka 作为外部通信部件。
Spark1.3 中引入 Netty 通信框架,为了解决 Shuffle 的大数据传输问题应用
Spark1.6 中 Akka 和 Netty 能够配置应用。Netty 齐全实现了 Akka 在 Spark 中的性能。
Spark2 系列中,Spark 摈弃 Akka,应用 Netty。
尚硅谷大数据培训_业余的大数据培训机构_值得信赖的大数据教程
大数据
大数据教程
大数据培训
尚硅谷大数据拼课程、论口碑更给力
尚硅谷 IT 培训
立刻征询
Netty 通信架构解析
Netty 通信架构如下:
RpcEndpoint:RPC 通信终端。Spark 针对每个节点(Client/Master/Worker)都称之为一个 RPC 终端,且都实现 RpcEndpoint 接口,外部依据不同端点的需要,设计不同的音讯和不同的业务解决,如果须要发送(询问)则调用 Dispatcher。在 Spark 中,所有的终端都存在生命周期:
Constructor
onStart
receive*
onStop
RpcEnv:RPC 上下文环境,每个 RPC 终端运行时依赖的上下文环境称为 RpcEnv;在把以后 Spark 版本中应用的 NettyRpcEnv(即每个节点都有环境上下文)
Dispatcher:音讯调度(散发)器,针对于 RPC 终端须要发送近程音讯或者从近程 RPC 接管到的音讯,散发至对应的指令收件箱(发件箱)。如果指令接管方是本人则存入收件箱,如果指令接管方不是本人,则放入发件箱;一个环境一个
Inbox:指令音讯收件箱。一个本地 RpcEndpoint 对应一个收件箱,Dispatcher 在每次向 Inbox 存入音讯时,都将对应 EndpointData 退出外部 ReceiverQueue 中,另外 Dispatcher 创立时会启动一个独自线程进行轮询 ReceiverQueue,进行收件箱音讯生产;
RpcEndpointRef:RpcEndpointRef 是对近程 RpcEndpoint 的一个援用。当咱们须要向一个具体的 RpcEndpoint 发送音讯时,个别咱们须要获取到该 RpcEndpoint 的援用,而后通过该利用发送音讯。
OutBox:指令音讯发件箱。对于以后 RpcEndpoint 来说,一个指标 RpcEndpoint 对应一个发件箱,如果向多个指标 RpcEndpoint 发送信息,则有多个 OutBox。当音讯放入 Outbox 后,紧接着通过 TransportClient 将音讯发送进来。音讯放入发件箱以及发送过程是在同一个线程中进行;
RpcAddress:示意近程的 RpcEndpointRef 的地址,Host + Port。
TransportClient:Netty 通信客户端,一个 OutBox 对应一个 TransportClient,TransportClient 一直轮询 OutBox,依据 OutBox 音讯的 receiver 信息,申请对应的近程 TransportServer;(相似 socket)
TransportServer:Netty 通信服务端,一个 RpcEndpoint 对应一个 TransportServer,承受近程音讯后调用 Dispatcher 散发音讯至对应收发件箱(通过本地指令);
Netty 通信流程总结
在一个 rpcEnv 里,RpcEndpoint 通过持有 RpcEndpointRef,向 Dispatcher 发送音讯,Dispatcher 辨认到音讯是近程指令,会把音讯发送到 OutBox。
TransportClient 一直轮询 OutBox 的队列,一旦 OutBox 队列有音讯,就会将音讯发往对应 RpcEndpoint 的 TransportServer。
接管的 RpcEndpoint 的 TransportServer 会把音讯发往 Dispatcher,Dispatcher 辨认到本地指令后,会把音讯给发往本身的 InBox 外面,这样就实现了通信。
Spark 启动流程分析
在分析 Spark 启动流程中,咱们次要通过 StandAlone 模式下的 Master / Work 启动流程来看 Spark 是怎么通信的。
Master 启动流程
咱们首先从启动命令 start-all.sh 登程(因为他会启动 master 和 work),一步一步查看启动的调用流程:
start-all.sh
会加载 sparkhome 作为变量,所以学习 spark 装置多种模式 spark 时最好不配
start-master.sh
CLASS=”org.apache.spark.deploy.master.Master”
“${SPARK_HOME}/sbin”/spark-daemon.sh start $CLASS 1 \
–host $SPARK_MASTER_HOST –port $SPARK_MASTER_PORT –webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS
run_command class “$@” – 运行传过来的所有参数
${SPARK_HOME}”/bin/spark-class “$command” “$@”
java .. org.apache.spark.deploy.master.Master 最终启动这个类,启动 java 虚拟机
— main
— startRpcEnvAndEndpoint // master 和 worker 通信须要现有通信环境, 先创立通信环境和 endpoint
- RpcEnv.create // 创立环境
1.1– RpeEnv return new NettyRpcEnvFactory().create(config) // 启动创立环境的工厂
1.2– val nettyEnv = new NettyRpcEnv // 创立 netty 环境
— dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)// 创立 dispatch(一个环境只有一个
— endpoints: ConcurrentMap[String, MessageLoop] // 存储每个 endpoint 的音讯死循环
— endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] // 依据 ref 找到 endpoint 通信实体
— new DedicatedMessageLoop(name, e, this)// 专用音讯循环(每个 endpoint 一个音讯循环
— private val inbox = new Inbox(name, endpoint) // 一个 endpoint 都独自享有一个收件箱
— receiveLoop()// 每个线程都会死循环期待信息
1.3– nettyEnv.startServer(config.bindAddress, actualPort) // 启动 netty 服务
— server = transportContext.createServer // 创立 transportServer
- rpcEnv.setupEndpoint(ENDPOINT_NAME, // 将 endpoint(master)放进环境
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
— dispatcher.registerRpcEndpoint(name, endpoint) // 将创立的 master 注册
— new Master
// 启动了一个线程, 这个线程会每 60s 检测一次所有 worker 的超时状况
— timeOutDeadWorkers()
- master 启动结束
Worker 启动流程
Worker 的启动流程还是先从 start-all.sh 触发,会走进 start-woker.sh。
org.apache.spark.deploy.worker.Worker
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)// 创立 rpcenv
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)// 与 master 相比, 只多了这一行.
//worker 会从配置找到 master 地址, 被动去找 master 注册.(master 有可能会是 ha, 所以可能找到的是 master 地址数组)
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, // 注册 worker 为 endpoint
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))
— onStart
registerWithMaster() // 向 master 注册本人(worker)
sendRegisterMessageToMaster(masterEndpoint) // 发送注册信息
— 注册胜利后, 每 15 秒执行一次, 发送心跳
sendToMaster(Heartbeat(workerId, self))
masterRef.send(message)
尚硅谷大数据培训_业余的大数据培训机构_值得信赖的大数据教程
大数据
大数据教程
大数据培训
尚硅谷大数据拼课程、论口碑更给力
尚硅谷 IT 培训
立刻征询
Spark 启动流程总结
A 跟 B 通信,A 拿到 B 的 EndPointRef,通过 send 办法发送一个样例类进行通信。样例类携带更多信息,相似通信协议
B 会有 receive 办法收到信息通过模式匹配进行匹配信息
Spark 提交流程分析
因为 Spark 能够以多种模式运行,国内多以 YARN 模式进行提交,所以此处以 YARN 的 Cluster 模式下的 Spark 提交流程进行分析。
SparkSubmit
SparkSubmit 的作用次要就是两个:
- 解析参数
- 提交参数,初始数环境,并获取 ”org.apache.spark.deploy.yarn.YarnClusterApplication” 的对象,调用对象的 start 办法
org.apache.spark.deploy.SparkSubmit
main
— submit.doSubmit(args) // 执行提交
— doSubmit
submit(appArgs, uninitLog)
— doRunMain()
// 执行主办法
runMain(args, uninitLog)
// 筹备提交环境
- val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
//”org.apache.spark.deploy.yarn.YarnClusterApplication” 赋给 childMainClass 重点!!
// 如果是 client 模式,则 childMainClass 就是提交 jar 包的主类
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
// 反射加载 childMainClass 类
- mainClass = Utils.classForName(childMainClass)
// 创立 mainClass 实例并且转为 SparkApplication 类型
//SparkApplication 是 YarnClusterApplication 的父类
- val app = mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
// 最终调用 YarnClusterApplication 的 start 办法
- app.start(childArgs.toArray, sparkConf)
//client 的结构器创立 YarnClient 对象, 用于连贯 ResourceManager
new Client(new ClientArguments(args), conf, null).run()
–run
this.appId = submitApplication()// 提交利用到 ResourceManager, 返回 appid
— submitApplication
val containerContext = createContainerLaunchContext(newAppResponse)//
// 确定 applicationMaster 是谁
// 如果 yarn 模式: 就是 applicationMaster
// 如果是 client 模式: 就是 executorLauncher
— amClass = org.apache.spark.deploy.yarn.ApplicationMaster
val appContext = createApplicationSubmissionContext(newApp, containerContext)
yarnClient.submitApplication(appContext)
// 提交利用到 ResourceManager, 让 resourcemanager 启动 container(applicationMaster)
此时第一个过程 spark submit 曾经实现(能够间接把这个过程 kill 掉也没问题)
ApplicationMaster
此时到 am 过程启动,而这个过程次要的作用如下:
- 封装 ApplicationMaster 的参数
- 依据参数,创立 ApplicationMaster 对象
- 执行 ApplicationMaster 的 run 办法,在 run 办法中,最初调用到 runDriver 办法
// 解析完各种参数,new 一个 applicationMaster
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
// 执行 applicationMaster 的 run 办法
master.run
if (isClusterMode) {// 集群模式
runDriver()// 集群模式就运行 driver
} else {// client 模式
runExecutorLauncher()
}
//am 启动第一件事就是跑 driver, 启动应用程序
runDriver()
- userClassThread = startUserApplication() // 启动应用程序, 也就是执行提交的 jar 包中的主函数
// 加载参数穿过来的用户类 即提交时指定的 –class
1.1 val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod(“main”, classOf[Array[String]])
1.2 new Thread … // 创立一个线程, 线程名就叫 driver, 并返回这个线程
2.userThread.start()// 执行这个 driver 线程的 run 办法, 线程执行的就是咱们提交应用程序类的主函数
// 期待 sc 初始化, 初始化完后才持续往下执行
- val sc: SparkContext = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
//cs 初始化完后向 ResourceManager 注册 applicationMaster
// 注册的实质就是向 rm 申请资源运行 executor 过程
- registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
— client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
— amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
//am 向 rm 注册胜利后, 创立分配器
- — createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
— allocator.allocateResources()// 分配资源
— val allocatedContainers = allocateResponse.getAllocatedContainers()// 通过分配器响应 获取 调配到的容器列表
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
handleAllocatedContainers(allocatedContainers.asScala)// 资源列表大于 0, 就解决调配到的资源
runAllocatedContainers(containersToUse)// 运行调配后的资源, 封装指令
ExecutorRunnable.run// 启动 executor
— startContainer()
— val commands = prepareCommand()// 封装指令
— bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
— nmClient.startContainer(container.get, ctx)// 启动容器, 也就是启动 executor
!!AM 启动结束
AM 只有两个子线程, 一个主线程, 一个子线程(driver)
子线程 driver 执行用户类(用户传过来的 jar 包 main 办法)
主线程
1. 次要是注册 am(向 rm 申请分配资源) , rm 返回容器给 am
- am 拿到返回容器列表, 让 nm 在容器上执行 java 命令
— bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 这个命令
3. 最终在 nm 上启动 executor 过程
CoarseGrainedExecutorBackend
执行一次 - bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 这个命令 就会执行一个新的过程,则是属于并行执行的感觉,和之前执行的内容是离开的。相似咱们在 Windows 中开了一个微信和 qq 程序一样,各自执行,互不影响。因为这就是咱们平时说的 executor 过程
- commands=/bin/java/org.apache.spark.executor.CoarseGrainedExecutorBackend,
执行这个指令,那么是调用这个类的 main 办法。
- main 办法中:
// 1. 首先是对一些参数进行封装
// 2. 执行 run 办法
— run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
// 1. 通过 driver 的 uri 和 Driver 进行关联
–driver = fetcher.setupEndpointRefByURI(driverUrl)
// 2. 通过通信环境创立了一个终端,名字为 executor,
在底层:Executor 启动后会注册通信,并收到信息 onStart,收到音讯后,会执行通信对象 CoarseGrainedExecutorBackend
的 onStart 办法,点击 CoarseGrainedExecutorBackend
–env.rpcEnv.setupEndpoint(“Executor”, new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
// 1. 获取 driver 的援用
— driver = Some(ref)
// 2.ExecutorBackend 向 driver 发送音讯,注册 executor 的音讯,也称之为反向注册
–ref.askBoolean)
// 3. 在 driver 端会接管到这个音讯,因为在 driver 端,有一个上下文的对象,sparkcontext,在这个类有一个属性:
private var _schedulerBackend: SchedulerBackend = _,点击 SchedulerBackend,是一个 trait,找到
实现类:CoarseGrainedSchedulerBackend,在这个类中,有一个办法:receiveAndReply():
// executor 的援用,在 driver 端,发送音讯给到 ExecutorBackend,注册 executor 胜利
–executorRef.send(RegisteredExecutor)
// ExecutorBackend 类中有一个 recive 办法,用来接管 driver 返回的 executor 注册胜利的音讯,executor 是一个计算对象,在这个对象外面有一个线程池,每一个线程来解决一个从 driver 端发送过去的工作
–executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
整体提交流程图如下图所示:
YARN ClusterClient 模式异同
到此,YARN 模式下的 Cluster 提交流程完结,而 Client 模式的提交与 Cluster 模式提交的不同如下:
SparkSubmit 类中:
childMainClass = args.mainClass// 用户类 即提交时的 –class
// 而在 cluster 模式则是 org.apache.spark.deploy.yarn.YarnClusterApplication
开始执行用户的 main 函数: 其实在执行 drvier
driver 在 sparkSubmit 过程的主线程中运行
// 而 cluster 的 driver 则是 ApplicationMaster 中的子线程, 而 AM 肯定在某一个 NM 上, 所以叫 cluster 模式
//driver 在客户端上运行, 所以叫 client 模式
SparkContext 类中 //driver 中会创立 SparkContext
client.submitApplication()
amClass = org.apache.spark.deploy.yarn.ExecutorLauncher
开始启动 AM, 表明上是 ExecutorLauncher, 实质还是 ApplicationMaster
runExecutorLauncher()
不同:
- driver 的地位不一样
cluset: 在 ApplicationMaster 过程中, 是它的一个子线程
client: 在 SparkSubmit 的过程中, 而且是在他的主线程中执行的.
- AM 的名字不一样
cluster: ApplicationMaster
client: ExecutorLauntcher
Spark 提交流程总结
用大白话解释提交流程源码就是:
执行 suspark-submit 后会有三个过程
SparkSubmit
ApplicationMaster
YarnCoarseGrainedExecutorBackend:粗粒度执行器后端,也就是 Executor
找个客户端执行 sparksubmit
执行 SparkSubmit 类里的 main 办法,筹备环境,解析一堆参数
获取一个 childMainClass 的值并且反射创立这个类
如果是 cluster 模式他的值就是 YarnClusterApplication
如果是 client 模式他的值就是提交 jar 包的主类
通过反射创立 childMainClass 失去 YarnClusterApplication 并且强转为 SparkApplication
调用 SparkApplication 的 start 办法
创立一个 client 去连贯 rm,并且获取到 rm 返回 appId
封装一个指令让 rm 找一台 nm 启动一个 am
这个指令如果是 cluster 那么启动的类就是 applicationMaster,如果是 client 就是启动 executorLauncher
此时 SparkSubmit 工作实现,如果是 cluster 模式,那么间接把这个过程 kill 掉也没事
ApplicationMaster
启动一个 ApplicationMaster 过程后,解析各种参数后封装一个 ApplicationMaster 对象
封装好的 ApplicationMaster 对象会开启一个线程运行用户类(提交的 jar 包)的 main 函数,这个线程就是 driver 线程,
在 am 过程主办法,会期待获取 SparkContext,等到获取后就会向 rm 注册本人并申请资源,rm 返回容器列表(这里申请资源细节比拟多)
am 拿到容器列表,就会在 nm 启动 executor 过程
YarnCoarseGrainedExecutorBackend 过程启动胜利后
启动后第一件事向 driver 线程反向注册
注册胜利后,executor 过程会创立 executor 计算对象
计算对象里有一个线程池,每一个线程来解决一个 driver 端发过来的工作
关键词:大数据培训