共计 4650 个字符,预计需要花费 12 分钟才能阅读完成。
在 JDK1.5 曾经提供了 Future 和 Callable 的实现, 能够用于阻塞式获取后果, 如果想要异步获取后果, 通常都会以轮询的形式去获取后果, 如下:
// 定义一个异步工作
Future<String> future = executor.submit(() -> {Thread.sleep(2000);
return "hello world";
});
// 轮询获取后果
while (true) {if (future.isDone()) {System.out.println(future.get());
break;
}
}
从下面的模式看来轮询的形式会消耗无谓的 CPU 资源,而且也不能及时地失去计算结果. 所以要实现真正的异步, 上述这样是齐全不够的, 在 Netty 中, 咱们随处可见异步编程
ChannelFuture f = serverBootstrap.bind(port).sync();
f.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {System.out.println("complete");
}
});
而 JDK1.8 中的 CompletableFuture 就为咱们提供了异步函数式编程,CompletableFuture 提供了十分弱小的 Future 的扩大性能,能够帮忙咱们简化异步编程的复杂性,提供了函数式编程的能力,能够通过回调的形式解决计算结果,并且提供了转换和组合 CompletableFuture 的办法。
- 创立 CompletableFuture 对象
CompletableFuture 提供了四个静态方法用来创立 CompletableFuture 对象:
Asynsc 示意异步, 而 supplyAsync 与 runAsync 不同在与前者异步返回一个后果, 后者是 void. 第二个函数第二个参数示意是用咱们本人创立的线程池, 否则采纳默认的 ForkJoinPool.commonPool()作为它的线程池. 其中 Supplier 是一个函数式接口, 代表是一个生成者的意思, 传入 0 个参数, 返回一个后果.(更具体的能够看我另一篇文章)
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{return "hello world";});
System.out.println(future.get()); // 阻塞的获取后果 ''helllo world"
- 被动计算
以下 4 个办法用于获取后果
getNow 有点非凡,如果后果曾经计算完则返回后果或者抛出异样,否则返回给定的 valueIfAbsent 值。join()与 get()区别在于 join()返回计算的后果或者抛出一个 unchecked 异样 (CompletionException),而 get() 返回一个具体的异样.
被动触发计算.
下面办法示意当调用 CompletableFuture.get()被阻塞的时候, 那么这个办法就是完结阻塞, 并且 get()获取设置的 value.
public static CompletableFuture<Integer> compute () {final CompletableFuture<Integer> future = new CompletableFuture<>();
return future;
}
public static void main (String[]args) throws Exception {final CompletableFuture<Integer> f = compute();
class Client extends Thread {
CompletableFuture<Integer> f;
Client(String threadName, CompletableFuture<Integer> f) {super(threadName);
this.f = f;
}
@Override
public void run() {
try {System.out.println(this.getName() + ":" + f.get());
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}
}
}
new Client("Client1", f).start();
new Client("Client2", f).start();
System.out.println("waiting");
// 设置 Future.get()获取到的值
f.complete(100);
// 以异样的模式触发计算
//f.completeExceptionally(new Exception());
Thread.sleep(1000);
}
- 计算结果实现时的解决
下面 4 个办法是当计算阶段完结的时候触发,BiConsumer 有两个入参, 别离代表计算返回值, 另外一个是异样. 无返回值. 办法不以 Async 结尾,意味着 Action 应用雷同的线程执行,而 Async 可能会应用其它的线程去执行(如果应用雷同的线程池,也可能会被同一个线程选中执行)。
future.whenCompleteAsync((v, e) -> {System.out.println("return value:" + v + "exception:" + e);
});
handle()
与 whenComplete()不同的是这个函数返回 CompletableFuture 并不是原始的 CompletableFuture 返回的值, 而是 BiFunction 返回的值.
- CompletableFuture 的组合
thenApply
当计算结算实现之后, 前面能够接持续一系列的 thenApply, 来实现值的转化.
它们与 handle 办法的区别在于 handle 办法会解决失常计算值和异样,因而它能够屏蔽异样,防止异样持续抛出。而 thenApply 办法只是用来解决正常值,因而一旦有异样就会抛出。
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{return "hello world";});
CompletableFuture<String> future3 = future.thenApply((element)->{return element+"addPart";}).thenApply((element)->{return element+"addTwoPart";});
System.out.println(future3.get());//hello world addPart addTwoPart
- CompletableFuture 的 Consumer
只对 CompletableFuture 的后果进行生产, 无返回值, 也就是最初的 CompletableFuture 是 void.
// 入参为原始的 CompletableFuture 的后果.
CompletableFuture future4 = future.thenAccept((e)->{System.out.println("without return value");
});
future4.get();
thenAcceptBoth
这个办法用来组合两个 CompletableFuture, 其中一个 CompletableFuture 期待另一个 CompletableFuture 的后果.
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "hello world";});
CompletableFuture future5 = future.thenAcceptBoth(CompletableFuture.completedFuture("compose"),
(x, y) -> System.out.println(x + y));//hello world compose
- Either 和 ALL
thenAcceptBoth 是当两个 CompletableFuture 都计算实现,而咱们上面要理解的办法 applyToEither 是当任意一个 CompletableFuture 计算实现的时候就会执行。
Random rand = new Random();
CompletableFuture<Integer> future9 = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(1000 + rand.nextInt(1000));
} catch (InterruptedException e) {e.printStackTrace();
}
return 100;
});
CompletableFuture<Integer> future10 = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(1000 + rand.nextInt(1000));
} catch (InterruptedException e) {e.printStackTrace();
}
return 200;
});
// 两个中任意一个计算实现, 那么触发 Runnable 的执行
CompletableFuture<String> f = future10.applyToEither(future9,i -> i.toString());
// 两个都计算实现, 那么触发 Runnable 的执行
CompletableFuture f1 = future10.acceptEither(future9,(e)->{System.out.println(e);
});
System.out.println(f.get());
如果想组合超过 2 个以上的 CompletableFuture,allOf 和 anyOf 可能会满足你的要求.allOf 办法是当所有的 CompletableFuture 都执行完后执行计算。anyOf 办法是当任意一个 CompletableFuture 执行完后就会执行计算,计算的后果雷同。
总结
有了 CompletableFuture 之后, 咱们本人实现异步编程变得轻松很多, 这个类也提供了许多办法来组合 CompletableFuture. 联合 Lambada 表达式来用, 变得很轻松.
f 的 whenComplete 的内容由哪个线程来执行,取决于哪个线程 X 执行了 f.complete()。然而当 X 线程执行了 f.complete()的时候,whenComplete 还没有被执行到的时候(就是事件还没有注册的时候),那么 X 线程就不会去同步执行 whenComplete 的回调了。这个时候哪个线程执行到了 whenComplete 的事件注册的时候,就由哪个线程本人来同步执行 whenComplete 的事件内容。
而 whenCompleteAsync 的场合,就简略很多。一句话就是线程池外面拿一个空的线程或者新启一个线程来执行回调。和执行 f.complete 的线程以及执行 whenCompleteAsync 的线程无关。