关于java:Spark源代码Spark多线程NettyRpcEnvask解读

private[netty] def askT: ClassTag: Future[T] = {

    // 定义了一个Any的promise
val promise = Promise[Any]()
val remoteAddr = message.receiver.address
def onFailure(e: Throwable): Unit = {
  if (!promise.tryFailure(e)) {
    e match {
      case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
      case _ => logWarning(s"Ignored failure: $e")
    }
  }
}
    /* 
    这里申明的onSuccess会被填充到RpcResponseCallback的onSuccess中,这个
    RpcResponseCallback就是下面【图9】中的listener,当咱们从Server端获取到response后
    留神,获取的不是RpcFailure类型的response,则都会进入到【图9】的
    else if (message instanceof RpcResponse) { 分支中
    */
def onSuccess(reply: Any): Unit = reply match {
  case RpcFailure(e) => onFailure(e)
  case rpcReply =>
            /*
            当返回的response是OK的没有问题后,onSuccess被callback,这里promise的trySuccess也
            进行call操作,[PayPal下载](https://www.gendan5.com/wallet/PayPal.html)这里就是下面所说的,为了一个promise铺设了一条future,从而能够执行
            这个Future的线程了
            */
    if (!promise.trySuccess(rpcReply)) {
      logWarning(s"Ignored message: $reply")
    }
}
try {
  if (remoteAddr == address) {
    val p = Promise[Any]()
    p.future.onComplete {
      case Success(response) => onSuccess(response)
      case Failure(e) => onFailure(e)
    }(ThreadUtils.sameThread)
    dispatcher.postLocalMessage(message, p)
  } else {
    val rpcMessage = RpcOutboxMessage(message.serialize(this),
      onFailure,
      (client, response) => **onSuccess**(deserialize[Any](client, response)))
    postToOutbox(message.receiver, rpcMessage)
            /*
            如果是callback了Failure,则这里会被执行
            */
    promise.future.failed.foreach {
      case _: TimeoutException => rpcMessage.onTimeout()
      case _ =>
    }(ThreadUtils.sameThread)
  }
  val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
    override def run(): Unit = {
      onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +
        s"in ${timeout.duration}"))
    }
  }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
        /*
        当promise的future执行后,会调用这里的onComplete办法
        */
    promise.future.onComplete { v =>
    timeoutCancelable.cancel(true)
  }(ThreadUtils.sameThread)
} catch {
  case NonFatal(e) =>
    onFailure(e)
}
    /*
    利用RpcTimeout中的addMessageIfTimeout的偏函数再去模式匹配一下产生的Throwable内容
    如果是RpcTimeoutException 则 间接throw这个ex
    如果是TimeoutException 则包装成RpcTimeoutException后再throw进来
    */
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)

}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理