一、Promise 介绍
JDK 的 Future 接口大家都比拟相熟,它用于示意 异步操作的后果 。Netty 的 Future 接口继承于 JDK 原生的 Future 接口,并在其之上拓展了一些办法,比方 注册监听器 addListener、移除监听器 removeListener、期待异步操作实现 await 等办法。相干类图如下:
从上图能够看到,Promise 接口继承于 Netty 的 Future 接口 ,也在其之上拓展了一些办法,比方 设置后果 setSuccess、setFailure,设置不可勾销 setUncancellable 等办法。源码如下:
public interface Promise<V> extends Future<V> {
// 设置后果为胜利
Promise<V> setSuccess(V result);
// 尝试设置后果为胜利
boolean trySuccess(V result);
// 设置后果为失败
Promise<V> setFailure(Throwable cause);
// 尝试设置后果为失败
boolean tryFailure(Throwable cause);
// 尝试为不可勾销
boolean setUncancellable();
// 增加监听器
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除监听器
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 可中断地期待
Promise<V> await() throws InterruptedException;
// 不可中断地期待
Promise<V> awaitUninterruptibly();
// 可中断地期待,如果后果为失败,从新抛出异样
Promise<V> sync() throws InterruptedException;
// 不可中断地期待,如果后果为失败,从新抛出异样
Promise<V> syncUninterruptibly();}
二、DefaultPromise 的实现剖析
DefaultPromise 是 Promise 的 默认实现 ,并且前面常常遇到的DefaultChannelPromise 也是继承于它,所以接下来让咱们看下 DefaultPromise 的具体代码实现
2.1 要害属性
DefaultPromise 的几个要害属性,如下:
private static final Object SUCCESS = new Object();
private static final Object UNCANCELLABLE = new Object();
private volatile Object result;
private final EventExecutor executor;
private Object listeners;
private short waiters;
private boolean notifyingListeners;
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
- SUCCESS:空对象,示意后果为胜利
- UNCANCELABLE:空对象,示意为不可勾销
-
result:代表后果,它的取值有 5 种状况(Promise 可归为 3 种状态:未实现、胜利、失败)
- null(状态为未实现)
- 空对象 UNCANCELLABLE(状态为未实现)
- 空对象 SUCCESS(状态为胜利)
- CauseHolder 对象,蕴含了异样信息(状态为失败)
- 失常执行后果(状态为胜利)
- executor:执行器,用于回调监听器
- listeners:监听器汇合
- waiters:期待以后 Promise 实现的 线程个数
- notifyingListners:是否正在回调监听器
- MAX_LISTENER_STACK_DEPTH:字面翻译为监听器最大栈深度,这是当 嵌套回调 监听器时,避免栈溢出 StackOverflowError 而设计的,相似如下状况
public class TestPromise {public static void main(String[] args) {
EventExecutor executor = ImmediateEventExecutor.INSTANCE;
Promise<String> p1 = new DefaultPromise<>(executor);
Promise<String> p2 = new DefaultPromise<>(executor);
Promise<String> p3 = new DefaultPromise<>(executor);
Promise<String> p4 = new DefaultPromise<>(executor);
Promise<String> p5 = new DefaultPromise<>(executor);
Promise<String> p6 = new DefaultPromise<>(executor);
Promise<String> p7 = new DefaultPromise<>(executor);
Promise<String> p8 = new DefaultPromise<>(executor);
Promise<String> p9 = new DefaultPromise<>(executor);
Promise<String> p10 = new DefaultPromise<>(executor);
p1.addListener(new MyListener(p2));
p2.addListener(new MyListener(p3));
p3.addListener(new MyListener(p4));
p4.addListener(new MyListener(p5));
p5.addListener(new MyListener(p6));
p6.addListener(new MyListener(p7));
p7.addListener(new MyListener(p8));
p8.addListener(new MyListener(p9));
p9.addListener(new MyListener(p10));
p1.setSuccess(null);
}
private static class MyListener implements GenericFutureListener {
private Promise promise;
MyListener(Promise promise) {this.promise = promise;}
@Override
public void operationComplete(Future future) throws Exception {System.out.println("回调胜利");
promise.setSuccess(null);
}
}
}
2.2 设置后果
2.2.1 设置后果为胜利或失败
通过 setSuccess、setFailure 办法 来设置 Promise 的最终后果 result,源码如下
// 设置 Promise 的最终后果为参数 result,如果设置失败,则抛出异样
public Promise<V> setSuccess(V result) {if (setSuccess0(result)) {return this;}
throw new IllegalStateException("complete already:" + this);
}
// 设置 Promise 的最终后果为 new CauseHolder(cause),如果设置失败,则抛出异样
public Promise<V> setFailure(Throwable cause) {if (setFailure0(cause)) {return this;}
throw new IllegalStateException("complete already:" + this, cause);
}
private boolean setSuccess0(V result) {
// 如果传入的 result 为 null,则设置最终后果为 SUCCESS
return setValue0(result == null ? SUCCESS : result);
}
private boolean setFailure0(Throwable cause) {
// 将传入的 cause 封装为 CauseHolder 对象,并设置为最终后果
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
private boolean setValue0(Object objResult) {
/**
* 最终后果 result 为 null,示意以后 Promise 处于未实现状态
* 最终后果 result 为 UNCANCELLABLE,示意以后 Promise 不可勾销,也是处于未实现状态
* 通过 CAS 操作来更新最终后果,如果更新胜利,唤醒期待以后 Promise 的线程,并且回调监听器
* 如果更新失败,间接返回 false
*/
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 唤醒期待以后 Promise 的线程
if (checkNotifyWaiters()) {
// 回调监听器
notifyListeners();}
return true;
}
return false;
}
2.2.2 设置为 UNCANCELLABLE(不可勾销)
当 Promise 未实现时,通过 setUncancellable()办法将最终后果 result临时更新 为 UNCANCELLABLE,后续调用 cancel()将都会返回 false,当 Promise 实现后,会再将最终后果 result 更新为实现后果。
public boolean setUncancellable() {
// CAS 操作将最终后果 result 临时更新为 UNCANCELLABLE
if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {return true;}
Object result = this.result;
return !isDone0(result) || !isCancelled0(result);
}
2.3 利用 wait/notify 机制来实现期待唤醒
2.3.1 期待实现 await
await 办法能够使以后线程期待 Promise 实现,其大抵流程为:
- 判断 Promise 是否已实现,如果是,间接返回,否则下一步
- 判断以后线程是否已被中断,如果是,间接抛出异样,否则下一步
- 查看以后线程对应的执行器是否是 EventLoop,如果是,间接抛出异样,否则下一步
- synchronized 加锁,再次判断 Promise 是否已实现
如果是,间接返回
如果不是,waiters 加 1,并调用 wait()办法进入期待唤醒状态,当被唤醒时,waiters 减 1
源码如下:
public Promise<V> await() throws InterruptedException {
// 判断是否已实现,如果是,间接返回
if (isDone()) {return this;}
// 判断以后线程是否已被中断,如果是,间接抛出异样
if (Thread.interrupted()) {throw new InterruptedException(toString());
}
// 检测以后线程对应的执行器是否是 EventLoop,如果是,则断定为死锁,间接抛出异样
checkDeadLock();
synchronized (this) {while (!isDone()) {incWaiters(); // waiters 加 1
try {wait(); // 开释锁,并期待唤醒
} finally {decWaiters(); // waiters 减 1
}
}
}
return this;
}
2.3.2 唤醒期待的线程
通过 checkNotifyWaiters 来唤醒期待的线程,如下:
private synchronized boolean checkNotifyWaiters() {if (waiters > 0) { // 如果 waiters 大于 0,示意有期待的线程,唤醒所有期待线程
notifyAll();}
return listeners != null;
}
那么 何时唤醒 期待的线程呢?
- 当胜利设置最终后果之后,会唤醒期待的线程
- 当调用 cancel 办法来胜利勾销 Promise 时,会唤醒期待的线程
2.4 注册、移除监听器以及回调监听器
2.4.1 注册、移除监听器
- 通过 addListener 来注册监听器,如果 Promise 已实现,间接回调监听器
- 通过 removeListener 来移除监听器
// 注册监听器
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {checkNotNull(listener, "listener");
// 加锁
synchronized (this) {addListener0(listener); // 注册监听器
}
// 判断是否已实现,如果已实现,间接回调监听器
if (isDone()) {notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {if (listeners == null) { // 首次注册监听器
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) { // 第三次及当前注册监听器
((DefaultFutureListeners) listeners).add(listener);
} else { // 第二次注册监听器
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
// 移除监听器
public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {checkNotNull(listener, "listener");
// 加锁
synchronized (this) {removeListener0(listener); // 移除监听器
}
return this;
}
private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {if (listeners instanceof DefaultFutureListeners) {
// 移除对应的监听器
((DefaultFutureListeners) listeners).remove(listener);
} else if (listeners == listener) { // 本来只注册了一个监听器
listeners = null;
}
}
2.4.2 回调监听器
当 Promise 实现时,会回调曾经注册过的监听器,回调监听器的源码如下:
// 回调监听器
private void notifyListeners() {EventExecutor executor = executor();
// 判断以后线程是否 EventLoop 绑定的线程是同一个
if (executor.inEventLoop()) {final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
// 判断是否小于 MAX_LISTENER_STACK_DEPTH
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
// 回调监听器
notifyListenersNow();} finally {threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 平安执行,避免以后线程抛出 StackOverflowError 导致
safeExecute(executor, new Runnable() {
@Override
public void run() {notifyListenersNow();
}
});
}
private void notifyListenersNow() {
Object listeners;
// 加锁
synchronized (this) {
// 如果以后 Promise 正在回调监听器,或者没有注册监听器,间接返回
if (notifyingListeners || this.listeners == null) {return;}
notifyingListeners = true; // 示意正在回调监听器
listeners = this.listeners;
this.listeners = null;
}
for (;;) {if (listeners instanceof DefaultFutureListeners) {
// 遍历监听器,并逐个回调
notifyListeners0((DefaultFutureListeners) listeners);
} else {
// 回调单个监听器
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
// 当 this.listeners 等于 null,示意以后 Promise 曾经没有可回调的监听器了
if (this.listeners == null) {
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
// 回调单个监听器
private static void notifyListener0(Future future, GenericFutureListener l) {
try {l.operationComplete(future);
} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("An exception was thrown by" + l.getClass().getName() + ".operationComplete()", t);
}
}
}
三、总结
Netty 中所有的异步操作都是通过 Promise 来实现的,所以了解 Promise 对于咱们后续深刻了解 Netty 是很有必要的。