介绍

java.util.concurrent.CompletionService 是对 ExecutorService 的一个性能加强封装,优化了获取异步操作后果的接口。

应用场景

假如咱们要向线程池提交一批工作,并获取工作后果。个别的形式是提交工作后,从线程池失去一批 Future 对象汇合,而后顺次调用其 get() 办法。

这里有个问题:因为咱们会要按固定的程序来遍历 Future 元素,而 get() 办法又是阻塞的,因而如果某个 Future 对象执行工夫太长,会使得咱们的遍历过程阻塞在该元素上,无奈及时从前面早已实现的 Future 当中获得后果。

CompletionService 解决了这个问题。它自身不蕴含线程池,创立一个 CompletionService 须要先创立一个 Executor。上面是一个例子:

ExecutorService executor = Executors.newFixedThreadPool(4);CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

CompletionService 提交工作的形式与 ExecutorService 一样:

completionService.submit(() -> "Hello");

当你须要取得后果的时候,就不同了。有了 CompletionService,你不须要再持有 Future 汇合。如果要失去最早的执行后果,只须要像上面这样:

String result = completionService.take().get();

这个 take() 办法返回的是最早实现的工作的后果,这个就解决了一个工作被另一个工作阻塞的问题。上面是一个残缺的例子:

示例

public static void main(String[] args) throws Exception {    ExecutorService executor;    CompletionService<String> completionService;    // 创立一个指定执行时长的工作的办法    BiFunction<Integer, Integer, Callable<String>> createTask = (id, duration) -> () -> {        log("Task " + id + " started, duration=" + duration);        Thread.sleep(duration);        log("Task " + id + " completed.");        return "Result of task " + id;    };    ///////////////////////////////////////////////////////////////////    System.out.println("// 示例1:像应用 ExecutorService 一样应用 CompletionService");    // 初始化 executor 和 completionService    executor = Executors.newFixedThreadPool(4);    completionService = new ExecutorCompletionService<>(executor);    // 提交工作    List<Future<String>> results = Arrays.asList(            completionService.submit(createTask.apply(1, 1000)),            completionService.submit(createTask.apply(2, 800)),            completionService.submit(createTask.apply(3, 600)),            completionService.submit(createTask.apply(4, 400))    );    // 取后果    for (Future<String> result : results) {        log(result.get());    }    executor.shutdown();    ///////////////////////////////////////////////////////////////////    System.out.println("// 示例2:按规范形式应用 CompletionService");    // 初始化 executor 和 completionService    executor = Executors.newFixedThreadPool(4);    completionService = new ExecutorCompletionService<>(executor);    // 提交工作    completionService.submit(createTask.apply(5, 1000));    completionService.submit(createTask.apply(6, 800));    completionService.submit(createTask.apply(7, 600));    completionService.submit(createTask.apply(8, 400));    // 取后果    for (int i = 0; i < 4; i++) {        log(completionService.take().get());    }    ///////////////////////////////////////////////////////////////////    executor.shutdown();}

这个例子的执行后果如下所示:

// 示例1:像应用 ExecutorService 一样应用 CompletionService10:22:32:271 - Task 4 started, duration=40010:22:32:271 - Task 3 started, duration=60010:22:32:271 - Task 2 started, duration=80010:22:32:271 - Task 1 started, duration=100010:22:32:687 - Task 4 completed.10:22:32:888 - Task 3 completed.10:22:33:089 - Task 2 completed.10:22:33:303 - Task 1 completed.10:22:33:303 - Result of task 110:22:33:303 - Result of task 210:22:33:303 - Result of task 310:22:33:303 - Result of task 4// 示例2:按规范形式应用 CompletionService10:22:33:305 - Task 5 started, duration=100010:22:33:305 - Task 7 started, duration=60010:22:33:305 - Task 6 started, duration=80010:22:33:305 - Task 8 started, duration=40010:22:33:718 - Task 8 completed.10:22:33:718 - Result of task 810:22:33:918 - Task 7 completed.10:22:33:918 - Result of task 710:22:34:119 - Task 6 completed.10:22:34:119 - Result of task 610:22:34:320 - Task 5 completed.10:22:34:320 - Result of task 5

能够看出,在示例 1 中,尽管 Task 4 执行工夫只有 400ms,但因为咱们是依照 1-2-3-4 的程序顺次取后果,因而 Task 4 实现后并没有马上打印出后果来。而在示例 2 中,对每个 Task 都是在实现时立即就将后果打印进去了。这就是 CompletionService 的劣势所在。

原理解释

CompletionService 之所以可能做到这点,是因为它没有采取顺次遍历 Future 的形式,而是在两头加上了一个后果队列,工作实现后马上将后果放入队列,那么从队列中取到的就是最早实现的后果。

如果队列为空,那么 take() 办法会阻塞直到队列中呈现后果为止。此外 CompletionService 还提供一个 poll() 办法,返回值与 take() 办法一样,不同之处在于它不会阻塞,如果队列为空则立即返回 null。这算是给用户多一种抉择。