关于future:Future和Promise

6次阅读

共计 16348 个字符,预计需要花费 41 分钟才能阅读完成。

第四章 Future 和 Promise

Netty 是一个异步网络解决框架,在实现中大量应用了 Future 机制,并在 Java 自带 Future 的根底上,减少了 Promise 机制。这两者的目标都是使异步编程更加方便使用。在浏览源码之前,咱们须要对 Future 的机制有很分明的意识。

4.1 异步编程模型
4.1.1 Future
应用 Future 机制时,咱们调用耗时工作会立即返回一个 Future 实例,应用该实例可能以阻塞的形式或者在将来某刻取得耗时工作的执行后果,还能够增加监听事件设置后续程序。

function Future asynchronousFunction(String arg){
Future future = new Future(new Callable(){

  public Object call(){return null;}

});
return future;
}
ReturnHandler handler = asynchronousFunction(); // 耗时函数,但会立刻返回一个句柄
handler.getResult(); // 通过句柄能够期待后果
handler.addListener(); // 通过句柄能够增加实现后执行的事件
handler.cancel(); // 通过句柄勾销耗时工作
4.1.2 Promise
在 Future 机制中,业务逻辑所在工作执行的状态(胜利或失败)是在 Future 中实现的,而在 Promise 中,能够在业务逻辑管制工作的执行后果,相比 Future,更加灵便。

// 异步的耗时工作接管一个 promise
function Promise asynchronousFunction(String arg){

Promise  promise = new PromiseImpl();
Object result = null;
result = search()  // 业务逻辑,
if(success){promise.setSuccess(result); // 告诉 promise 以后异步工作胜利了,并传入后果
}else if(failed){promise.setFailure(reason); //// 告诉 promise 以后异步工作失败了
 }else if(error){promise.setFailure(error); //// 告诉 promise 以后异步工作产生了异样
  }

}

// 调用异步的耗时工作
Promise promise = asynchronousFunction(promise);// 会立刻返回 promise
// 增加胜利解决 / 失败解决 / 异样解决等事件
promise.addListener();// 例如,能够增加胜利后执行的事件
doOtherThings() ; // 持续做其余事件,不须要理睬 asynchronousFunction 何时完结
在 Netty 中,Promise 继承了 Future,蕴含了这两者的性能。

Java 的 Future 机制

4.2.1 Java 的 Future 机制
Future 顾名思义,是一个将来实现的异步操作,能够取得将来返回的值。罕用的场景如:调用一个耗时的办法 search()(依据产品名称在全网查问价格,假如须要 3s 左右能力返回),该办法会立刻返回 Future 对象,调应用 Future.get() 能够同步期待耗时办法的返回,也能够调用 future 的 cancel()勾销 Future 工作。如上面的程序,search 办法逻辑会依据名字在全网查找价格,假如须要耗时 3s,该办法会立刻返回一个 Future 对象供用户线程应用;在主办法中能够应用 get()期待获取到价格,也能够应用 cancel()勾销查问。

public Future<String> search(String prodName) {

    FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
        @Override
        public String call()  {
            try {System.out.println(String.format(">>search price of %s from internet!",prodName));
            Thread.sleep(3000);
            return "$99.99";
            }catch(InterruptedException e){System.out.println("search function is Interrupted!");
            }
            return null;
        }
    });
    new Thread(future).start();// 交给线程去执行
    return future; // 立即返回 future 对象
}

JavaFuture jf = new JavaFuture();
Future<String> future = jf.search(“Netty 权威指南 ”);// 返回 future
System.out.println(“Begin search,get future!”);

// 测试 1 -【获取后果】期待 3s 后会返回
String prods = future.get();// 获取 prods
System.out.println(“get result:”+prods);

// 测试 2 -【勾销工作】1s 后勾销工作
Thread.sleep(1000);
future.cancel(false);//true 时会中断线程,false 不会
System.out.println(“Future is canceled? ” + (future.isCancelled()?”yes”:”no”));

Thread.sleep(4000); // 期待 4s 检查一下 future 所在线程是否还在执行
4.2.2 Future 的实现
如果咱们须要实现一个 Future,考虑一下须要实现哪些性能:

Future<String> future = jf.search(“Netty 权威指南 ”);

Future search(){
// 启动线程或者在线程池中执行业务逻辑
return future; // 立即返回 future
}
search 办法须要立刻返回一个 Future 对象,并且须要启动一个线程(或线程池)执行业务逻辑;
因为 Future 对象能够期待线程执行完结或者勾销线程,Future 外部须要可能治理业务逻辑的执行状态。
业务逻辑完结或异样时须要通知 Future 对象,有两种形式:在 Future 中启动线程执行业务逻辑;或者业务逻辑独自执行,通过创立的 Future 实例的办法如 setSuccess(result)办法告诉 Future。Java 的 FutureTask 采纳了第一种办法,其自身继承了 Runnable,在 run 办法中执行传入的业务逻辑。而 Netty 的 Promise 中采纳了第二种办法。
get()办法中,如果业务逻辑还未执行结束,须要期待,能够用锁机制实现。
Java 中的 Future 是一个接口,外部有如下办法:

boolean cancel(boolean mayInterruptIfRunning) 试图勾销对此工作的执行。
V get() 如有必要,期待计算实现,而后获取其后果。
V get(long timeout, TimeUnit unit) 如有必要,最多期待为使计算实现所给定的工夫之后,获取其后果(如果后果可用)。
boolean isCancelled() 如果在工作失常实现前将其勾销,则返回 true。
boolean isDone() 如果工作已实现,则返回 true。
上面,咱们本人实现一个 Future 加深了解,上面定义了一个继承 Future 的 MyFutureTask,初始化时传递一个 Callable 作为业务逻辑,实现 Future 接口是为了管制业务逻辑线程,实现 Runnable 接口是为了业务线程执行时可能批改 Future 的外部状态。

public class MyFutureTask<V> implements Future<V>,Runnable {

Callable<V> callable; // 业务逻辑
boolean running = false ,done = false,cancel = false;// 业务逻辑执行状态
ReentrantLock lock ;// 锁
V outcome;// 后果

public MyFutureTask(Callable<V> callable) {if(callable == null) {throw new NullPointerException("callable cannot be null!");
    }
    this.callable = callable;
    this.done = false;
    this.lock = new ReentrantLock();}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    callable = null;
    cancel = true;
    return true;
}

@Override
public boolean isCancelled() {return cancel;}

@Override
public boolean isDone() {return done;}

@Override
public V get() throws InterruptedException, ExecutionException {
    try {this.lock.lock();// 先获取锁,取得后阐明业务逻辑曾经执行结束
        return outcome;
    }finally{this.lock.unlock();
    }
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    try {this.lock.tryLock(timeout, unit);
        return outcome;
    }catch (InterruptedException e) {return null;}finally{this.lock.unlock();
    }
}
@Override
public void run() {
    try {this.lock.lock(); // 启动线程,先上锁,避免 get 时间接返回
        running = true;
        try {outcome = callable.call(); // 业务逻辑
        } catch (Exception e) {e.printStackTrace();
        }
        done = true;
        running = false;
    }finally {this.lock.unlock(); // 解锁后 get 可获取
    }
}

}

测试程序如下:

public Future<String> search(String prodName) {

    MyFutureTask<String> future = new MyFutureTask<String>(new Callable<String>() {
        @Override
        public String call()  {
            try {System.out.println(String.format(">>search price of %s from internet!",prodName));
            Thread.sleep(3000);
            return "$99.99";
            }catch(InterruptedException e){System.out.println("search function is Interrupted!");
            }
            return null;
        }
    });
    new Thread(future).start();// 或提交到线程池中
    return future;
}

4.2.3 Java 的 Future 实现
当然,下面是本人实现的 FutureTask,Java 自带的 FutureTask 要比下面的更加简单和强壮。上面咱们进行一些剖析。

FutureTask 外部保护了 state,示意运行状态,只能通过 set,setException, 和 cancel 来批改。

 private static final int NEW          = 0;  // 初始状态,private static final int COMPLETING   = 1; // 业务逻辑曾经完结
private static final int NORMAL       = 2;  // 失常完结
private static final int EXCEPTIONAL  = 3; // 异样完结
private static final int CANCELLED    = 4; // 曾经勾销
private static final int INTERRUPTING = 5; // 中断中
private static final int INTERRUPTED  = 6; // 曾经中断

private volatile WaitNode waiters; 保护了期待的线程,get()办法时,如果业务逻辑还未执行结束,则创立 WaitNode q,将其 q.next 设置为 waiters,waiters 设置为 q;这样组成了一个期待链表。在业务逻辑执行结束(失常或异样完结)时,
run 办法

run 办法用来执行业务逻辑,在此过程中须要保护好业务逻辑的运行状态

public void run() {

    // 1. 如果 state 不为初始状态或者 runner 不为 null,阐明曾经在运行了,间接返回
    // 如果为空,应用 CAS 将 runner 设置为以后线程,避免并发进入
    //runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) { // 2. 业务逻辑不为空并且 state 为 NEW 时才运行
            V result;
            boolean ran;
            try {result = c.call(); // 3. 执行业务逻辑
                ran = true;  // ran 为 true 示意失常返回
            } catch (Throwable ex) {
                result = null;  // 产生异样,后果为 null
                ran = false; // 非正常完结
                setException(ex); // 设置异样
            }
            if (ran)
                set(result); // 失常完结,设置后果
        }
    } finally {// 为例避免并发调用 run()办法,进入 run 时应用 cas 将 runner 设置为非空,完结时设为 null
        runner = null;
        int s = state;  // 以后状态为 INTERRUPTING 或者 INTERRUPTED 阐明要勾销
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);// 如果在中断进行中,则始终期待
    }
}

执行 run 办法时,要判断 Future 状态是否正确,必须为 NEW;应用 CAS 将 runner 对象设置为以后线程,若 runner 不为 null,阐明其余线程曾经执行了 run 办法,则间接 return;
状态为 NEW,执行传入的业务逻辑,失常完结时,将后果保留到 result,ran 设置为 true;若产生异样,设置 result 为空,ran 为 false,并设置异样 setException(ex);
失常完结,调用 set(result); 设置后果
业务逻辑执行完结,讲 runner 设置为 null,若线程在 INTERRUPTING 或者 INTERRUPTED 阐明要勾销;如果在中断进行中,则始终期待。
setException(ex); 业务逻辑异样时调用
protected void setException(Throwable t) {

     // 若状态为 NEW,将其设置为 COMPLETING- 实现
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t; // 后果为抛出的异样
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态为 EXCEPTIONAL- 异样
        finishCompletion();}
}

set(V v) 业务逻辑失常完结时设置后果
protected void set(V v) {

    // 若状态为 NEW,将其设置为 COMPLETING- 实现
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态为 NORMAL- 失常完结
        finishCompletion();}
}

finishCompletion 做了一些收尾性工作,依据 waiters 链表,唤醒期待的线程。
private void finishCompletion() {

    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) { // 遍历链表
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); // 唤醒线程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

get 办法

get 时,如果业务逻辑尚未完结,须要应用 LockSupport.park(this); 将休眠期待的线程,在业务逻辑实现后,finishCompletion()会唤醒线程,之后返回业务逻辑的处理结果。

public V get() throws InterruptedException, ExecutionException {

    int s = state;
    // 如果状态为 NEW 或者 COMPLETING,阐明还未完结,退出期待链表 waiters
    if (s <= COMPLETING)  
        s = awaitDone(false, 0L);
    return report(s); // 返回后果
}

cancel 办法

cancel 办法会勾销执行业务逻辑的,次要逻辑如下:

public boolean cancel(boolean mayInterruptIfRunning) {

    // mayInterruptIfRunning 示意以中断勾销
    // 如果状态为 NEW,阐明还未执行,无需勾销;讲状态设置为打断或勾销
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) { // 以中断勾销
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt(); // 执行线程的 interrupt 办法} finally { // 中断实现,批改状态为 INTERRUPTED- 已中断
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {finishCompletion(); // 唤醒期待线程
    }
    return true;
}

复制
通过剖析,能够看到,java 的 FutureTask 通过 state 记录业务逻辑的执行状态;多线程时应用 CAS 避免反复进入;业务逻辑未执行实现时,会将线程退出到 waiter 链表,应用 LockSupport.park()阻塞业务线程;业务逻辑执行结束或产生异样或被勾销时,唤醒期待列表的线程。
与咱们实现时应用的 ReentrantLock 在原理上是一样的,ReentrantLock 的 lock 在获取不到锁时,也会保护一个链表保留期待列表,开释锁时,唤醒期待列表上的线程。区别在与,Java 的实现会同时唤醒所有的期待线程,而 unlock 时等线程表会顺次取得锁。

Netty 的 Future 机制

4.3.1 Netty 的 Future
Netty 的 Future 在 concurrent 包的 Future 根底上,减少了更多的性能。在 Java 的 Future 中,次要是工作的运行 / 勾销,而 Netty 的 Future 减少了更多的性能。

public interface Future<V> extends java.util.concurrent.Future<V>

boolean isSuccess(); 只有 IO 操作实现时才返回 true
boolean isCancellable(); 只有当 cancel(boolean)胜利勾销时才返回 true
Throwable cause(); IO 操作产生异样时,返回导致 IO 操作以此的起因,如果没有异样,返回 null
// 向 Future 增加事件,future 实现时,会执行这些事件,如果 add 时 future 曾经实现,会立刻执行监听事件
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 移除监听事件,future 实现时,不会触发
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> sync() throws InterruptedException; // 期待 future done
Future<V> syncUninterruptibly(); // 期待 future done,不可打断
Future<V> await() throws InterruptedException; // 期待 future 实现
Future<V> awaitUninterruptibly(); // 期待 future 实现,不可打断
V getNow(); // 立即取得后果,如果没有实现,返回 null
boolean cancel(boolean mayInterruptIfRunning); // 如果胜利勾销,future 会失败,导致 CancellationException
Netty 为 Future 退出的性能次要是增加 / 删除监听事件,在 Promise 中会有实例演示。其余的办法是为 get() 办法服务的,get()办法能够通过调用 await/getNow 等办法实现。

4.3.2 Netty 的 Promise 机制
Netty 的 Future 与 Java 自带到 Future 略有不同,其引入了 Promise 机制。在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call() 或 run()执行结束意味着业务逻辑的完结;而在 Promise 机制中,能够在业务逻辑中人工设置业务逻辑的胜利与失败。

Netty 中 Promise 接口的定义如下:

public interface Promise<V> extends Future<V> {

// 设置 future 执行后果为胜利
Promise<V> setSuccess(V result);
   // 尝试设置 future 执行后果为胜利, 返回是否设置胜利

boolean trySuccess(V result);
// 设置失败

Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
// 设置为不能取消
boolean setUncancellable();
// 一下省略了笼罩 Future 的一些办法

}
上面以一个例子来阐明 Promise 的应用办法,还是以 seach()查问产品报价为例:

// main 办法
NettyFuture4Promise test = new NettyFuture4Promise();
Promise<String> promise = test.search(“Netty In Action”);
String result = promise.get();
System.out.println(“price is ” + result);

//
private Promise<String> search(String prod) {

    NioEventLoopGroup loop = new NioEventLoopGroup();
    // 创立一个 DefaultPromise 并返回
    DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());
    loop.schedule(new Runnable() {
        @Override
        public void run() {
            try {System.out.println(String.format(">>search price of %s from internet!",prod));
                Thread.sleep(5000);
                promise.setSuccess("$99.99");// 期待 5S 后设置 future 为胜利,// promise.setFailure(new NullPointerException()); // 当然,也能够设置失败
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    },0,TimeUnit.SECONDS);
    
    return promise;
}

能够看到,Promise 可能在业务逻辑线程中告诉 Future 胜利或失败,因为 Promise 继承了 Netty 的 Future,因而能够退出监听事件。

// main 办法中,查问完结后获取 promise,退出两个监听事件,别离给小 Hong 发告诉和 Email
Promise<String> promise = test.search(“Netty In Action”);

    promise.addListener(new GenericFutureListener<Future<? super String>>() {
        @Override
        public void operationComplete(Future<? super String> future) throws Exception {System.out.println("Listener 1, make a notifice to Hong,price is" + future.get());
        }
        
    });
    promise.addListener(new GenericFutureListener<Future<? super String>>() {
        @Override
        public void operationComplete(Future<? super String> future) throws Exception {System.out.println("Listener 2, send a email to Hong,price is" + future.get());
        }
        
    });

Future 和 Promise 的益处在于,获取到 Promise 对象后能够为其设置异步调用实现后的操作,而后立刻持续去做其余工作。

4.3.3 Netty 罕用的 Promise 类
Netty 罕用的纯 Future 机制的类,有 SucceededFuture 和 FailedFuture,他们不须要设置业务逻辑代码,会立即实现,只须要设置胜利后的返回和抛出的异样。

Netty 的罕用 Promise 类有 DefalutPromise 类,这是 Promise 实现的根底,后续会对这个类的实现进行解读;DefaultChannelPromise 是 DefalutPromise 的子类,退出了 channel 这个属性。

上面对 DefaultChannelPromise 进行剖析,其类图如下:
NettyFuture 类图

DefaultPromise 的应用
Netty 中波及到异步操做的中央都应用了 promise,例如,上面是服务器 / 客户端启动时的注册工作,最终会调用 unsafe 的 register,调用过程中会传入一个 promise,unsafe 进行事件的注册时调用 promise 能够设置胜利 / 失败。

// SingleThreadEventLoop.java
public ChannelFuture register(final ChannelPromise promise) {

    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

// AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {

if (eventLoop == null) {throw new NullPointerException("eventLoop");
}
if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));
    return;
}
if (!isCompatible(eventLoop)) {
    promise.setFailure(new IllegalStateException("incompatible event loop type:" + eventLoop.getClass().getName()));
    return;
}

……
}
DefaultPromise 的实现
DefaultChannelPromise 提供的性能能够分为两个局部:一方面是为调用者提供 get() 和 addListener()用于获取 Future 工作执行后果和增加监听事件;另一方面是为业务解决工作提供 setSuccess()等办法设置工作的胜利或失败。

get 办法

DefaultPromise 的 get 办法有两个,无参数的 get 会阻塞期待;有参数的 get 会期待指定事件,若未完结抛出超时异样。这两个 get()是在其父类 AbstractFuture 中实现的,通过调用上面四个办法实现:

await(); // 期待 Future 工作完结
await(timeout, unit) // 期待 Future 工作完结,超过事件则抛出异样
cause(); // 返回 Future 工作的异样
getNow() // / 返回 Future 工作的执行后果

// 先期待,如果有异样则抛出,无异样返回 getNow()
public V get() throws InterruptedException, ExecutionException {

    await();

    Throwable cause = cause();
    if (cause == null) {return getNow();
    }
    if (cause instanceof CancellationException) {throw (CancellationException) cause;
    }
    throw new ExecutionException(cause);
}

await

await()办法判断 Future 工作是否完结,之后获取 this 锁,如果工作未实现,则调用 Object 的 wait()期待

public Promise<V> await() throws InterruptedException {

    // 判断 Future 工作是否完结,外部依据 result 是否为 null 判断,setSuccess 或 setFailure 时会通过 CAS 批改 result
    if (isDone()) {return this;}

    if (Thread.interrupted()) { // 线程是否被中断
        throw new InterruptedException(toString());
    }

    checkDeadLock(); // 查看以后线程是否与线程池运行的线程是一个

    synchronized (this) {while (!isDone()) {incWaiters(); // waiters 计数加 1
            try {wait();  // Object 的办法,让出 cpu,退出期待队列
            } finally {decWaiters(); //  waiters 计数减 1
            }
        }
    }
    return this;
}

await(long timeout, TimeUnit unit)与 awite 相似,只是调用了 Object 对象的 wait(long timeout, int nanos)办法
awaitUninterruptibly() 办法在外部 catch 住了期待线程的中断异样,因而不会抛出中断异样。

监听事件相干办法
add/remove 办法

addListener 办法被调用时,将传入的回调类传入到 listeners 对象中,如果监听多于 1 个,会创立 DefaultFutureListeners 对象将回调办法保留在一个数组中。removeListener 会将 listeners 设置为 null(只有一个时)或从数组中移除(多个回调时)。

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {

    if (listeners == null) {listeners = listener;} else if (listeners instanceof DefaultFutureListeners) {((DefaultFutureListeners) listeners).add(listener);
    } else {listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
    }
}

private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {

    if (listeners instanceof DefaultFutureListeners) {((DefaultFutureListeners) listeners).remove(listener);
    } else if (listeners == listener) {listeners = null;}
}

notifyListeners()

在增加监听器的过程中,如果工作刚好执行结束 done(), 则立刻触发监听事件。触发监听通过 notifyListeners()实现。次要逻辑为:如果以后 addListener 的线程(精确来说应该是调用 notifyListeners 的线程,因为 addListener 和 setSuccess 都会调用 notifyListeners()和 Promise 内的线程池以后执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池去执行;例如,main 线程中调用 addListener 时工作实现,notifyListeners()执行回调,会提交到线程池中执行;而如果是执行 Future 工作的线程池中 setSuccess()时调用 notifyListeners(),会放在以后线程中执行。
外部保护了 notifyingListeners 用来记录是否曾经触发过监听事件,只有未触发过且监听列表不为空,才会顺次便当并调用 operationComplete

private static void notifyListener0(Future future, GenericFutureListener l) {

    try {l.operationComplete(future);
    } catch (Throwable t) {logger.warn("An exception was thrown by" + l.getClass().getName() + ".operationComplete()", t);
    }
}

setSuccess()办法
Future 工作在执行实现后调用 setSuccess() 或 setFailure()告诉 Future 执行后果;次要逻辑是:批改 result 的值,若有期待线程则唤醒,告诉监听事件。

if (setSuccess0(result)) {// 设置胜利后唤醒期待线程

        notifyListeners(); // 告诉
        return this;

}
// 告诉胜利时将后果保留在变量 result,告诉失败时,应用 CauseHolder 包装 Throwable 赋值给 result
// RESULT_UPDATER 是一个应用 CAS 更新外部属性 result 的类,
// 如果 result 为 null 或 UNCANCELLABLE,更新为胜利 / 失败后果;UNCANCELLABLE 是不可勾销状态
private boolean setValue0(Object objResult) {

    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {checkNotifyWaiters();// 调用 Object 的 notifyAll(); 告诉期待线程
        return true;
    }
    return false;
}

cancel()办法
cancel 用来勾销工作,依据 result 判断,如果能够勾销,则唤醒期待线程,告诉监听事件。

public boolean cancel(boolean mayInterruptIfRunning) {

// 如果 result 为 null,阐明未 setUncancellable()/setSuccess/setFailure
    if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {checkNotifyWaiters(); // 唤醒期待线程
        notifyListeners(); // 触发监听事件
        return true;
    }
    return false;
}

通过下面的剖析,咱们能够看到 DefaultPromise 外部通过 result 记录 Future 工作的执行状态:

null – 未实现
CANCELLATION_CAUSE_HOLDER - 被勾销
UNCANCELLABLE – 不可勾销
业务解决调用 setSuccess 时传入的后果
业务解决调用 setFailure 时包装 Throws 的 CauseHolder
DefaultPromise 外部保护了一个监听列表保留监听事件,在工作实现或勾销时告诉监听事件(提交到线程池中执行);工作的期待与唤醒通过 Object 的 wait()和 notifyAll()实现

DefaultChannelPromise 实现
DefaultChannelPromise 是 DefaultPromise 的子类,外部保护了一个通道变量 Channel channel;Promise 机制相干的办法都是调用父类办法。
除此之外,还实现了 FlushCheckpoint 接口,供 ChannelFlushPromiseNotifier 应用,咱们能够将 ChannelFuture 注册到 ChannelFlushPromiseNotifier 类,当有数据写入或达到 checkpoint 时应用。

interface FlushCheckpoint {long flushCheckpoint();
    void flushCheckpoint(long checkpoint);
    ChannelPromise promise();}

正文完
 0