乐趣区

关于java:Java多线程学习笔记四-久处不厌

在 Java 多线程学习笔记的 (一) (二)(三) 篇, 咱们曾经根本对多线程有根本的理解了, 然而 Runnable 接口依然是不完满的,如果你心愿获取线程的执行后果的话,那可能要绕下路了。为了补救的毛病,Java 引入了 Callable 接口,Callable 相当于一个增强版的 Runnable 接口. 然而如果说我想勾销工作呢,Java 就引入了 Future 类。然而 Future 还是不够完满,那就再加强:CompletionService.

Future 和 Callable

Callable 作为 Runnable 的增强版,能够应用 ThreadPoolExecutor 的另外一个 submit 办法来提交工作, 该 submit 办法的申明如下:

 public <T> Future<T> submit(Callable<T> task)

Callable 办法只有一个办法:

  V call() throws Exception;

call 办法的返回值代表相应工作的处理结果,其类型 V 是通过 Callable 接口的类型参数来指定的; call 办法代表的工作在其执行过程中能够抛出异样。而 Runnable 接口中的 run 办法既无返回值也不能抛出异样。Executors.callable(Runnable task , T result)可能将 Runnable 接口转换为 Callable 接口实例。

接下来咱们来将眼光转移到返回值 Future 接口上, Future 接口实例可被看作是提交给线程池执行的工作的句柄,凭借 Future 实例咱们就能够取得线程执行的返回后果。Future.get()办法能够用来获取 task 参数所制订工作的处理结果。该办法的申明如下:

 V get() throws InterruptedException, ExecutionException;

Future.get()办法被调用时,如果相应的工作尚未执行结束,那么 Future.get()会使以后线程暂停,直到相应的工作执行完结 (包含失常完结和抛出异样而终止)。因而 Future.get() 办法是一个阻塞办法,该办法可能抛出 InterruptedException 阐明它能够响应中断。另外假如响应工作的执行过程中抛出了一个任意的异样。调用这个异样 (ExecutionException) 的 getCause 办法可返回工作执行中所抛出的异样。因而客户端代码能够通过捕捉 Future.get()调用抛出的异样来通晓执行过程中抛出的异样。因为工作在执行未结束的时候调用 Future.get()办法来获取该工作的处理结果会导致期待上下文切换,因而如果须要获取执行后果的工作该当尽早的向线程池提交,并且尽可能晚地调用 Future.get()办法来获取工作的处理结果,而线程池则正好利用这段时间来执行已提交的工作.

Future 接口还反对工作的勾销:

 boolean cancel(boolean mayInterruptIfRunning);

如果工作曾经执行结束或者曾经被勾销过了,或者因为其余起因不能被勾销,该办法将会返回 false. 如果胜利勾销,那么工作将不会被执行,工作的执行状态分为两种,一种是还未执行,一种是曾经处于执行中。对于执行中的工作, 参数 mayInterruptIfRunning 代表是否通过中断去终止线程的执行。
在这个办法执行结束之后,返回 true 的话,与之相关联的调用, isDone 和 isCancelled 将总会返回 true。
Future.isDone()办法能够检测响应的工作是否执行结束。工作执行结束、执行过程中抛出异样以及勾销都会导致该办法返回 true。Future.get()会使其执行线程无限度期待,直到相应的工作执行完结。然而有的时候处理不当就可能导致无限度的期待,为了防止这种无限度期待状况的产生,此时咱们能够应用 get 办法另一个版本:

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

指定等待时间,如果等待时间之内还没实现,该办法就会抛出 TimeoutException。留神该办法参数指定的超时工夫仅仅用于管制期待相应工作的处理结果最多期待多长时间,而非限度工作自身的执行工夫。所以举荐的做法是客户端通常须要在捕捉 TimeoutException 之后,勾销该工作的执行。

上面是一个 Callable 和 Future 的示例:

public class FutureDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newCachedThreadPool();
        Future<String> future = threadPool.submit(() -> {TimeUnit.SECONDS.sleep(1);
            int i = 1 / 0;
            return "hello world";
        });
        String result = null;
        try {result = future.get();
        } catch (InterruptedException e) {e.printStackTrace();
        } catch (ExecutionException e) {
            // 打印进去的就是除 0 异样
            System.out.println(e.getCause());
        }
        System.out.println(result); // 直到 Future 工作执行完结, 线程返回执行后果, 这行才会被执行
        threadPool.shutdownNow();}
}

异步执行框架 Executor

Runnable 和 Callable 接口是对工作执行逻辑的形象,咱们在对应的办法编写对应的工作执行逻辑就能够了. 而 java.util.concurretn.Executor 接口则是对工作的执行进行的形象,工作的提交方只须要调用 Executor 的 execute 办法便能够被执行,而无需关怀具体的执行细节。

比方在工作执行之前做些什么,工作是采纳一个专门的线程来执行的,还是采纳线程池来执行的,多个工作是以何种程序来执行的。可见 Executor 可能实现工作的提交与工作执行的具体细节解耦(Decoupling)。和对工作解决逻辑的形象相似,对工作执行的形象也能带给咱们信息暗藏和关注点拆散的益处。

那这个解耦所带来的益处是什么呢? 一个不言而喻的例子就是在肯定水平上可能屏蔽同步执行和异步执行的差别。对于同一个工作 (Runnable 或 Callable 的实例),如果咱们把这个工作提交给 ThreadPoolExecutor(它实现了 Executor) 来执行,那它就是异步执行; 如果把它交给如下所示的 Executor 来执行:

public class SynchronousExecutor implements Executor {
    @Override
    public void execute(Runnable command) {}}

那么该工作就是同步执行。那么该工作无论是同步执行还是异步执行对于调用方来说就没有太大的差别。
Executor 接口比较简单,性能也比拟无限:

  • 它不能返回工作的执行后果给提交工作方
  • Executor 的实现类经常会保护一定量的工作者线程。当咱们不再须要 Executor 实例的时候,往往须要将其外部保护的工作者线程停掉以开释相应的资源,然而 Executor 接口没有定义相应的办法。当初的须要就是咱们须要增强 Executor 接口,但又不能间接改变 Executor,那这里的做法就是做子接口,也就是 ExecutorService,ExecutorService 继承了 Executor,外部也定义了几个 submit 办法,可能返回工作的执行后果。ExecutorService 接口还定义了 shutdown 办法和 shutdownNow 来敞开对应的服务开释资源。

再加强: CompletionService

只管 Future 接口使得咱们可能不便地获取异步工作的处理结果,然而如果咱们须要一次性的提交一批工作的处理结果的话,那么仅应用 Future 接口写进去的代码将颇为繁琐。CompletionService 接口就是为解决这个问题而生。咱们来简略的看下 CompletionService 提供的办法:

submit 办法和 ThreadPoolExecutor 的一个 submit 办法雷同:

Future<V> submit(Callable<V> task);

如果要获取批量提交异步工作的执行后果,那么咱们能够应用 CompletionService 接口专门为此定义的办法:

  Future<V> take() throws InterruptedException;

该办法和 BlockingQueue 的 take()类似,它是一个阻塞办法,其返回值是批量提交异步工作中一个已执行结束任务的 Future 实例。咱们能够通过 Future 实例来获取异步工作的执行后果。如果调用 take 办法时没有曾经执行结束的工作,那么调用 take 办法的线程会被阻塞,直到有异步工作执行完结,调用 take 办法的线程才会被复原执行。
如果你想获取异步工作的后果,然而如果没有执行结束的异步工作,那么 CompletionService 同样提供了非阻塞式的获取异步工作处理结果的办法:

  • Future<V> poll(); // 如果没有已实现的工作, 则返回 null
  • Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; // 在指定的工夫内没有已实现的异步工作, 返回 null。
    Java 规范库提供的 CompletionService 的实现类是 ExecutorCompletionService,分割上文提到的 Executor,咱们能够大抵推断这个类是 Executor 和 CompletionService 的结合体。咱们随便找出一个 ExecutorCompletionService 的构造函数来大抵看一下:
  • public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue)

    executor 是工作的执行者,工作执行完结后将执行后果 Future 实例放入 BlockingQueue 中

Callable 的补充: FutureTask 概述

无论是 Runnable 实例还是 Callable 实例所示意的工作,只有咱们将其提交给线程池执行,那么这个工作我就能够认为是异步工作。采纳 Runnable 实例来示意异步工作的长处是工作既能够交给一个专门的工作者线程来执行, 也能够提交给线程池来执行,毛病是咱们无奈间接获取工作的执行后果。应用 Callable 实例来示意异步工作,其长处是咱们能够通过 ThreadPoolExecutor 的返回值来获取工作的执行后果,但无奈间接交给一个专门的工作者线程来执行。因而,应用 Callable 来示意异步工作会使工作的执行形式大大受限。

而 java.util.concurrent 的 FutureTask 类则交融了 Runnable 和 Callable 接口的长处: FutureTask 是 RunnableFuture 的实现类,
Runnable 则是 Runable 和 Future 的子接口。因而 FutureTask 既是 Runable 的实现类也是 Future 接口的实现。FutureTask 提供了一个构造函数能够将 Callable 实例转换为 Runnable 实例,该结构器的申明如下:

public FutureTask(Callable<V> callable) 

如此就补救了 Callable 实例只能交由线程池执行不能交给指定的工作者线程执行的毛病,而 FutureTask 实例自身也代表了咱们要执行的工作,咱们能够通过 FutureTask 实例来获取以前 Runnable 实例无奈获取的线程执行后果。像是 Executor.execute(Runnable task)这样只认 Runnable 实例来执行工作的状况下仍然能够获取执行后果。
上面是一个示例:

public class FutureTaskDemo {public static void main(String[] args) throws Exception{FutureTask<String> futureTask = new FutureTask(()->{
            try {TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println("hello world");
            return "hello world";
        });
        new Thread(futureTask).start();
        while (futureTask.isDone()){System.out.println(futureTask.get());
        }
    }
}

FutureTask 还反对以回调的形式解决工作的执行后果。当 FutureTask 实例所代表的工作执行结束之后,Future.done 就会被执行,FutureTask.done 的执行线程与 FutureTask 的 run 的执行线程是同一个线程,在 FutureTask 是空的 protected 办法,咱们能够在子类中重写该办法以实现后续的回调解决,因为是在工作执行结束之后 done 办法被触发,所以在 done 办法中调用 get 办法获取工作执行后果不会被阻塞。然而,因为工作的执行完结即包含失常完结,也包含异样终止以及工作被勾销而导致的终止,因而 FutureTask.done 办法中的代码可能须要在调用 FutureTask.get()前调用 FutureTask.isCancelled 来判断工作是否被勾销,免得 FutureTask.get 调用抛出 CancellationException 异样。
FutureTask 的简略示例:

public class FutureTaskDemo extends FutureTask<String>{public FutureTaskDemo(Callable<String> callable) {super(callable);
    }

    public FutureTaskDemo(Runnable runnable, String result) {super(runnable, result);
    }

    /**
     * 工作执行完结该办法会主动被调用
     * 咱们通过 get 办法获取线程的执行后果,
     * 因为该办法在工作执行完结被调用, 所以该办法调用 get 不会被阻塞
     */
    @Override
    protected void done() {
        try {String threadResult = get();
            System.out.println(threadResult);
        } catch (InterruptedException e) {e.printStackTrace();
        } catch (ExecutionException e) {e.printStackTrace();
        }
    }

    public static void main(String[] args) {FutureTaskDemo futureTaskDemo = new  FutureTaskDemo(()->{System.out.println("hello world");
            return "hello world";
        });
        new Thread(futureTaskDemo).start();}
}

FutureTask 依然有一些不足之处,它基本上是被设计用来示意一次性执行的工作,其外部保护了一个示意工作运行状态 (包含未开始运行,曾经运行完结等) 的状态变量,FutureTask.run 在执行 run 的时候会先判断相应工作的执行状态:

如果曾经执行过,那么会间接返回,并不会抛出异样。因而 FutureTask 实例所代表的工作是无奈被反复执行的。这意味着同一个 FutureTask 实例不能屡次提交执行,只管不会有异样的抛出。FutureTask.runAndReset 可能突破这种限度,使得一个 FutureTask 实例所代表的工作被屡次执行,然而不记录工作的执行后果。因而,如果对同一个对象所示意的工作须要屡次被执行,并且咱们须要对工作的执行后果进行解决,那么 FutureTask 就不实用了,不过咱们能够仿照 FutureTask 设计一个满足咱们所须要的,这也是咱们看源码的意义,重在领会和学习背地的设计思维,由此咱们引出 AsyncTask。示例:

public class FutureTaskDemo extends FutureTask<String>{public FutureTaskDemo(Callable<String> callable) {super(callable);
    }

    public FutureTaskDemo(Runnable runnable, String result) {super(runnable, result);
    }

    @Override
    public void run() {
        // 启动工作用 runAndReset 启用
        super.runAndReset();}
    /**
     * 工作执行完结该办法会主动被调用
     * 咱们通过 get 办法获取线程的执行后果,
     * 因为该办法在工作执行完结被调用, 所以该办法调用 get 不会被阻塞
     */
    @Override
    protected void done() {}

    public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTaskDemo futureTaskDemo = new FutureTaskDemo(()->{System.out.println("hello world");
            return "hello world";
        });
        new Thread(futureTaskDemo).start();
        new Thread(futureTaskDemo).start();}
}

可反复执行的异步工作: AsyncTask

public abstract class AsyncTask<V> implements Runnable, Callable<V> {
    protected final Executor executor;

    public AsyncTask(Executor executor) {this.executor = executor;}

    public AsyncTask() {this(new Executor() {
            @Override
            public void execute(Runnable command) {command.run();
            }
        });
    }

    @Override
    public void run() {
        Exception exp = null;
        V r = null;
        try {
            //call 的逻辑能够从传递也能够由本人来实现
           r = call();}catch (Exception e){exp = e;}
        final V result  = r;
        if (null == exp){executor.execute(new Runnable() {
                @Override
                public void run() {onResult(result);
                }
            });
        }else {executor.execute(new Runnable() {
                @Override
                public void run() {onError(result);
                }
            });
        }
    }

    /**
     * 留给子类实现工作执行后果的解决逻辑
     * @param result
     */
    protected abstract void onError(V result);

    /**
     * 留给子类实现工作执行后果的解决逻辑
     * @param result
     */
    protected abstract void onResult(V result);
}

参考资料

  • 《Java 多线程编程实战指南》黄文海
退出移动版