关于java:Netty三Promise的实现分析

一、Promise介绍

JDK的Future接口大家都比拟相熟,它用于示意异步操作的后果。Netty的Future接口继承于JDK原生的Future接口,并在其之上拓展了一些办法,比方注册监听器addListener、移除监听器removeListener、期待异步操作实现await等办法。相干类图如下:

从上图能够看到,Promise接口继承于Netty的Future接口,也在其之上拓展了一些办法,比方设置后果setSuccess、setFailure,设置不可勾销setUncancellable等办法。源码如下:

public interface Promise<V> extends Future<V> {
    // 设置后果为胜利
    Promise<V> setSuccess(V result);
    // 尝试设置后果为胜利
    boolean trySuccess(V result);
    // 设置后果为失败
    Promise<V> setFailure(Throwable cause);
    // 尝试设置后果为失败
    boolean tryFailure(Throwable cause);
    // 尝试为不可勾销
    boolean setUncancellable();
    // 增加监听器
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 移除监听器
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
    // 可中断地期待
    Promise<V> await() throws InterruptedException;
    // 不可中断地期待
    Promise<V> awaitUninterruptibly();
    // 可中断地期待,如果后果为失败,从新抛出异样
    Promise<V> sync() throws InterruptedException;
    // 不可中断地期待,如果后果为失败,从新抛出异样
    Promise<V> syncUninterruptibly();
}

二、DefaultPromise的实现剖析

DefaultPromise是Promise的默认实现,并且前面常常遇到的DefaultChannelPromise也是继承于它,所以接下来让咱们看下DefaultPromise的具体代码实现

2.1 要害属性

DefaultPromise的几个要害属性,如下:

private static final Object SUCCESS = new Object();
private static final Object UNCANCELLABLE = new Object();
private volatile Object result;
private final EventExecutor executor;
private Object listeners;
private short waiters;
private boolean notifyingListeners;
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8, SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
  • SUCCESS:空对象,示意后果为胜利
  • UNCANCELABLE:空对象,示意为不可勾销
  • result:代表后果,它的取值有5种状况(Promise可归为3种状态:未实现、胜利、失败

    • null(状态为未实现)
    • 空对象UNCANCELLABLE(状态为未实现)
    • 空对象SUCCESS(状态为胜利)
    • CauseHolder对象,蕴含了异样信息(状态为失败)
    • 失常执行后果(状态为胜利)
  • executor:执行器,用于回调监听器
  • listeners:监听器汇合
  • waiters:期待以后Promise实现的线程个数
  • notifyingListners:是否正在回调监听器
  • MAX_LISTENER_STACK_DEPTH:字面翻译为监听器最大栈深度,这是当嵌套回调监听器时,避免栈溢出StackOverflowError而设计的,相似如下状况
public class TestPromise {
    public static void main(String[] args) {
        EventExecutor executor = ImmediateEventExecutor.INSTANCE;
        Promise<String> p1 = new DefaultPromise<>(executor);
        Promise<String> p2 = new DefaultPromise<>(executor);
        Promise<String> p3 = new DefaultPromise<>(executor);
        Promise<String> p4 = new DefaultPromise<>(executor);
        Promise<String> p5 = new DefaultPromise<>(executor);
        Promise<String> p6 = new DefaultPromise<>(executor);
        Promise<String> p7 = new DefaultPromise<>(executor);
        Promise<String> p8 = new DefaultPromise<>(executor);
        Promise<String> p9 = new DefaultPromise<>(executor);
        Promise<String> p10 = new DefaultPromise<>(executor);

        p1.addListener(new MyListener(p2));
        p2.addListener(new MyListener(p3));
        p3.addListener(new MyListener(p4));
        p4.addListener(new MyListener(p5));
        p5.addListener(new MyListener(p6));
        p6.addListener(new MyListener(p7));
        p7.addListener(new MyListener(p8));
        p8.addListener(new MyListener(p9));
        p9.addListener(new MyListener(p10));

        p1.setSuccess(null);
    }

    private static class MyListener implements GenericFutureListener {
        private Promise promise;
        MyListener(Promise promise) {
            this.promise = promise;
        }

        @Override
        public void operationComplete(Future future) throws Exception {
            System.out.println("回调胜利");
            promise.setSuccess(null);
        }
    }
}

2.2 设置后果

2.2.1 设置后果为胜利或失败

通过setSuccess、setFailure办法来设置Promise的最终后果result,源码如下

// 设置Promise的最终后果为参数result,如果设置失败,则抛出异样
public Promise<V> setSuccess(V result) {
    if (setSuccess0(result)) {
        return this;
    }
    throw new IllegalStateException("complete already: " + this);
}

// 设置Promise的最终后果为new CauseHolder(cause),如果设置失败,则抛出异样
public Promise<V> setFailure(Throwable cause) {
    if (setFailure0(cause)) {
        return this;
    }
    throw new IllegalStateException("complete already: " + this, cause);
}

private boolean setSuccess0(V result) {
    // 如果传入的result为null,则设置最终后果为SUCCESS
    return setValue0(result == null ? SUCCESS : result);
}

private boolean setFailure0(Throwable cause) {
    // 将传入的cause封装为CauseHolder对象,并设置为最终后果
    return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}

private boolean setValue0(Object objResult) {
    /**
     * 最终后果result为null,示意以后Promise处于未实现状态
     * 最终后果result为UNCANCELLABLE,示意以后Promise不可勾销,也是处于未实现状态
     * 通过CAS操作来更新最终后果,如果更新胜利,唤醒期待以后Promise的线程,并且回调监听器
     * 如果更新失败,间接返回false
     */
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        // 唤醒期待以后Promise的线程    
        if (checkNotifyWaiters()) {
            // 回调监听器
            notifyListeners();
        }
        return true;
    }
    return false;
}

2.2.2 设置为UNCANCELLABLE(不可勾销)

当Promise未实现时,通过setUncancellable()办法将最终后果result临时更新为UNCANCELLABLE,后续调用cancel()将都会返回false,当Promise实现后,会再将最终后果result更新为实现后果。

public boolean setUncancellable() {
    // CAS操作将最终后果result临时更新为UNCANCELLABLE
    if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
        return true;
    }
    Object result = this.result;
    return !isDone0(result) || !isCancelled0(result);
}

2.3 利用wait/notify机制来实现期待唤醒

2.3.1 期待实现await

await办法能够使以后线程期待Promise实现,其大抵流程为:

  1. 判断Promise是否已实现,如果是,间接返回,否则下一步
  2. 判断以后线程是否已被中断,如果是,间接抛出异样,否则下一步
  3. 查看以后线程对应的执行器是否是EventLoop,如果是,间接抛出异样,否则下一步
  4. synchronized加锁,再次判断Promise是否已实现
    如果是,间接返回
    如果不是,waiters加1,并调用wait()办法进入期待唤醒状态,当被唤醒时,waiters减1

源码如下:

public Promise<V> await() throws InterruptedException {
    // 判断是否已实现,如果是,间接返回
    if (isDone()) {
        return this;
    }
    // 判断以后线程是否已被中断,如果是,间接抛出异样
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }
    // 检测以后线程对应的执行器是否是EventLoop,如果是,则断定为死锁,间接抛出异样
    checkDeadLock();

    synchronized (this) {
        while (!isDone()) {
            incWaiters();        // waiters加1
            try {
                wait();          // 开释锁,并期待唤醒
            } finally {
                decWaiters();    // waiters减1
            }
        }
    }
    return this;
}

2.3.2 唤醒期待的线程

通过checkNotifyWaiters来唤醒期待的线程,如下:

private synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {    // 如果waiters大于0,示意有期待的线程,唤醒所有期待线程
        notifyAll();
    }
    return listeners != null;
}

那么何时唤醒期待的线程呢?

  1. 当胜利设置最终后果之后,会唤醒期待的线程
  2. 当调用cancel办法来胜利勾销Promise时,会唤醒期待的线程

2.4 注册、移除监听器以及回调监听器

2.4.1 注册、移除监听器

  • 通过addListener来注册监听器,如果Promise已实现,间接回调监听器
  • 通过removeListener来移除监听器
// 注册监听器
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");
    // 加锁
    synchronized (this) {
        addListener0(listener);    // 注册监听器
    }
    // 判断是否已实现,如果已实现,间接回调监听器
    if (isDone()) {
        notifyListeners();
    }

    return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners == null) {    // 首次注册监听器
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {    // 第三次及当前注册监听器
        ((DefaultFutureListeners) listeners).add(listener);
    } else {    // 第二次注册监听器
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
}

// 移除监听器
public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");
    // 加锁
    synchronized (this) {
        removeListener0(listener);    // 移除监听器
    }

    return this;
}

private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners instanceof DefaultFutureListeners) {
        // 移除对应的监听器
        ((DefaultFutureListeners) listeners).remove(listener);
    } else if (listeners == listener) {  // 本来只注册了一个监听器
        listeners = null;
    }
}

2.4.2 回调监听器

当Promise实现时,会回调曾经注册过的监听器,回调监听器的源码如下:

// 回调监听器
private void notifyListeners() {
    EventExecutor executor = executor();
    // 判断以后线程是否EventLoop绑定的线程是同一个
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        // 判断是否小于MAX_LISTENER_STACK_DEPTH
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                // 回调监听器
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }
    // 平安执行,避免以后线程抛出StackOverflowError导致
    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

private void notifyListenersNow() {
    Object listeners;
    // 加锁
    synchronized (this) {
        // 如果以后Promise正在回调监听器,或者没有注册监听器,间接返回
        if (notifyingListeners || this.listeners == null) {
            return;
        }
        notifyingListeners = true;    // 示意正在回调监听器
        listeners = this.listeners;
        this.listeners = null;
    }
    for (;;) {
        if (listeners instanceof DefaultFutureListeners) {
            // 遍历监听器,并逐个回调
            notifyListeners0((DefaultFutureListeners) listeners);
        } else {
            // 回调单个监听器
            notifyListener0(this, (GenericFutureListener<?>) listeners);
        }
        synchronized (this) {
            // 当this.listeners等于null,示意以后Promise曾经没有可回调的监听器了
            if (this.listeners == null) {
                notifyingListeners = false;
                return;
            }
            listeners = this.listeners;
            this.listeners = null;
        }
    }
}

// 回调单个监听器
private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        l.operationComplete(future);
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
}

三、总结

Netty中所有的异步操作都是通过Promise来实现的,所以了解Promise对于咱们后续深刻了解Netty是很有必要的。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理