乐趣区

关于java:Java8已经发布7年了不会还有人没用过CompletableFuture吧

日常开发中,咱们都会用到线程池,个别会用 execute() 和 submit() 办法提交工作。然而当你用过 CompletableFuture 之后,就会发现以前的线程池解决工作有多难用,性能有多简陋,CompletableFuture 又是如许简洁优雅。

要晓得 CompletableFuture 曾经随着 Java8 公布 7 年了,还没有过它就有点说不过去了。
明天 5 分钟带你深入浅出 CompletableFuture 实用教程。

1. 应用线程池解决工作

/**
 * @author yideng
 * @apiNote 线程池应用示例
 */
public class ThreadDemo {public static void main(String[] args) {
        // 1. 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        
        List<Integer> list = Arrays.asList(1, 2, 3);
        List<Future<String>> futures = new ArrayList<>();
        for (Integer key : list) {
            // 2. 提交工作
            Future<String> future = executorService.submit(() -> {
                // 睡眠一秒,模拟处理过程
                Thread.sleep(1000L);
                return "后果" + key;
            });
            futures.add(future);
        }

        // 3. 获取后果
        for (Future<String> future : futures) {
            try {String result = future.get();
                System.out.println(result);
            } catch (Exception e) {e.printStackTrace();
            }
        }
        executorService.shutdown();}

}

输入后果:

 后果 1
后果 2
后果 3 

个别大家都会这样应用线程池,然而有没有思考过这样应用有没有什么问题?
反正我发现两个比较严重的问题:

  1. 获取后果时,调用的 future.get() 办法,会阻塞以后线程,直到返回后果,大大降低性能
  2. 有一半的代码在写怎么应用线程,其实咱们不应该关怀怎么应用线程,更应该关注工作的解决

有没有具体的优化计划呢?当然有了,请进去咱们明天的配角 CompletableFuture

2. 应用 CompletableFuture 重构工作解决

看一下应用 CompletableFuture 革新后代码:

/**
 * @author yideng
 * @apiNote CompletableFuture 应用示例
 */
public class ThreadDemo {public static void main(String[] args) {
        // 1. 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        for (Integer key : list) {
            // 2. 提交工作
            CompletableFuture.supplyAsync(() -> {
                // 睡眠一秒,模拟处理过程
                try {Thread.sleep(1000L);
                } catch (InterruptedException e) { }
                return "后果" + key;
            }, executorService).whenCompleteAsync((result, exception) -> {
                // 3. 获取后果
                System.out.println(result);
            });;
        }

        executorService.shutdown();
        // 因为 whenCompleteAsync 获取后果的办法是异步的,所以要阻塞以后线程能力输入后果
        try {Thread.sleep(2000L);
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }

}

输入后果:

 后果 1
后果 2
后果 3 

代码中应用了 CompletableFuture 的两个办法,
supplyAsync() 办法作用是提交异步工作,有两个传参,工作和自定义线程池。
whenCompleteAsync() 办法作用是异步获取后果,也有两个传参,后果和异样信息。

代码通过 CompletableFuture 革新后,是如许的简洁优雅。
提交工作也不必再关怀线程池是怎么应用了,获取后果也不必再阻塞以后线程了。

如果你比拟倔强,还想同步获取后果,能够应用 whenComplete() 办法,或者独自调用 join() 办法。
join() 办法配合 Stream 流是这样用的:

/**
 * @author yideng
 * @apiNote CompletableFuture 应用示例
 */
public class ThreadDemo {public static void main(String[] args) {
        // 1. 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        // 2. 提交工作
        List<String> results = list.stream().map(key ->
                CompletableFuture.supplyAsync(() -> {
                    // 睡眠一秒,模拟处理过程
                    try {Thread.sleep(1000L);
                    } catch (InterruptedException e) { }
                    return "后果" + key;
                }, executorService))
                .map(CompletableFuture::join).collect(Collectors.toList());

        executorService.shutdown();
        // 3. 获取后果
        System.out.println(results);

    }

}

输入后果:

[后果 1, 后果 2, 后果 3]

如许的简洁优雅啊!原来 executorService.submit() 这种应用线程池的形式,能够彻底丢掉了。

3. CompletableFuture 更多妙用

3.1 期待所有工作执行实现

如果让你实现期待所有工作线程执行实现,再进行下一步操作,你会怎么做?
我猜你肯定会应用 线程池 +CountDownLatch,像上面这样:

/**
 * @author yideng
 * @apiNote 线程池和 CountDownLatch 应用示例
 */
public class ThreadDemo {public static void main(String[] args) {
        // 1. 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (Integer key : list) {
            // 2. 提交工作
            executorService.execute(() -> {
                // 睡眠一秒,模拟处理过程
                try {Thread.sleep(1000L);
                } catch (InterruptedException e) { }
                System.out.println("后果" + key);
                countDownLatch.countDown();});
        }

        executorService.shutdown();
        // 3. 阻塞期待所有工作执行实现
        try {countDownLatch.await();
        } catch (InterruptedException e) {}}

}

输入后果:

 后果 2
后果 3
后果 1 

Low 不 Low?十年前能够这样写,Java8 都曾经公布 7 年了,你还不会用 Java8 的写法?看一下应用 CompletableFuture 是怎么重构的:

/**
 * @author yideng
 * @apiNote CompletableFuture.allOf() 办法应用示例
 */
public class ThreadDemo {public static void main(String[] args) {
        // 1. 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        // 2. 提交工作,并调用 join() 阻塞期待所有工作执行实现
        CompletableFuture
                .allOf(list.stream().map(key ->
                                CompletableFuture.runAsync(() -> {
                                    // 睡眠一秒,模拟处理过程
                                    try {Thread.sleep(1000L);
                                    } catch (InterruptedException e) { }
                                    System.out.println("后果" + key);
                                }, executorService))
                                .toArray(CompletableFuture[]::new))
                .join();
        executorService.shutdown();}

}

输入后果:

 后果 3
后果 1
后果 2 

代码看着有点乱,其实逻辑很清晰。

  1. 遍历 list 汇合,提交 CompletableFuture 工作,把后果转换成数组
  2. 再把数组放到 CompletableFuture 的 allOf() 办法外面
  3. 最初调用 join() 办法阻塞期待所有工作执行实现

CompletableFuture 的 allOf() 办法的作用就是,期待所有工作解决实现。
这样写是不是简洁优雅了许多?

3.2 任何一个工作解决实现就返回

如果要实现这样一个需要,往线程池提交一批工作,只有有其中一个工作解决实现就返回。
该怎么做?如果你手动实现这个逻辑的话,代码必定简单且低效,有了 CompletableFuture 就非常简单了,只需调用 anyOf() 办法就行了。

/**
 * @author yideng
 * @apiNote CompletableFuture.anyOf() 办法应用示例
 */
public class ThreadDemo {public static void main(String[] args) {
        // 1. 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        List<Integer> list = Arrays.asList(1, 2, 3);
        long start = System.currentTimeMillis();
        // 2. 提交工作
        CompletableFuture<Object> completableFuture = CompletableFuture
                .anyOf(list.stream().map(key ->
                                CompletableFuture.supplyAsync(() -> {
                                    // 睡眠一秒,模拟处理过程
                                    try {Thread.sleep(1000L);
                                    } catch (InterruptedException e) { }
                                    return "后果" + key;
                                }, executorService))
                                .toArray(CompletableFuture[]::new));
        executorService.shutdown();

        // 3. 获取后果
        System.out.println(completableFuture.join());
    }

}

输入后果:

 后果 3 

一切都是那么简略优雅。

3.3 一个线程执行实现,交给另一个线程接着执行

有这么一个需要:
一个线程解决实现,把解决的后果交给另一个线程持续解决,怎么实现?

你是不是想到了一堆工具,线程池、CountDownLatch、Semaphore、ReentrantLock、Synchronized,该怎么进行组合应用呢?AB 组合还是 BC 组合?

别瞎想了,你写的必定没有 CompletableFuture 好用,看一下 CompletableFuture 是怎么用的:

/**
 * @author yideng
 * @apiNote CompletableFuture 线程接力解决示例
 */
public class ThreadDemo {public static void main(String[] args) {
        // 1. 创立线程池
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        // 2. 提交工作,并调用 join() 阻塞期待工作执行实现
        String result2 = CompletableFuture.supplyAsync(() -> {
            // 睡眠一秒,模拟处理过程
            try {Thread.sleep(1000L);
            } catch (InterruptedException e) { }
            return "后果 1";
        }, executorService).thenApplyAsync(result1 -> {
            // 睡眠一秒,模拟处理过程
            try {Thread.sleep(1000L);
            } catch (InterruptedException e) { }
            return result1 + "后果 2";
        }, executorService).join();

        executorService.shutdown();
        // 3. 获取后果
        System.out.println(result2);
    }

}

输入后果:

 后果 1 后果 2 

代码次要用到了 CompletableFuture 的 thenApplyAsync() 办法,作用就是异步解决上一个线程的后果。

是不是太不便了?

这么好用的 CompletableFuture 还有没有其余性能?当然有。

4. CompletableFuture 罕用 API

4.1 CompletableFuture 罕用 API 阐明

  1. 提交工作
    supplyAsync
    runAsync
  2. 接力解决

    thenRun thenRunAsync
    thenAccept thenAcceptAsync
    thenApply thenApplyAsync
    handle handleAsync
    applyToEither applyToEitherAsync
    acceptEither acceptEitherAsync
    runAfterEither runAfterEitherAsync
    thenCombine thenCombineAsync
    thenAcceptBoth thenAcceptBothAsync

API 太多,有点目迷五色,很容易分类。
带 run 的办法,无入参,无返回值。
带 accept 的办法,有入参,无返回值。
带 supply 的办法,无入参,有返回值。
带 apply 的办法,有入参,有返回值。
带 handle 的办法,有入参,有返回值,并且带异样解决。
以 Async 结尾的办法,都是异步的,否则是同步的。
以 Either 结尾的办法,只需实现任意一个。
以 Both/Combine 结尾的办法,必须所有都实现。

  1. 获取后果
    join 阻塞期待,不会抛异样
    get 阻塞期待,会抛异样
    complete(T value) 不阻塞,如果工作已实现,返回处理结果。如果没实现,则返回传参 value。
    completeExceptionally(Throwable ex) 不阻塞,如果工作已实现,返回处理结果。如果没实现,抛异样。

4. CompletableFuture 罕用 API 应用示例

用最常见的煮饭来举例:

4.1 then、handle 办法应用示例

/**
 * @author yideng
 * @apiNote then、handle 办法应用示例
 */
public class ThreadDemo {public static void main(String[] args) {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("1. 开始淘米");
            return "2. 淘米实现";
        }).thenApplyAsync(result -> {System.out.println(result);
            System.out.println("3. 开始煮饭");
            // 生成一个 1~10 的随机数
            if (RandomUtils.nextInt(1, 10) > 5) {throw new RuntimeException("4. 电饭煲坏了,煮不了");
            }
            return "4. 煮饭实现";
        }).handleAsync((result, exception) -> {if (exception != null) {System.out.println(exception.getMessage());
                return "5. 明天没饭吃";
            } else {System.out.println(result);
                return "5. 开始吃饭";
            }
        });

        try {String result = completableFuture.get();
            System.out.println(result);
        } catch (Exception e) {e.printStackTrace();
        }
    }

}

输入后果可能是:

1. 开始淘米
2. 淘米实现
3. 开始煮饭
4. 煮饭实现
5. 开始吃饭 

也可能是:

1. 开始淘米
2. 淘米实现
3. 开始煮饭
java.lang.RuntimeException: 4. 电饭煲坏了,煮不了
5. 明天没饭吃 

4.2 complete 办法应用示例

/**
 * @author yideng
 * @apiNote complete 应用示例
 */
public class ThreadDemo {public static void main(String[] args) {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {return "饭做好了";});
        
        //try {//    Thread.sleep(1L);
        //} catch (InterruptedException e) {//}

        completableFuture.complete("饭还没做好,我点外卖了");
        System.out.println(completableFuture.join());
    }

}

输入后果:

 饭还没做好,我点外卖了 

如果把正文的 sleep() 办法放开,输入后果就是:

 饭做好了 

4.3 either 办法应用示例

/**
 * @author yideng
 * @apiNote either 办法应用示例
 */
public class ThreadDemo {public static void main(String[] args) {CompletableFuture<String> meal = CompletableFuture.supplyAsync(() -> {return "饭做好了";});
        CompletableFuture<String> outMeal = CompletableFuture.supplyAsync(() -> {return "外卖到了";});

        // 饭先做好,就吃饭。外卖先到,就吃外卖。就是这么任性。CompletableFuture<String> completableFuture = meal.applyToEither(outMeal, myMeal -> {return myMeal;});

        System.out.println(completableFuture.join());
    }

}

输入后果可能是:

 饭做好了 

也可能是:

 外卖到了 

学会了吗?开发中赶快用起来吧!

退出移动版