一、Dubbo 分层整体设计概述
咱们先从下图开始简略介绍 Dubbo 分层设计概念:
(援用自 Duboo 开发指南 - 框架设计文档)
如图形容 Dubbo 实现的 RPC 整体分 10 层:service、config、proxy、registry、cluster、monitor、protocol、exchange、transport、serialize。
service:应用方定义的接口和实现类;
config:负责解析 Dubbo 定义的配置,比方注解和 xml 配置,各种参数;
proxy:次要负责生成消费者和提供者的代理对象,加载框架性能,比方提供者过滤器链,扩大点;
registry:负责注册服务的定义和实现类的装载;
cluster:只有消费者有这么一层,负责包装多个服务提供者成一个‘大提供者’,加载负载平衡、路有等扩大点;
monitor:定义监控服务,加载监控实现提供者;
protocol:封装 RPC 调用接口,治理调用实体的生命周期;
exchange:封装申请响应模式,同步转异步;
transport:形象传输层模型,兼容 netty、mina、grizzly 等通信框架;
serialize:形象序列化模型,兼容多种序列化框架,包含:fastjson、fst、hessian2、kryo、kryo2、protobuf 等,通过序列化反对跨语言的形式,反对跨语言的 rpc 调用;
Dubbo 这么分层的目标在于实现层与层之间的解耦,每一层都定义了接口标准,也能够依据不同的业务需要定制、加载不同的实现,具备极高的扩展性。
1.1. RPC 调用过程
接下来联合上图简略形容一次残缺的 rpc 调用过程:
从 Dubbo 分层的角度看,具体时序图如下,蓝色局部是服务生产端,浅绿色局部是服务提供端,时序图从生产端一次 Dubbo 办法调用开始,到服务端本地办法执行完结。
从 Dubbo 外围畛域对象的角度看,咱们援用 Dubbo 官网文档阐明,如下图所示。Dubbo 外围畛域对象是 Invoker,生产端代理对象是 proxy,包装了 Invoker 的调用;服务端代理对象是一个 Invoker,他通过 exporter 包装,当服务端接管到调用申请后,通过 exporter 找到 Invoker,Invoker 去理论执行用户的业务逻辑。
(援用自 Dubbo 官网文档)
1.2 Dubbo 服务的注册和发现流程
下图出自开发指南 - 框架设计 - 援用服务时序,次要流程是:从注册核心订阅服务提供者,而后启动 tcp 服务连贯远端提供者,将多个服务提供者合并成一个 Invoker,用这个 Invoker 创立代理对象。
下图出自开发指南 - 框架设计 - 裸露服务时序,次要流程是:创立本地服务的代理 Invoker,启动 tcp 服务裸露服务,而后将服务注册到注册核心。
接下来咱们联合 Dubbo 服务的注册和发现,从配置层开始解释每一层的作用和原理。
示例服务接口定义如下:
public interface CouponServiceViewFacade {
/**
* 查问单张优惠券
*/
CouponViewDTO query(String code);
}
二、配置层
2.1. 做什么
配置层提供配置解决工具类,在容器启动的时候,通过 ServiceConfig.export 实例化服务提供者,ReferenceConfig.get 实例化服务消费者对象。
Dubbo 利用应用 spring 容器启动时,Dubbo 服务提供者配置处理器通过 ServiceConfig.export 启动 Dubbo 近程服务裸露本地服务。Dubbo 服务消费者配置处理器通过 ReferenceConfig.get 实例化一个代理对象,并通过注册核心服务发现,连贯远端服务提供者。
Dubbo 配置能够应用注解和 xml 两种模式,本文采纳注解的模式进行阐明。
2.2. 怎么做
2.2.1 服务生产端的解析
Spring 容器启动过程中,填充 bean 属性时,对含有 Dubbo 援用注解的属性应用 org.apache.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor 进行初始化。如下是 ReferenceAnnotationBeanPostProcessor 的构造方法,Dubbo 服务消费者注解处理器解决以下三个注解:DubboReference.class、Reference.class、com.alibaba.dubbo.config.annotation.Reference.class 润饰的类。
ReferenceAnnotationBeanPostProcessor 类定义:
public class ReferenceAnnotationBeanPostProcessor extends AbstractAnnotationBeanPostProcessor implements
ApplicationContextAware {public ReferenceAnnotationBeanPostProcessor() {super(DubboReference.class, Reference.class, com.alibaba.dubbo.config.annotation.Reference.class);
}
}
Dubbo 服务发现到这一层,Dubbo 行将开始构建服务消费者的代理对象,CouponServiceViewFacade 接口的代理实现类。
2.2.2 服务提供端的解析
Spring 容器启动的时候,加载注解 @org.apache.dubbo.config.spring.context.annotation.DubboComponentScan 指定范畴的类,并初始化;初始化应用 dubbo 实现的扩大点 org.apache.dubbo.config.spring.beans.factory.annotation.ServiceClassPostProcessor。
ServiceClassPostProcessor 解决的注解类有 DubboService.class,Service.class,com.alibaba.dubbo.config.annotation.Service.class。
如下是 ServiceClassPostProcessor 类定义:
public class ServiceClassPostProcessor implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,
ResourceLoaderAware, BeanClassLoaderAware {
private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(DubboService.class,Service.class,com.alibaba.dubbo.config.annotation.Service.class);。。。}
期待 Spring 容器 ContextRefreshedEvent 事件,启动 Dubbo 应用服务监听端口,裸露本地服务。
Dubbo 服务注册到这一层,Dubbo 行将开始构建服务提供者的代理对象,CouponServiceViewFacade 实现类的反射代理类。
三、代理层
3.1 做什么
为服务消费者生成代理实现实例,为服务提供者生成反射代理实例。
CouponServiceViewFacade 的代理实现实例,生产端在调用 query 办法的时候,实际上是调用代理实现实例的 query 办法,通过他调用近程服务。
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.dubbo.common.bytecode;
public class proxy1 implements DC, Destroyable, CouponServiceViewFacade, EchoService {public static Method[] methods;
private InvocationHandler handler;
public proxy1(InvocationHandler var1) {this.handler = var1;}
public proxy1() {}
public CouponViewDTO query(String var1) {Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[0], var2);
return (CouponViewDTO)var3;
}
}
CouponServiceViewFacade 的反射代理实例,服务端接管到申请后,通过该实例的 Invoke 办法最终执行本地办法 query。
/**
* InvokerWrapper
*/
public class AbstractProxyInvoker<CouponServiceViewFacade> implements Invoker<CouponServiceViewFacade> {
//。。。public AbstractProxyInvoker(CouponServiceViewFacade proxy, Class<CouponServiceViewFacade> type, URL url) {
//。。。this.proxy = proxy;
this.type = type;
this.url = url;
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
//。。。Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
//。。。}
protected Object doInvoke(CouponServiceViewFacade proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable{
//。。。return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
}
3.2 怎么做
Dubbo 代理工厂接口定义如下,定义了服务提供者和服务消费者的代理对象工厂办法。服务提供者代理对象和服务消费者代理对象都是通过工厂办法创立,工厂实现类能够通过 SPI 自定义扩大。
@SPI("javassist")
public interface ProxyFactory {
// 生成服务消费者代理对象
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
// 生成服务消费者代理对象
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
// 生成服务提供者代理对象
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
3.2.1 服务消费者
3.2.1.1 创立服务消费者代理类
默认采纳 Javaassist 代理工厂实现,Proxy.getProxy(interfaces)创立代理工厂类,newInstance 创立具体代理对象。
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}。。。}
3.2.1.2 服务消费者代理
Dubbo 为每个服务消费者生成两个代理类:代理工厂类,接口代理类。
CouponServiceViewFacade 代理工厂类:
public class Proxy1 extends Proxy implements DC {public Proxy1() { }
public Object newInstance(InvocationHandler var1) {return new proxy1(var1);
}
}
最终生成的 CouponServiceViewFacade 的代理对象 如下,其中 handler 的实现类是 InvokerInvocationHandler,this.handler.invoke 办法发动 Dubbo 调用。
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.dubbo.common.bytecode;
public class proxy1 implements DC, Destroyable, CouponServiceViewFacade, EchoService {public static Method[] methods;
private InvocationHandler handler;
public proxy1(InvocationHandler var1) {this.handler = var1;}
public proxy1() {}
public CouponViewDTO query(String var1) {Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[0], var2);
return (CouponViewDTO)var3;
}
}
3.2.2 服务提供者
3.2.2.1 创立服务提供者代理类
默认 Javaassist 代理工厂实现,应用 Wrapper 包装本地服务提供者。proxy 是理论的服务提供者实例,即 CouponServiceViewFacade 的本地实现类,type 是接口类定义,URL 是 injvm 协定 URL。
public class JavassistProxyFactory extends AbstractProxyFactory {。。。@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 代理包装类,包装了本地的服务提供者
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 代理类入口
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
3.2.2.2 Wrapper 包装类
Dubbo 为每个服务提供者的本地实现生成一个 Wrapper 代理类,形象 Wrapper 类定义如下:
public abstract class Wrapper {。。。abstract public Object invokeMethod(Object instance, String mn, Class<?>[] types, Object[] args) throws NoSuchMethodException, InvocationTargetException;
}
具体 Wrapper 代理类应用字节码技术动静生成,本地服务 CouponServiceViewFacade 的代理包装类举例:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.dubbo.common.bytecode;
import com.xxx.CouponServiceViewFacade;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import org.apache.dubbo.common.bytecode.ClassGenerator.DC;
public class Wrapper25 extends Wrapper implements DC {。。。public Wrapper25() {}
public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
CouponServiceViewFacade var5;
try {var5 = (CouponServiceViewFacade)var1;
} catch (Throwable var8) {throw new IllegalArgumentException(var8);
}
try {if ("query".equals(var2) && var3.length == 1) {return var5.query((String)var4[0]);
}
} catch (Throwable var9) {throw new InvocationTargetException(var9);
}
throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.xxx.CouponServiceViewFacade.");
}。。。}
在服务初始化流程中,服务消费者代理对象生成后初始化就实现了,服务生产端的初始化程序:ReferenceConfig.get-> 从注册核心订阅服务 -> 启动客户端 -> 创立 DubboInvoker-> 构建 ClusterInvoker→创立服务代理对象;
而服务提供端的初始化才刚开始,服务提供端的初始化程序:ServiceConfig.export-> 创立 AbstractProxyInvoker,通过 Injvm 协定关联本地服务 -> 启动服务端→注册服务到注册核心。
接下来咱们讲注册层。
四、注册层
4.1 做什么
封装服务地址的注册与发现,以服务 URL 为配置核心。服务提供者本地服务启动胜利后,监听 Dubbo 端口胜利后,通过注册协定公布到注册核心;服务消费者通过注册协定订阅服务,启动本地利用连贯近程服务。
注册协定 URL 举例:
zookeeper://xxx/org.apache.dubbo.registry.RegistryService?application=xxx&…
4.2 怎么做
注册服务工厂接口定义如下,注册服务实现通过 SPI 扩大,默认是 zk 作为注册核心。
@SPI("dubbo")
public interface RegistryFactory {@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
注册服务接口定义;
public interface RegistryService {void register(URL url);
void unregister(URL url);
void subscribe(URL url, NotifyListener listener);
void unsubscribe(URL url, NotifyListener listener);
List<URL> lookup(URL url);
}
五、集群层
5.1 做什么
服务生产方从注册核心订阅服务提供者后,将多个提供者包装成一个提供者,并且封装路由及负载平衡策略;并桥接注册核心,以 Invoker 为核心,扩大接口为 Cluster, Directory, Router, LoadBalance;
服务提供端不存在集群层。
5.2 怎么做
5.2.1 Cluster
集群畛域次要负责将多个服务提供者包装成一个 ClusterInvoker,注入路由处理器链和负载平衡策略。次要策略有:failover、failfast、failsafe、failback、forking、available、mergeable、broadcast、zone-aware。
集群接口定义如下,只有一个办法:从服务目录中的多个服务提供者构建一个 ClusterInvoker。
作用是对下层 - 代理层屏蔽集群层的逻辑;代理层调用服务办法只需执行 Invoker.invoke,而后通过 ClusterInvoker 外部的路由策略和负载平衡策略计算具体执行哪个远端服务提供者。
@SPI(Cluster.DEFAULT)
public interface Cluster {
String DEFAULT = FailoverCluster.NAME;
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;。。。}
ClusterInvoker 执行逻辑,先路由策略过滤,而后负载平衡策略抉择最终的远端服务提供者。示例代理如下:
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {。。。@Override
public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// 集群 invoker 执行时,先应用路由链过滤服务提供者
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}。。。}
5.2.2 Directory
服务目录接口定义如下,Dubbo 办法接口调用时,将办法信息包装成 invocation,通过 Directory.list 过滤可执行的远端服务。
通过 org.apache.dubbo.registry.integration.RegistryDirectory 桥接注册核心,监听注册核心的路由配置批改、服务治理等事件。
public interface Directory<T> extends Node {Class<T> getInterface();
List<Invoker<T>> list(Invocation invocation) throws RpcException;
List<Invoker<T>> getAllInvokers();
URL getConsumerUrl();}
5.2.3 Router
从已知的所有服务提供者中依据路由规定刷选服务提供者。
服务订阅的时候初始化路由处理器链,调用近程服务的时候先应用路由链过滤服务提供者,再通过负载平衡抉择具体的服务节点。
路由处理器链工具类,提供路由筛选服务,监听更新服务提供者。
public class RouterChain<T> {。。。public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = invokers;
for (Router router : routers) {finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}
/**
* Notify router chain of the initial addresses from registry at the first time.
* Notify whenever addresses in registry change.
*/
public void setInvokers(List<Invoker<T>> invokers) {
// 路由链监听更新服务提供者
this.invokers = (invokers == null ? Collections.emptyList() : invokers);
routers.forEach(router -> router.notify(this.invokers));
}
}
订阅服务的时候,将路由链注入到 RegistryDirectory 中;
public class RegistryProtocol implements Protocol {。。。private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {。。。// 服务目录初始化路由链
directory.buildRouterChain(subscribeUrl);
directory.subscribe(toSubscribeUrl(subscribeUrl));。。。return registryInvokerWrapper;
}。。。}
5.2.4 LoadBalance
依据不同的负载平衡策略从可应用的远端服务实例中抉择一个,负责平衡接口定义如下:
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
六、监控层
6.1 做什么
监控 RPC 调用次数和调用工夫,以 Statistics 为核心,扩大接口为 MonitorFactory, Monitor, MonitorService。
6.2 怎么做
监控工厂接口定义,通过 SPI 形式进行扩大;
@SPI("dubbo")
public interface MonitorFactory {@Adaptive("protocol")
Monitor getMonitor(URL url);
}
@Adaptive("protocol")
Monitor getMonitor(URL url);
监控服务接口定义如下,定义了一些默认的监控维度和指标项;
public interface MonitorService {
// 监控维度
String APPLICATION = "application";
String INTERFACE = "interface";
String METHOD = "method";
String GROUP = "group";
String VERSION = "version";
String CONSUMER = "consumer";
String PROVIDER = "provider";
String TIMESTAMP = "timestamp";
// 监控指标项
String SUCCESS = "success";
String FAILURE = "failure";
String INPUT = INPUT_KEY;
String OUTPUT = OUTPUT_KEY;
String ELAPSED = "elapsed";
String CONCURRENT = "concurrent";
String MAX_INPUT = "max.input";
String MAX_OUTPUT = "max.output";
String MAX_ELAPSED = "max.elapsed";
String MAX_CONCURRENT = "max.concurrent";
void collect(URL statistics);
List<URL> lookup(URL query);
}
6.2.1 MonitorFilter
通过过滤器的形式收集服务的调用次数和调用工夫,默认实现:
org.apache.dubbo.monitor.dubbo.DubboMonitor。
七、协定层
7.1 做什么
封装 RPC 调用,以 Invocation, Result 为核心,扩大接口为 Protocol, Invoker, Exporter。
接下来介绍 Dubbo RPC 过程中的罕用概念:
1)Invocation 是申请会话畛域模型,每次申请有相应的 Invocation 实例,负责包装 dubbo 办法信息为申请参数;
2)Result 是申请后果畛域模型,每次申请都有相应的 Result 实例,负责包装 dubbo 办法响应;
3)Invoker 是实体域,代表一个可执行实体,有本地、近程、集群三类;
4)Exporter 服务提供者 Invoker 治理实体;
5)Protocol 是服务域,治理 Invoker 的生命周期,提供服务的裸露和援用入口;
服务初始化流程中,从这一层开始进行近程服务的裸露和连贯援用。
对于 CouponServiceViewFacade 服务来说,服务提供端会监听 Dubbo 端口启动 tcp 服务;服务生产端通过注册核心发现服务提供者信息,启动 tcp 服务连贯远端提供者。
7.2 怎么做
协定接口定义如下,对立形象了不同协定的服务裸露和援用模型,比方 InjvmProtocol 只需将 Exporter,Invoker 关联本地实现。DubboProtocol 裸露服务的时候,须要监控本地端口启动服务;援用服务的时候,须要连贯远端服务。
@SPI("dubbo")
public interface Protocol {int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
default List<ProtocolServer> getServers() {return Collections.emptyList();
}
}
Invoker 接口定义
Invocation 是 RPC 调用的会话对象,负责包装申请参数;Result 是 RPC 调用的后果对象,负责包装 RPC 调用的后果对象,包含异样类信息;
public interface Invoker<T> extends Node {Class<T> getInterface();
Result invoke(Invocation invocation) throws RpcException;
}
7.2.1 服务的裸露和援用
服务裸露的时候,开启 RPC 服务端;援用服务的时候,开启 RPC 客户端。
public class DubboProtocol extends AbstractProtocol {。。。@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {。。。// 开启 rpc 服务端
openServer(url);
optimizeSerialization(url);
return exporter;
}
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {optimizeSerialization(url);
// 创立 dubbo invoker, 开启 rpc 客户端
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}。。。}
7.2.2 服务端响应申请
接管响应申请;
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {。。。Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
// 调用本地服务
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}。。。};
7.2.3 客户端发送申请
调用近程服务;
public class DubboInvoker<T> extends AbstractInvoker<T> {。。。@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {。。。boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
}
}
八、替换层
8.1 做什么
封装申请响应模式,同步转异步,以 Request, Response 为核心,扩大接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。
应用 request 包装 Invocation 作为残缺的申请对象,应用 response 包装 result 作为残缺的响应对象;Request、Response 相比 Invocation、Result 增加了 Dubbo 的协定头。
8.2 怎么做
交换器对象接口定义,定义了近程服务的绑定和连贯,应用 SPI 形式进行扩大;
@SPI(HeaderExchanger.NAME)
public interface Exchanger {@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
@Adaptive({Constants.EXCHANGER_KEY})
ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
替换层模型类图:
8.2.1 服务提供者
服务提供端接管到申请后,本地执行,发送响应后果;
public class HeaderExchangeHandler implements ChannelHandlerDelegate {。。。void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
// 封装响应
Response res = new Response(req.getId(), req.getVersion());。。。Object msg = req.getData();
try {CompletionStage<Object> future = handler.reply(channel, msg);
future.whenComplete((appResult, t) -> {
try {if (t == null) {res.setStatus(Response.OK);
res.setResult(appResult);
} else {res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {logger.warn("Send result to consumer failed, channel is" + channel + ", msg is" + e);
}
});
} catch (Throwable e) {res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}。。。}
8.2.2 服务消费者
服务生产端发动申请的封装,办法执行胜利后,返回一个 future;
final class HeaderExchangeChannel implements ExchangeChannel {。。。// 封装申请实体
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {。。。// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
//RpcInvocation
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {channel.send(req);
} catch (RemotingException e) {future.cancel();
throw e;
}
return future;
}。。。}
九、传输层
9.1 做什么
形象传输层模型,兼容 netty、mina、grizzly 等通信框架。
9.2 怎么做
传输器接口定义如下, 它与交换器 Exchanger 接口定义类似,区别在于 Exchanger 是围绕 Dubbo 的 Request 和 Response 封装的操作门面接口,而 Transporter 更加的底层,Exchanger 用于隔离 Dubbo 协定层和通信层。
@SPI("netty")
public interface Transporter {@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
自定义传输层模型
通过 SPI 的形式,动静抉择具体的传输框架,默认是 netty;
public class Transporters {。。。public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {。。。return getTransporter().bind(url, handler);
}
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {。。。return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}
}
netty 框架的 channel 适配如下,采纳装璜模式,应用 netty 框架的 channel 作为 Dubbo 自定义的 channel 做实现;
final class NettyChannel extends AbstractChannel {private NettyChannel(Channel channel, URL url, ChannelHandler handler) {super(url, handler);
if (channel == null) {throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
}
十、序列化
10.1 做什么
形象序列化模型,兼容多种序列化框架,包含:fastjson、fst、hessian2、kryo、kryo2、protobuf 等,通过序列化反对跨语言的形式,反对跨语言的 RPC 调用。
10.2 怎么做
定义 Serialization 扩大点,默认 hessian2,反对跨语言。Serialization 接口理论是一个工厂接口,通过 SPI 扩大;理论序列化和反序列化工作由 ObjectOutput,ObjectInput 实现,通过装璜模式让 hessian2 实现理论工作。
@SPI("hessian2")
public interface Serialization {byte getContentTypeId();
String getContentType();
@Adaptive
ObjectOutput serialize(URL url, OutputStream output) throws IOException;
@Adaptive
ObjectInput deserialize(URL url, InputStream input) throws IOException;
}
10.2.1 通信协定设计
下图出自开发指南 - 实现细节 - 近程通信细节,形容 Dubbo 协定头设计;
- 0-15bit 示意 Dubbo 协定魔法数字,值:0xdabb;
- 16bit 申请响应标记,Request – 1; Response – 0;
- 17bit 申请模式标记,只有申请音讯才会有,1 示意须要服务端返回响应;
- 18bit 是事件音讯标记,1 示意该音讯是事件音讯,比方心跳音讯;
- 19-23bit 是序列化类型标记,hessian 序列化 id 是 2,fastjson 是 6,详见 org.apache.dubbo.common.serialize.Constants;
- 24-31bit 示意状态,只有响应音讯才有用;
- 32-64bit 是 RPC 申请 ID;
- 96-128bit 是会话数据长度;
- 128 是音讯体字节序列;
十一、总结
Dubbo 将 RPC 整个过程分成外围的代理层、注册层、集群层、协定层、传输层等,层与层之间的职责边界明确;核心层都通过接口定义,不依赖具体实现,这些接口串联起来造成了 Dubbo 的骨架;这个骨架也能够看作是 Dubbo 的内核,内核应用 SPI 机制加载插件(扩大点),达到高度可扩大。
vivo 互联网服务器团队 -Wang Genfu