关于juc:JUC包之Future模式

41次阅读

共计 8953 个字符,预计需要花费 23 分钟才能阅读完成。

Future 模式

Future 模式是多线程开发中的一种常见的设计模式,核心思想异步调用,让串行化的问题变得并行处理节省时间。

当程序执行一个工作时,这个工作可能执行的很慢,它不可能立刻返回后果,但能够返回一个契约,因而咱们能够在该工作执行的时候,再去执行其它工作,最终用该契约获取后果。

举个栗子:

在网上买了一部手机,手机三天后才会到货,但会马上产生一个订单,这个订单就是上述所提到的契约,而后咱们不必始终干等手机的到来,齐全能够去忙别的事,当快递到来的时候,订单核查一下,而后就取得了最终后果。

JDK 中的 Future 模式

Future 接口相似于之前的契约,依据 Future 对象调用 get 办法最终获取到后果。

FutureTask 接口实现 Callable 接口对象到 Runnable 接口对象的过渡,最终会交由 Callable 接口实现,Callable 接口的 call 办法返回最终后果。

FutureTask 类阐明

英文有肯定阻碍的看中文正文

/**
一、订正阐明:

1、这与该类以前依赖 AbstractQueuedSynchronizer 的版本不同,
次要是为了防止用户在勾销竞争期间意外地保留中断状态。

2、以后设计中的同步控制依赖于通过 CAS 更新的“state”字段来跟踪实现状况,
以及一个用于保留期待线程的简略 Treiber 堆栈。

二、阐明:

1、与平常一样,咱们绕过了应用 atomicxfielddupdater 的开销,而是间接应用不平安的外部函数。

工作状态:
一、状态阐明
1、此工作的运行状态,最后为新建。
2、运行状态仅在办法 set、setException 和 cancel 中转换为终端状态。
3、在实现过程中,状态可能会出现实现(在设置后果时)或中断(仅在中断转轮以满足勾销(true))的瞬态值。
4、从这些中间状态到最终状态的转换应用更便宜的程序 / 提早写入,因为值是惟一的,无奈进一步批改。

二、可能的状态转换:

1、新建 -> 实现 -> 失常
2、新建 -> 实现 -> 异样
3、新建 -> 勾销
4、新建 -> 中断 -> 中断
**/

/**
 * A cancellable asynchronous computation.  This class provides a base
 * implementation of {@link Future}, with methods to start and cancel
 * a computation, query to see if the computation is complete, and
 * retrieve the result of the computation.  The result can only be
 * retrieved when the computation has completed; the {@code get}
 * methods will block if the computation has not yet completed.  Once
 * the computation has completed, the computation cannot be restarted
 * or cancelled (unless the computation is invoked using
 * {@link #runAndReset}).
 *
 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
 * {@link Runnable} object.  Because {@code FutureTask} implements
 * {@code Runnable}, a {@code FutureTask} can be submitted to an
 * {@link Executor} for execution.
 *
 * <p>In addition to serving as a standalone class, this class provides
 * {@code protected} functionality that may be useful when creating
 * customized task classes.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's {@code get} methods
 */
public class FutureTask<V> implements RunnableFuture<V> {
    /*
     * Revision notes: This differs from previous versions of this
     * class that relied on AbstractQueuedSynchronizer, mainly to
     * avoid surprising users about retaining interrupt status during
     * cancellation races. Sync control in the current design relies
     * on a "state" field updated via CAS to track completion, along
     * with a simple Treiber stack to hold waiting threads.
     *
     * Style note: As usual, we bypass overhead of using
     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
     */

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

    // 外部类 WaitNode 
    // 在 Treiber 堆栈中记录期待线程的简略链表节点。// 无关更具体的阐明,请参见其余类,如 Phaser 和 SynchronousQueue。/**
     * Simple linked list nodes to record waiting threads in a Treiber
     * stack.  See other classes such as Phaser and SynchronousQueue
     * for more detailed explanation.
     */
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

简略应用

get 办法

package com.github.excelent01;

import java.util.concurrent.*;

/**
 * @auther plg
 * @date 2019/5/17 16:54
 */
public class TestFuture {public static void main(String[] args) {ExecutorService service = Executors.newSingleThreadExecutor();
        Future<Integer> future =  service.submit(()->{TimeUnit.SECONDS.sleep(10); // 模仿延时
            return 10;
        });
        //==============================
        System.out.println("do other works.");
        //==============================

        try {System.out.println(future.get());
        } catch (InterruptedException e) {e.printStackTrace();
        } catch (ExecutionException e) {e.printStackTrace();
        }
        service.shutdown();}
}

Future 接口 API

public interface Future<V> {

    /**
     * 用来勾销工作,勾销胜利则返回 true,勾销失败则返回 false。* mayInterruptIfRunning 参数示意是否容许勾销正在执行却没有执行结束的工作,设为 true,则示意能够勾销正在执行过程中的工作。* 如果工作已实现,则无论 mayInterruptIfRunning 为 true 还是 false,此办法都返回 false,即如果勾销曾经实现的工作会返回 false;* 如果工作正在执行,若 mayInterruptIfRunning 设置为 true,则返回 true,若 mayInterruptIfRunning 设置为 false,则返回 false;* 如果工作还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,必定返回 true。*/
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 示意工作是否被勾销胜利,如果在工作失常实现前被勾销胜利,则返回 true
     */
    boolean isCancelled();

    /**
     * 示意工作是否曾经实现,若工作实现,则返回 true
     */
    boolean isDone();

    /**
     * 获取执行后果,如果最终后果还没得出该办法会产生阻塞,直到工作执行结束返回后果
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 获取执行后果,如果在指定工夫内,还没获取到后果,则抛出 TimeoutException
     */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Future 就是对于 Runnable 或 Callable 工作的执行进行查问、中断工作、获取后果。

因为烧水要花费 15min,因而没有必要白白干等,浪费时间,在这个时间段内,能够去实现筹备茶具的一些工作,等茶具筹备好之后,就静等水烧开了。因而这是一个典型的依附 Future 模式来解决的问题。

代码实现:

package Future;

import java.util.Scanner;
import java.util.concurrent.*;

/**
 * @auther plg
 * @date 2019/5/17 17:54
 */
public class TeaTest2 {public static void main(String[] args) throws InterruptedException, ExecutionException {BoilWater boilWater = new BoilWater();
        FutureTask<String> futureTask1 = new FutureTask<>(boilWater);
        ReadyTeaSet readyTeaSet = new ReadyTeaSet(futureTask1);
        FutureTask<String> futureTask2 = new FutureTask<>(readyTeaSet);
        new Thread(futureTask1).start();
        Thread.sleep(2000);
        new Thread(futureTask2).start();
        System.out.println(futureTask2.get());

    }
}
// T1 线程
class  BoilWater implements Callable<String> {
    @Override
    public String call() throws Exception {System.out.println("T1: 洗水壶");
        Thread.sleep(1000);
        System.out.println("T1: 烧水");
        Thread.sleep(10000);
        return "T1: 水烧开了。";
    }
}
// T2 线程 ReadyTeaSet
class ReadyTeaSet implements Callable<String>{
    private FutureTask<String> futureTask = null;
    public ReadyTeaSet(FutureTask<String> futureTask) {this.futureTask = futureTask;}

    @Override
    public String call() throws Exception {System.out.println("T2: 洗水杯");
        Thread.sleep(1000);
        System.out.println("T2: 洗茶壶");
        Thread.sleep(2000);
        System.out.println("T2: 取茶叶");
        Thread.sleep(1000);
        System.out.println("T2: 等着水烧开。");
        System.out.println(futureTask.get());
        return "一壶好茶.";
    }
}

运行后果:
T1: 洗水壶
T1: 烧水
T2: 洗水杯
T2: 洗茶壶
T2: 取茶叶
T2: 等着水烧开。
T1: 水烧开了。
一壶好茶.

Process finished with exit code 0

一般模式与 Future 模式的简略比照:

  1. 一般模式在解决多任务时是串行的,在遇到耗时操作的时候只能期待,直到阻塞被解除,才会继续执行下一个工作
  2. Future 模式,只是发动了耗时操作,函数立马就返回了,真正执行具体操作由另外一个工作线程去实现,并不会阻塞客户端线程。

所以在工作线程执行耗时操作的时候客户端无需期待,能够持续做其余事件,等到须要的时候再向工作线程获取后果。

Future 模式详解:

1、Future 模式是多线程设计罕用的一种设计模式。

2、它的核心思想是异步调用。

对于 Future 模式来说,它无奈立刻返回你须要的数据,然而它会返回一个契约,未来你能够凭借这个契约去获取你须要的信息。

Future 模式能够简略了解成:我有一个工作,它比拟耗时,然而我又不想始终空等,而且有时候工作的后果并不立即须要,于是我把这工作提交给了 Future,Future 替我实现这个工作,同时 Future 将这个工作订单的信息返回给我。

那么我就能够不必等了,本人能够去做任何想做的事件。

当我须要这个工作后果的时候,我能够依据返回的订单信息,尝试从 Future 那里去取出该工作的后果(当然如果此时还未实现,则会阻塞)。

当然,思考到此时工作可能还未实现,Future 也反对工作是否实现检测,由此,咱们能够依据是否实现设计不同的该当逻辑。

FutureTask

说完 Future,Future 因为是接口不能间接用来创建对象,就有了上面的 FutureTask。

先看看 FutureTask 的实现:

能够看到 FutureTask 类实现了 RunnableFuture 接口,接着看 RunnableFuture 接口源码:


public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();}

能够看到 RunnableFuture 接口继承了 Runnable 接口和 Future 接口,也就是说其实 FutureTask 既能够作为 Runnable 被线程执行,也能够作为 Future 失去 Callable 的返回值。

手动实现 Future 模式:

上面的 DataFuture 类只是一个包装类,创立它时无需阻塞期待。

在工作线程筹备好数据后应用 setRealData 办法将数据传入。

客户端只有在真正须要数据时调用 getRealData 办法即可,如果此时数据已筹备好则立刻返回,否则 getRealData 办法就会期待,直到获取数据实现。

DataFuture

public class DataFuture<T> {

    private T realData;
    private boolean isOK = false;

    public synchronized T getRealData() {while (!isOK) {
            try {
                // 数据未筹备好则期待
                wait();} catch (Exception e) {e.printStackTrace();
            }
        }
        return realData;
    }

    public synchronized void setRealData(T data) {
        isOK = true;
        realData = data;
        notifyAll();}

}

上面实现一服务端,客户端向服务端申请数据时,服务端并不会立即去加载真正数据,只是创立一个 DataFuture,创立子线程去加载真正数据,服务端间接返回 DataFuture 即可。

Server

import java.util.concurrent.Executors;

public class Server {public DataFuture<String> getData() {final DataFuture<String> data = new DataFuture<>();

        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            public void run() {
                try {Thread.sleep(5000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
                data.setRealData("最终数据");
            }
        });
        return data;
    }
}

测试代码

客户端调用 代码如下:

TestDataFuture



public class TestDataFuture {public static void main(String[] args) {long start = System.currentTimeMillis();
        Server server = new Server();
        DataFuture<String> dataFuture = server.getData();

        try {
            // 先执行其余操作
            Thread.sleep(5000);
            // 模仿耗时...
        } catch (InterruptedException e) {e.printStackTrace();
        }

        System.out.println("后果数据:" + dataFuture.getRealData());
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

测试后果

后果数据:最终数据
耗时: 5006

Process finished with exit code 0

Future 不足之处

下面说了一堆 Future 的益处,那么就没有毛病吗?

下面例子能够看到应用 Future 模式比传统模式效率明显提高了,应用 Future 肯定水平上能够让一个线程池内的工作异步执行;

但同时也有个显著的毛病:

就是回调无奈放到与工作不同的线程中执行,传统回调最大的问题就是不能将控制流拆散到不同的事件处理器中。

比方主线程要等各个异步执行线程返回的后果来做下一步操作,就必须阻塞在 future.get()办法期待后果返回,这时其实又是同步了,如果遇到某个线程执行工夫太长时,那状况就更糟了。

到 Java8 时引入了一个新的实现类 CompletableFuture,补救了下面的毛病,在下篇会解说 CompletableFuture 的应用。

关键词:java 培训

正文完
 0