一、Promise介绍
JDK的Future接口大家都比拟相熟,它用于示意异步操作的后果。Netty的Future接口继承于JDK原生的Future接口,并在其之上拓展了一些办法,比方注册监听器addListener、移除监听器removeListener、期待异步操作实现await等办法。相干类图如下:
从上图能够看到,Promise接口继承于Netty的Future接口,也在其之上拓展了一些办法,比方设置后果setSuccess、setFailure,设置不可勾销setUncancellable等办法。源码如下:
public interface Promise<V> extends Future<V> { // 设置后果为胜利 Promise<V> setSuccess(V result); // 尝试设置后果为胜利 boolean trySuccess(V result); // 设置后果为失败 Promise<V> setFailure(Throwable cause); // 尝试设置后果为失败 boolean tryFailure(Throwable cause); // 尝试为不可勾销 boolean setUncancellable(); // 增加监听器 Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 移除监听器 Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); // 可中断地期待 Promise<V> await() throws InterruptedException; // 不可中断地期待 Promise<V> awaitUninterruptibly(); // 可中断地期待,如果后果为失败,从新抛出异样 Promise<V> sync() throws InterruptedException; // 不可中断地期待,如果后果为失败,从新抛出异样 Promise<V> syncUninterruptibly();}
二、DefaultPromise的实现剖析
DefaultPromise是Promise的默认实现,并且前面常常遇到的DefaultChannelPromise也是继承于它,所以接下来让咱们看下DefaultPromise的具体代码实现
2.1 要害属性
DefaultPromise的几个要害属性,如下:
private static final Object SUCCESS = new Object();private static final Object UNCANCELLABLE = new Object();private volatile Object result;private final EventExecutor executor;private Object listeners;private short waiters;private boolean notifyingListeners;private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
- SUCCESS:空对象,示意后果为胜利
- UNCANCELABLE:空对象,示意为不可勾销
result:代表后果,它的取值有5种状况(Promise可归为3种状态:未实现、胜利、失败)
- null(状态为未实现)
- 空对象UNCANCELLABLE(状态为未实现)
- 空对象SUCCESS(状态为胜利)
- CauseHolder对象,蕴含了异样信息(状态为失败)
- 失常执行后果(状态为胜利)
- executor:执行器,用于回调监听器
- listeners:监听器汇合
- waiters:期待以后Promise实现的线程个数
- notifyingListners:是否正在回调监听器
- MAX_LISTENER_STACK_DEPTH:字面翻译为监听器最大栈深度,这是当嵌套回调监听器时,避免栈溢出StackOverflowError而设计的,相似如下状况
public class TestPromise { public static void main(String[] args) { EventExecutor executor = ImmediateEventExecutor.INSTANCE; Promise<String> p1 = new DefaultPromise<>(executor); Promise<String> p2 = new DefaultPromise<>(executor); Promise<String> p3 = new DefaultPromise<>(executor); Promise<String> p4 = new DefaultPromise<>(executor); Promise<String> p5 = new DefaultPromise<>(executor); Promise<String> p6 = new DefaultPromise<>(executor); Promise<String> p7 = new DefaultPromise<>(executor); Promise<String> p8 = new DefaultPromise<>(executor); Promise<String> p9 = new DefaultPromise<>(executor); Promise<String> p10 = new DefaultPromise<>(executor); p1.addListener(new MyListener(p2)); p2.addListener(new MyListener(p3)); p3.addListener(new MyListener(p4)); p4.addListener(new MyListener(p5)); p5.addListener(new MyListener(p6)); p6.addListener(new MyListener(p7)); p7.addListener(new MyListener(p8)); p8.addListener(new MyListener(p9)); p9.addListener(new MyListener(p10)); p1.setSuccess(null); } private static class MyListener implements GenericFutureListener { private Promise promise; MyListener(Promise promise) { this.promise = promise; } @Override public void operationComplete(Future future) throws Exception { System.out.println("回调胜利"); promise.setSuccess(null); } }}
2.2 设置后果
2.2.1 设置后果为胜利或失败
通过setSuccess、setFailure办法来设置Promise的最终后果result,源码如下
// 设置Promise的最终后果为参数result,如果设置失败,则抛出异样public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { return this; } throw new IllegalStateException("complete already: " + this);}// 设置Promise的最终后果为new CauseHolder(cause),如果设置失败,则抛出异样public Promise<V> setFailure(Throwable cause) { if (setFailure0(cause)) { return this; } throw new IllegalStateException("complete already: " + this, cause);}private boolean setSuccess0(V result) { // 如果传入的result为null,则设置最终后果为SUCCESS return setValue0(result == null ? SUCCESS : result);}private boolean setFailure0(Throwable cause) { // 将传入的cause封装为CauseHolder对象,并设置为最终后果 return setValue0(new CauseHolder(checkNotNull(cause, "cause")));}private boolean setValue0(Object objResult) { /** * 最终后果result为null,示意以后Promise处于未实现状态 * 最终后果result为UNCANCELLABLE,示意以后Promise不可勾销,也是处于未实现状态 * 通过CAS操作来更新最终后果,如果更新胜利,唤醒期待以后Promise的线程,并且回调监听器 * 如果更新失败,间接返回false */ if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { // 唤醒期待以后Promise的线程 if (checkNotifyWaiters()) { // 回调监听器 notifyListeners(); } return true; } return false;}
2.2.2 设置为UNCANCELLABLE(不可勾销)
当Promise未实现时,通过setUncancellable()办法将最终后果result临时更新为UNCANCELLABLE,后续调用cancel()将都会返回false,当Promise实现后,会再将最终后果result更新为实现后果。
public boolean setUncancellable() { // CAS操作将最终后果result临时更新为UNCANCELLABLE if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) { return true; } Object result = this.result; return !isDone0(result) || !isCancelled0(result);}
2.3 利用wait/notify机制来实现期待唤醒
2.3.1 期待实现await
await办法能够使以后线程期待Promise实现,其大抵流程为:
- 判断Promise是否已实现,如果是,间接返回,否则下一步
- 判断以后线程是否已被中断,如果是,间接抛出异样,否则下一步
- 查看以后线程对应的执行器是否是EventLoop,如果是,间接抛出异样,否则下一步
- synchronized加锁,再次判断Promise是否已实现
如果是,间接返回
如果不是,waiters加1,并调用wait()办法进入期待唤醒状态,当被唤醒时,waiters减1
源码如下:
public Promise<V> await() throws InterruptedException { // 判断是否已实现,如果是,间接返回 if (isDone()) { return this; } // 判断以后线程是否已被中断,如果是,间接抛出异样 if (Thread.interrupted()) { throw new InterruptedException(toString()); } // 检测以后线程对应的执行器是否是EventLoop,如果是,则断定为死锁,间接抛出异样 checkDeadLock(); synchronized (this) { while (!isDone()) { incWaiters(); // waiters加1 try { wait(); // 开释锁,并期待唤醒 } finally { decWaiters(); // waiters减1 } } } return this;}
2.3.2 唤醒期待的线程
通过checkNotifyWaiters来唤醒期待的线程,如下:
private synchronized boolean checkNotifyWaiters() { if (waiters > 0) { // 如果waiters大于0,示意有期待的线程,唤醒所有期待线程 notifyAll(); } return listeners != null;}
那么何时唤醒期待的线程呢?
- 当胜利设置最终后果之后,会唤醒期待的线程
- 当调用cancel办法来胜利勾销Promise时,会唤醒期待的线程
2.4 注册、移除监听器以及回调监听器
2.4.1 注册、移除监听器
- 通过addListener来注册监听器,如果Promise已实现,间接回调监听器
- 通过removeListener来移除监听器
// 注册监听器public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); // 加锁 synchronized (this) { addListener0(listener); // 注册监听器 } // 判断是否已实现,如果已实现,间接回调监听器 if (isDone()) { notifyListeners(); } return this;}private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null) { // 首次注册监听器 listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { // 第三次及当前注册监听器 ((DefaultFutureListeners) listeners).add(listener); } else { // 第二次注册监听器 listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); }}// 移除监听器public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); // 加锁 synchronized (this) { removeListener0(listener); // 移除监听器 } return this;}private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) { if (listeners instanceof DefaultFutureListeners) { // 移除对应的监听器 ((DefaultFutureListeners) listeners).remove(listener); } else if (listeners == listener) { // 本来只注册了一个监听器 listeners = null; }}
2.4.2 回调监听器
当Promise实现时,会回调曾经注册过的监听器,回调监听器的源码如下:
// 回调监听器private void notifyListeners() { EventExecutor executor = executor(); // 判断以后线程是否EventLoop绑定的线程是同一个 if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); // 判断是否小于MAX_LISTENER_STACK_DEPTH if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { // 回调监听器 notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } // 平安执行,避免以后线程抛出StackOverflowError导致 safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } });}private void notifyListenersNow() { Object listeners; // 加锁 synchronized (this) { // 如果以后Promise正在回调监听器,或者没有注册监听器,间接返回 if (notifyingListeners || this.listeners == null) { return; } notifyingListeners = true; // 示意正在回调监听器 listeners = this.listeners; this.listeners = null; } for (;;) { if (listeners instanceof DefaultFutureListeners) { // 遍历监听器,并逐个回调 notifyListeners0((DefaultFutureListeners) listeners); } else { // 回调单个监听器 notifyListener0(this, (GenericFutureListener<?>) listeners); } synchronized (this) { // 当this.listeners等于null,示意以后Promise曾经没有可回调的监听器了 if (this.listeners == null) { notifyingListeners = false; return; } listeners = this.listeners; this.listeners = null; } }}// 回调单个监听器private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } }}
三、总结
Netty中所有的异步操作都是通过Promise来实现的,所以了解Promise对于咱们后续深刻了解Netty是很有必要的。