乐趣区

关于java:CompletionService和CompletableFuture

咱们在 jdk1.8 之前,都是用 FutureTask 的 get 办法来获取异步执行的后果。

在演示之前,先贴一下共用的代码。

ConcurrentSupport:

public class ConcurrentSupport {public static String processOne() {
        try {TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return getNow() + "#one";}

    public static String processTwo() {
        try {TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return getNow() + "#two";}

    public static String processThree() {
        try {TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return getNow() + "#three";}

    public static String getNow() {return LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss"));
    }
}

TaskA:

class TaskA implements Callable {
    @Override
    public Object call() throws Exception {return ConcurrentSupport.processOne();
    }
}

TaskB:

class TaskB implements Callable {
    @Override
    public Object call() throws Exception {return ConcurrentSupport.processTwo();
    }
}

TaskC:

class TaskC implements Callable {
    @Override
    public Object call() throws Exception {return ConcurrentSupport.processThree();
    }
}

一般状况

比方有 3 个工作,别离耗时 1s、2s、3s,在同步执行的时候,整个耗时就是 6s。

public class NormalDemo {public static void main(String[] args) {long start = System.currentTimeMillis();
        System.out.println(ConcurrentSupport.processOne());
        System.out.println(ConcurrentSupport.processTwo());
        System.out.println(ConcurrentSupport.processThree());
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

FutureTask

用 FutureTask 的状况,就是取最长工夫的那个,所以最终工夫是 3s。

public class FutureTaskDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创立线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        long start = System.currentTimeMillis();
        // 创立 FutureTask
        FutureTask taskA = new FutureTask(new TaskA());
        FutureTask taskB = new FutureTask(new TaskB());
        FutureTask taskC = new FutureTask(new TaskC());
        executor.submit(taskA);
        executor.submit(taskB);
        executor.submit(taskC);
        System.out.println(taskC.get());
        System.out.println(taskA.get());
        System.out.println(taskB.get());
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

CompletionService

然而 FutureTask 也有一个小瑕疵,比方下面的 TaskC 执行的工夫最长,间接把 TaskA 和 TaskB 的打印工作给阻塞了,打印的后果是 three、one、two。

有木有方法是哪个工作先执行胜利,就先打印(或者对这个后果其余解决)这个后果呢?

CompletionService 做的就是这个事件。从后果能够看出打印 one、two、three。

public class CompletionServiceDemo {public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 创立线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        long start = System.currentTimeMillis();
        // 创立 CompletionService
        CompletionService<String> cs = new ExecutorCompletionService<>(executor);
        // 用于保留 Future 对象
        List<Future<String>> futures = new ArrayList<>(3);
        // 提交 FutureTask
        futures.add(cs.submit(new TaskC()));
        futures.add(cs.submit(new TaskA()));
        futures.add(cs.submit(new TaskB()));
        for (int i = 0; i < 3; i++) {String result = cs.take().get();
            System.out.println(result);
        }
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    }
}

CompletableFuture

如果有一个工作是这样的,A1 执行完执行 A2,B1 执行完执行 B2,A2 和 B2 执行完,再执行 C。

JDK1.8 提供了 CompletableFuture 这个优雅的解决方案。

比方上面的例子,就是 f1 和 f2 执行完后,才执行 f3。

CompletableFuture 的办法中,runAsync 是没有返回值的,supplyAsync 是有返回值的。

public class CompletableFutureDemo {public static void main(String[] args) {CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {String one = ConcurrentSupport.processOne();
                    System.out.println(one);
                }
        );
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {String two = ConcurrentSupport.processTwo();
                    System.out.println(two);
                    return two;
                }
        );
        CompletableFuture<String> f3 =
                f1.thenCombine(f2, (__, tf) -> {System.out.println("f3#" + tf);
                            return "f3";
                        }
                );
        f3.join();}
}
退出移动版