Apache Spark 是专为大规模数据处理而设计的疾速通用的计算引擎,并且领有Hadoop MapReduce所具备的长处;但不同于MapReduce的是——Job两头输入后果能够保留在内存中,从而不再须要读写HDFS,因而Spark能更好地实用于须要迭代MapReduce的算法。接下来带大家摸索一下Spark启动及提交流程的外部外围原理。

Netty

在摸索Spark启动及提交流程的外部外围原理之前,咱们得先简略介绍一下Spark外部的通信框架----Netty

Spark中通信框架的倒退:

Spark晚期版本中采纳Akka作为外部通信部件。

Spark1.3中引入Netty通信框架,为了解决Shuffle的大数据传输问题应用

Spark1.6中Akka和Netty能够配置应用。Netty齐全实现了Akka在Spark中的性能。

Spark2系列中,Spark摈弃Akka,应用Netty。

尚硅谷大数据培训_业余的大数据培训机构_值得信赖的大数据教程
大数据
大数据教程
大数据培训
尚硅谷大数据拼课程、论口碑更给力
尚硅谷IT培训
立刻征询
Netty通信架构解析

Netty通信架构如下:

RpcEndpoint:RPC通信终端。Spark针对每个节点(Client/Master/Worker)都称之为一个RPC终端,且都实现RpcEndpoint接口,外部依据不同端点的需要,设计不同的音讯和不同的业务解决,如果须要发送(询问)则调用Dispatcher。在Spark中,所有的终端都存在生命周期:

Constructor

onStart

receive*

onStop

RpcEnv:RPC上下文环境,每个RPC终端运行时依赖的上下文环境称为RpcEnv;在把以后Spark版本中应用的NettyRpcEnv(即每个节点都有环境上下文)

Dispatcher:音讯调度(散发)器,针对于RPC终端须要发送近程音讯或者从近程RPC接管到的音讯,散发至对应的指令收件箱(发件箱)。如果指令接管方是本人则存入收件箱,如果指令接管方不是本人,则放入发件箱;一个环境一个

Inbox:指令音讯收件箱。一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入音讯时,都将对应EndpointData退出外部ReceiverQueue中,另外Dispatcher创立时会启动一个独自线程进行轮询ReceiverQueue,进行收件箱音讯生产;

RpcEndpointRef:RpcEndpointRef是对近程RpcEndpoint的一个援用。当咱们须要向一个具体的RpcEndpoint发送音讯时,个别咱们须要获取到该RpcEndpoint的援用,而后通过该利用发送音讯。

OutBox:指令音讯发件箱。对于以后RpcEndpoint来说,一个指标RpcEndpoint对应一个发件箱,如果向多个指标RpcEndpoint发送信息,则有多个OutBox。当音讯放入Outbox后,紧接着通过TransportClient将音讯发送进来。音讯放入发件箱以及发送过程是在同一个线程中进行;

RpcAddress:示意近程的RpcEndpointRef的地址,Host + Port。

TransportClient:Netty通信客户端,一个OutBox对应一个TransportClient,TransportClient一直轮询OutBox,依据OutBox音讯的receiver信息,申请对应的近程TransportServer;(相似socket)

TransportServer:Netty通信服务端,一个RpcEndpoint对应一个TransportServer,承受近程音讯后调用Dispatcher散发音讯至对应收发件箱(通过本地指令);

Netty通信流程总结

在一个rpcEnv里, RpcEndpoint通过持有RpcEndpointRef,向Dispatcher发送音讯,Dispatcher辨认到音讯是近程指令,会把音讯发送到OutBox。

TransportClient一直轮询OutBox的队列,一旦OutBox队列有音讯,就会将音讯发往对应RpcEndpoint的TransportServer。

接管的RpcEndpoint的TransportServer会把音讯发往Dispatcher,Dispatcher辨认到本地指令后,会把音讯给发往本身的InBox外面,这样就实现了通信。

Spark启动流程分析

在分析Spark启动流程中,咱们次要通过StandAlone模式下的Master / Work启动流程来看Spark是怎么通信的。

Master启动流程

咱们首先从启动命令start-all.sh登程(因为他会启动master和work),一步一步查看启动的调用流程:

start-all.sh

会加载sparkhome作为变量,所以学习spark装置多种模式spark时最好不配

start-master.sh

CLASS="org.apache.spark.deploy.master.Master"

"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \

--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \

$ORIGINAL_ARGS

run_command class "$@" --运行传过来的所有参数

${SPARK_HOME}"/bin/spark-class "$command" "$@"

java .. org.apache.spark.deploy.master.Master 最终启动这个类,启动java虚拟机

-- main

-- startRpcEnvAndEndpoint // master和worker通信须要现有通信环境,先创立通信环境和endpoint

  1. RpcEnv.create //创立环境

1.1-- RpeEnv return new NettyRpcEnvFactory().create(config) //启动创立环境的工厂

1.2-- val nettyEnv = new NettyRpcEnv //创立netty环境

-- dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)// 创立dispatch(一个环境只有一个

-- endpoints: ConcurrentMap[String, MessageLoop] //存储每个endpoint的音讯死循环

-- endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] //依据ref找到endpoint通信实体

-- new DedicatedMessageLoop(name, e, this)//专用音讯循环(每个endpoint一个音讯循环

-- private val inbox = new Inbox(name, endpoint) //一个endpoint都独自享有一个收件箱

-- receiveLoop()//每个线程都会死循环期待信息

1.3-- nettyEnv.startServer(config.bindAddress, actualPort) //启动netty服务

-- server = transportContext.createServer //创立transportServer

  1. rpcEnv.setupEndpoint(ENDPOINT_NAME, //将endpoint(master)放进环境

new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

-- dispatcher.registerRpcEndpoint(name, endpoint) //将创立的master注册

-- new Master

//启动了一个线程, 这个线程会每60s检测一次所有worker的超时状况

-- timeOutDeadWorkers()

  1. master启动结束

Worker启动流程

Worker的启动流程还是先从start-all.sh触发,会走进start-woker.sh。

org.apache.spark.deploy.worker.Worker

val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)//创立rpcenv

val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)//与master相比,只多了这一行.

//worker会从配置找到master地址,被动去找master注册.(master有可能会是ha,所以可能找到的是master地址数组)

rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, //注册worker为endpoint

masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))

-- onStart

registerWithMaster() //向master注册本人(worker)

sendRegisterMessageToMaster(masterEndpoint) //发送注册信息

-- 注册胜利后,每15秒执行一次,发送心跳

sendToMaster(Heartbeat(workerId, self))

masterRef.send(message)

尚硅谷大数据培训_业余的大数据培训机构_值得信赖的大数据教程
大数据
大数据教程
大数据培训
尚硅谷大数据拼课程、论口碑更给力
尚硅谷IT培训
立刻征询
Spark启动流程总结

A跟B通信,A拿到B的EndPointRef,通过send办法发送一个样例类进行通信。样例类携带更多信息,相似通信协议

B会有receive办法收到信息通过模式匹配进行匹配信息

Spark提交流程分析

因为Spark能够以多种模式运行,国内多以YARN模式进行提交,所以此处以YARN的Cluster模式下的Spark提交流程进行分析。

SparkSubmit

SparkSubmit的作用次要就是两个:

  1. 解析参数
  2. 提交参数,初始数环境,并获取"org.apache.spark.deploy.yarn.YarnClusterApplication"的对象,调用对象的start办法

org.apache.spark.deploy.SparkSubmit

main

-- submit.doSubmit(args) //执行提交

-- doSubmit

submit(appArgs, uninitLog)

-- doRunMain()

//执行主办法

runMain(args, uninitLog)

//筹备提交环境

  1. val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

//"org.apache.spark.deploy.yarn.YarnClusterApplication"赋给childMainClass 重点!!

//如果是client模式,则childMainClass就是提交jar包的主类

childMainClass = YARN_CLUSTER_SUBMIT_CLASS

//反射加载childMainClass类

  1. mainClass = Utils.classForName(childMainClass)

//创立mainClass实例并且转为SparkApplication类型

//SparkApplication是YarnClusterApplication的父类

  1. val app = mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]

//最终调用YarnClusterApplication的start办法

  1. app.start(childArgs.toArray, sparkConf)

//client的结构器创立YarnClient对象,用于连贯ResourceManager

new Client(new ClientArguments(args), conf, null).run()

--run

this.appId = submitApplication()//提交利用到ResourceManager,返回appid

-- submitApplication

val containerContext = createContainerLaunchContext(newAppResponse)//

//确定applicationMaster是谁

//如果yarn模式:就是applicationMaster

//如果是client模式:就是executorLauncher

-- amClass = org.apache.spark.deploy.yarn.ApplicationMaster

val appContext = createApplicationSubmissionContext(newApp, containerContext)

yarnClient.submitApplication(appContext)

//提交利用到ResourceManager,让resourcemanager启动container(applicationMaster)

此时第一个过程spark submit曾经实现(能够间接把这个过程kill掉也没问题)

ApplicationMaster

此时到am过程启动,而这个过程次要的作用如下:

  1. 封装ApplicationMaster的参数
  2. 依据参数,创立ApplicationMaster对象
  3. 执行ApplicationMaster的run办法,在run办法中,最初调用到runDriver办法

//解析完各种参数,new 一个applicationMaster

master = new ApplicationMaster(amArgs, sparkConf, yarnConf)

//执行applicationMaster的run办法

master.run

if (isClusterMode) { //集群模式

runDriver()//集群模式就运行driver

} else { // client 模式

runExecutorLauncher()

}

//am启动第一件事就是跑driver,启动应用程序

runDriver()

  1. userClassThread = startUserApplication() //启动应用程序,也就是执行提交的jar包中的主函数

//加载参数穿过来的用户类 即提交时指定的--class

1.1 val mainMethod = userClassLoader.loadClass(args.userClass)

.getMethod("main", classOf[Array[String]])

1.2 new Thread ... //创立一个线程,线程名就叫driver,并返回这个线程

2.userThread.start()//执行这个driver线程的run办法,线程执行的就是咱们提交应用程序类的主函数

//期待sc初始化,初始化完后才持续往下执行

  1. val sc: SparkContext = ThreadUtils.awaitResult(sparkContextPromise.future,

Duration(totalWaitTime, TimeUnit.MILLISECONDS))

//cs初始化完后向ResourceManager注册applicationMaster

//注册的实质就是向rm申请资源运行executor过程

  1. registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)

-- client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)

-- amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)

//am向rm注册胜利后,创立分配器

  1. -- createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)

-- allocator.allocateResources()//分配资源

-- val allocatedContainers = allocateResponse.getAllocatedContainers()//通过分配器响应 获取 调配到的容器列表

allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)

handleAllocatedContainers(allocatedContainers.asScala)//资源列表大于0,就解决调配到的资源

runAllocatedContainers(containersToUse)//运行调配后的资源,封装指令

ExecutorRunnable.run//启动executor

-- startContainer()

-- val commands = prepareCommand()//封装指令

-- bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend

-- nmClient.startContainer(container.get, ctx)//启动容器,也就是启动executor

!!AM启动结束

AM只有两个子线程,一个主线程,一个子线程(driver)

子线程driver执行用户类(用户传过来的jar包main办法)

主线程

1.次要是注册am(向rm申请分配资源) , rm返回容器给am

  1. am拿到返回容器列表,让nm在容器上执行java命令

-- bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend这个命令

3.最终在nm上启动executor过程

CoarseGrainedExecutorBackend

执行一次- bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend这个命令 就会执行一个新的过程,则是属于并行执行的感觉,和之前执行的内容是离开的。相似咱们在Windows中开了一个微信和qq程序一样,各自执行,互不影响。因为这就是咱们平时说的executor过程

  1. commands=/bin/java/org.apache.spark.executor.CoarseGrainedExecutorBackend,

执行这个指令,那么是调用这个类的main办法。

  1. main办法中:

// 1. 首先是对一些参数进行封装

// 2. 执行run办法

-- run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

// 1.通过driver的uri和Driver进行关联

--driver = fetcher.setupEndpointRefByURI(driverUrl)

// 2.通过通信环境创立了一个终端,名字为executor,

在底层:Executor启动后会注册通信,并收到信息onStart,收到音讯后,会执行通信对象CoarseGrainedExecutorBackend

的onStart办法,点击CoarseGrainedExecutorBackend

--env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

// 1.获取driver的援用

-- driver = Some(ref)

// 2.ExecutorBackend向driver发送音讯,注册executor的音讯,也称之为反向注册

--ref.askBoolean)

// 3.在driver端会接管到这个音讯,因为在driver端,有一个上下文的对象,sparkcontext,在这个类有一个属性:

private var _schedulerBackend: SchedulerBackend = _,点击SchedulerBackend,是一个trait,找到

实现类:CoarseGrainedSchedulerBackend,在这个类中,有一个办法:receiveAndReply():

// executor的援用,在driver端,发送音讯给到ExecutorBackend,注册executor胜利

--executorRef.send(RegisteredExecutor)

// ExecutorBackend类中有一个recive办法,用来接管driver返回的executor注册胜利的音讯,executor是一个计算对象,在这个对象外面有一个线程池,每一个线程来解决一个从driver端发送过去的工作

--executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

整体提交流程图如下图所示:

YARN ClusterClient模式异同

到此,YARN模式下的Cluster提交流程完结,而Client模式的提交与Cluster模式提交的不同如下:

SparkSubmit类中:

childMainClass = args.mainClass//用户类 即提交时的 --class

//而在cluster模式则是org.apache.spark.deploy.yarn.YarnClusterApplication

开始执行用户的main函数:其实在执行 drvier

driver在sparkSubmit过程的主线程中运行

//而cluster的driver则是ApplicationMaster中的子线程,而AM肯定在某一个NM上,所以叫cluster模式

//driver在客户端上运行,所以叫client模式

SparkContext类中//driver中会创立SparkContext

client.submitApplication()

amClass = org.apache.spark.deploy.yarn.ExecutorLauncher

开始启动 AM, 表明上是ExecutorLauncher, 实质还是ApplicationMaster

runExecutorLauncher()


不同:

  1. driver的地位不一样

cluset: 在ApplicationMaster过程中, 是它的一个子线程

client: 在SparkSubmit的过程中, 而且是在他的主线程中执行的.

  1. AM的名字不一样

cluster: ApplicationMaster

client: ExecutorLauntcher

Spark提交流程总结

用大白话解释提交流程源码就是:

执行suspark-submit后会有三个过程

SparkSubmit

ApplicationMaster

YarnCoarseGrainedExecutorBackend:粗粒度执行器后端, 也就是Executor

找个客户端执行sparksubmit

执行SparkSubmit类里的main办法,筹备环境,解析一堆参数

获取一个childMainClass的值并且反射创立这个类

如果是cluster模式他的值就是YarnClusterApplication

如果是client模式他的值就是提交jar包的主类

通过反射创立childMainClass失去YarnClusterApplication并且强转为SparkApplication

调用SparkApplication的start办法

创立一个client去连贯rm,并且获取到rm返回appId

封装一个指令让rm找一台nm启动一个am

这个指令如果是cluster那么启动的类就是applicationMaster,如果是client就是启动executorLauncher

此时SparkSubmit工作实现,如果是cluster模式,那么间接把这个过程kill掉也没事

ApplicationMaster

启动一个ApplicationMaster过程后,解析各种参数后封装一个ApplicationMaster对象

封装好的ApplicationMaster对象会开启一个线程运行用户类(提交的jar包)的main函数,这个线程就是driver线程,

在am过程主办法,会期待获取SparkContext,等到获取后就会向rm注册本人并申请资源,rm返回容器列表(这里申请资源细节比拟多)

am拿到容器列表,就会在nm启动executor过程

YarnCoarseGrainedExecutorBackend过程启动胜利后

启动后第一件事向driver线程反向注册

注册胜利后,executor过程会创立executor计算对象

计算对象里有一个线程池,每一个线程来解决一个driver端发过来的工作

关键词:大数据培训