关于程序员:如何编写优雅的异步代码-CompletableFuture

41次阅读

共计 9772 个字符,预计需要花费 25 分钟才能阅读完成。

前言

在咱们的意识里,同步执行的程序都比拟合乎人们的思维形式,而异步的货色通常都不好解决。在异步计算的状况下,以回调示意的动作往往会扩散在代码中,也可能互相嵌套在外部,如果须要解决其中一个步骤中可能产生的谬误时,状况变得更加蹩脚。Java 8 引入了很多的新个性,其中就蕴含了 CompletableFuture 类的引入,这让咱们编写清晰可读的异步代码变得更加容易,该类性能十分弱小,蕴含了超过 50 多个办法。。。

什么是 CompletableFuture

CompletableFuture 类的设计灵感来自于 Google Guava 的 ListenableFuture 类,它实现了 Future 和 CompletionStage 接口并且新增了许多办法,它反对 lambda,通过回调利用非阻塞办法,晋升了异步编程模型。它容许咱们通过在与主应用程序线程不同的线程上(也就是异步)运行工作,并向主线程告诉工作的进度、实现或失败,来编写非阻塞代码。

为什么要引入 CompletableFuture

Java 的 1.5 版本引入了 Future,你能够把它简略的了解为运算后果的占位符,它提供了两个办法来获取运算后果。

get():调用该办法线程将会无限期期待运算后果。

get(long timeout, TimeUnit unit):调用该办法线程将仅在指定工夫 timeout 内期待后果,如果期待超时就会抛出 TimeoutException 异样。

Future 能够应用 Runnable 或 Callable 实例来实现提交的工作,通过其源码能够看出,它存在如下几个问题:

阻塞 调用 get() 办法会始终阻塞,直到期待直到计算实现,它没有提供任何办法能够在实现时告诉,同时也不具备附加回调函数的性能。

链式调用和后果聚合解决 在很多时候咱们想链接多个 Future 来实现耗时较长的计算,此时须要合并后果并将后果发送到另一个工作中,该接口很难实现这种解决。

异样解决 Future 没有提供任何异样解决的形式。

以上这些问题在 CompletableFuture 中都曾经解决了,接下来让咱们看看如何去应用 CompletableFuture。

如何创立 CompletableFuture

最简略的创立形式就是调用 CompletableFuture.completedFuture(U value) 办法来获取一个曾经实现的 CompletableFuture 对象。

@Test

public void testSimpleCompletableFuture() {

CompletableFuture<String> completableFuture = CompletableFuture.completedFuture(“Hello mghio”);

assertTrue(completableFuture.isDone());

try {

assertEquals(“Hello mghio”, completableFuture.get());

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

}

须要留神的是当咱们对不残缺的 CompleteableFuture 调用 get 办法的话,会因为 Future 未实现,因而 get 调用将永远阻塞,此时能够应用 CompletableFuture.complete 办法手动实现 Future。

工作异步解决

当咱们想让程序在后盾异步执行工作而不关怀工作的处理结果时,能够应用 runAsync 办法,该办法接管一个 Runnable 类型的参数返回 CompletableFuture。

@Test

public void testCompletableFutureRunAsync() {

AtomicInteger variable = new AtomicInteger(0);

CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> process(variable));

runAsync.join();

assertEquals(100, variable.get());

}

public void process(AtomicInteger variable) {

System.out.println(Thread.currentThread() + ” Process…”);

variable.set(100);

}

如果咱们想让工作在后盾异步执行而且须要获取工作的处理结果时,能够应用 supplyAsync 办法,该办法接管一个 Supplier 类型的参数返回一个 CompletableFuture。

@Test

public void testCompletableFutureSupplyAsync() {

CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process);

try {

assertEquals(“Hello mghio”, supplyAsync.get()); // Blocking

} catch (ExecutionException | InterruptedException e) {

e.printStackTrace();

}

}

public String process() {

return “Hello mghio”;

}

看到这里你可能会有个问题,下面执行 runAsync 和 supplyAsync 工作的线程是从哪里来的、谁创立的呢?实际上它和 Java 8 中的 parallelStream 相似,CompletableFuture 也是从全局 ForkJoinPool.commonPool() 取得的线程中执行这些工作的。同时,下面的两个办法也提供了自定义线程池去执行工作,其实你如果去理解过 CompletableFuture 的源码的话,你会发现其 API 中的所有办法都有个重载的版本,有或没有自定义 Executor 执行器。

@Test

public void testCompletableFutureSupplyAsyncWithExecutor() {

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);

CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(this::process, newFixedThreadPool);

try {

assertEquals(“Hello mghio”, supplyAsync.get()); // Blocking

} catch (ExecutionException | InterruptedException e) {

e.printStackTrace();

}

}

public String process() {

return “Hello mghio”;

}

链式调用和后果聚合解决

咱们晓得 CompletableFuture 的 get() 办法会始终阻塞直到获取到后果,CompletableFuture 提供了 thenApply、thenAccept 和 thenRun 等办法来防止这种状况,而且咱们还能够增加工作实现后的回调告诉。这几个办法的应用场景如下:

thenApply 当咱们如果要在从 Future 接管值后工作之前运行自定义的业务代码,而后要为此工作返回一些值时,则能够应用该办法

thenAccept 如果咱们心愿在从 Future 接管到一些值后执行工作之前运行自定义的业务代码而不关怀返回后果值时,则能够应用该办法

thenRun 如果咱们想在 Future 实现后运行自定义的站长交易业务代码,并且不想为此返回任何值时,则能够应用该办法

@Test

public void testCompletableFutureThenApply() {

Integer notificationId = CompletableFuture.supplyAsync(this::thenApplyProcess)

.thenApply(this::thenApplyNotify) // Non Blocking

.join();

assertEquals(new Integer(1), notificationId);

}

@Test

public void testCompletableFutureThenAccept() {

CompletableFuture.supplyAsync(this::processVariable)

.thenAccept(this::thenAcceptNotify) // Non Blocking

.join();

assertEquals(100, variable.get());

}

@Test

public void testCompletableFutureThenRun() {

CompletableFuture.supplyAsync(this::processVariable)

.thenRun(this::thenRunNotify)

.join();

assertEquals(100, variable.get());

}

private String processVariable() {

variable.set(100);

return “success”;

}

private void thenRunNotify() {

System.out.println(“thenRun completed notify ….”);

}

private Integer thenApplyNotify(Integer integer) {

return integer;

}

private void thenAcceptNotify(String s) {

System.out.println(

String.format(“Thread %s completed notify ….”, Thread.currentThread().getName()));

}

public Integer thenApplyProcess() {

return 1;

}

如果有大量的异步计算,那么咱们能够持续将值从一个回调传递到另一个回调中去,也就是应用链式调用形式,应用形式很简略。

@Test

public void testCompletableFutureThenApplyAccept() {

CompletableFuture.supplyAsync(this::findAccountNumber)

.thenApply(this::calculateBalance)

.thenApply(this::notifyBalance)

.thenAccept((i) -> notifyByEmail()).join();

}

private void notifyByEmail() {

// business code

System.out.println(“send notify by email …”);

}

private Double notifyBalance(Double d) {

// business code

System.out.println(String.format(“your balance is $%s”, d));

return 9527D;

}

private Double calculateBalance(Object o) {

// business code

return 9527D;

}

private Double findAccountNumber() {

// business code

return 9527D;

}

比拟仔细的敌人可能留神到在所有后面的几个办法示例中,所有办法都是在同一线程上执行的。如果咱们心愿这些工作在独自的线程上运行时,那么咱们能够应用这些办法对应的异步版本。

@Test

public void testCompletableFutureApplyAsync() {

ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);

ScheduledExecutorService newSingleThreadScheduledExecutor = Executors

.newSingleThreadScheduledExecutor();

CompletableFuture<Double> completableFuture =

CompletableFuture

.supplyAsync(this::findAccountNumber,

newFixedThreadPool) // _从线程池_ newFixedThreadPool _获取线程执行工作_

.thenApplyAsync(this::calculateBalance,

newSingleThreadScheduledExecutor)

.thenApplyAsync(this::notifyBalance);

Double balance = completableFuture.join();

assertEquals(9527D, balance);

}

执行后果解决

thenCompose 办法适宜有依赖性的工作解决,比方一个计算账户余额的业务:首先咱们要先找到帐号,而后为该帐户计算余额,而后计算实现后再发送告诉。所有这些工作都是依赖前一个工作的返回 CompletableFuture 后果,此时咱们须要应用 thenCompose 办法,其实有点相似于 Java 8 流的 flatMap 操作。

@Test

public void testCompletableFutureThenCompose() {

Double balance = this.doFindAccountNumber()

.thenCompose(this::doCalculateBalance)

.thenCompose(this::doSendNotifyBalance).join();

assertEquals(9527D, balance);

}

private CompletableFuture<Double> doSendNotifyBalance(Double aDouble) {

sleepSeconds(2);

// business code

System.out.println(String.format(“%s doSendNotifyBalance ….”, Thread.currentThread().getName()));

return CompletableFuture.completedFuture(9527D);

}

private CompletableFuture<Double> doCalculateBalance(Double d) {

sleepSeconds(2);

// business code

System.out.println(String.format(“%s doCalculateBalance ….”, Thread.currentThread().getName()));

return CompletableFuture.completedFuture(9527D);

}

private CompletableFuture<Double> doFindAccountNumber() {

sleepSeconds(2);

// business code

System.out.println(String.format(“%s doFindAccountNumber ….”, Thread.currentThread().getName()));

return CompletableFuture.completedFuture(9527D);

}

private void sleepSeconds(int timeout) {

try {

TimeUnit.SECONDS.sleep(timeout);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

thenCombine 办法次要是用于合并多个独立工作的处理结果。假如咱们须要查找一个人的姓名和住址,则能够应用不同的工作来别离获取,而后要取得这个人的残缺信息(姓名 + 住址),则须要合并这两种办法的后果,那么咱们能够应用 thenCombine 办法。

@Test

public void testCompletableFutureThenCombine() {

CompletableFuture<String> thenCombine = this.findName().thenCombine(this.findAddress(), (name, address) -> name + address);

String personInfo = thenCombine.join();

assertEquals(“mghio Shanghai, China”, personInfo);

}

private CompletableFuture<String> findAddress() {

return CompletableFuture.supplyAsync(() -> {

sleepSeconds(2);

// business code

return “Shanghai, China”;

});

}

private CompletableFuture<String> findName() {

return CompletableFuture.supplyAsync(() -> {

sleepSeconds(2);

// business code

return “mghio “;

});

}

期待多个工作执行实现

在许多状况下,咱们心愿并行运行多个工作,并在所有工作实现后再进行一些解决。假如咱们要查找 3 个不同用户的姓名并将后果合并。此时就能够应用 CompletableFuture 的静态方法 allOf,该办法会期待所有工作实现,须要留神的是该办法它不会返回所有工作的合并后果,因而咱们必须手动组合工作的执行后果。

@Test

public void testCompletableFutureAllof() {

List<CompletableFuture<String>> list = Lists.newArrayListWithCapacity(4);

IntStream.range(0, 3).forEach(num -> list.add(findName(num)));

CompletableFuture<Void> allFuture = CompletableFuture

.allOf(list.toArray(new CompletableFuture[0]));

CompletableFuture<List<String>> allFutureList = allFuture

.thenApply(val -> list.stream().map(CompletableFuture::join).collect(Collectors.toList()));

CompletableFuture<String> futureHavingAllValues = allFutureList

.thenApply(fn -> String.join(“”, fn));

String result = futureHavingAllValues.join();

assertEquals(“mghio0mghio1mghio2”, result);

}

private CompletableFuture<String> findName(int num) {

return CompletableFuture.supplyAsync(() -> {

sleepSeconds(2);

// business code

return “mghio” + num;

});

}

异样解决

在多线程中程序异样其实不太好解决,然而侥幸的是在 CompletableFuture 中给咱们提供了很不便的异样解决形式,在咱们下面的例子代码中:

@Test

public void testCompletableFutureThenCompose() {

Double balance = this.doFindAccountNumber()

.thenCompose(this::doCalculateBalance)

.thenCompose(this::doSendNotifyBalance).join();

}

在下面的代码中,三个办法 doFindAccountNumber、doCalculateBalance 和 doSendNotifyBalance 只有任意一个产生异样了,则之后调用的办法将不会运行。CompletableFuture 提供了三种解决异样的形式,别离是 exceptionally、handle 和 whenComplete 办法。

第一种形式是应用 exceptionally 办法解决异样,如果后面的办法失败并产生异样,则会调用异样回调。

@Test

public void testCompletableFutureExceptionally() {

CompletableFuture<Double> thenApply = CompletableFuture.supplyAsync(this::findAccountNumber)

.thenApply(this::calculateBalance)

.thenApply(this::notifyBalance)

.exceptionally(ex -> {

System.out.println(“Exception ” + ex.getMessage());

return 0D;

});

Double join = thenApply.join();

assertEquals(9527D, join);

}

第二种形式是应用 handle 办法解决异样,应用该形式解决异样比下面的 exceptionally 形式更为灵便,咱们能够同时获取到异样对象和以后的处理结果。

@Test

public void testCompletableFutureHandle() {

CompletableFuture.supplyAsync(this::findAccountNumber)

.thenApply(this::calculateBalance)

.thenApply(this::notifyBalance)

.handle((ok, ex) -> {

System.out.println(“ 最终要运行的代码 …”);

if (ok != null) {

System.out.println(“No Exception !!”);

} else {

System.out.println(“Exception ” + ex.getMessage());

return -1D;

}

return ok;

});

}

第三种是应用 whenComplete 办法解决异样。

@Test

public void testCompletableFutureWhenComplete() {

CompletableFuture.supplyAsync(this::findAccountNumber)

.thenApply(this::calculateBalance)

.thenApply(this::notifyBalance)

.whenComplete((result, ex) -> {

System.out.println(“result = ” + result + “, ex = ” + ex);

System.out.println(“ 最终要运行的代码 …”);

});

}

总结

在本文中,简要的介绍了 CompletableFuture 类的局部办法和应用形式,这个类的办法很多同时提供的性能也十分弱小,在异步编程中应用的比拟多,相熟了根本的应用办法之后要深刻理解还是要深刻源码剖析其实现原理。

正文完
 0