共计 3086 个字符,预计需要花费 8 分钟才能阅读完成。
前段时间工作上比较忙,这篇文章一直没来得及写,本文是阅读《Java8 实战》的时候,了解到 Java 8 里已经提供了一个异步非阻塞的接口(CompletableFuture),可以实现简单的响应式编程的模式,因此用这篇文章做个梳理。我是带着下面这几个问题去学习 CompletableFuture 这个接口的,
- CompletableFuture 是为了解决什么问题而设计的?
- 它的使用场景是什么?开源软件中有实战使用案例吗?
- CompletableFuture 的常用 API 都有哪些?如何使用?
- CompletableFuture 和 RxJava 有什么不同?
这篇文章梳理下来,基本上可以回答前面四个问题,OK,我们进入正文。
基本概念
RPC(远程方法调用)的四种方式有:oneway、sync、future 和 callback,在 dubbo 或 bolt 这类通信框架中,默认使用的是 sync 模式(同步 + 阻塞),future 和 callback 都属于异步模式,不过 future 模式在 get 的时候会阻塞,callback 模式则不需要等待结果,有结果后服务端会回调请求方。
异步调用这类模式,比较适合的场景是 IO 密集型场景,要执行很多远程调用的任务,并且这些调用耗时可能比较久。以 openwrite 中的一个 case 为例:我发布一篇文章,需要给几个不同的写作平台创建文章,这时候我不希望这个过程是顺序的,就比较适合用异步调用模式。
Future 模式除了在 get() 调用的时候会阻塞外,还有其他的局限性,例如:没有使用 Java Lambda 表达式的优势,对一连串的异步调用可以支持,但是写出来的代码会比较复杂。
CompletableFuture 的常用 API
阅读 CompletableFuture 的 API 的时候,我有一个体会——CompletableFuture 之于 Future,除了增加了回调这个最重要的特性,其他的特性有点像 Stream 对于集合迭代的增强。
使用 CompletableFuture,我们可以像 Stream 一样使用一部调用,可以处理一些级联的异步调用(类似于 Stream 里的 flatMap)、可以过滤一些无用的异步调用(anyOf、allOf)。
下面这张图是我按照自己的理解,梳理除了 CompletableFuture 常见的 API,阅读的时候需要注意下面几个点:
- 把握几个大的分类:创建 CompletableFuture、获取 CompletableFuture 的执行结果、主动结束 CompletableFuture、异步调用任务的组合处理;
- 看着方法多,但是有规律可循,例如 apply 字样的接口,传入的方法参数都是有返回值的;
- 带 either 字样的,都是多个异步任务有一个满足条件即可的;
- 带 executor 方法的,都表示该方法可以用自定义的线程池来优化性能。
Dubbo 项目中的使用案例
Dubbo 对于异步化的支持起始在 2.6.x 中就有提供,是在发布 bean 的时候加个属性配置——async=true,然后利用上下文将异步标识一层层传递下去。在之前的公司中有一次排查 dubbo(当时我们用的是 dubbox)异步调用的问题,最后查到的原因就是多个异步调用,上下文里的信息串了。
Dubbo 2.7 中使用了 JDK1.8 提供的 CompletableFuture 原生接口对自身的异步化做了改进。CompletableFuture 可以支持 future 和 callback 两种调用方式。在 Dubbo 最新的 master 代码中,我知道了 Dubbo 的异步结果的定义,它的类图如下,可以看出 AsyncRpcResult 是一个 CompletableFuture 接口的实现。
实战 Demo
通过下面的例子,可以看出 CompletableFuture 的最大好处——callback 特性。首先定义一个接口,其中包括同步接口和该接口的异步版本。
public interface AsyncInterfaceExample {String computeSomeThine();
CompletableFuture<String> computeSomeThingAsync();}
然后定义该接口的实现类,可以看出,如果要讲现有的同步接口异步化,是比较容易的;
public class AsyncInterfaceExampleImpl implements AsyncInterfaceExample {
@Override
public String computeSomeThine() {
try {Thread.sleep(2000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
return "hello, world";
}
@Override
public CompletableFuture<String> computeSomeThingAsync() {return CompletableFuture.supplyAsync(this::computeSomeThine);
}
}
然后看下我们的测试 case,如下:
public class AsyncInterfaceExampleTest {private static String getOtherThing() {
try {Thread.sleep(2000);
} catch (InterruptedException e) {e.printStackTrace();
}
return "other";
}
public static void main(String[] args) {AsyncInterfaceExample asyncInterfaceExample = new AsyncInterfaceExampleImpl();
//case1 同步调用
long start = System.currentTimeMillis();
String someThing = asyncInterfaceExample.computeSomeThine();
String other = getOtherThing();
System.out.println("cost:" + (System.currentTimeMillis() - start) + "result:" + someThing + other);
//case2 异步调用,使用回调
start = System.currentTimeMillis();
CompletableFuture<String> someThingFuture = asyncInterfaceExample.computeSomeThingAsync();
other = getOtherThing();
long finalStart = start;
String finalOther = other;
someThingFuture.whenComplete((returnValue, exception) -> {if (exception == null) {
System.out.println("cost:" + (System.currentTimeMillis() - finalStart) + "result:" + returnValue + finalOther);
} else {exception.printStackTrace();
}
});
}
}
上面这个案例的执行结果如下图所示:
*
本号专注于后端技术、JVM 问题排查和优化、Java 面试题、个人成长和自我管理等主题,为读者提供一线开发者的工作和成长经验,期待你能在这里有所收获。