简介
FutureTask可用于异步获取执行后果或者勾销工作。通过实现Runnable或者Callable接口定义工作给FutureTask,而后间接调用其run()办法或者把工作放入线程池执行。在内部能够通过FutureTask的get()办法异步获取执行后果。因而FutureTask非常适合用于耗时的计算,主线程能够实现本人的工作之后再去获取后果。另外,FutureTask能够确定即便屡次调用了run()办法,它只执行一次Runnable或者Callable工作,获取通过cancel勾销FutureTask工作。
应用场景
利用FutureTask和ExecutorService,能够用多线程的形式提交计算工作,主线程继续执行其余工作,当主线程须要子线程的计算结果是,再异步获取现成的执行后果
FutureTask在高并发环境下确保工作只执行一次
应用案例
public class FutureTaskTest {
public static void main(String[] args) {
FutureTaskTest futureTaskTest = new FutureTaskTest();
ArrayList<FutureTask<Integer>> taskList = new ArrayList<>();
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
// 传入Callable对象创立FutureTask对象
FutureTask<Integer> ft = new FutureTask<Integer>(futureTaskTest.new ComputeTask(i, "" + i));
taskList.add(ft);
// 提交给线程池执行工作,也能够通过exec.invokeAll(taskList)一次性提交所有工作;
service.submit(ft);
}
System.out.println("所有计算工作提交结束, 主线程接着干其余事件!");
// 开始统计各计算线程计算结果
Integer totalResult = 0;
for (FutureTask<Integer> ft : taskList) {
try {
//FutureTask的get办法会主动阻塞,直到获取计算结果为止
totalResult = totalResult + ft.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
// 敞开线程池
service.shutdown();
System.out.println("多任务计算后的总后果是:" + totalResult);
}
private class ComputeTask implements Callable<Integer>{
private Integer result = 0;
private String taskName = "";
public ComputeTask(Integer iniResult, String taskName) {
result = iniResult;
this.taskName = taskName;
System.out.println("生成子线程计算工作: " + taskName);
}
@Override
public Integer call() throws Exception {
for (int i = 0; i < 100; i++) {
result = +i;
}
// 休眠5秒钟,察看主线程行为,预期的后果是主线程会继续执行,到要获得FutureTask的后果是期待直至实现。
Thread.sleep(5000);
System.out.println("子线程计算工作: " + taskName + " 执行实现!");
return result;
}
}
}
在下面的案例中,咱们自定义ComputeTask实现了Callable接口,自定义工作内容。并创立了一个定长为5的线程池和一个ArrayList类型的FutureTask汇合,而后通过线程池的subbmit办法把自定义工作提交给线程池。
而submit办法如下
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
咱们能够看到submit有三个重载办法,案例里用的是传参为Callable的submit办法,最终通过newTaskFor办法创立了一个FutureTask办法提交给了线程池
开启了一个for循环,在for循环外面向线程池提交了10次task工作。
最初遍历list汇合外面的FutureTask,通过get()获取执行后果,对所有后果进行累加。
因为咱们通过newFixedThreadPool(5)创立了一个定长为5的线程池,最大并发数为5,多余的工作会在队列外面进行期待。所以会先执行五个task工作,而后再去执行队列外面的五个task工作。最初在输入后果的时候两头会有一个短暂的进展,这个进展是因为开始提交的五个task工作执行实现,get()办法获取到了执行后果,然而前面的五个还在队列外面没有执行,所以get办法阻塞导致呈现进展。
在很多高并发的环境下,往往咱们只须要某些工作只执行一次。这种应用情景FutureTask的个性恰能胜任。
源码解析
数据结构和常量剖析
WaitNode 节点用来存储在前面get()办法获取工作执行后果呈现阻塞时寄存线程应用。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
state示意当前任务执行状态
private volatile int state;
// 当前任务尚未执行
private static final int NEW = 0;
// 当前任务正在完结,然而没有齐全完结
private static final int COMPLETING = 1;
// 当前任务失常完结
private static final int NORMAL = 2;
// 当前任务执行过程中产生了异样 外部封装的Callable.run()向上抛出异样
private static final int EXCEPTIONAL = 3;
// 当前任务被勾销
private static final int CANCELLED = 4;
// 当前任务中断中
private static final int INTERRUPTING = 5;
// 当前任务曾经中断 中断不是线程进行了。中断只是线程的一个标记,如果submit提交的工作齐全没有响应中断
private static final int INTERRUPTED = 6;
// submit(runnable/callable) runnable应用装璜者模式伪装成callable
private Callable<V> callable;
/**
* 失常状况下工作失常执行完结,outcome保留执行后果
* 非正常状况,outcome保留异样*/
private Object outcome; // non-volatile, protected by state reads/writes
/**
* 当前任务被线程执行期间,保留以后执行工作的线程对象援用*/
private volatile Thread runner;
/**
* 因为会有很多线程去get当前任务的后果,这里是用栈保留来多个线程 */
private volatile WaitNode waiters;
次要办法
run()办法
run()办法是工作执行的入口
public void run() {
/**
* state不为new,阐明以后task曾经被其余线程解决过,以后线程不解决
* 通过CAS把task的runner设置为以后线程
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
// 执行到这里,以后task肯定是NEW状态,而且以后线程抢占task胜利
try {
// callable就是程序员本人封装的逻辑,就是下面案例中自定义的ComputeTask类
Callable<V> c = callable;
// 判断避免提交task为null,内部线程把以后task给cancel掉
if (c != null && state == NEW) {
V result;
// 工作执行状态
boolean ran;
try {
// 执行自定义工作内容,就是下面ComputeTask中的for循环对result累加操作
result = c.call();
// 设置task工作执行状态为true,示意当前任务曾经执行实现,然而FutureTask不是执行实现的状态
ran = true;
} catch (Throwable ex) {
// 如果执行自定义工作过程中出现异常,就把result设置为null,同时把FutureTask返回后果设置为执行过程中抛出的异样
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// 设置以后执行线程为nul
runner = null;
// 获取以后FutureTask状态
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
get()办法
get办法就是内部线程获取task工作执行后果的办法。如果当前任务状态为COMPLETING和NEW,则把以后线程封装成一个waitNode节点进入队列期待,否则就通过report办法获取执行后果或者执行后果中的异样,变量outcome就是保留工作执行后果或者执行过程中对外抛出的异样。
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 以后task未执行或者正在完结,调用get办法的内部线程会被阻塞
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
awaitDone() 办法
首先把waitNode置为null,而后进入for循环的自旋
CASE 1:
第一次自旋,因为以后waitNode,且以后内部线程第一次获取执行后果,没有属于本人的node节点,所以命中else if (q == null)这个判断条件,创立属于以后内部线程的node节点。
CASE 2:
因为以后线程有了本人的node节点,如果当前任务还是没有实现,就会命中else if (!queued)这个判断条件,通过CAS以头插法的模式把以后线程的node节点进入期待队列。入队胜利,queued,开始下轮自旋,查看工作状态,如果工作状态不为NEW和COMPLETING,则返回工作执行后果或者异样。如果入队失败,依然获取最新的工作状态,不是NEW和COMPLETING返回后果或者异样,如果是NEW和COMPLETING则开始新一次的CAS把以后线程的node节点入队。如果入队胜利并且当前任务还是NEW和COMPLETING,则以后线程会被park,晓得被唤醒或者被中断。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
/**
*因为get办法中,传入的timed为false,如果这个中央deadline为0
*
*/
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//因为以后获取后果的线程第一次进入,所以线程对应的waitNode节点为null
WaitNode q = null;
// 是否入栈标记
boolean queued = false;
// 自旋
for (;;) {
//如果以后线程是被中断唤醒,interrupted()会返回true,同时革除中断标记,前面interrupted()返回的都是false
if (Thread.interrupted()) {
// 栈顶元素出栈
removeWaiter(q);
// 向外抛出中断异样
throw new InterruptedException();
}
//获取当前任务的最新状态
int s = state;
// 当前任务曾经解决并有后果,可能是失常后果,也可能是异样后果
if (s > COMPLETING) {
// 阐明以后线程曾经创立了node对象,此时将q.thread设置为null,帮忙GC
if (q != null)
q.thread = null;
// 返回执行后果
return s;
}
// 阐明当前任务曾经进入实现中,让出CPU执行权,开始下一轮的抢占
else if (s == COMPLETING) // cannot time out yet
/**
* java通过start启动线程,让线程变成就绪状态期待CPU调度执行,而yield()办法就是让线程由执行状态变成就绪状态,让出CPU工夫片
* 在下一个线程执行的时候,以后线程可能会被执行,也可能不会被执行
*/
Thread.yield();
// 第一次自旋,因为后面q曾经为null,所以这里为以后线程创立一个waitNode
else if (q == null)
q = new WaitNode();
// 第二次自旋,以后线程曾经有了WaitNode对象,然而node对象还没有入队
else if (!queued)
/**
* q.next = waiters把以后节点的next节点设置为原来的头结点waiter,而后把waiter指向q
* waiter始终指向队列头结点,把以后线程node节点的next指向队列头结点,
* 听过CAS设置waiter指向以后线程node,设置失败可能其余线程领先设置胜利
*/
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;9
}
LockSupport.parkNanos(this, nanos);
}
else
/**
* 以后线程就会被park,变为WAITING
* 除非有其余线程将你唤醒或者以后线程被中断
*/
LockSupport.park(this);
}
}
这里,FutureTask的次要办法的源码根本解析实现,咱们能够看到,在FutrueTask外面,在并发多线程的场景下,更改值都是通过unsafe类的CAS形式实现,并且每次操作之前都会获取最新的state值来获取当前任务的状态,因为在并发场景下,每次更改之前不获取最新值就间接进行操作的话就可能笼罩掉最新的执行后果。这个思维咱们在当前的高并发场景下能够采纳。
发表回复