背景

Spark中有很多异步解决的例子,每一个中央都值得好好去扫视一番,对辅助了解spark的机理以及为本人写出优雅的代码都会有很大的帮忙。

NettyRpcEnv.ask解读

RpcEnv作用

NettyRpcEnvRpcEnv的在spark中的惟一一个实现。RpcEnv是什么呢,能够先看一下它的class头信息

/** * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote * nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by * [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the * sender, or logging them if no such sender or `NotSerializableException`. * * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. */

就是一句话,RPC的环境。在这里,最重要的2个操作莫过于

  • 能够去注册RpcEndpoint
  • 能够去异步获取RpcEndpointRef

RpcEndpointRpcEndpointRef是什么呢,在这里不做具体赘述,其余的文章中会具体阐明,简略来讲一下

简略回顾RpcEndpointRpcEndpointRef

RpcEndpoint

  • RpcEndpoint

    家喻户晓,spark外部会有executordriver等角色,他们之间的通信都采纳利用Netty,在executor或者driver上并不是只启动1个Netty的服务,针对不同的性能会有多个Netty的RPC服务开启,利用不同的端口号进行辨别。服务间通信后,通的“信”被很多种逻辑单元来解决,如Inbox,如EventLoop等,这些都是工具级别的单元,而被形象进去作为可插拔可扩大的大的逻辑功能模块在Spark中就叫做RpcEndpoint,它是用来解决从其余client端发送或者server端返回过去的message的模块。RpcEndpoint自身是一个trait,它能够有多种的实现

RpcEndpointRef

  • RpcEndpointRef

    spark之前的网络通信都是采纳akka,改版后采纳的是Netty,在akka中,如果一个两个节点间的通信是利用目标方的actorRef来进行的通信的,即AActor 心愿发送音讯到 BActor,须要BActorRef来发送音讯。Spark的网络通信降级到Netty后,Endpoint就能够间接理解成原来的Actor,那么发送音讯到另一个Actor的话,也须要RpcEndpoint的Ref,即RpcEndpointRef。这个概念乍一看有点懵,试想,从A发送音讯到B,能发送的前提是A先领有了一个B的”援用“,这在一般的Http服务中貌似很不能被了解,我想拜访某一台机器按说只须要晓得对方的IP和Port不就OK了,当初还须要对方的一个“替身”?这是什么鬼?带着问题咱们能够继续往下看即可,这里你只须要这样意识即可:

    • 用来拜访B machine的RpcEndpointRef你了解成就是B machine的IP和Port的一个被包装后的实例即可

图解RpcEndpointRpcEndpointRef

  • 图解一下

    A machine能够是物理机能够是虚拟机,B machine能够是和A同一台物理机、虚拟机(端口号不同),也能够是不同的(在spark中甚至于有本人发给本人的msg,后续会讲)。那么从A发送音讯到B的话,应用的是B的RpcEndpointRef,通过它发送音讯到B machine

  • 【图1】要如何拜访

  • 【图2】外部的原理

  • 【图3】B machine的RpcEndpointRef的实例是啥(简化版)

简略回顾Driver和Executor

Ask,顾名思义——问。可能是打个招呼,看看在不在,询问一下,等等。这个就是NettyRpcEnv.ask的作用所在。为了讲NettyRpcEnv.ask的作用,还须要简略的串一下一下概念和流程

Driver线程和Executor过程

首先,须要明确两个事件,在yarn环境下

  • Driver是在ApplicatioMaster过程中执行的一个线程

    严格来说,其实这个说法也不太正确,因为Driver其实是在用户的class的时候,在造成sparkContext上下文环境的一个产物,自身执行的其实是用户class线程,在这个线程中建设了SparkEnv以及RpcEnv等等,并且建设了Driver的Netty的Service等等,与Executor互相通信

  • Executor则是一个个的过程,通过java命令在每一个节点上启动的

Yarn系列以及 ApplicatioMaster是什么这里不做赘述,其余文章中会细讲。

其次,在这里只须要理解到,Driver自身是一个协调调度节点,它能够去分配任务给Executor,并且把握着Executor的状况,调配就是把Task发送给Executor,把握则指的是须要晓得Executor的运行状况等等。

  • 【图4】图解一下

    举个栗子,1个Driver和2个Executor进行交互通信,Driver手握2个Executor(一个叫做E1,一个叫做E2)的RpcEndpointRef,权且简称为E1Ref和E2Ref,通过这2个Ref发送msg到E1节点和E2节点,这2个节点自身通过本身的RpcEndpoint来解决msg。而E1和E2自身还要定期起的向Driver汇报本身的状况,这里叫做heartbeat心跳,那么反过来则是利用各自外部把握的DriverEpcEndpointRef来发送heartbeatDriver,而Driver利用其本人的DriverRpcEndpoint来解决heartbeat的msg。所有节点的下面的组建则都在本身的NettyRpcEnv中,也就是RpcEnv的实现。

举例:在RpcEnv中建设一个DriverRpcEndpointRef

背景

终于要说到本篇的内容了,NettyRpcEnv.ask的解读,须要有一个场景调用NettyRpcEnv.ask的办法才能够,那能够在题中所述的RpcEnv中建设一个DriverRpcEndpointRef这个场景中形容

RpcEnv中为啥建设DriverRpcEndpointRef

下面的【图4】介绍了一个Driver和和Executor之间通信的过程。其实,在ApplicationMaster中构建Driver线程的时候,有一部分的通信是须要通过DriverRpcEndpointRef进行的,即利用DriverRpcEndpointRef发送msg给DriverRpcEndpointDriverRpcEndpoint做出解决并响应

  • 【图5】图解一下

    • ApplicationMaster 中启动【Run】Driver的线程后,从Driver线程中拿到了NettyRpcEnv
    • 并且利用NettyRpcEnvsetupEndpointRef办法【Get】到两个DriverEndpointRef
    • 后续通过【Use】这个DriverEndpointRef去拜访Driver的DriverEndpoint
    • 有一点须要阐明的是,ApplicationMaster的节点自身也是Driver的节点,其实拜访DriverDriverEndpoint按说是能够间接拜访的(Spark源代码中没有这样实现,还是为了隔离和封装的更好,缩小耦合,今后Driver如果作为过程执行,不再ApplicationMaster上运行也会批改的较为简单),然而这里还是采纳了Netty的Rpc拜访形式

看看源代码

这部分代码在ApplicationMaster.scala中,关注办法runDriver即可

  • 【图6】图解一下

    • (I) 有一台IP是10.1.2.5的服务器,启动了ApplicationMaster
    • (II) a过程,在这个节点上启动了Driver的线程,并且初始化了用户的class,并且在10.1.2.5节点上启动了一个Netty的serviec,IP和Port为10.1.2.5:13200
    • (III) b过程,ApplicationMaster节点上持续调用RpcEnv.setupEndpointRef,目标是setup一个DriverDriverEndpointRefRpcEnv中,这个setup的过程就是去10.1.2.5:13200拜访一下,如果服务通了,则构建出DriverEndpointRef,这个“拜访一下”即本文所述要用到的NettyRpcEnv.ask的办法。
    • 能够看到调用程序为

      • (ApplicationMaster.scala) rpcEnv.setupEndpointRef ↓
      • (NettyRpcEnv.scala) NettyRpcEnv.asyncSetupEndpointRefByURI ↓
      • (NettyRpcEndpointRef.scala) NettyRpcEndpointRef.ask ↓
      • (NettyRpcEnv.scala) NettyRpcEnv.ask — — — — ↓ (通过多个步骤,两头局部省略,其余文章会讲)
      • 10.1.2.5:13200 的netty服务

  • 代码如下

    private def runDriver(): Unit = {    addAmIpFilter(None)        /*        这里,调用startUserApplication办法来执行用户的class,也就是咱们的jar包,        invoke咱们的main办法,从而启动了sparkContext,外部启动一系列的scheduler以及        backend,以及taskscheduler等等等等core的内容,其余篇章会具体解说        */        userClassThread = startUserApplication()    // This a bit hacky, but we need to wait until the spark.driver.port property has    // been set by the Thread executing the user class.    logInfo("Waiting for spark context initialization...")    val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)    try {      /*            这里,阻塞的期待SparkContext从Driver线程中返回回来            */      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,        Duration(totalWaitTime, TimeUnit.MILLISECONDS))      if (sc != null) {        rpcEnv = sc.env.rpcEnv        val userConf = sc.getConf        val host = userConf.get("spark.driver.host")        val port = userConf.get("spark.driver.port").toInt        registerAM(host, port, userConf, sc.ui.map(_.webUrl))                /*                **这里,演出了好戏,通过NettyRpcEnv的setupEndpointRef办法来获取到driverRef                这个外面其实是去ask一下Driver你在吗?是否存在这个Driver的服务,如果存在,则                返回OK,构建出Driver的Ref**                */        val driverRef = rpcEnv.setupEndpointRef(          RpcAddress(host, port),          YarnSchedulerBackend.ENDPOINT_NAME)        createAllocator(driverRef, userConf)      } else {        // Sanity check; should never happen in normal operation, since sc should only be null        // if the user app did not create a SparkContext.        throw new IllegalStateException("User did not initialize spark context!")      }      resumeDriver()      userClassThread.join()    } catch {      case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>        logError(          s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +           "Please check earlier log output for errors. Failing the application.")        finish(FinalApplicationStatus.FAILED,          ApplicationMaster.EXIT_SC_NOT_INITED,          "Timed out waiting for SparkContext.")    } finally {      resumeDriver()    }  }

解读NettyRpcEnv.ask

回顾Future

如何了解Future呢,从字面意思能够很好的了解,Future即将来,也是期货的意思。

说到期货,就充斥了不确定性,因为毕竟没有产生,谁也不晓得将来会怎么。所以,定义一个Future就是定义了一个不在当初这个时空(线程)产生的(将来)的另一个(另一个线程的)事件,相比java的鸡肋的Future,scala的Future堪称是十分优雅且完满,搜寻我的博客能够看到针对scala的Future的具体介绍。

  • 官网文章:https://docs.scala-lang.org/zh-cn/overviews/core/futures.html
  • 这里不从源代码的角度去构建Future和Promise的认知观点,会有其余的文章再做解释
  • 【图7】图解一下

    在java中定义一个线程是右侧的做法,而在左侧的scala中,利用Future则优雅了很多

  • 代码

    import scala.concurrent.ExecutionContext.Implicits.globalimport scala.concurrent.Future/** * 解读Future的根底 */object DocFutureTest {  def apply(): Unit = {    println("I am DocFutureTest")  }  def main(args: Array[String]): Unit = {    val sleeping = 3000;    val main_thread = Thread.currentThread().getName;            /*        定义另一个线程产生的事件        这个事件相当于java中的如下的代码块:        从整体的间接性上看,scala的更为优雅一些,间接一个Future能够包裹住左右须要解决的内容        后续如果须要进行异样解决的话还能够依据Success和Failture进行模式匹配        public class JavaThreading {                    public static void main(String[] args) throws InterruptedException {                new Thread(                        () -> System.out.println("这是一条产生在另一个叫做叫做" + Thread.currentThread().getName() + " 线程的故事")                ).start();                        System.out.println(Thread.currentThread().getName());                Thread.sleep(3000);            }        }        */    var future_run = Future {      Thread.sleep(1000)      println("这是一条产生在另一个叫做叫做" + Thread.currentThread().getName +" 线程的故事")    }    // 主线程劳动3000ms        // 如果不劳动的话,main线程会先进行,导致下面的Future定义的thread还没有被执行到就完结了    Thread.sleep(sleeping)    println(s"$main_thread 线程劳动 $sleeping 毫秒")  }}
  • Future + callback(截取局部)

    case class ExceptionError(error: String) extends Exception(error)  def main(args: Array[String]): Unit = {    val sleeping = 3000;    val main_thread = Thread.currentThread().getName;    // 定义另一个线程产生的事件    var future_run = Future {      Thread.sleep(1000)      prntln("这是一条产生在另一个叫做叫做" + Thread.currentThread().getName + " 线程的故事")        // 如果须要onFailure的话 则开释此句        // throw ExceptionError("error")     future_run onFailure {      case t => println("exception " + t.getMessage)    }    future_run onSuccess {      case _ => println("success")    }
  • 留神点

    • 定义了Future,则定义了须要执行的线程的执行体(body),那么执行也是立即马上,相似于java定义了一个Thread,而后间接调用了start()一样
    • 在Future中大量使用了scala的Try[],如果呈现了异样,没有做onFailure的解决,那么可能看不到异样被抛出来,这点和java有较大区别

回顾Promise

从皮毛简略的说完了Future,那Promise又是什么呢?其实在Future的实现中蕴含了Promise的实现,也就是说没有Promise,Future是无奈被运行的。从字面的了解,Promise是承诺,有了Future的将来的定义,那么须要给出一个确切的承诺才能够进行,否则都是空口无凭天马行空无奈兑现的大话。

  • 官网文章:https://docs.scala-lang.org/zh-cn/overviews/core/futures.html

说到当初,包含看完以上的Future的介绍,很多人必定还是懵b状态,因为我刚开始接触的时候也是这样,但我喜爱的就是用最直观的图和设想来形容一个形象的问题,二话不说,持续上图

  • 【图8】图解一下FuturePromise的关系

    • Future的含意

      • 主线人生是你的Main Thread,在spark中可能是某一个解决的Thread
      • 在Now这个时刻,你开启了变成star之路(become star的Thread)
      • 在Now这个时刻,你开启了变帅之路(become handsome的Thread)
      • 一旦你开启这两条路,只有你的Main Thread没有完结,那么你能够始终继续的去走完这两个“之路”,直到Success或者Failure,这就是Future,能够了解为,开启了一条新的轨迹

- **Promiose的含意**    - 当你开启了两条新的“之路”的时候,我能够在你两条路的重点给你不同的承诺    - 当你success的实现了Future的时候,我promise你一个后果    - 当你failure的实现了Future的时候,我promise你另一个后果

- **Future与Promise的比照**    - Future是一条线,蕴含执行过程的一条线,依照Timeline要去走上来    - Promise是一个点,一个被触发的点,想达到这个点必须又一个Future搭出一条门路才能够    下面两句话如何了解呢,你能够这么想,人生(Main Thread)是一个数轴,你如果心愿依照timeline向着右侧始终后退就须要有一条间断的路线,这个“路线”就是一个Thread,也能够是Future定义出的路线,咱们只能好高鹜远的通过路线走到指标起点,而不能间接跳到起点。Promise相似于一个milestone点,如果只有一个Promise,不定义出“路线”也就是不定义出一个Future(或Main Thread)的话,是无奈实现这个Promise的。只定义了Promise,不去思考直线门路(Future),无奈实现,但只定义Future,不定义Promise(其实在Future中是内置了Default的Promise的)是能够间接执行Future的。如下图所示,开出了两张空头支票,没有定义具体的路线(Future 实现形式),那么这两个Promise是无奈兑现的。    须要留神一点,这张图只是画出了定义了Promise,然而如果想对象这个Promise的话,是能够通过Promise中的办法来搭建出一个Future来执行的,与Future不同的是,Future只有定义了就能够马上执行,Promise定义了的话,必须要显式的触发“搭建Future”的操作才能够。

  • 看看Promise不执行的代码

    这里,咱们定义了一个Promise,并且“承诺”在Promise对应的future完结后调用一个map操作打印出一句话future:....

    但咱们执行以下语句的时候会发现什么都没有执行

    import scala.concurrent.Promiseimport scala.util.{Failure, Success}object PromiseTest {  def main(args: Array[String]): Unit = {    import scala.concurrent.ExecutionContext.Implicits.global    val promise = Promise[String]        promise.future.onComplete(v => println("onComplete " + v))    promise.future.map(str => println("future: " + str + " ==> " + Thread.currentThread().getName))    promise.future.failed.foreach(e => println(e + " ==> " + Thread.currentThread().getName))    Thread.sleep(3000)  }}
  • 看看能够执行的代码

    与下面代码惟一不同的就是,退出了promise.trySuccess的解决

    代码细节在其余篇章咱们具体看,这里你能够这样了解,退出和trySuccess,就是为达到Promise搭建了一条Future之路,并触发这条路开始执行(start)

    至于trySuccess,tryComplete等具体的细节讲scala多线程的中央能够细说

    • **promise.future.onComplete**

      在Future执行结束后的callback解决,无论是Success还是Failure都能够执行这个onComplete解决

    • **promise.future.map**

      promise.future.onComplete之后对Future进行的持续的map解决

    • promise.trySuccess

      触发整个Future执行的trigger

    import scala.concurrent.Promiseimport scala.util.{Failure, Success}object PromiseTest {  def main(args: Array[String]): Unit = {    import scala.concurrent.ExecutionContext.Implicits.global    val promise = Promise[String]        promise.future.onComplete(v => println("onComplete " + v))    promise.future.map(str => println("future: " + str + " ==> " + Thread.currentThread().getName))    promise.future.failed.foreach(e => println(e + " ==> " + Thread.currentThread().getName))        **promise.trySuccess("try success "  + " --> " + Thread.currentThread().getName)**    Thread.sleep(3000)  }}

ask的代码

其实,讲完了下面的所有的内容后,ask的代码感觉几句话就能够解说结束了。

ask自身返回的是Future,自身是异步解决

  • 【图9】图解一下

    一台10.1.1.1的client机器通过rpc拜访一台10.1.1.2的Netty的service,当response正确返回后,在client机器中的TransportResponseHandler中进行判断解决,并且调用listener的onSuccess办法,这个onSuccess办法则是上面的ask代码中定义的办法。在这个办法中自身又去执行了promise的tryComplete,从而触发了promise的future之路执行

private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {        // 定义了一个Any的promise    val promise = Promise[Any]()    val remoteAddr = message.receiver.address    def onFailure(e: Throwable): Unit = {      if (!promise.tryFailure(e)) {        e match {          case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")          case _ => logWarning(s"Ignored failure: $e")        }      }    }        /*         这里申明的onSuccess会被填充到RpcResponseCallback的onSuccess中,这个        RpcResponseCallback就是下面【图9】中的listener,当咱们从Server端获取到response后        留神,获取的不是RpcFailure类型的response,则都会进入到【图9】的        else if (message instanceof RpcResponse) { 分支中        */    def onSuccess(reply: Any): Unit = reply match {      case RpcFailure(e) => onFailure(e)      case rpcReply =>                /*                当返回的response是OK的没有问题后,onSuccess被callback,这里promise的trySuccess也                进行call操作,这里就是下面所说的,为了一个promise铺设了一条future,从而能够执行                这个Future的线程了                */        if (!promise.trySuccess(rpcReply)) {          logWarning(s"Ignored message: $reply")        }    }    try {      if (remoteAddr == address) {        val p = Promise[Any]()        p.future.onComplete {          case Success(response) => onSuccess(response)          case Failure(e) => onFailure(e)        }(ThreadUtils.sameThread)        dispatcher.postLocalMessage(message, p)      } else {        val rpcMessage = RpcOutboxMessage(message.serialize(this),          onFailure,          (client, response) => **onSuccess**(deserialize[Any](client, response)))        postToOutbox(message.receiver, rpcMessage)                /*                如果是callback了Failure,则这里会被执行                */        promise.future.failed.foreach {          case _: TimeoutException => rpcMessage.onTimeout()          case _ =>        }(ThreadUtils.sameThread)      }      val timeoutCancelable = timeoutScheduler.schedule(new Runnable {        override def run(): Unit = {          onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +            s"in ${timeout.duration}"))        }      }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)            /*            当promise的future执行后,会调用这里的onComplete办法            */        promise.future.onComplete { v =>        timeoutCancelable.cancel(true)      }(ThreadUtils.sameThread)    } catch {      case NonFatal(e) =>        onFailure(e)    }        /*        利用RpcTimeout中的addMessageIfTimeout的偏函数再去模式匹配一下产生的Throwable内容        如果是RpcTimeoutException 则 间接throw这个ex        如果是TimeoutException 则包装成RpcTimeoutException后再throw进来        */    promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)  }

总结

本篇用小篇幅解说了一下o.a.s.rpc.netty.NettyRpcEnv.ask()的办法,简略形容了一个spark的异步解决的小case,这个小case须要不少的先验知识点,可能忽然间看到这里有点懵,学习须要死记硬背一点点的来积攒才能够,如果不明确能够缓缓积攒其余模块的常识再来这里看流水账会更有播种。