关于dubbo:dubbo-异步调用

42次阅读

共计 4661 个字符,预计需要花费 12 分钟才能阅读完成。

本篇次要介绍一下 Dubbo 的异步编程,包含客户端如何发动异步调用、服务端如何异步执行,以及其实现原理。

客户端异步调用

先看下整体流程:

再来看下怎么调用,有两种形式,定义 CompletableFuture 返回类型的接口以及应用 RpcContext。
第一种:定义一个 CompletableFuture 返回类型的接口及实现

public interface CityService {CompletableFuture<String> getCityId();
}
@Service
public class CityServiceImpl implements CityService {
    @Override
    public CompletableFuture<String> getCityId() {CompletableFuture<String> completableFuture = new CompletableFuture<>();
        completableFuture.complete("ni hao");
        return completableFuture;
    }
}

客户端引入该接口并发动调用

    @Reference
    CityService cityService;
    ...
    CompletableFuture<String> future = cityService.getCityId();
        future.whenComplete((o, t) -> {System.out.println(o);
        });

第二种 :应用async 配置,能够用在 Reference 或者 Method 级别
还是定义一个服务接口

public interface CityService {String getCityName();
}
@Service
public class CityServiceImpl implements CityService {
    @Override
    public String getCityName() {return "hangzhou";}
}

客户端引入该接口并发动调用

    @Reference(async = true)
    CityService cityService;
    ...
    // 此处调用后果会返回 null(2.7.1 版本)cityService.getCityName();
    FutureAdapter<Object> futureAdapter = (FutureAdapter)RpcContext.getContext().getFuture();
    futureAdapter.getResultFuture().whenComplete((o, t) -> {System.out.println(o.getValue());
    });
    

对于第二种形式调用,须要留神的是单线程下每次调完办法之后,要立即获取对应的 FutureAdapter, 上面的原理局部会阐明起因。如果你要确保音讯发送胜利的,则须要设置sent 为 true。
对于第一种形式咱们能够看到并没有定义 async 的值,这是因为对于返回类型为 CompletableFuture 的函数,dubbo 外面判断为须要异步了。

服务端异步执行

服务端异步执行实际上只是为了进步吞吐,其实就是咱们本人开新线程去解决业务,以便开释 dubbo 线程去解决其余申请。

服务端的异步执行也有两种形式能够实现,定义 CompletableFuture 返回类型的接口以及应用 AsyncContext。
第一种 :定义一个 CompletableFuture 返回类型的接口及实现,其实和下面的客户端异步的接口定义一样,只不过服务端实现不一样。
服务端实现

@Service
public class CityServiceImpl implements CityService {
    @Override
    public CompletableFuture<String> getCityId() {return CompletableFuture.supplyAsync(() -> "ni hao");
    }
}

下面用了 CompletableFuture 自带的线程池做异步执行,当然你也能够自定义,比方

@Service
public class CityServiceImpl implements CityService {
    @Override
    public CompletableFuture<String> getCityId() {CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {completableFuture.complete("ni hao");
        }).start();
        return completableFuture;
    }
}

这种形式对于接口的定义有肯定要求,如果你接口还是想返回自定义的类型,则须要应用 AsyncContext
第二种:应用AsyncContext

@Service
public class CityServiceImpl implements CityService {
    @Override
    public String getCityId() {AsyncContext asyncContext = RpcContext.startAsync();
        new Thread(() -> {
            // 转储调用线程的 RpcContext 信息到以后线程,不必则能够正文掉
            //asyncContext.signalContextSwitch();
            asyncContext.write("zhejiang");
        }).start();
        return "";
    }
}

其实 AsyncContext 外部也是用了 CompletableFuture,Dubbo 做了非凡解决来兼容异步操作,前面原理局部会讲到。

原理

基于下面的应用,这里对 Dubbo 的异步编程阐明一下是如何实现的。也从客户端异步调用和服务端异步执行两个方面来别离介绍。

客户端异步调用

  • 定义 CompletableFuture 返回类型形式
    Dubbo 客户端发动调用的时候,实际上是对于接口做了一层代理,最终调用的逻辑在org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke

    从上图能够看到在调用的时候会通过函数 createInvocation 创立一个 RpcInvocation

    在创立的时候会判断接口的返回类型是否是 CompletableFuture,如果是的话就是设置异步调用标记

    最终在 org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke 会进行判断是否须要异步调用,需要的话会返回 CompletableFuture,而后调用端期待监听回调
  • 设置 async 值为 true
    咱们在 Reference 或者 Method 配置中将 async 设置为 true 之后,Dubbo 在启动的时候会将配置注入到对应的 ReferenceBean 中,在客户端发动调用的时候会进行传递,具体设置逻辑在
    org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke

    如果 Method 级别没有配置则取 Reference 级别,前面的逻辑和第一种形式一样,不再赘述。在应用的时候咱们有说到 “ 每次调完办法之后,要立即获取对应的 FutureAdapter“,这里阐明一下起因

    能够看到在发申请之后 Dubbo 会设置对应的 FutureAdapter 到 RpcContext 中,而 RpcContext.getContext()是 ThreadLocal 级别的,也就是如果一个线程调用了两次,第二次就会把第一次的异步后果给笼罩了,所以说咱们每次调用之后都要先把 Future 取回来而后再发动下一次调用。

    能够看到是 ThreadLocal 级别的存储

    服务端异步执行

    说完客户端异步调用的原理之后,这里再讲一下服务端是如何做异步执行的,还是以下面介绍的两种形式别离阐明。

  • 定义一个 CompletableFuture 返回类型的接口
    服务端哪里接管申请,如何解决申请不在这里介绍,前面会有文章剖析,这里间接贴要害代码,看看服务端如何实现异步执行的,看这里
    org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest

    从这段逻辑咱们能够看出服务端在解决申请之后会返回一个 CompletableFuture,而后期待异步执行结束。咱们再看一下图中的 handler.reply 函数

    能够看到对于 AsyncRpcResult 的返回类型会做异步期待,其余的则间接返回值来完结 CompletableFuture,持续跟进这个 invoker.invoke 函数,理论就是 org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke

    能够看到这里有对于返回类型的判断,对于 CompletableFuture 类型的返回,对应 Invocation 中的 future_returntype 值则为 true,最终返回一个对 future 包装的 AsyncRpcResult 后果类,进入到 AsyncRpcResult 类看下

    内部拿到的是新建的 resultFuture,valueFuture 则是服务端业务逻辑异步接口返回的 CompletableFuture,这里能够看到如果咱们业务逻辑异步执行结束,则会触发内部拿到 resultFuture 的设值并完结,以此来完结内部期待
  • 应用 AsyncContext
    再来说一下第二种形式的原理,这种形式据 dubbo 官网介绍是参考的 Servlet 3.0 的异步接口 AsyncContext,咱们来看下怎么实现的,同样看一下第一种形式实现的
    org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke

    这里判断了 rpcContext.isAsyncStarted(),如果启动了则会返回异步的后果,咱们在下面应用中介绍过,在业务代码中首先要调用 AsyncContext asyncContext = RpcContext.startAsync() 来拿到 AsyncContext,先看下这个 startAsync 逻辑

    能够看到这里会获取到以后线程的 RpcContext,而后新建一个 AsyncContext(如果没有的话),而后启动 AsyncContext,看下启动逻辑

    其实就是把状态设置为已启动并且新建一个 CompletableFuture,而这个 Future 就是内部拿到期待业务逻辑异步后果的,在业务逻辑中执行完后失去后果后,咱们须要调用 asyncContext.write(xxx) 对后果进行设置,看下 write 函数

    能够看到用完之后就会把 Context 关掉并且完结 Future 的期待。所以实质上这种形式是利用了 ThreadLocal 的个性来存储 Context 进行传递 Future,再利用 Future 来做线程间的期待唤醒。

    总结

    本篇具体介绍了 Dubbo 客户端异步调用以及服务端异步执行的应用和原理,能够看到 Dubbo 在很多细节上的确实现得很奇妙,这里是基于 2.7.1 版本的源码解析,2.6.x 版本是不反对 CompletableFuture 类型的异步的,对于之后的版本,大略逻辑是差不多的,看起来也就瓜熟蒂落了。

正文完
 0