在Java多线程学习笔记的(一) (二)(三)篇, 咱们曾经根本对多线程有根本的理解了, 然而Runnable接口依然是不完满的,如果你心愿获取线程的执行后果的话,那可能要绕下路了。为了补救的毛病,Java引入了Callable接口,Callable相当于一个增强版的Runnable接口. 然而如果说我想勾销工作呢,Java就引入了Future类。然而Future还是不够完满,那就再加强:CompletionService.
Future和Callable
Callable作为Runnable的增强版,能够应用ThreadPoolExecutor的另外一个submit办法来提交工作, 该submit办法的申明如下:
public <T> Future<T> submit(Callable<T> task)
Callable办法只有一个办法:
V call() throws Exception;
call办法的返回值代表相应工作的处理结果,其类型V是通过Callable接口的类型参数来指定的; call办法代表的工作在其执行过程中能够抛出异样。而Runnable接口中的run办法既无返回值也不能抛出异样。Executors.callable(Runnable task , T result)可能将Runnable接口转换为Callable接口实例。
接下来咱们来将眼光转移到返回值Future接口上, Future接口实例可被看作是提交给线程池执行的工作的句柄,凭借Future实例咱们就能够取得线程执行的返回后果。Future.get()办法能够用来获取task参数所制订工作的处理结果。该办法的申明如下:
V get() throws InterruptedException, ExecutionException;
Future.get()办法被调用时,如果相应的工作尚未执行结束,那么Future.get()会使以后线程暂停,直到相应的工作执行完结(包含失常完结和抛出异样而终止)。因而Future.get()办法是一个阻塞办法,该办法可能抛出InterruptedException阐明它能够响应中断。另外假如响应工作的执行过程中抛出了一个任意的异样。调用这个异样(ExecutionException)的getCause办法可返回工作执行中所抛出的异样。因而客户端代码能够通过捕捉Future.get()调用抛出的异样来通晓执行过程中抛出的异样。因为工作在执行未结束的时候调用Future.get()办法来获取该工作的处理结果会导致期待上下文切换,因而如果须要获取执行后果的工作该当尽早的向线程池提交,并且尽可能晚地调用Future.get()办法来获取工作的处理结果,而线程池则正好利用这段时间来执行已提交的工作.
Future接口还反对工作的勾销:
boolean cancel(boolean mayInterruptIfRunning);
如果工作曾经执行结束或者曾经被勾销过了,或者因为其余起因不能被勾销,该办法将会返回false. 如果胜利勾销,那么工作将不会被执行,工作的执行状态分为两种,一种是还未执行,一种是曾经处于执行中。对于执行中的工作, 参数mayInterruptIfRunning代表是否通过中断去终止线程的执行。
在这个办法执行结束之后,返回true的话,与之相关联的调用, isDone和isCancelled将总会返回true。
Future.isDone()办法能够检测响应的工作是否执行结束。工作执行结束、执行过程中抛出异样以及勾销都会导致该办法返回true。Future.get()会使其执行线程无限度期待,直到相应的工作执行完结。然而有的时候处理不当就可能导致无限度的期待,为了防止这种无限度期待状况的产生,此时咱们能够应用get办法另一个版本:
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
指定等待时间,如果等待时间之内还没实现,该办法就会抛出TimeoutException。留神该办法参数指定的超时工夫仅仅用于管制期待相应工作的处理结果最多期待多长时间,而非限度工作自身的执行工夫。所以举荐的做法是客户端通常须要在捕捉TimeoutException之后,勾销该工作的执行。
上面是一个Callable 和 Future的示例:
public class FutureDemo { public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); Future<String> future = threadPool.submit(() -> { TimeUnit.SECONDS.sleep(1); int i = 1 / 0; return "hello world"; }); String result = null; try { result = future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { // 打印进去的就是除0异样 System.out.println(e.getCause()); } System.out.println(result); // 直到Future工作执行完结,线程返回执行后果, 这行才会被执行 threadPool.shutdownNow(); }}
异步执行框架Executor
Runnable和Callable接口是对工作执行逻辑的形象,咱们在对应的办法编写对应的工作执行逻辑就能够了.而java.util.concurretn.Executor接口则是对工作的执行进行的形象,工作的提交方只须要调用Executor的execute办法便能够被执行,而无需关怀具体的执行细节。
比方在工作执行之前做些什么,工作是采纳一个专门的线程来执行的,还是采纳线程池来执行的,多个工作是以何种程序来执行的。可见Executor可能实现工作的提交与工作执行的具体细节解耦(Decoupling)。和对工作解决逻辑的形象相似,对工作执行的形象也能带给咱们信息暗藏和关注点拆散的益处。
那这个解耦所带来的益处是什么呢? 一个不言而喻的例子就是在肯定水平上可能屏蔽同步执行和异步执行的差别。对于同一个工作(Runnable或Callable的实例),如果咱们把这个工作提交给ThreadPoolExecutor(它实现了Executor)来执行,那它就是异步执行; 如果把它交给如下所示的Executor来执行:
public class SynchronousExecutor implements Executor { @Override public void execute(Runnable command) { }}
那么该工作就是同步执行。那么该工作无论是同步执行还是异步执行对于调用方来说就没有太大的差别。
Executor 接口比较简单,性能也比拟无限:
- 它不能返回工作的执行后果给提交工作方
- Executor 的实现类经常会保护一定量的工作者线程。当咱们不再须要Executor实例的时候,往往须要将其外部保护的工作者线程停掉以开释相应的资源,然而Executor 接口没有定义相应的办法。当初的须要就是咱们须要增强Executor接口,但又不能间接改变Executor,那这里的做法就是做子接口,也就是ExecutorService,ExecutorService继承了Executor,外部也定义了几个submit办法,可能返回工作的执行后果。ExecutorService接口还定义了shutdown办法和shutdownNow来敞开对应的服务开释资源。
再加强: CompletionService
只管Future接口使得咱们可能不便地获取异步工作的处理结果,然而如果咱们须要一次性的提交一批工作的处理结果的话,那么仅应用Future接口写进去的代码将颇为繁琐。CompletionService接口就是为解决这个问题而生。咱们来简略的看下CompletionService提供的办法:
submit办法和ThreadPoolExecutor的一个submit办法雷同:
Future<V> submit(Callable<V> task);
如果要获取批量提交异步工作的执行后果,那么咱们能够应用CompletionService接口专门为此定义的办法:
Future<V> take() throws InterruptedException;
该办法和BlockingQueue的take()类似,它是一个阻塞办法,其返回值是批量提交异步工作中一个已执行结束任务的Future实例。咱们能够通过Future实例来获取异步工作的执行后果。如果调用take办法时没有曾经执行结束的工作,那么调用take办法的线程会被阻塞,直到有异步工作执行完结,调用take办法的线程才会被复原执行。
如果你想获取异步工作的后果,然而如果没有执行结束的异步工作,那么CompletionService同样提供了非阻塞式的获取异步工作处理结果的办法:
- Future<V> poll(); // 如果没有已实现的工作, 则返回null
- Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; // 在指定的工夫内没有已实现的异步工作, 返回null。
Java规范库提供的CompletionService的实现类是ExecutorCompletionService,分割上文提到的Executor,咱们能够大抵推断这个类是Executor和CompletionService的结合体。咱们随便找出一个ExecutorCompletionService的构造函数来大抵看一下: public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)
executor 是工作的执行者,工作执行完结后将执行后果Future实例放入BlockingQueue中
Callable的补充: FutureTask概述
无论是Runnable实例还是Callable实例所示意的工作,只有咱们将其提交给线程池执行,那么这个工作我就能够认为是异步工作。采纳Runnable实例来示意异步工作的长处是工作既能够交给一个专门的工作者线程来执行,也能够提交给线程池来执行,毛病是咱们无奈间接获取工作的执行后果。应用Callable实例来示意异步工作,其长处是咱们能够通过ThreadPoolExecutor的返回值来获取工作的执行后果,但无奈间接交给一个专门的工作者线程来执行。因而,应用Callable来示意异步工作会使工作的执行形式大大受限。
而java.util.concurrent的FutureTask类则交融了Runnable和Callable接口的长处: FutureTask是RunnableFuture的实现类,
Runnable则是Runable和Future的子接口。因而FutureTask既是Runable的实现类也是Future接口的实现。FutureTask提供了一个构造函数能够将Callable实例转换为Runnable实例,该结构器的申明如下:
public FutureTask(Callable<V> callable)
如此就补救了Callable实例只能交由线程池执行不能交给指定的工作者线程执行的毛病,而FutureTask实例自身也代表了咱们要执行的工作,咱们能够通过FutureTask实例来获取以前Runnable实例无奈获取的线程执行后果。像是Executor.execute(Runnable task)这样只认Runnable实例来执行工作的状况下仍然能够获取执行后果。
上面是一个示例:
public class FutureTaskDemo { public static void main(String[] args) throws Exception{ FutureTask<String> futureTask = new FutureTask(()->{ try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("hello world"); return "hello world"; }); new Thread(futureTask).start(); while (futureTask.isDone()){ System.out.println(futureTask.get()); } }}
FutureTask还反对以回调的形式解决工作的执行后果。当FutureTask实例所代表的工作执行结束之后,Future.done就会被执行,FutureTask.done的执行线程与FutureTask的run的执行线程是同一个线程,在FutureTask是空的protected办法,咱们能够在子类中重写该办法以实现后续的回调解决,因为是在工作执行结束之后done办法被触发,所以在done办法中调用get办法获取工作执行后果不会被阻塞。然而,因为工作的执行完结即包含失常完结,也包含异样终止以及工作被勾销而导致的终止,因而FutureTask.done办法中的代码可能须要在调用FutureTask.get()前调用FutureTask.isCancelled来判断工作是否被勾销,免得FutureTask.get调用抛出CancellationException异样。
FutureTask的简略示例:
public class FutureTaskDemo extends FutureTask<String>{ public FutureTaskDemo(Callable<String> callable) { super(callable); } public FutureTaskDemo(Runnable runnable, String result) { super(runnable, result); } /** * 工作执行完结该办法会主动被调用 * 咱们通过get办法获取线程的执行后果, * 因为该办法在工作执行完结被调用,所以该办法调用get不会被阻塞 */ @Override protected void done() { try { String threadResult = get(); System.out.println(threadResult); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } public static void main(String[] args) { FutureTaskDemo futureTaskDemo = new FutureTaskDemo(()->{ System.out.println("hello world"); return "hello world"; }); new Thread(futureTaskDemo).start(); }}
FutureTask依然有一些不足之处,它基本上是被设计用来示意一次性执行的工作,其外部保护了一个示意工作运行状态(包含未开始运行,曾经运行完结等)的状态变量,FutureTask.run在执行run的时候会先判断相应工作的执行状态:
如果曾经执行过,那么会间接返回,并不会抛出异样。因而FutureTask实例所代表的工作是无奈被反复执行的。这意味着同一个FutureTask实例不能屡次提交执行,只管不会有异样的抛出。FutureTask.runAndReset可能突破这种限度,使得一个FutureTask实例所代表的工作被屡次执行,然而不记录工作的执行后果。因而,如果对同一个对象所示意的工作须要屡次被执行,并且咱们须要对工作的执行后果进行解决,那么FutureTask就不实用了,不过咱们能够仿照FutureTask设计一个满足咱们所须要的,这也是咱们看源码的意义,重在领会和学习背地的设计思维,由此咱们引出AsyncTask。示例:
public class FutureTaskDemo extends FutureTask<String>{ public FutureTaskDemo(Callable<String> callable) { super(callable); } public FutureTaskDemo(Runnable runnable, String result) { super(runnable, result); } @Override public void run() { // 启动工作用runAndReset启用 super.runAndReset(); } /** * 工作执行完结该办法会主动被调用 * 咱们通过get办法获取线程的执行后果, * 因为该办法在工作执行完结被调用,所以该办法调用get不会被阻塞 */ @Override protected void done() { } public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTaskDemo futureTaskDemo = new FutureTaskDemo(()->{ System.out.println("hello world"); return "hello world"; }); new Thread(futureTaskDemo).start(); new Thread(futureTaskDemo).start(); }}
可反复执行的异步工作: AsyncTask
public abstract class AsyncTask<V> implements Runnable, Callable<V> { protected final Executor executor; public AsyncTask(Executor executor) { this.executor = executor; } public AsyncTask() { this(new Executor() { @Override public void execute(Runnable command) { command.run(); } }); } @Override public void run() { Exception exp = null; V r = null; try { //call的逻辑能够从传递也能够由本人来实现 r = call(); }catch (Exception e){ exp = e; } final V result = r; if (null == exp){ executor.execute(new Runnable() { @Override public void run() { onResult(result); } }); }else { executor.execute(new Runnable() { @Override public void run() { onError(result); } }); } } /** *留给子类实现工作执行后果的解决逻辑 * @param result */ protected abstract void onError(V result); /** * 留给子类实现工作执行后果的解决逻辑 * @param result */ protected abstract void onResult(V result);}
参考资料
- 《Java多线程编程实战指南》 黄文海