共计 4248 个字符,预计需要花费 11 分钟才能阅读完成。
AbstractExecutorService 抽象类,实现了 ExecutorService 的接口。
newTaskFor
将任务封装成 FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value); | |
} | |
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable); | |
} |
submit
提交任务
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException(); | |
RunnableFuture<Void> ftask = newTaskFor(task, null); | |
execute(ftask); | |
return ftask; | |
} | |
public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException(); | |
RunnableFuture<T> ftask = newTaskFor(task, result); | |
execute(ftask); | |
return ftask; | |
} | |
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException(); | |
RunnableFuture<T> ftask = newTaskFor(task); | |
execute(ftask); | |
return ftask; | |
} |
invokeAny
主要方法在 doInvokeAny
//tasks 任务 | |
//timed 是否超时 | |
//nanos 超时时间 | |
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, | |
boolean timed, long nanos) | |
throws InterruptedException, ExecutionException, TimeoutException {if (tasks == null) | |
throw new NullPointerException(); | |
int ntasks = tasks.size();// 任务书 | |
if (ntasks == 0) | |
throw new IllegalArgumentException(); | |
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); | |
// 用于存放结果的,先完成的放前面。所以第一个任务没完成的时候,会继续提交后续任务 | |
ExecutorCompletionService<T> ecs = | |
new ExecutorCompletionService<T>(this); | |
try { | |
// 异常信息 | |
ExecutionException ee = null; | |
// 过期时间 | |
final long deadline = timed ? System.nanoTime() + nanos : 0L; | |
Iterator<? extends Callable<T>> it = tasks.iterator();// 获取第一个任务 | |
提交任务 | |
futures.add(ecs.submit(it.next())); | |
--ntasks;// 因为提交了一个,任务数 -1 | |
int active = 1;// 正在执行的任务 | |
for (;;) {Future<T> f = ecs.poll(); | |
if (f == null) {// 第一个没完成 | |
if (ntasks > 0) {// 还有没提交的任务 | |
--ntasks;// 任务数 -1 | |
futures.add(ecs.submit(it.next()));// 提交任务 | |
++active;// 正在执行的任务 +1 | |
} | |
else if (active == 0)// 当前没任务了,但是都失败了,异常被捕获了 | |
break; | |
else if (timed) {f = ecs.poll(nanos, TimeUnit.NANOSECONDS);// 等待 | |
if (f == null)// 返回空,超时抛出异常,结束 | |
throw new TimeoutException(); | |
nanos = deadline - System.nanoTime();// 剩余时间} | |
else | |
f = ecs.take();// 阻塞等待获取} | |
if (f != null) {// 说明已经执行完 | |
--active;// 任务数 -1 | |
try {return f.get();// 返回执行结果 | |
} catch (ExecutionException eex) {ee = eex;} catch (RuntimeException rex) {ee = new ExecutionException(rex); | |
} | |
} | |
} | |
if (ee == null) | |
ee = new ExecutionException(); | |
throw ee; | |
} finally { | |
// 取消其他任务,毕竟第一个结果已经返回了 | |
for (int i = 0, size = futures.size(); i < size; i++) | |
futures.get(i).cancel(true); | |
} | |
} | |
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) | |
throws InterruptedException, ExecutionException { | |
try {return doInvokeAny(tasks, false, 0); | |
} catch (TimeoutException cannotHappen) { | |
assert false; | |
return null; | |
} | |
} | |
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, | |
long timeout, TimeUnit unit) | |
throws InterruptedException, ExecutionException, TimeoutException {return doInvokeAny(tasks, true, unit.toNanos(timeout)); | |
} |
invokeAll
返回所有任务的结果
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) | |
throws InterruptedException {if (tasks == null) | |
throw new NullPointerException(); | |
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());// | |
boolean done = false; | |
try {for (Callable<T> t : tasks) {// 封装任务,并提交 | |
RunnableFuture<T> f = newTaskFor(t); | |
futures.add(f); | |
execute(f); | |
} | |
for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i); | |
if (!f.isDone()) { | |
try {f.get();// 阻塞,等待结果 | |
} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}} | |
} | |
done = true; | |
return futures; | |
} finally {if (!done)// 有异常,取消 | |
for (int i = 0, size = futures.size(); i < size; i++) | |
futures.get(i).cancel(true); | |
} | |
} | |
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, | |
long timeout, TimeUnit unit) | |
throws InterruptedException {if (tasks == null) | |
throw new NullPointerException(); | |
long nanos = unit.toNanos(timeout); | |
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); | |
boolean done = false; | |
try {for (Callable<T> t : tasks) | |
futures.add(newTaskFor(t)); | |
final long deadline = System.nanoTime() + nanos; | |
final int size = futures.size(); | |
// Interleave time checks and calls to execute in case | |
// executor doesn't have any/much parallelism. | |
for (int i = 0; i < size; i++) {execute((Runnable)futures.get(i)); | |
nanos = deadline - System.nanoTime(); | |
if (nanos <= 0L) | |
return futures;// 每个提交都要判断,超时了返回 Future | |
} | |
for (int i = 0; i < size; i++) {Future<T> f = futures.get(i); | |
if (!f.isDone()) {if (nanos <= 0L) | |
return futures; | |
try {f.get(nanos, TimeUnit.NANOSECONDS); | |
} catch (CancellationException ignore) {} catch (ExecutionException ignore) {} catch (TimeoutException toe) {return futures;} | |
nanos = deadline - System.nanoTime();} | |
} | |
done = true; | |
return futures; | |
} finally {if (!done) | |
for (int i = 0, size = futures.size(); i < size; i++) | |
futures.get(i).cancel(true); | |
} | |
} |
正文完