关于spark:Spark启动及提交流程内部核心原理剖析

6次阅读

共计 10573 个字符,预计需要花费 27 分钟才能阅读完成。

Apache Spark 是专为大规模数据处理而设计的疾速通用的计算引擎,并且领有 Hadoop MapReduce 所具备的长处;但不同于 MapReduce 的是——Job 两头输入后果能够保留在内存中,从而不再须要读写 HDFS,因而 Spark 能更好地实用于须要迭代 MapReduce 的算法。接下来带大家摸索一下 Spark 启动及提交流程的外部外围原理。

Netty

在摸索 Spark 启动及提交流程的外部外围原理之前,咱们得先简略介绍一下 Spark 外部的通信框架 —-Netty

Spark 中通信框架的倒退:

Spark 晚期版本中采纳 Akka 作为外部通信部件。

Spark1.3 中引入 Netty 通信框架,为了解决 Shuffle 的大数据传输问题应用

Spark1.6 中 Akka 和 Netty 能够配置应用。Netty 齐全实现了 Akka 在 Spark 中的性能。

Spark2 系列中,Spark 摈弃 Akka,应用 Netty。

尚硅谷大数据培训_业余的大数据培训机构_值得信赖的大数据教程
大数据
大数据教程
大数据培训
尚硅谷大数据拼课程、论口碑更给力
尚硅谷 IT 培训
立刻征询
Netty 通信架构解析

Netty 通信架构如下:

RpcEndpoint:RPC 通信终端。Spark 针对每个节点(Client/Master/Worker)都称之为一个 RPC 终端,且都实现 RpcEndpoint 接口,外部依据不同端点的需要,设计不同的音讯和不同的业务解决,如果须要发送(询问)则调用 Dispatcher。在 Spark 中,所有的终端都存在生命周期:

Constructor

onStart

receive*

onStop

RpcEnv:RPC 上下文环境,每个 RPC 终端运行时依赖的上下文环境称为 RpcEnv;在把以后 Spark 版本中应用的 NettyRpcEnv(即每个节点都有环境上下文)

Dispatcher:音讯调度(散发)器,针对于 RPC 终端须要发送近程音讯或者从近程 RPC 接管到的音讯,散发至对应的指令收件箱(发件箱)。如果指令接管方是本人则存入收件箱,如果指令接管方不是本人,则放入发件箱;一个环境一个

Inbox:指令音讯收件箱。一个本地 RpcEndpoint 对应一个收件箱,Dispatcher 在每次向 Inbox 存入音讯时,都将对应 EndpointData 退出外部 ReceiverQueue 中,另外 Dispatcher 创立时会启动一个独自线程进行轮询 ReceiverQueue,进行收件箱音讯生产;

RpcEndpointRef:RpcEndpointRef 是对近程 RpcEndpoint 的一个援用。当咱们须要向一个具体的 RpcEndpoint 发送音讯时,个别咱们须要获取到该 RpcEndpoint 的援用,而后通过该利用发送音讯。

OutBox:指令音讯发件箱。对于以后 RpcEndpoint 来说,一个指标 RpcEndpoint 对应一个发件箱,如果向多个指标 RpcEndpoint 发送信息,则有多个 OutBox。当音讯放入 Outbox 后,紧接着通过 TransportClient 将音讯发送进来。音讯放入发件箱以及发送过程是在同一个线程中进行;

RpcAddress:示意近程的 RpcEndpointRef 的地址,Host + Port。

TransportClient:Netty 通信客户端,一个 OutBox 对应一个 TransportClient,TransportClient 一直轮询 OutBox,依据 OutBox 音讯的 receiver 信息,申请对应的近程 TransportServer;(相似 socket)

TransportServer:Netty 通信服务端,一个 RpcEndpoint 对应一个 TransportServer,承受近程音讯后调用 Dispatcher 散发音讯至对应收发件箱(通过本地指令);

Netty 通信流程总结

在一个 rpcEnv 里,RpcEndpoint 通过持有 RpcEndpointRef,向 Dispatcher 发送音讯,Dispatcher 辨认到音讯是近程指令,会把音讯发送到 OutBox。

TransportClient 一直轮询 OutBox 的队列,一旦 OutBox 队列有音讯,就会将音讯发往对应 RpcEndpoint 的 TransportServer。

接管的 RpcEndpoint 的 TransportServer 会把音讯发往 Dispatcher,Dispatcher 辨认到本地指令后,会把音讯给发往本身的 InBox 外面,这样就实现了通信。

Spark 启动流程分析

在分析 Spark 启动流程中,咱们次要通过 StandAlone 模式下的 Master / Work 启动流程来看 Spark 是怎么通信的。

Master 启动流程

咱们首先从启动命令 start-all.sh 登程(因为他会启动 master 和 work),一步一步查看启动的调用流程:

start-all.sh

会加载 sparkhome 作为变量,所以学习 spark 装置多种模式 spark 时最好不配

start-master.sh

CLASS=”org.apache.spark.deploy.master.Master”

“${SPARK_HOME}/sbin”/spark-daemon.sh start $CLASS 1 \

–host $SPARK_MASTER_HOST –port $SPARK_MASTER_PORT –webui-port $SPARK_MASTER_WEBUI_PORT \

$ORIGINAL_ARGS

run_command class “$@” – 运行传过来的所有参数

${SPARK_HOME}”/bin/spark-class “$command” “$@”

java .. org.apache.spark.deploy.master.Master 最终启动这个类,启动 java 虚拟机

— main

— startRpcEnvAndEndpoint // master 和 worker 通信须要现有通信环境, 先创立通信环境和 endpoint

  1. RpcEnv.create // 创立环境

1.1– RpeEnv return new NettyRpcEnvFactory().create(config) // 启动创立环境的工厂

1.2– val nettyEnv = new NettyRpcEnv // 创立 netty 环境

— dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)// 创立 dispatch(一个环境只有一个

— endpoints: ConcurrentMap[String, MessageLoop] // 存储每个 endpoint 的音讯死循环

— endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] // 依据 ref 找到 endpoint 通信实体

— new DedicatedMessageLoop(name, e, this)// 专用音讯循环(每个 endpoint 一个音讯循环

— private val inbox = new Inbox(name, endpoint) // 一个 endpoint 都独自享有一个收件箱

— receiveLoop()// 每个线程都会死循环期待信息

1.3– nettyEnv.startServer(config.bindAddress, actualPort) // 启动 netty 服务

— server = transportContext.createServer // 创立 transportServer

  1. rpcEnv.setupEndpoint(ENDPOINT_NAME, // 将 endpoint(master)放进环境

new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

— dispatcher.registerRpcEndpoint(name, endpoint) // 将创立的 master 注册

— new Master

// 启动了一个线程, 这个线程会每 60s 检测一次所有 worker 的超时状况

— timeOutDeadWorkers()

  1. master 启动结束

Worker 启动流程

Worker 的启动流程还是先从 start-all.sh 触发,会走进 start-woker.sh。

org.apache.spark.deploy.worker.Worker

val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)// 创立 rpcenv

val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)// 与 master 相比, 只多了这一行.

//worker 会从配置找到 master 地址, 被动去找 master 注册.(master 有可能会是 ha, 所以可能找到的是 master 地址数组)

rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, // 注册 worker 为 endpoint

masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))

— onStart

registerWithMaster() // 向 master 注册本人(worker)

sendRegisterMessageToMaster(masterEndpoint) // 发送注册信息

— 注册胜利后, 每 15 秒执行一次, 发送心跳

sendToMaster(Heartbeat(workerId, self))

masterRef.send(message)

尚硅谷大数据培训_业余的大数据培训机构_值得信赖的大数据教程
大数据
大数据教程
大数据培训
尚硅谷大数据拼课程、论口碑更给力
尚硅谷 IT 培训
立刻征询
Spark 启动流程总结

A 跟 B 通信,A 拿到 B 的 EndPointRef,通过 send 办法发送一个样例类进行通信。样例类携带更多信息,相似通信协议

B 会有 receive 办法收到信息通过模式匹配进行匹配信息

Spark 提交流程分析

因为 Spark 能够以多种模式运行,国内多以 YARN 模式进行提交,所以此处以 YARN 的 Cluster 模式下的 Spark 提交流程进行分析。

SparkSubmit

SparkSubmit 的作用次要就是两个:

  1. 解析参数
  2. 提交参数,初始数环境,并获取 ”org.apache.spark.deploy.yarn.YarnClusterApplication” 的对象,调用对象的 start 办法

org.apache.spark.deploy.SparkSubmit

main

— submit.doSubmit(args) // 执行提交

— doSubmit

submit(appArgs, uninitLog)

— doRunMain()

// 执行主办法

runMain(args, uninitLog)

// 筹备提交环境

  1. val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

//”org.apache.spark.deploy.yarn.YarnClusterApplication” 赋给 childMainClass 重点!!

// 如果是 client 模式,则 childMainClass 就是提交 jar 包的主类

childMainClass = YARN_CLUSTER_SUBMIT_CLASS

// 反射加载 childMainClass 类

  1. mainClass = Utils.classForName(childMainClass)

// 创立 mainClass 实例并且转为 SparkApplication 类型

//SparkApplication 是 YarnClusterApplication 的父类

  1. val app = mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]

// 最终调用 YarnClusterApplication 的 start 办法

  1. app.start(childArgs.toArray, sparkConf)

//client 的结构器创立 YarnClient 对象, 用于连贯 ResourceManager

new Client(new ClientArguments(args), conf, null).run()

–run

this.appId = submitApplication()// 提交利用到 ResourceManager, 返回 appid

— submitApplication

val containerContext = createContainerLaunchContext(newAppResponse)//

// 确定 applicationMaster 是谁

// 如果 yarn 模式: 就是 applicationMaster

// 如果是 client 模式: 就是 executorLauncher

— amClass = org.apache.spark.deploy.yarn.ApplicationMaster

val appContext = createApplicationSubmissionContext(newApp, containerContext)

yarnClient.submitApplication(appContext)

// 提交利用到 ResourceManager, 让 resourcemanager 启动 container(applicationMaster)

此时第一个过程 spark submit 曾经实现(能够间接把这个过程 kill 掉也没问题)

ApplicationMaster

此时到 am 过程启动,而这个过程次要的作用如下:

  1. 封装 ApplicationMaster 的参数
  2. 依据参数,创立 ApplicationMaster 对象
  3. 执行 ApplicationMaster 的 run 办法,在 run 办法中,最初调用到 runDriver 办法

// 解析完各种参数,new 一个 applicationMaster

master = new ApplicationMaster(amArgs, sparkConf, yarnConf)

// 执行 applicationMaster 的 run 办法

master.run

if (isClusterMode) {// 集群模式

runDriver()// 集群模式就运行 driver

} else {// client 模式

runExecutorLauncher()

}

//am 启动第一件事就是跑 driver, 启动应用程序

runDriver()

  1. userClassThread = startUserApplication() // 启动应用程序, 也就是执行提交的 jar 包中的主函数

// 加载参数穿过来的用户类 即提交时指定的 –class

1.1 val mainMethod = userClassLoader.loadClass(args.userClass)

.getMethod(“main”, classOf[Array[String]])

1.2 new Thread … // 创立一个线程, 线程名就叫 driver, 并返回这个线程

2.userThread.start()// 执行这个 driver 线程的 run 办法, 线程执行的就是咱们提交应用程序类的主函数

// 期待 sc 初始化, 初始化完后才持续往下执行

  1. val sc: SparkContext = ThreadUtils.awaitResult(sparkContextPromise.future,

Duration(totalWaitTime, TimeUnit.MILLISECONDS))

//cs 初始化完后向 ResourceManager 注册 applicationMaster

// 注册的实质就是向 rm 申请资源运行 executor 过程

  1. registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

— client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)

— amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)

//am 向 rm 注册胜利后, 创立分配器

  1. — createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

— allocator.allocateResources()// 分配资源

— val allocatedContainers = allocateResponse.getAllocatedContainers()// 通过分配器响应 获取 调配到的容器列表

allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)

handleAllocatedContainers(allocatedContainers.asScala)// 资源列表大于 0, 就解决调配到的资源

runAllocatedContainers(containersToUse)// 运行调配后的资源, 封装指令

ExecutorRunnable.run// 启动 executor

— startContainer()

— val commands = prepareCommand()// 封装指令

— bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend

— nmClient.startContainer(container.get, ctx)// 启动容器, 也就是启动 executor

!!AM 启动结束

AM 只有两个子线程, 一个主线程, 一个子线程(driver)

子线程 driver 执行用户类(用户传过来的 jar 包 main 办法)

主线程

1. 次要是注册 am(向 rm 申请分配资源) , rm 返回容器给 am

  1. am 拿到返回容器列表, 让 nm 在容器上执行 java 命令

— bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 这个命令

3. 最终在 nm 上启动 executor 过程

CoarseGrainedExecutorBackend

执行一次 - bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 这个命令 就会执行一个新的过程,则是属于并行执行的感觉,和之前执行的内容是离开的。相似咱们在 Windows 中开了一个微信和 qq 程序一样,各自执行,互不影响。因为这就是咱们平时说的 executor 过程

  1. commands=/bin/java/org.apache.spark.executor.CoarseGrainedExecutorBackend,

执行这个指令,那么是调用这个类的 main 办法。

  1. main 办法中:

// 1. 首先是对一些参数进行封装

// 2. 执行 run 办法

— run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

// 1. 通过 driver 的 uri 和 Driver 进行关联

–driver = fetcher.setupEndpointRefByURI(driverUrl)

// 2. 通过通信环境创立了一个终端,名字为 executor,

在底层:Executor 启动后会注册通信,并收到信息 onStart,收到音讯后,会执行通信对象 CoarseGrainedExecutorBackend

的 onStart 办法,点击 CoarseGrainedExecutorBackend

–env.rpcEnv.setupEndpoint(“Executor”, new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

// 1. 获取 driver 的援用

— driver = Some(ref)

// 2.ExecutorBackend 向 driver 发送音讯,注册 executor 的音讯,也称之为反向注册

–ref.askBoolean)

// 3. 在 driver 端会接管到这个音讯,因为在 driver 端,有一个上下文的对象,sparkcontext,在这个类有一个属性:

private var _schedulerBackend: SchedulerBackend = _,点击 SchedulerBackend,是一个 trait,找到

实现类:CoarseGrainedSchedulerBackend,在这个类中,有一个办法:receiveAndReply():

// executor 的援用,在 driver 端,发送音讯给到 ExecutorBackend,注册 executor 胜利

–executorRef.send(RegisteredExecutor)

// ExecutorBackend 类中有一个 recive 办法,用来接管 driver 返回的 executor 注册胜利的音讯,executor 是一个计算对象,在这个对象外面有一个线程池,每一个线程来解决一个从 driver 端发送过去的工作

–executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

整体提交流程图如下图所示:

YARN ClusterClient 模式异同

到此,YARN 模式下的 Cluster 提交流程完结,而 Client 模式的提交与 Cluster 模式提交的不同如下:

SparkSubmit 类中:

childMainClass = args.mainClass// 用户类 即提交时的 –class

// 而在 cluster 模式则是 org.apache.spark.deploy.yarn.YarnClusterApplication

开始执行用户的 main 函数: 其实在执行 drvier

driver 在 sparkSubmit 过程的主线程中运行

// 而 cluster 的 driver 则是 ApplicationMaster 中的子线程, 而 AM 肯定在某一个 NM 上, 所以叫 cluster 模式

//driver 在客户端上运行, 所以叫 client 模式

SparkContext 类中 //driver 中会创立 SparkContext

client.submitApplication()

amClass = org.apache.spark.deploy.yarn.ExecutorLauncher

开始启动 AM, 表明上是 ExecutorLauncher, 实质还是 ApplicationMaster

runExecutorLauncher()


不同:

  1. driver 的地位不一样

cluset: 在 ApplicationMaster 过程中, 是它的一个子线程

client: 在 SparkSubmit 的过程中, 而且是在他的主线程中执行的.

  1. AM 的名字不一样

cluster: ApplicationMaster

client: ExecutorLauntcher

Spark 提交流程总结

用大白话解释提交流程源码就是:

执行 suspark-submit 后会有三个过程

SparkSubmit

ApplicationMaster

YarnCoarseGrainedExecutorBackend:粗粒度执行器后端,也就是 Executor

找个客户端执行 sparksubmit

执行 SparkSubmit 类里的 main 办法,筹备环境,解析一堆参数

获取一个 childMainClass 的值并且反射创立这个类

如果是 cluster 模式他的值就是 YarnClusterApplication

如果是 client 模式他的值就是提交 jar 包的主类

通过反射创立 childMainClass 失去 YarnClusterApplication 并且强转为 SparkApplication

调用 SparkApplication 的 start 办法

创立一个 client 去连贯 rm,并且获取到 rm 返回 appId

封装一个指令让 rm 找一台 nm 启动一个 am

这个指令如果是 cluster 那么启动的类就是 applicationMaster,如果是 client 就是启动 executorLauncher

此时 SparkSubmit 工作实现,如果是 cluster 模式,那么间接把这个过程 kill 掉也没事

ApplicationMaster

启动一个 ApplicationMaster 过程后,解析各种参数后封装一个 ApplicationMaster 对象

封装好的 ApplicationMaster 对象会开启一个线程运行用户类(提交的 jar 包)的 main 函数,这个线程就是 driver 线程,

在 am 过程主办法,会期待获取 SparkContext,等到获取后就会向 rm 注册本人并申请资源,rm 返回容器列表(这里申请资源细节比拟多)

am 拿到容器列表,就会在 nm 启动 executor 过程

YarnCoarseGrainedExecutorBackend 过程启动胜利后

启动后第一件事向 driver 线程反向注册

注册胜利后,executor 过程会创立 executor 计算对象

计算对象里有一个线程池,每一个线程来解决一个 driver 端发过来的工作

关键词:大数据培训

正文完
 0