关于spark:Spark源代码Spark多线程NettyRpcEnvask解读

88次阅读

共计 12682 个字符,预计需要花费 32 分钟才能阅读完成。

背景

Spark 中有很多异步解决的例子,每一个中央都值得好好去扫视一番,对辅助了解 spark 的机理以及为本人写出优雅的代码都会有很大的帮忙。

NettyRpcEnv.ask 解读

RpcEnv 作用

NettyRpcEnvRpcEnv 的在 spark 中的惟一一个实现。RpcEnv是什么呢,能够先看一下它的 class 头信息

/**
 * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
 * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
 * nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by
 * [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the
 * sender, or logging them if no such sender or `NotSerializableException`.
 *
 * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.
 */

就是一句话,RPC 的环境。在这里,最重要的 2 个操作莫过于

  • 能够去注册RpcEndpoint
  • 能够去异步获取RpcEndpointRef

RpcEndpointRpcEndpointRef是什么呢,在这里不做具体赘述,其余的文章中会具体阐明,简略来讲一下

简略回顾 RpcEndpointRpcEndpointRef

RpcEndpoint

  • RpcEndpoint

    家喻户晓,spark 外部会有 executordriver 等角色,他们之间的通信都采纳利用 Netty,在 executor 或者 driver 上并不是只启动 1 个 Netty 的服务,针对不同的性能会有多个 Netty 的 RPC 服务开启,利用不同的端口号进行辨别。服务间通信后,通的“信”被很多种逻辑单元来解决,如 Inbox,如EventLoop 等,这些都是工具级别的单元,而被形象进去作为可插拔可扩大的大的逻辑功能模块在 Spark 中就叫做 RpcEndpoint,它是用来解决从其余client 端发送或者 server 端返回过去的 message 的模块。RpcEndpoint自身是一个 trait,它能够有多种的实现

RpcEndpointRef

  • RpcEndpointRef

    spark 之前的网络通信都是采纳 akka,改版后采纳的是 Netty,在 akka 中,如果一个两个节点间的通信是利用目标方的 actorRef 来进行的通信的,即 AActor 心愿发送音讯到 BActor,须要 BActorRef 来发送音讯。Spark 的网络通信降级到 Netty 后,Endpoint 就能够间接理解成原来的 Actor,那么发送音讯到另一个 Actor 的话,也须要 RpcEndpoint 的 Ref,即RpcEndpointRef。这个概念乍一看有点懵,试想,从 A 发送音讯到 B,能发送的前提是 A 先领有了一个 B 的”援用“,这在一般的 Http 服务中貌似很不能被了解,我想拜访某一台机器按说只须要晓得对方的 IP 和 Port 不就 OK 了,当初还须要对方的一个“替身”?这是什么鬼?带着问题咱们能够继续往下看即可,这里你只须要这样意识即可:

    • 用来拜访 B machine 的 RpcEndpointRef 你了解成就是 B machine 的 IP 和 Port 的一个被包装后的实例即可

图解 RpcEndpointRpcEndpointRef

  • 图解一下

    A machine 能够是物理机能够是虚拟机,B machine 能够是和 A 同一台物理机、虚拟机(端口号不同),也能够是不同的(在 spark 中甚至于有本人发给本人的 msg,后续会讲)。那么从 A 发送音讯到 B 的话,应用的是 B 的RpcEndpointRef,通过它发送音讯到 B machine

  • 【图 1】要如何拜访

  • 【图 2】外部的原理

  • 【图 3】B machine 的 RpcEndpointRef 的实例是啥(简化版)

简略回顾 Driver 和 Executor

Ask,顾名思义——问。可能是打个招呼,看看在不在,询问一下,等等。这个就是 NettyRpcEnv.ask 的作用所在。为了讲 NettyRpcEnv.ask 的作用,还须要简略的串一下一下概念和流程

Driver 线程和 Executor 过程

首先,须要明确两个事件,在 yarn 环境下

  • Driver是在 ApplicatioMaster 过程中执行的一个线程

    严格来说,其实这个说法也不太正确,因为 Driver 其实是在用户的 class 的时候,在造成 sparkContext 上下文环境的一个产物,自身执行的其实是用户 class 线程,在这个线程中建设了 SparkEnv 以及 RpcEnv 等等,并且建设了 Driver 的 Netty 的 Service 等等,与 Executor 互相通信

  • Executor则是一个个的过程,通过 java 命令在每一个节点上启动的

Yarn 系列以及 ApplicatioMaster是什么这里不做赘述,其余文章中会细讲。

其次,在这里只须要理解到,Driver自身是一个协调调度节点,它能够去分配任务给 Executor,并且把握着Executor 的状况,调配就是把 Task 发送给 Executor,把握则指的是须要晓得Executor 的运行状况等等。

  • 【图 4】图解一下

    举个栗子,1 个 Driver 和 2 个 Executor 进行交互通信,Driver手握 2 个 Executor(一个叫做 E1,一个叫做 E2)的RpcEndpointRef,权且简称为 E1Ref 和 E2Ref,通过这 2 个 Ref 发送 msg 到 E1 节点和 E2 节点,这 2 个节点自身通过本身的RpcEndpoint 来解决 msg。而 E1 和 E2 自身还要定期起的向 Driver 汇报本身的状况,这里叫做 heartbeat 心跳,那么反过来则是利用各自外部把握的 DriverEpcEndpointRef 来发送 heartbeatDriver,而 Driver 利用其本人的 DriverRpcEndpoint 来解决 heartbeat 的 msg。所有节点的下面的组建则都在本身的 NettyRpcEnv 中,也就是 RpcEnv 的实现。

举例:在 RpcEnv 中建设一个 DriverRpcEndpointRef

背景

终于要说到本篇的内容了,NettyRpcEnv.ask的解读,须要有一个场景调用 NettyRpcEnv.ask 的办法才能够,那能够在题中所述的 RpcEnv 中建设一个 DriverRpcEndpointRef 这个场景中形容

RpcEnv 中为啥建设 DriverRpcEndpointRef

下面的【图 4】介绍了一个 Driver 和和 Executor 之间通信的过程。其实,在 ApplicationMaster 中构建 Driver 线程的时候,有一部分的通信是须要通过 DriverRpcEndpointRef 进行的,即利用 DriverRpcEndpointRef 发送 msg 给 DriverRpcEndpointDriverRpcEndpoint 做出解决并响应

  • 【图 5】图解一下

    • ApplicationMaster 中启动【Run】Driver 的线程后,从 Driver 线程中拿到了NettyRpcEnv
    • 并且利用 NettyRpcEnvsetupEndpointRef办法【Get】到两个DriverEndpointRef
    • 后续通过【Use】这个 DriverEndpointRef 去拜访 Driver 的DriverEndpoint
    • 有一点须要阐明的是,ApplicationMaster的节点自身也是 Driver 的节点,其实拜访 DriverDriverEndpoint按说是能够间接拜访的(Spark 源代码中没有这样实现,还是为了隔离和封装的更好,缩小耦合,今后 Driver 如果作为过程执行,不再 ApplicationMaster 上运行也会批改的较为简单),然而这里还是采纳了 Netty 的 Rpc 拜访形式

看看源代码

这部分代码在 ApplicationMaster.scala 中,关注办法 runDriver 即可

  • 【图 6】图解一下

    • (I) 有一台 IP 是 10.1.2.5 的服务器,启动了ApplicationMaster
    • (II) a 过程,在这个节点上启动了 Driver 的线程,并且初始化了用户的 class,并且在 10.1.2.5 节点上启动了一个 Netty 的 serviec,IP 和 Port 为 10.1.2.5:13200
    • (III) b过程,ApplicationMaster 节点上持续调用 RpcEnv.setupEndpointRef,目标是 setup 一个DriverDriverEndpointRefRpcEnv 中,这个 setup 的过程就是去 10.1.2.5:13200 拜访一下,如果服务通了,则构建出 DriverEndpointRef,这个“拜访一下”即本文所述要用到的NettyRpcEnv.ask 的办法。
    • 能够看到调用程序为

      • (ApplicationMaster.scala) rpcEnv.setupEndpointRef ↓
      • (NettyRpcEnv.scala) NettyRpcEnv.asyncSetupEndpointRefByURI ↓
      • (NettyRpcEndpointRef.scala) NettyRpcEndpointRef.ask ↓
      • (NettyRpcEnv.scala) NettyRpcEnv.ask — — — — ↓(通过多个步骤,两头局部省略,其余文章会讲)
      • 10.1.2.5:13200 的 netty 服务

  • 代码如下

    private def runDriver(): Unit = {addAmIpFilter(None)
    
            /*
            这里,调用 startUserApplication 办法来执行用户的 class,也就是咱们的 jar 包,invoke 咱们的 main 办法,从而启动了 sparkContext,外部启动一系列的 scheduler 以及
            backend,以及 taskscheduler 等等等等 core 的内容,其余篇章会具体解说
            */    
        userClassThread = startUserApplication()
    
        // This a bit hacky, but we need to wait until the spark.driver.port property has
        // been set by the Thread executing the user class.
        logInfo("Waiting for spark context initialization...")
        val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
        try {
    
          /*
                这里,阻塞的期待 SparkContext 从 Driver 线程中返回回来
                */
          val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
            Duration(totalWaitTime, TimeUnit.MILLISECONDS))
          if (sc != null) {
            rpcEnv = sc.env.rpcEnv
    
            val userConf = sc.getConf
            val host = userConf.get("spark.driver.host")
            val port = userConf.get("spark.driver.port").toInt
            registerAM(host, port, userConf, sc.ui.map(_.webUrl))
    
                    /*
                    ** 这里,演出了好戏,通过 NettyRpcEnv 的 setupEndpointRef 办法来获取到 driverRef
                    这个外面其实是去 ask 一下 Driver 你在吗?是否存在这个 Driver 的服务,如果存在,则
                    返回 OK,构建出 Driver 的 Ref**
                    */
            val driverRef = rpcEnv.setupEndpointRef(RpcAddress(host, port),
              YarnSchedulerBackend.ENDPOINT_NAME)
            createAllocator(driverRef, userConf)
          } else {
            // Sanity check; should never happen in normal operation, since sc should only be null
            // if the user app did not create a SparkContext.
            throw new IllegalStateException("User did not initialize spark context!")
          }
          resumeDriver()
          userClassThread.join()} catch {case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
            logError(
              s"SparkContext did not initialize after waiting for $totalWaitTime ms." +
               "Please check earlier log output for errors. Failing the application.")
            finish(FinalApplicationStatus.FAILED,
              ApplicationMaster.EXIT_SC_NOT_INITED,
              "Timed out waiting for SparkContext.")
        } finally {resumeDriver()
        }
      }

解读 NettyRpcEnv.ask

回顾 Future

如何了解 Future 呢,从字面意思能够很好的了解,Future 即将来,也是期货的意思。

说到期货,就充斥了不确定性,因为毕竟没有产生,谁也不晓得将来会怎么。所以,定义一个 Future 就是定义了一个不在当初这个时空(线程)产生的(将来)的另一个(另一个线程的)事件,相比 java 的鸡肋的 Future,scala 的 Future 堪称是十分优雅且完满,搜寻我的博客能够看到针对 scala 的 Future 的具体介绍。

  • 官网文章:https://docs.scala-lang.org/zh-cn/overviews/core/futures.html
  • 这里不从源代码的角度去构建 Future 和 Promise 的认知观点,会有其余的文章再做解释
  • 【图 7】图解一下

    在 java 中定义一个线程是右侧的做法,而在左侧的 scala 中,利用 Future 则优雅了很多

  • 代码

    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.Future
    
    /**
     * 解读 Future 的根底
     */
    object DocFutureTest {def apply(): Unit = {println("I am DocFutureTest")
      }
    
      def main(args: Array[String]): Unit = {
    
        val sleeping = 3000;
        val main_thread = Thread.currentThread().getName;
        
            /*
            定义另一个线程产生的事件
            这个事件相当于 java 中的如下的代码块:从整体的间接性上看,scala 的更为优雅一些,间接一个 Future 能够包裹住左右须要解决的内容
            后续如果须要进行异样解决的话还能够依据 Success 和 Failture 进行模式匹配
            public class JavaThreading {public static void main(String[] args) throws InterruptedException {
                    new Thread(() -> System.out.println("这是一条产生在另一个叫做叫做" + Thread.currentThread().getName() + "线程的故事")
                    ).start();
            
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(3000);
                }
            }
            */
        var future_run = Future {Thread.sleep(1000)
          println("这是一条产生在另一个叫做叫做" + Thread.currentThread().getName +"线程的故事")
        }
    
        // 主线程劳动 3000ms
            // 如果不劳动的话,main 线程会先进行,导致下面的 Future 定义的 thread 还没有被执行到就完结了
        Thread.sleep(sleeping)
        println(s"$main_thread 线程劳动 $sleeping 毫秒")
    
      }
    
    }
  • Future + callback(截取局部)

    case class ExceptionError(error: String) extends Exception(error)
    
      def main(args: Array[String]): Unit = {
    
        val sleeping = 3000;
        val main_thread = Thread.currentThread().getName;
    
        // 定义另一个线程产生的事件
        var future_run = Future {Thread.sleep(1000)
          prntln("这是一条产生在另一个叫做叫做" + Thread.currentThread().getName + "线程的故事")
            // 如果须要 onFailure 的话 则开释此句
            // throw ExceptionError("error") 
    
        future_run onFailure {case t => println("exception" + t.getMessage)
        }
    
        future_run onSuccess {case _ => println("success")
        }
  • 留神点

    • 定义了 Future,则定义了须要执行的线程的执行体(body),那么执行也是立即马上,相似于 java 定义了一个 Thread,而后间接调用了 start() 一样
    • 在 Future 中大量使用了 scala 的 Try[],如果呈现了异样,没有做 onFailure 的解决,那么可能看不到异样被抛出来,这点和 java 有较大区别

回顾 Promise

从皮毛简略的说完了 Future,那 Promise 又是什么呢?其实在 Future 的实现中蕴含了 Promise 的实现,也就是说没有 Promise,Future 是无奈被运行的。从字面的了解,Promise 是承诺,有了 Future 的将来的定义,那么须要给出一个确切的承诺才能够进行,否则都是空口无凭天马行空无奈兑现的大话。

  • 官网文章:https://docs.scala-lang.org/zh-cn/overviews/core/futures.html

说到当初,包含看完以上的 Future 的介绍,很多人必定还是懵 b 状态,因为我刚开始接触的时候也是这样,但我喜爱的就是用最直观的图和设想来形容一个形象的问题,二话不说,持续上图

  • 【图 8】图解一下 FuturePromise的关系

    • Future 的含意

      • 主线人生是你的 Main Thread,在 spark 中可能是某一个解决的 Thread
      • 在 Now 这个时刻,你开启了变成 star 之路(become star 的 Thread)
      • 在 Now 这个时刻,你开启了变帅之路(become handsome 的 Thread)
      • 一旦你开启这两条路,只有你的 Main Thread 没有完结,那么你能够始终继续的去走完这两个“之路”,直到 Success 或者Failure,这就是 Future,能够了解为,开启了一条新的轨迹

- **Promiose 的含意 **
    - 当你开启了两条新的“之路”的时候,我能够在你两条路的重点给你不同的承诺
    - 当你 success 的实现了 Future 的时候,我 promise 你一个后果
    - 当你 failure 的实现了 Future 的时候,我 promise 你另一个后果

- **Future 与 Promise 的比照 **
    - Future 是一条线,蕴含执行过程的一条线,依照 Timeline 要去走上来
    - Promise 是一个点,一个被触发的点,想达到这个点必须又一个 Future 搭出一条门路才能够

    下面两句话如何了解呢,你能够这么想,人生(Main Thread)是一个数轴,你如果心愿依照 timeline 向着右侧始终后退就须要有一条间断的路线,这个“路线”就是一个 Thread,也能够是 Future 定义出的路线,咱们只能好高鹜远的通过路线走到指标起点,而不能间接跳到起点。Promise 相似于一个 milestone 点,如果只有一个 Promise,不定义出“路线”也就是不定义出一个 Future(或 Main Thread)的话,是无奈实现这个 Promise 的。只定义了 Promise,不去思考直线门路(Future),无奈实现,但只定义 Future,不定义 Promise(其实在 Future 中是内置了 Default 的 Promise 的)是能够间接执行 Future 的。如下图所示,开出了两张空头支票,没有定义具体的路线(Future 实现形式),那么这两个 Promise 是无奈兑现的。须要留神一点,这张图只是画出了定义了 Promise,然而如果想对象这个 Promise 的话,是能够通过 Promise 中的办法来搭建出一个 Future 来执行的,与 Future 不同的是,Future 只有定义了就能够马上执行,Promise 定义了的话,必须要显式的触发“搭建 Future”的操作才能够。

  • 看看 Promise 不执行的代码

    这里,咱们定义了一个 Promise,并且“承诺”在 Promise 对应的 future 完结后调用一个 map 操作打印出一句话 future:….

    但咱们执行以下语句的时候会发现什么都没有执行

    import scala.concurrent.Promise
    import scala.util.{Failure, Success}
    
    object PromiseTest {def main(args: Array[String]): Unit = {
        import scala.concurrent.ExecutionContext.Implicits.global
        val promise = Promise[String]
            promise.future.onComplete(v => println("onComplete" + v))
        promise.future.map(str => println("future:" + str + "==>" + Thread.currentThread().getName))
        promise.future.failed.foreach(e => println(e + "==>" + Thread.currentThread().getName))
        Thread.sleep(3000)
      }
    
    }
  • 看看能够执行的代码

    与下面代码惟一不同的就是,退出了 promise.trySuccess 的解决

    代码细节在其余篇章咱们具体看,这里你能够这样了解,退出和 trySuccess,就是为达到 Promise 搭建了一条 Future 之路,并触发这条路开始执行(start)

    至于 trySuccess,tryComplete 等具体的细节讲 scala 多线程的中央能够细说

    • **promise.future.onComplete**

      在 Future 执行结束后的 callback 解决,无论是 Success 还是 Failure 都能够执行这个 onComplete 解决

    • **promise.future.map**

      promise.future.onComplete 之后对 Future 进行的持续的 map 解决

    • promise.trySuccess

      触发整个 Future 执行的 trigger

    import scala.concurrent.Promise
    import scala.util.{Failure, Success}
    
    object PromiseTest {def main(args: Array[String]): Unit = {
        import scala.concurrent.ExecutionContext.Implicits.global
        val promise = Promise[String]
            promise.future.onComplete(v => println("onComplete" + v))
        promise.future.map(str => println("future:" + str + "==>" + Thread.currentThread().getName))
        promise.future.failed.foreach(e => println(e + "==>" + Thread.currentThread().getName))
            **promise.trySuccess("try success"  + "-->" + Thread.currentThread().getName)**
        Thread.sleep(3000)
      }
    
    }

ask 的代码

其实,讲完了下面的所有的内容后,ask 的代码感觉几句话就能够解说结束了。

ask 自身返回的是 Future,自身是异步解决

  • 【图 9】图解一下

    一台 10.1.1.1 的 client 机器通过 rpc 拜访一台 10.1.1.2 的 Netty 的 service,当 response 正确返回后,在 client 机器中的 TransportResponseHandler 中进行判断解决,并且调用 listener 的 onSuccess 办法,这个 onSuccess 办法则是上面的 ask 代码中定义的办法。在这个办法中自身又去执行了 promise 的 tryComplete,从而触发了 promise 的 future 之路执行

private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
        // 定义了一个 Any 的 promise
    val promise = Promise[Any]()
    val remoteAddr = message.receiver.address

    def onFailure(e: Throwable): Unit = {if (!promise.tryFailure(e)) {
        e match {case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
          case _ => logWarning(s"Ignored failure: $e")
        }
      }
    }

        /* 
        这里申明的 onSuccess 会被填充到 RpcResponseCallback 的 onSuccess 中,这个
        RpcResponseCallback 就是下面【图 9】中的 listener,当咱们从 Server 端获取到 response 后
        留神,获取的不是 RpcFailure 类型的 response,则都会进入到【图 9】的
        else if (message instanceof RpcResponse) { 分支中

        */
    def onSuccess(reply: Any): Unit = reply match {case RpcFailure(e) => onFailure(e)
      case rpcReply =>
                /*
                当返回的 response 是 OK 的没有问题后,onSuccess 被 callback,这里 promise 的 trySuccess 也
                进行 call 操作,这里就是下面所说的,为了一个 promise 铺设了一条 future,从而能够执行
                这个 Future 的线程了
                */
        if (!promise.trySuccess(rpcReply)) {logWarning(s"Ignored message: $reply")
        }
    }

    try {if (remoteAddr == address) {val p = Promise[Any]()
        p.future.onComplete {case Success(response) => onSuccess(response)
          case Failure(e) => onFailure(e)
        }(ThreadUtils.sameThread)
        dispatcher.postLocalMessage(message, p)
      } else {val rpcMessage = RpcOutboxMessage(message.serialize(this),
          onFailure,
          (client, response) => **onSuccess**(deserialize[Any](client, response)))
        postToOutbox(message.receiver, rpcMessage)
                /*
                如果是 callback 了 Failure,则这里会被执行
                */
        promise.future.failed.foreach {case _: TimeoutException => rpcMessage.onTimeout()
          case _ =>
        }(ThreadUtils.sameThread)
      }

      val timeoutCancelable = timeoutScheduler.schedule(new Runnable {override def run(): Unit = {onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr}" +
            s"in ${timeout.duration}"))
        }
      }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)

            /*
            当 promise 的 future 执行后,会调用这里的 onComplete 办法
            */
        promise.future.onComplete { v =>
        timeoutCancelable.cancel(true)
      }(ThreadUtils.sameThread)
    } catch {case NonFatal(e) =>
        onFailure(e)
    }

        /*
        利用 RpcTimeout 中的 addMessageIfTimeout 的偏函数再去模式匹配一下产生的 Throwable 内容
        如果是 RpcTimeoutException 则 间接 throw 这个 ex
        如果是 TimeoutException 则包装成 RpcTimeoutException 后再 throw 进来
        */
    promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
  }

总结

本篇用小篇幅解说了一下 o.a.s.rpc.netty.NettyRpcEnv.ask() 的办法,简略形容了一个 spark 的异步解决的小 case,这个小 case 须要不少的先验知识点,可能忽然间看到这里有点懵,学习须要死记硬背一点点的来积攒才能够,如果不明确能够缓缓积攒其余模块的常识再来这里看流水账会更有播种。

正文完
 0