private[netty] def askT: ClassTag: Future[T] = {
// 定义了一个Any的promiseval promise = Promise[Any]()val remoteAddr = message.receiver.addressdef onFailure(e: Throwable): Unit = { if (!promise.tryFailure(e)) { e match { case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e") case _ => logWarning(s"Ignored failure: $e") } }} /* 这里申明的onSuccess会被填充到RpcResponseCallback的onSuccess中,这个 RpcResponseCallback就是下面【图9】中的listener,当咱们从Server端获取到response后 留神,获取的不是RpcFailure类型的response,则都会进入到【图9】的 else if (message instanceof RpcResponse) { 分支中 */def onSuccess(reply: Any): Unit = reply match { case RpcFailure(e) => onFailure(e) case rpcReply => /* 当返回的response是OK的没有问题后,onSuccess被callback,这里promise的trySuccess也 进行call操作,[PayPal下载](https://www.gendan5.com/wallet/PayPal.html)这里就是下面所说的,为了一个promise铺设了一条future,从而能够执行 这个Future的线程了 */ if (!promise.trySuccess(rpcReply)) { logWarning(s"Ignored message: $reply") }}try { if (remoteAddr == address) { val p = Promise[Any]() p.future.onComplete { case Success(response) => onSuccess(response) case Failure(e) => onFailure(e) }(ThreadUtils.sameThread) dispatcher.postLocalMessage(message, p) } else { val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure, (client, response) => **onSuccess**(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) /* 如果是callback了Failure,则这里会被执行 */ promise.future.failed.foreach { case _: TimeoutException => rpcMessage.onTimeout() case _ => }(ThreadUtils.sameThread) } val timeoutCancelable = timeoutScheduler.schedule(new Runnable { override def run(): Unit = { onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " + s"in ${timeout.duration}")) } }, timeout.duration.toNanos, TimeUnit.NANOSECONDS) /* 当promise的future执行后,会调用这里的onComplete办法 */ promise.future.onComplete { v => timeoutCancelable.cancel(true) }(ThreadUtils.sameThread)} catch { case NonFatal(e) => onFailure(e)} /* 利用RpcTimeout中的addMessageIfTimeout的偏函数再去模式匹配一下产生的Throwable内容 如果是RpcTimeoutException 则 间接throw这个ex 如果是TimeoutException 则包装成RpcTimeoutException后再throw进来 */promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}