本篇次要介绍一下 Dubbo 的异步编程,包含客户端如何发动异步调用、服务端如何异步执行,以及其实现原理。

客户端异步调用

先看下整体流程:

再来看下怎么调用,有两种形式,定义 CompletableFuture 返回类型的接口以及应用 RpcContext。
第一种:定义一个 CompletableFuture 返回类型的接口及实现

public interface CityService {    CompletableFuture<String> getCityId();}
@Servicepublic class CityServiceImpl implements CityService {    @Override    public CompletableFuture<String> getCityId() {        CompletableFuture<String> completableFuture = new CompletableFuture<>();        completableFuture.complete("ni hao");        return completableFuture;    }}

客户端引入该接口并发动调用

    @Reference    CityService cityService;    ...    CompletableFuture<String> future = cityService.getCityId();        future.whenComplete((o, t) -> {            System.out.println(o);        });

第二种:应用async配置,能够用在 Reference 或者 Method 级别
还是定义一个服务接口

public interface CityService {    String getCityName();}
@Servicepublic class CityServiceImpl implements CityService {    @Override    public String getCityName() {        return "hangzhou";    }}

客户端引入该接口并发动调用

    @Reference(async = true)    CityService cityService;    ...    //此处调用后果会返回null(2.7.1版本)    cityService.getCityName();    FutureAdapter<Object> futureAdapter = (FutureAdapter)RpcContext.getContext().getFuture();    futureAdapter.getResultFuture().whenComplete((o, t) -> {            System.out.println(o.getValue());    });    

对于第二种形式调用,须要留神的是单线程下每次调完办法之后,要立即获取对应的FutureAdapter,上面的原理局部会阐明起因。如果你要确保音讯发送胜利的,则须要设置sent为 true。
对于第一种形式咱们能够看到并没有定义async的值,这是因为对于返回类型为 CompletableFuture 的函数,dubbo外面判断为须要异步了。

服务端异步执行

服务端异步执行实际上只是为了进步吞吐,其实就是咱们本人开新线程去解决业务,以便开释dubbo线程去解决其余申请。

服务端的异步执行也有两种形式能够实现,定义 CompletableFuture 返回类型的接口以及应用 AsyncContext。
第一种:定义一个 CompletableFuture 返回类型的接口及实现,其实和下面的客户端异步的接口定义一样,只不过服务端实现不一样。
服务端实现

@Servicepublic class CityServiceImpl implements CityService {    @Override    public CompletableFuture<String> getCityId() {        return CompletableFuture.supplyAsync(() -> "ni hao");    }}

下面用了 CompletableFuture 自带的线程池做异步执行,当然你也能够自定义,比方

@Servicepublic class CityServiceImpl implements CityService {    @Override    public CompletableFuture<String> getCityId() {        CompletableFuture<String> completableFuture = new CompletableFuture<>();        new Thread(() -> {            completableFuture.complete("ni hao");        }).start();        return completableFuture;    }}

这种形式对于接口的定义有肯定要求,如果你接口还是想返回自定义的类型,则须要应用AsyncContext
第二种:应用AsyncContext

@Servicepublic class CityServiceImpl implements CityService {    @Override    public String getCityId() {        AsyncContext asyncContext = RpcContext.startAsync();        new Thread(() -> {            //转储调用线程的RpcContext信息到以后线程,不必则能够正文掉            //asyncContext.signalContextSwitch();            asyncContext.write("zhejiang");        }).start();        return "";    }}

其实AsyncContext外部也是用了 CompletableFuture,Dubbo做了非凡解决来兼容异步操作,前面原理局部会讲到。

原理

基于下面的应用,这里对Dubbo的异步编程阐明一下是如何实现的。也从客户端异步调用和服务端异步执行两个方面来别离介绍。

客户端异步调用

  • 定义 CompletableFuture 返回类型形式
    Dubbo客户端发动调用的时候,实际上是对于接口做了一层代理,最终调用的逻辑在org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke

    从上图能够看到在调用的时候会通过函数 createInvocation 创立一个RpcInvocation

    在创立的时候会判断接口的返回类型是否是 CompletableFuture,如果是的话就是设置异步调用标记

    最终在org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke会进行判断是否须要异步调用,需要的话会返回 CompletableFuture,而后调用端期待监听回调
  • 设置async值为true
    咱们在 Reference 或者 Method 配置中将async设置为 true 之后,Dubbo 在启动的时候会将配置注入到对应的 ReferenceBean 中,在客户端发动调用的时候会进行传递,具体设置逻辑在
    org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke

    如果 Method 级别没有配置则取 Reference 级别,前面的逻辑和第一种形式一样,不再赘述。在应用的时候咱们有说到 "每次调完办法之后,要立即获取对应的FutureAdapter",这里阐明一下起因

    能够看到在发申请之后Dubbo会设置对应的FutureAdapter到 RpcContext 中,而 RpcContext.getContext()是 ThreadLocal 级别的,也就是如果一个线程调用了两次,第二次就会把第一次的异步后果给笼罩了,所以说咱们每次调用之后都要先把Future取回来而后再发动下一次调用。

    能够看到是ThreadLocal级别的存储

    服务端异步执行

    说完客户端异步调用的原理之后,这里再讲一下服务端是如何做异步执行的,还是以下面介绍的两种形式别离阐明。

  • 定义一个 CompletableFuture 返回类型的接口
    服务端哪里接管申请,如何解决申请不在这里介绍,前面会有文章剖析,这里间接贴要害代码,看看服务端如何实现异步执行的,看这里
    org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest

    从这段逻辑咱们能够看出服务端在解决申请之后会返回一个 CompletableFuture,而后期待异步执行结束。咱们再看一下图中的 handler.reply 函数

    能够看到对于 AsyncRpcResult 的返回类型会做异步期待,其余的则间接返回值来完结 CompletableFuture,持续跟进这个 invoker.invoke 函数,理论就是org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke

    能够看到这里有对于返回类型的判断,对于 CompletableFuture 类型的返回,对应 Invocation 中的future_returntype 值则为 true,最终返回一个对 future 包装的 AsyncRpcResult 后果类,进入到 AsyncRpcResult 类看下

    内部拿到的是新建的 resultFuture,valueFuture 则是服务端业务逻辑异步接口返回的 CompletableFuture,这里能够看到如果咱们业务逻辑异步执行结束,则会触发内部拿到 resultFuture的设值并完结,以此来完结内部期待
  • 应用AsyncContext
    再来说一下第二种形式的原理,这种形式据dubbo官网介绍是参考的Servlet 3.0 的异步接口AsyncContext,咱们来看下怎么实现的,同样看一下第一种形式实现的
    org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke

    这里判断了 rpcContext.isAsyncStarted(),如果启动了则会返回异步的后果,咱们在下面应用中介绍过,在业务代码中首先要调用AsyncContext asyncContext = RpcContext.startAsync()来拿到 AsyncContext,先看下这个 startAsync 逻辑

    能够看到这里会获取到以后线程的 RpcContext,而后新建一个 AsyncContext(如果没有的话),而后启动 AsyncContext,看下启动逻辑

    其实就是把状态设置为已启动并且新建一个 CompletableFuture,而这个 Future 就是内部拿到期待业务逻辑异步后果的,在业务逻辑中执行完后失去后果后,咱们须要调用asyncContext.write(xxx)对后果进行设置,看下write函数

    能够看到用完之后就会把Context关掉并且完结 Future 的期待。所以实质上这种形式是利用了ThreadLocal的个性来存储Context进行传递 Future,再利用 Future 来做线程间的期待唤醒。

    总结

    本篇具体介绍了Dubbo客户端异步调用以及服务端异步执行的应用和原理,能够看到Dubbo在很多细节上的确实现得很奇妙,这里是基于2.7.1版本的源码解析,2.6.x版本是不反对 CompletableFuture类型的异步的,对于之后的版本,大略逻辑是差不多的,看起来也就瓜熟蒂落了。