共计 1970 个字符,预计需要花费 5 分钟才能阅读完成。
private[netty] def askT: ClassTag: Future[T] = {
// 定义了一个 Any 的 promise
val promise = Promise[Any]()
val remoteAddr = message.receiver.address
def onFailure(e: Throwable): Unit = {if (!promise.tryFailure(e)) {
e match {case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
case _ => logWarning(s"Ignored failure: $e")
}
}
}
/*
这里申明的 onSuccess 会被填充到 RpcResponseCallback 的 onSuccess 中,这个
RpcResponseCallback 就是下面【图 9】中的 listener,当咱们从 Server 端获取到 response 后
留神,获取的不是 RpcFailure 类型的 response,则都会进入到【图 9】的
else if (message instanceof RpcResponse) { 分支中
*/
def onSuccess(reply: Any): Unit = reply match {case RpcFailure(e) => onFailure(e)
case rpcReply =>
/*
当返回的 response 是 OK 的没有问题后,onSuccess 被 callback,这里 promise 的 trySuccess 也
进行 call 操作,[PayPal 下载](https://www.gendan5.com/wallet/PayPal.html) 这里就是下面所说的,为了一个 promise 铺设了一条 future,从而能够执行
这个 Future 的线程了
*/
if (!promise.trySuccess(rpcReply)) {logWarning(s"Ignored message: $reply")
}
}
try {if (remoteAddr == address) {val p = Promise[Any]()
p.future.onComplete {case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
dispatcher.postLocalMessage(message, p)
} else {val rpcMessage = RpcOutboxMessage(message.serialize(this),
onFailure,
(client, response) => **onSuccess**(deserialize[Any](client, response)))
postToOutbox(message.receiver, rpcMessage)
/*
如果是 callback 了 Failure,则这里会被执行
*/
promise.future.failed.foreach {case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
}(ThreadUtils.sameThread)
}
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {override def run(): Unit = {onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr}" +
s"in ${timeout.duration}"))
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
/*
当 promise 的 future 执行后,会调用这里的 onComplete 办法
*/
promise.future.onComplete { v =>
timeoutCancelable.cancel(true)
}(ThreadUtils.sameThread)
} catch {case NonFatal(e) =>
onFailure(e)
}
/*
利用 RpcTimeout 中的 addMessageIfTimeout 的偏函数再去模式匹配一下产生的 Throwable 内容
如果是 RpcTimeoutException 则 间接 throw 这个 ex
如果是 TimeoutException 则包装成 RpcTimeoutException 后再 throw 进来
*/
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}
正文完