关于future:Future和Promise

第四章 Future和Promise

Netty是一个异步网络解决框架,在实现中大量应用了Future机制,并在Java自带Future的根底上,减少了Promise机制。这两者的目标都是使异步编程更加方便使用。在浏览源码之前,咱们须要对Future的机制有很分明的意识。

4.1 异步编程模型
4.1.1 Future
应用Future机制时,咱们调用耗时工作会立即返回一个Future实例,应用该实例可能以阻塞的形式或者在将来某刻取得耗时工作的执行后果,还能够增加监听事件设置后续程序。

function Future asynchronousFunction(String arg){
Future future = new Future(new Callable(){

  public Object call(){
    return null;
  }

});
return future;
}
ReturnHandler handler = asynchronousFunction(); // 耗时函数,但会立刻返回一个句柄
handler.getResult(); // 通过句柄能够期待后果
handler.addListener(); //通过句柄能够增加实现后执行的事件
handler.cancel(); // 通过句柄勾销耗时工作
4.1.2 Promise
在Future机制中,业务逻辑所在工作执行的状态(胜利或失败)是在Future中实现的,而在Promise中,能够在业务逻辑管制工作的执行后果,相比Future,更加灵便。

// 异步的耗时工作接管一个promise
function Promise asynchronousFunction(String arg){

Promise  promise = new PromiseImpl();
Object result = null;
result = search()  //业务逻辑,
if(success){
     promise.setSuccess(result); // 告诉promise以后异步工作胜利了,并传入后果
}else if(failed){
    promise.setFailure(reason); //// 告诉promise以后异步工作失败了
 }else if(error){
     promise.setFailure(error); //// 告诉promise以后异步工作产生了异样
  }

}

// 调用异步的耗时工作
Promise promise = asynchronousFunction(promise) ;//会立刻返回promise
//增加胜利解决/失败解决/异样解决等事件
promise.addListener();// 例如,能够增加胜利后执行的事件
doOtherThings() ; // 持续做其余事件,不须要理睬asynchronousFunction何时完结
在Netty中,Promise继承了Future,蕴含了这两者的性能。

Java的Future机制

4.2.1 Java的Future机制
Future顾名思义,是一个将来实现的异步操作,能够取得将来返回的值。罕用的场景如:调用一个耗时的办法search()(依据产品名称在全网查问价格,假如须要3s左右能力返回),该办法会立刻返回Future对象,调应用Future.get()能够同步期待耗时办法的返回,也能够调用future的cancel()勾销Future工作。如上面的程序,search办法逻辑会依据名字在全网查找价格,假如须要耗时3s,该办法会立刻返回一个Future对象供用户线程应用;在主办法中能够应用get()期待获取到价格,也能够应用cancel()勾销查问。

public Future<String> search(String prodName) {

    FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
        @Override
        public String call()  {
            try {
            System.out.println(String.format("    >>search price of %s from internet!",prodName));
            Thread.sleep(3000);
            return "$99.99";
            }catch(InterruptedException e){
                System.out.println("search function is Interrupted!");
            }
            return null;
        }
    });
    new Thread(future).start();//交给线程去执行
    return future; // 立即返回future对象
}

JavaFuture jf = new JavaFuture();
Future<String> future = jf.search(“Netty权威指南”);// 返回future
System.out.println(“Begin search,get future!”);

// 测试1-【获取后果】期待3s后会返回
String prods = future.get();//获取prods
System.out.println(“get result:”+prods);

// 测试2-【勾销工作】1s后勾销工作
Thread.sleep(1000);
future.cancel(false);//true时会中断线程,false不会
System.out.println(“Future is canceled? ” + (future.isCancelled()?”yes”:”no”));

Thread.sleep(4000); //期待4s检查一下future所在线程是否还在执行
4.2.2 Future的实现
如果咱们须要实现一个Future,考虑一下须要实现哪些性能:

Future<String> future = jf.search(“Netty权威指南”);

Future search(){
//启动线程或者在线程池中执行业务逻辑
return future; //立即返回future
}
search办法须要立刻返回一个Future对象,并且须要启动一个线程(或线程池)执行业务逻辑;
因为Future对象能够期待线程执行完结或者勾销线程,Future外部须要可能治理业务逻辑的执行状态。
业务逻辑完结或异样时须要通知Future对象,有两种形式:在Future中启动线程执行业务逻辑;或者业务逻辑独自执行,通过创立的Future实例的办法如setSuccess(result)办法告诉Future。Java的FutureTask采纳了第一种办法,其自身继承了Runnable,在run办法中执行传入的业务逻辑。而Netty的Promise中采纳了第二种办法。
get()办法中,如果业务逻辑还未执行结束,须要期待,能够用锁机制实现。
Java中的Future是一个接口,外部有如下办法:

boolean cancel(boolean mayInterruptIfRunning) 试图勾销对此工作的执行。
V get() 如有必要,期待计算实现,而后获取其后果。
V get(long timeout, TimeUnit unit) 如有必要,最多期待为使计算实现所给定的工夫之后,获取其后果(如果后果可用)。
boolean isCancelled() 如果在工作失常实现前将其勾销,则返回 true。
boolean isDone() 如果工作已实现,则返回 true。
上面,咱们本人实现一个Future加深了解,上面定义了一个继承Future的MyFutureTask,初始化时传递一个Callable作为业务逻辑,实现Future接口是为了管制业务逻辑线程,实现Runnable接口是为了业务线程执行时可能批改Future的外部状态。

public class MyFutureTask<V> implements Future<V>,Runnable {

Callable<V> callable; //业务逻辑
boolean running = false ,done = false,cancel = false;// 业务逻辑执行状态
ReentrantLock lock ;//锁
V outcome;//后果

public MyFutureTask(Callable<V> callable) {
    if(callable == null) {
        throw new NullPointerException("callable cannot be null!");
    }
    this.callable = callable;
    this.done = false;
    this.lock = new ReentrantLock();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    callable = null;
    cancel = true;
    return true;
}

@Override
public boolean isCancelled() {
    return cancel;
}

@Override
public boolean isDone() {
    return done;
}

@Override
public V get() throws InterruptedException, ExecutionException {
    try {
        this.lock.lock();//先获取锁,取得后阐明业务逻辑曾经执行结束
        return outcome;
    }finally{
        this.lock.unlock();
    }
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    try {
        this.lock.tryLock(timeout, unit);
        return outcome;
    }catch (InterruptedException e) {
        return null;
    }finally{
        this.lock.unlock();
    }
}
@Override
public void run() {
    try {
        this.lock.lock(); // 启动线程,先上锁,避免get时间接返回
        running = true;
        try {
            outcome = callable.call(); // 业务逻辑
        } catch (Exception e) {
            e.printStackTrace();
        }
        done = true;
        running = false;
    }finally {
        this.lock.unlock(); // 解锁后get可获取
    }
}

}

测试程序如下:

public Future<String> search(String prodName) {

    MyFutureTask<String> future = new MyFutureTask<String>(new Callable<String>() {
        @Override
        public String call()  {
            try {
            System.out.println(String.format("    >>search price of %s from internet!",prodName));
            Thread.sleep(3000);
            return "$99.99";
            }catch(InterruptedException e){
                System.out.println("search function is Interrupted!");
            }
            return null;
        }
    });
    new Thread(future).start();// 或提交到线程池中
    return future;
}

4.2.3 Java的Future实现
当然,下面是本人实现的FutureTask,Java自带的FutureTask要比下面的更加简单和强壮。上面咱们进行一些剖析。

FutureTask外部保护了state,示意运行状态,只能通过set,setException, 和 cancel来批改。

 private static final int NEW          = 0;  //初始状态,
private static final int COMPLETING   = 1; // 业务逻辑曾经完结
private static final int NORMAL       = 2;  // 失常完结
private static final int EXCEPTIONAL  = 3; // 异样完结
private static final int CANCELLED    = 4; // 曾经勾销
private static final int INTERRUPTING = 5; // 中断中
private static final int INTERRUPTED  = 6; // 曾经中断

private volatile WaitNode waiters; 保护了期待的线程,get()办法时,如果业务逻辑还未执行结束,则创立WaitNode q,将其q.next设置为waiters,waiters设置为q;这样组成了一个期待链表。在业务逻辑执行结束(失常或异样完结)时,
run办法

run办法用来执行业务逻辑,在此过程中须要保护好业务逻辑的运行状态

public void run() {

    // 1. 如果state不为初始状态或者runner不为null,阐明曾经在运行了,间接返回
    // 如果为空,应用CAS将runner设置为以后线程,避免并发进入
    //runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) { // 2.业务逻辑不为空并且state为NEW时才运行
            V result;
            boolean ran;
            try {
                result = c.call(); // 3. 执行业务逻辑
                ran = true;  // ran为true示意失常返回
            } catch (Throwable ex) {
                result = null;  // 产生异样,后果为null
                ran = false; // 非正常完结
                setException(ex); // 设置异样
            }
            if (ran)
                set(result); // 失常完结,设置后果
        }
    } finally {
        // 为例避免并发调用run()办法,进入run时应用cas将runner设置为非空,完结时设为null
        runner = null;
        int s = state;  // 以后状态为INTERRUPTING或者INTERRUPTED 阐明要勾销
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);// 如果在中断进行中,则始终期待
    }
}

执行run办法时,要判断Future状态是否正确,必须为NEW;应用CAS将runner对象设置为以后线程,若runner不为null,阐明其余线程曾经执行了run办法,则间接return;
状态为NEW,执行传入的业务逻辑,失常完结时,将后果保留到result,ran设置为true;若产生异样,设置result为空,ran为false,并设置异样setException(ex);
失常完结,调用set(result);设置后果
业务逻辑执行完结,讲runner设置为null,若线程在INTERRUPTING或者INTERRUPTED 阐明要勾销;如果在中断进行中,则始终期待。
setException(ex); 业务逻辑异样时调用
protected void setException(Throwable t) {

     // 若状态为NEW,将其设置为COMPLETING-实现
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t; // 后果为抛出的异样
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态为EXCEPTIONAL-异样
        finishCompletion();
    }
}

set(V v) 业务逻辑失常完结时设置后果
protected void set(V v) {

    // 若状态为NEW,将其设置为COMPLETING-实现
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态为NORMAL-失常完结
        finishCompletion();
    }
}

finishCompletion做了一些收尾性工作,依据waiters链表,唤醒期待的线程。
private void finishCompletion() {

    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) { // 遍历链表
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); // 唤醒线程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

get办法

get时,如果业务逻辑尚未完结,须要应用LockSupport.park(this);将休眠期待的线程,在业务逻辑实现后,finishCompletion()会唤醒线程,之后返回业务逻辑的处理结果。

public V get() throws InterruptedException, ExecutionException {

    int s = state;
    // 如果状态为NEW或者COMPLETING,阐明还未完结,退出期待链表waiters
    if (s <= COMPLETING)  
        s = awaitDone(false, 0L);
    return report(s); // 返回后果
}

cancel办法

cancel办法会勾销执行业务逻辑的,次要逻辑如下:

public boolean cancel(boolean mayInterruptIfRunning) {

    // mayInterruptIfRunning示意以中断勾销
    // 如果状态为NEW,阐明还未执行,无需勾销;讲状态设置为打断或勾销
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) { // 以中断勾销
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt(); // 执行线程的interrupt办法
            } finally { // 中断实现,批改状态为INTERRUPTED-已中断
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion(); // 唤醒期待线程
    }
    return true;
}

复制
通过剖析,能够看到,java的FutureTask通过state记录业务逻辑的执行状态;多线程时应用CAS避免反复进入;业务逻辑未执行实现时,会将线程退出到waiter链表,应用LockSupport.park()阻塞业务线程;业务逻辑执行结束或产生异样或被勾销时,唤醒期待列表的线程。
与咱们实现时应用的ReentrantLock在原理上是一样的,ReentrantLock的lock在获取不到锁时,也会保护一个链表保留期待列表,开释锁时,唤醒期待列表上的线程。区别在与,Java的实现会同时唤醒所有的期待线程,而unlock时等线程表会顺次取得锁。

Netty的Future机制

4.3.1 Netty的Future
Netty的Future在concurrent包的Future根底上,减少了更多的性能。在Java的Future中,次要是工作的运行/勾销,而Netty的Future减少了更多的性能。

public interface Future<V> extends java.util.concurrent.Future<V>

boolean isSuccess(); 只有IO操作实现时才返回true
boolean isCancellable(); 只有当cancel(boolean)胜利勾销时才返回true
Throwable cause(); IO操作产生异样时,返回导致IO操作以此的起因,如果没有异样,返回null
// 向Future增加事件,future实现时,会执行这些事件,如果add时future曾经实现,会立刻执行监听事件
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 移除监听事件,future实现时,不会触发
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> sync() throws InterruptedException; //期待future done
Future<V> syncUninterruptibly(); // 期待future done,不可打断
Future<V> await() throws InterruptedException; // 期待future实现
Future<V> awaitUninterruptibly(); // 期待future 实现,不可打断
V getNow(); // 立即取得后果,如果没有实现,返回null
boolean cancel(boolean mayInterruptIfRunning); // 如果胜利勾销,future会失败,导致CancellationException
Netty为Future退出的性能次要是增加/删除监听事件,在Promise中会有实例演示。其余的办法是为get()办法服务的,get()办法能够通过调用await/getNow等办法实现。

4.3.2 Netty的Promise机制
Netty的Future与Java自带到Future略有不同,其引入了Promise机制。在Java的Future中,业务逻辑为一个Callable或Runnable实现类,该类的call()或run()执行结束意味着业务逻辑的完结;而在Promise机制中,能够在业务逻辑中人工设置业务逻辑的胜利与失败。

Netty中Promise接口的定义如下:

public interface Promise<V> extends Future<V> {

// 设置future执行后果为胜利
Promise<V> setSuccess(V result);
   // 尝试设置future执行后果为胜利,返回是否设置胜利

boolean trySuccess(V result);
// 设置失败

Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 设置为不能取消
boolean setUncancellable();
//一下省略了笼罩Future的一些办法

}
上面以一个例子来阐明Promise的应用办法,还是以seach()查问产品报价为例:

// main 办法
NettyFuture4Promise test = new NettyFuture4Promise();
Promise<String> promise = test.search(“Netty In Action”);
String result = promise.get();
System.out.println(“price is ” + result);

//
private Promise<String> search(String prod) {

    NioEventLoopGroup loop = new NioEventLoopGroup();
    // 创立一个DefaultPromise并返回
    DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());
    loop.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println(String.format("    >>search price of %s from internet!",prod));
                Thread.sleep(5000);
                promise.setSuccess("$99.99");// 期待5S后设置future为胜利,
               // promise.setFailure(new NullPointerException()); //当然,也能够设置失败
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    },0,TimeUnit.SECONDS);
    
    return promise;
}

能够看到,Promise可能在业务逻辑线程中告诉Future胜利或失败,因为Promise继承了Netty的Future,因而能够退出监听事件。

// main办法中,查问完结后获取promise,退出两个监听事件,别离给小Hong发告诉和Email
Promise<String> promise = test.search(“Netty In Action”);

    promise.addListener(new GenericFutureListener<Future<? super String>>() {
        @Override
        public void operationComplete(Future<? super String> future) throws Exception {
            System.out.println("Listener 1, make a notifice to Hong,price is " + future.get());
        }
        
    });
    promise.addListener(new GenericFutureListener<Future<? super String>>() {
        @Override
        public void operationComplete(Future<? super String> future) throws Exception {
            System.out.println("Listener 2, send a email to Hong,price is " + future.get());
        }
        
    });

Future和Promise的益处在于,获取到Promise对象后能够为其设置异步调用实现后的操作,而后立刻持续去做其余工作。

4.3.3 Netty罕用的Promise类
Netty罕用的纯Future机制的类,有SucceededFuture和FailedFuture,他们不须要设置业务逻辑代码,会立即实现,只须要设置胜利后的返回和抛出的异样。

Netty的罕用Promise类有DefalutPromise类,这是Promise实现的根底,后续会对这个类的实现进行解读;DefaultChannelPromise是DefalutPromise的子类,退出了channel这个属性。

上面对DefaultChannelPromise进行剖析,其类图如下:
NettyFuture类图

DefaultPromise的应用
Netty中波及到异步操做的中央都应用了promise,例如,上面是服务器/客户端启动时的注册工作,最终会调用unsafe的register,调用过程中会传入一个promise,unsafe进行事件的注册时调用promise能够设置胜利/失败。

// SingleThreadEventLoop.java
public ChannelFuture register(final ChannelPromise promise) {

    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

// AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {

if (eventLoop == null) {
    throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
    promise.setFailure(new IllegalStateException("registered to an event loop already"));
    return;
}
if (!isCompatible(eventLoop)) {
    promise.setFailure(
            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    return;
}

……
}
DefaultPromise的实现
DefaultChannelPromise提供的性能能够分为两个局部:一方面是为调用者提供get()和addListener()用于获取Future工作执行后果和增加监听事件;另一方面是为业务解决工作提供setSuccess()等办法设置工作的胜利或失败。

get办法

DefaultPromise的get办法有两个,无参数的get会阻塞期待;有参数的get会期待指定事件,若未完结抛出超时异样。这两个get()是在其父类AbstractFuture中实现的,通过调用上面四个办法实现:

await(); // 期待Future工作完结
await(timeout, unit) // 期待Future工作完结,超过事件则抛出异样
cause(); // 返回Future工作的异样
getNow() // /返回Future工作的执行后果

// 先期待,如果有异样则抛出,无异样返回getNow()
public V get() throws InterruptedException, ExecutionException {

    await();

    Throwable cause = cause();
    if (cause == null) {
        return getNow();
    }
    if (cause instanceof CancellationException) {
        throw (CancellationException) cause;
    }
    throw new ExecutionException(cause);
}

await

await()办法判断Future工作是否完结,之后获取this锁,如果工作未实现,则调用Object的wait()期待

public Promise<V> await() throws InterruptedException {

    // 判断Future工作是否完结,外部依据result是否为null判断,setSuccess或setFailure时会通过CAS批改result
    if (isDone()) {  
        return this;
    }

    if (Thread.interrupted()) { // 线程是否被中断
        throw new InterruptedException(toString());
    }

    checkDeadLock(); // 查看以后线程是否与线程池运行的线程是一个

    synchronized (this) {
        while (!isDone()) {
            incWaiters(); // waiters计数加1
            try {
                wait();  // Object的办法,让出cpu,退出期待队列
            } finally {
                decWaiters(); //  waiters计数减1
            }
        }
    }
    return this;
}

await(long timeout, TimeUnit unit)与awite相似,只是调用了Object对象的wait(long timeout, int nanos)办法
awaitUninterruptibly()办法在外部catch住了期待线程的中断异样,因而不会抛出中断异样。

监听事件相干办法
add/remove办法

addListener办法被调用时,将传入的回调类传入到listeners对象中,如果监听多于1个,会创立DefaultFutureListeners对象将回调办法保留在一个数组中。removeListener会将listeners设置为null(只有一个时)或从数组中移除(多个回调时)。

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {

    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
    }
}

private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {

    if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).remove(listener);
    } else if (listeners == listener) {
        listeners = null;
    }
}

notifyListeners()

在增加监听器的过程中,如果工作刚好执行结束done(),则立刻触发监听事件。触发监听通过notifyListeners()实现。次要逻辑为:如果以后addListener的线程(精确来说应该是调用notifyListeners的线程,因为addListener和setSuccess都会调用notifyListeners()和Promise内的线程池以后执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池去执行;例如,main线程中调用addListener时工作实现,notifyListeners()执行回调,会提交到线程池中执行;而如果是执行Future工作的线程池中setSuccess()时调用notifyListeners(),会放在以后线程中执行。
外部保护了notifyingListeners用来记录是否曾经触发过监听事件,只有未触发过且监听列表不为空,才会顺次便当并调用operationComplete

private static void notifyListener0(Future future, GenericFutureListener l) {

    try {
        l.operationComplete(future);
    } catch (Throwable t) {
        logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
    }
}

setSuccess()办法
Future工作在执行实现后调用setSuccess()或setFailure()告诉Future执行后果;次要逻辑是:批改result的值,若有期待线程则唤醒,告诉监听事件。

if (setSuccess0(result)) { // 设置胜利后唤醒期待线程

        notifyListeners(); // 告诉
        return this;

}
// 告诉胜利时将后果保留在变量result,告诉失败时,应用CauseHolder包装Throwable赋值给result
// RESULT_UPDATER 是一个应用CAS更新外部属性result的类,
// 如果result为null或UNCANCELLABLE,更新为胜利/失败后果;UNCANCELLABLE是不可勾销状态
private boolean setValue0(Object objResult) {

    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        checkNotifyWaiters();// 调用Object的notifyAll();告诉期待线程
        return true;
    }
    return false;
}

cancel()办法
cancel用来勾销工作,依据result判断,如果能够勾销,则唤醒期待线程,告诉监听事件。

public boolean cancel(boolean mayInterruptIfRunning) {

//如果result为null,阐明未setUncancellable()/setSuccess/setFailure
    if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
        checkNotifyWaiters(); // 唤醒期待线程
        notifyListeners(); // 触发监听事件
        return true;
    }
    return false;
}

通过下面的剖析,咱们能够看到DefaultPromise外部通过result记录Future工作的执行状态:

null – 未实现
CANCELLATION_CAUSE_HOLDER -被勾销
UNCANCELLABLE – 不可勾销
业务解决调用setSuccess时传入的后果
业务解决调用setFailure时包装Throws的CauseHolder
DefaultPromise外部保护了一个监听列表保留监听事件,在工作实现或勾销时告诉监听事件(提交到线程池中执行);工作的期待与唤醒通过Object的wait()和notifyAll()实现

DefaultChannelPromise实现
DefaultChannelPromise是DefaultPromise的子类,外部保护了一个通道变量Channel channel;Promise机制相干的办法都是调用父类办法。
除此之外,还实现了FlushCheckpoint接口,供ChannelFlushPromiseNotifier应用,咱们能够将ChannelFuture注册到ChannelFlushPromiseNotifier类,当有数据写入或达到checkpoint时应用。

interface FlushCheckpoint {
    long flushCheckpoint();
    void flushCheckpoint(long checkpoint);
    ChannelPromise promise();
}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理