日常开发中,咱们都会用到线程池,个别会用 execute() 和 submit() 办法提交工作。然而当你用过 CompletableFuture 之后,就会发现以前的线程池解决工作有多难用,性能有多简陋,CompletableFuture 又是如许简洁优雅。
要晓得 CompletableFuture 曾经随着 Java8 公布 7 年了,还没有过它就有点说不过去了。
明天 5 分钟带你深入浅出 CompletableFuture 实用教程。
1. 应用线程池解决工作
/**
* @author yideng
* @apiNote 线程池应用示例
*/
public class ThreadDemo {public static void main(String[] args) {
// 1. 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
List<Future<String>> futures = new ArrayList<>();
for (Integer key : list) {
// 2. 提交工作
Future<String> future = executorService.submit(() -> {
// 睡眠一秒,模拟处理过程
Thread.sleep(1000L);
return "后果" + key;
});
futures.add(future);
}
// 3. 获取后果
for (Future<String> future : futures) {
try {String result = future.get();
System.out.println(result);
} catch (Exception e) {e.printStackTrace();
}
}
executorService.shutdown();}
}
输入后果:
后果 1
后果 2
后果 3
个别大家都会这样应用线程池,然而有没有思考过这样应用有没有什么问题?
反正我发现两个比较严重的问题:
- 获取后果时,调用的 future.get() 办法,会阻塞以后线程,直到返回后果,大大降低性能
- 有一半的代码在写怎么应用线程,其实咱们不应该关怀怎么应用线程,更应该关注工作的解决
有没有具体的优化计划呢?当然有了,请进去咱们明天的配角 CompletableFuture
2. 应用 CompletableFuture 重构工作解决
看一下应用 CompletableFuture 革新后代码:
/**
* @author yideng
* @apiNote CompletableFuture 应用示例
*/
public class ThreadDemo {public static void main(String[] args) {
// 1. 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
for (Integer key : list) {
// 2. 提交工作
CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模拟处理过程
try {Thread.sleep(1000L);
} catch (InterruptedException e) { }
return "后果" + key;
}, executorService).whenCompleteAsync((result, exception) -> {
// 3. 获取后果
System.out.println(result);
});;
}
executorService.shutdown();
// 因为 whenCompleteAsync 获取后果的办法是异步的,所以要阻塞以后线程能力输入后果
try {Thread.sleep(2000L);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
输入后果:
后果 1
后果 2
后果 3
代码中应用了 CompletableFuture 的两个办法,
supplyAsync() 办法作用是提交异步工作,有两个传参,工作和自定义线程池。
whenCompleteAsync() 办法作用是异步获取后果,也有两个传参,后果和异样信息。
代码通过 CompletableFuture 革新后,是如许的简洁优雅。
提交工作也不必再关怀线程池是怎么应用了,获取后果也不必再阻塞以后线程了。
如果你比拟倔强,还想同步获取后果,能够应用 whenComplete() 办法,或者独自调用 join() 办法。
join() 办法配合 Stream 流是这样用的:
/**
* @author yideng
* @apiNote CompletableFuture 应用示例
*/
public class ThreadDemo {public static void main(String[] args) {
// 1. 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
// 2. 提交工作
List<String> results = list.stream().map(key ->
CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模拟处理过程
try {Thread.sleep(1000L);
} catch (InterruptedException e) { }
return "后果" + key;
}, executorService))
.map(CompletableFuture::join).collect(Collectors.toList());
executorService.shutdown();
// 3. 获取后果
System.out.println(results);
}
}
输入后果:
[后果 1, 后果 2, 后果 3]
如许的简洁优雅啊!原来 executorService.submit() 这种应用线程池的形式,能够彻底丢掉了。
3. CompletableFuture 更多妙用
3.1 期待所有工作执行实现
如果让你实现期待所有工作线程执行实现,再进行下一步操作,你会怎么做?
我猜你肯定会应用 线程池 +CountDownLatch,像上面这样:
/**
* @author yideng
* @apiNote 线程池和 CountDownLatch 应用示例
*/
public class ThreadDemo {public static void main(String[] args) {
// 1. 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
CountDownLatch countDownLatch = new CountDownLatch(list.size());
for (Integer key : list) {
// 2. 提交工作
executorService.execute(() -> {
// 睡眠一秒,模拟处理过程
try {Thread.sleep(1000L);
} catch (InterruptedException e) { }
System.out.println("后果" + key);
countDownLatch.countDown();});
}
executorService.shutdown();
// 3. 阻塞期待所有工作执行实现
try {countDownLatch.await();
} catch (InterruptedException e) {}}
}
输入后果:
后果 2
后果 3
后果 1
Low 不 Low?十年前能够这样写,Java8 都曾经公布 7 年了,你还不会用 Java8 的写法?看一下应用 CompletableFuture 是怎么重构的:
/**
* @author yideng
* @apiNote CompletableFuture.allOf() 办法应用示例
*/
public class ThreadDemo {public static void main(String[] args) {
// 1. 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
// 2. 提交工作,并调用 join() 阻塞期待所有工作执行实现
CompletableFuture
.allOf(list.stream().map(key ->
CompletableFuture.runAsync(() -> {
// 睡眠一秒,模拟处理过程
try {Thread.sleep(1000L);
} catch (InterruptedException e) { }
System.out.println("后果" + key);
}, executorService))
.toArray(CompletableFuture[]::new))
.join();
executorService.shutdown();}
}
输入后果:
后果 3
后果 1
后果 2
代码看着有点乱,其实逻辑很清晰。
- 遍历 list 汇合,提交 CompletableFuture 工作,把后果转换成数组
- 再把数组放到 CompletableFuture 的 allOf() 办法外面
- 最初调用 join() 办法阻塞期待所有工作执行实现
CompletableFuture 的 allOf() 办法的作用就是,期待所有工作解决实现。
这样写是不是简洁优雅了许多?
3.2 任何一个工作解决实现就返回
如果要实现这样一个需要,往线程池提交一批工作,只有有其中一个工作解决实现就返回。
该怎么做?如果你手动实现这个逻辑的话,代码必定简单且低效,有了 CompletableFuture 就非常简单了,只需调用 anyOf() 办法就行了。
/**
* @author yideng
* @apiNote CompletableFuture.anyOf() 办法应用示例
*/
public class ThreadDemo {public static void main(String[] args) {
// 1. 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Integer> list = Arrays.asList(1, 2, 3);
long start = System.currentTimeMillis();
// 2. 提交工作
CompletableFuture<Object> completableFuture = CompletableFuture
.anyOf(list.stream().map(key ->
CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模拟处理过程
try {Thread.sleep(1000L);
} catch (InterruptedException e) { }
return "后果" + key;
}, executorService))
.toArray(CompletableFuture[]::new));
executorService.shutdown();
// 3. 获取后果
System.out.println(completableFuture.join());
}
}
输入后果:
后果 3
一切都是那么简略优雅。
3.3 一个线程执行实现,交给另一个线程接着执行
有这么一个需要:
一个线程解决实现,把解决的后果交给另一个线程持续解决,怎么实现?
你是不是想到了一堆工具,线程池、CountDownLatch、Semaphore、ReentrantLock、Synchronized,该怎么进行组合应用呢?AB 组合还是 BC 组合?
别瞎想了,你写的必定没有 CompletableFuture 好用,看一下 CompletableFuture 是怎么用的:
/**
* @author yideng
* @apiNote CompletableFuture 线程接力解决示例
*/
public class ThreadDemo {public static void main(String[] args) {
// 1. 创立线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 2. 提交工作,并调用 join() 阻塞期待工作执行实现
String result2 = CompletableFuture.supplyAsync(() -> {
// 睡眠一秒,模拟处理过程
try {Thread.sleep(1000L);
} catch (InterruptedException e) { }
return "后果 1";
}, executorService).thenApplyAsync(result1 -> {
// 睡眠一秒,模拟处理过程
try {Thread.sleep(1000L);
} catch (InterruptedException e) { }
return result1 + "后果 2";
}, executorService).join();
executorService.shutdown();
// 3. 获取后果
System.out.println(result2);
}
}
输入后果:
后果 1 后果 2
代码次要用到了 CompletableFuture 的 thenApplyAsync() 办法,作用就是异步解决上一个线程的后果。
是不是太不便了?
这么好用的 CompletableFuture 还有没有其余性能?当然有。
4. CompletableFuture 罕用 API
4.1 CompletableFuture 罕用 API 阐明
- 提交工作
supplyAsync
runAsync -
接力解决
thenRun thenRunAsync
thenAccept thenAcceptAsync
thenApply thenApplyAsync
handle handleAsync
applyToEither applyToEitherAsync
acceptEither acceptEitherAsync
runAfterEither runAfterEitherAsync
thenCombine thenCombineAsync
thenAcceptBoth thenAcceptBothAsync
API 太多,有点目迷五色,很容易分类。
带 run 的办法,无入参,无返回值。
带 accept 的办法,有入参,无返回值。
带 supply 的办法,无入参,有返回值。
带 apply 的办法,有入参,有返回值。
带 handle 的办法,有入参,有返回值,并且带异样解决。
以 Async 结尾的办法,都是异步的,否则是同步的。
以 Either 结尾的办法,只需实现任意一个。
以 Both/Combine 结尾的办法,必须所有都实现。
- 获取后果
join 阻塞期待,不会抛异样
get 阻塞期待,会抛异样
complete(T value) 不阻塞,如果工作已实现,返回处理结果。如果没实现,则返回传参 value。
completeExceptionally(Throwable ex) 不阻塞,如果工作已实现,返回处理结果。如果没实现,抛异样。
4. CompletableFuture 罕用 API 应用示例
用最常见的煮饭来举例:
4.1 then、handle 办法应用示例
/**
* @author yideng
* @apiNote then、handle 办法应用示例
*/
public class ThreadDemo {public static void main(String[] args) {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {System.out.println("1. 开始淘米");
return "2. 淘米实现";
}).thenApplyAsync(result -> {System.out.println(result);
System.out.println("3. 开始煮饭");
// 生成一个 1~10 的随机数
if (RandomUtils.nextInt(1, 10) > 5) {throw new RuntimeException("4. 电饭煲坏了,煮不了");
}
return "4. 煮饭实现";
}).handleAsync((result, exception) -> {if (exception != null) {System.out.println(exception.getMessage());
return "5. 明天没饭吃";
} else {System.out.println(result);
return "5. 开始吃饭";
}
});
try {String result = completableFuture.get();
System.out.println(result);
} catch (Exception e) {e.printStackTrace();
}
}
}
输入后果可能是:
1. 开始淘米
2. 淘米实现
3. 开始煮饭
4. 煮饭实现
5. 开始吃饭
也可能是:
1. 开始淘米
2. 淘米实现
3. 开始煮饭
java.lang.RuntimeException: 4. 电饭煲坏了,煮不了
5. 明天没饭吃
4.2 complete 办法应用示例
/**
* @author yideng
* @apiNote complete 应用示例
*/
public class ThreadDemo {public static void main(String[] args) {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {return "饭做好了";});
//try {// Thread.sleep(1L);
//} catch (InterruptedException e) {//}
completableFuture.complete("饭还没做好,我点外卖了");
System.out.println(completableFuture.join());
}
}
输入后果:
饭还没做好,我点外卖了
如果把正文的 sleep() 办法放开,输入后果就是:
饭做好了
4.3 either 办法应用示例
/**
* @author yideng
* @apiNote either 办法应用示例
*/
public class ThreadDemo {public static void main(String[] args) {CompletableFuture<String> meal = CompletableFuture.supplyAsync(() -> {return "饭做好了";});
CompletableFuture<String> outMeal = CompletableFuture.supplyAsync(() -> {return "外卖到了";});
// 饭先做好,就吃饭。外卖先到,就吃外卖。就是这么任性。CompletableFuture<String> completableFuture = meal.applyToEither(outMeal, myMeal -> {return myMeal;});
System.out.println(completableFuture.join());
}
}
输入后果可能是:
饭做好了
也可能是:
外卖到了
学会了吗?开发中赶快用起来吧!