共计 16332 个字符,预计需要花费 41 分钟才能阅读完成。
1、暴露服务到远程
上一篇文章分析了暴露服务到本地,6、Dubbo 的服务导出 1 之导出到本地。接下来分析暴露服务到远程。
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (registryURLs != null && !registryURLs.isEmpty()) {for (URL registryURL : registryURLs) {
// 添加动态参数, 此动态参数是决定 Zookeeper 创建临时节点还是持久节点
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY,
registryURL.getParameter(Constants.DYNAMIC_KEY));
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// 步骤 1) 创建 Invoker, 这里创建 Invoker 逻辑和上面一样
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker =
new DelegateProviderMetaDataInvoker(invoker, this);
// 步骤 2) 暴露服务
Exporter<?> exporter = ServiceConfig.protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
/**
* 下面分析步骤 2, 该方法两大核心逻辑, 导出服务和注册服务, 服务注册下篇文章分析
*/
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 1. 导出服务,export invoker, 本篇文章仅分析第一步导出服务到远程
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// zookeeper://10.101.99.127:2181/com.alibaba.dubbo.registry.RegistryService
// ?application=demo-provider&dubbo=2.0.2
URL registryUrl = getRegistryUrl(originInvoker);
// registry provider, 默认返回 ZookeeperRegistry 实例
final Registry registry = getRegistry(originInvoker);
// dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true
// &application=demo-provider&default.server=netty4&dubbo=2.0.2&generic=false
// &interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=8140&side=provider
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
// 不配置的话默认返回 true
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
// 2. 注册服务, 这篇文章已经比较长了, 决定将步骤 2 和步骤 3 新起一篇文章分析, 服务暴露之后需要注册到注册中心
if (register) {register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// 3. 数据更新订阅
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener =
new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
return new DestroyableExporter<T>(exporter,
originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {synchronized (bounds) {exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker,
getProviderUrl(originInvoker));
// 调用 protocol 的 export 方法导出服务, 默认是采用 Dubbo 协议, 对应 DubboProtocol 的 export 方法
// 但是这里 protocol.export() 并不是先走 DubboProtocol 的 export 方法, 而是先走
// ProtocolListenerWrapper 的 wrapper 方法
// 因为 ProtocolListenerWrapper 对 DubboProtocol 做了一层包装, 具体参考
// https://segmentfault.com/a/1190000020387196, 核心方法 protocal.export()
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
/**
* 上述核心方法 protocol.export() 会先走到 ProtocolListenerWrapper 的 export 方法, 该方法是在服务暴露上做了
监听器功能的增强, 也就是加上了监听器
*/
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 如果是注册中心, 则暴露该 invoker
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);
}
// 创建一个暴露者监听器包装类对象, 暴露服务时这里的 protocol 是 ProtocolFilterWrapper, 这里用到了
// Wrapper 包装原有的 DubboProtocol, 可以参考 https://segmentfault.com/a/1190000020387196
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
/**
* ProtocolFilterWrapper 的 export 方法, 该方法是在服务暴露上做了过滤器链的增强, 也就是加上了过滤器
*/
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 如果是注册中心, 则直接暴露服务
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {return protocol.export(invoker);
}
// 服务提供侧暴露服务, 这里通过 buildInvokerChain 形成了过滤器链
return protocol.export(buildInvokerChain(invoker,
Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
/**
* 该方法就是创建带 Filter 链的 Invoker 对象, 倒序的把每一个过滤器串连起来, 形成一个 invoker
*/
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 获得过滤器的所有扩展实现类实例集合
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).
getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
// 从最后一个过滤器开始循环, 创建一个带有过滤器链的 invoker 对象
for (int i = filters.size() - 1; i >= 0; i--) {final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {return invoker.getInterface();
}
@Override
public URL getUrl() {return invoker.getUrl();
}
@Override
public boolean isAvailable() {return invoker.isAvailable();
}
// 关键在这里, 调用下一个 filter 代表的 invoker, 把每一个过滤器串起来
@Override
public Result invoke(Invocation invocation) throws RpcException {return filter.invoke(next, invocation);
}
@Override
public void destroy() {invoker.destroy();
}
@Override
public String toString() {return invoker.toString();
}
};
}
}
return last;
}
// 经过两个 Wrapper 的 export 方法包装之后, 走到 DubboProtocol 的 export 方法, 这里是核心方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// url 形如 dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true
// &application=demo-provider&bind.ip=172.22.213.93&bind.port=20880&dubbo=2.0.2&generic=false
// /&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=648&qos.port=22222
// &side=provider×tamp=1569585915258
URL url = invoker.getUrl();
// 获取服务标识, 理解成服务坐标也行, 由服务组名, 服务名, 服务版本号以及端口组成,key 形如
// com.alibaba.dubbo.demo.DemoService:20880
String key = serviceKey(url);
// 创建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 将 <key, exporter> 键值对放入缓存中
exporterMap.put(key, exporter);
// 本地存根相关代码, export an stub service for dispatching event
// 删除, 暂时还没有分析本地存根相关
// 启动服务器, 重点关注这里
openServer(url);
optimizeSerialization(url);
return exporter;
}
// 根据 URL 值可以猜测,openServer 方法就是启动 Netty 服务器, 在 172.22.213.93:20880 端口上监听调用请求
openServer(url);
/**
* 在同一台机器上 (单网卡), 同一个端口上仅允许启动一个服务器实例, 若某个端口上已有服务器实例, 此时则调用 reset 方法
重置服务器的一些配置
*/
private void openServer(URL url) {
// 获取 host:port, 并将其作为服务器实例的 key, 用于标识当前的服务器实例,key 形如 172.22.213.93:20880
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {ExchangeServer server = serverMap.get(key);
if (server == null) {
// 创建服务器实例,put 之后 serverMap 形如 <172.22.213.93:20880, HeaderExchangeServer>
serverMap.put(key, createServer(url));
} else {
// 服务器已创建, 则根据 url 中的配置重置服务器
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
Boolean.TRUE.toString());
// 添加心跳检测配置到 URL 中,enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,
String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 获取 server 参数, 默认为 netty, 这里配置成了 netty4,str 就为 netty4
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展, 不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.
getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type:" + str + ", url:" + url);
// 添加编码解码器参数
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 创建 ExchangeServer, 核心方法
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {throw new RpcException("Fail to start server(url:" + url + ")" + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.
getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type:" + str);
}
}
return server;
}
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
// 默认 type 就是 header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
// 创建 HeadExchanger
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 用传输层绑定返回的 server 创建对应的信息交换服务端
// 这里也是分成两步, 下面先分析 bind 方法, 该方法就是开启 Netty4 服务器监听请求
// 1) bind 方法
// 2) new HeaderExchangeServer(Server server)
return new HeaderExchangeServer(Transporters.bind(url,
new DecodeHandler(new HeaderExchangeHandler(handler))));
}
// 步骤 1)bind 方法
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
ChannelHandler handler;
if (handlers.length == 1) {handler = handlers[0];
} else {handler = new ChannelHandlerDispatcher(handlers);
}
// 获取自适应 Transporter 实例, 并调用实例方法. getTransporter() 方法获取的 Transporter 是在运行时动态创建的,
// 类名为 TransporterAdaptive, 也就是自适应拓展类.TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载
// 什么类型的 Transporter, 这里我们配置成了 netty4 的 NettyTransporter
// String string = url.getParameter("server", url.getParameter("transporter", "netty"));
// transporter = ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(string);
return getTransporter().bind(url, handler);
}
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
// 创建一个 NettyServer
return new NettyServer(url, listener);
}
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 调用父类构造方法, 这里的 wrap 方法返回的是
// MultiMessageHandler->HeartbeatHandler->AllDispatcherHandler->DecodeHandler->HeaderExchangeHandler
// -> 表示前一个 handler 里面包装了下一个 handler
super(url, ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
// 包装了 MultiMessageHandler 功能, 增加了多消息处理的功能, 以及对心跳消息做了功能增强
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 调用了多消息处理器, 对心跳消息进行了功能加强
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// url 形如 dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true
// &application=demo-provider&bind.ip=172.22.213.93&bind.port=20880&channel.readonly.sent=true
// &codec=dubbo&default.server=netty4&dubbo=2.0.2&generic=false&heartbeat=60000
// &interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=6900&qos.port=22222
// &side=provider×tamp=1569633535398
// handler 是 MultiMessageHandler 实例
super(url, handler);
// 从 url 中获得本地地址,/172.22.213.93:20880
localAddress = getUrl().toInetSocketAddress();
// 从 url 配置中获得绑定的 ip, 本机 IP 地址 172.22.213.93
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
// 从 url 配置中获得绑定的端口号,20880
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
// 判断 url 中配置 anyhost 是否为 true 或者判断 host 是否为不可用的本地 Host,url 中配置了 anyhost 为 true
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {bindIp = NetUtils.ANYHOST;}
// /0.0.0.0:20880
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 从 url 中获取配置, 默认值为 0
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
// 从 url 中获取配置, 默认 600s
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
// 开启服务器
doOpen();}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY,
Integer.toString(url.getPort()));
}
/**
* 该类是端点的抽象类, 其中封装了编解码器以及两个超时时间.
* 基于 dubbo 的 SPI 机制, 获得相应的编解码器实现对象, 编解码器优先从 Codec2 的扩展类中寻找
*/
public AbstractEndpoint(URL url, ChannelHandler handler) {super(url, handler);
this.codec = getChannelCodec(url);
// 优先从 url 配置中取, 如果没有, 默认为 1s
this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 优先从 url 配置中取, 如果没有, 默认为 3s
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY,
Constants.DEFAULT_CONNECT_TIMEOUT);
}
// 该方法是创建服务器, 并且开启
@Override
protected void doOpen() throws Throwable {
// 创建服务引导类
bootstrap = new ServerBootstrap();
// 创建 boss 线程组
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
// 创建 worker 线程组
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
// 创建服务器处理器
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
// 获得通道集合
channels = nettyServerHandler.getChannels();
// 设置 eventLoopGroup 还有可选项
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 编解码器
NettyCodecAdapter adapter =
new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
// 增加责任链
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
// 加入了 NettyServerHandler, 后续消息处理应该就是通过这个来处理, 猜测,TODO
.addLast("handler", nettyServerHandler);
}
});
// bind 绑定, 这里 bind 完成之后 Netty 服务器就启动了, 监听 20880 端口上的请求, 有兴趣可以研究下 Netty 的源码
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
// 等待绑定完成
channelFuture.syncUninterruptibly();
// 设置通道
channel = channelFuture.channel();}
// 步骤 2)new HeaderExchangeServer(Server server)
// 构造函数就是对属性的设置, 心跳的机制以及默认值都跟 HeaderExchangeClient 中的一模一样
public HeaderExchangeServer(Server server) {if (server == null) {throw new IllegalArgumentException("server == null");
}
this.server = server;
// 获得心跳周期配置, 如果没有配置, 默认设置为 0
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
// 获得心跳超时配置, 默认是心跳周期的三倍
this.heartbeatTimeout =
server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
// 如果心跳超时时间小于心跳周期的两倍, 则抛出异常
if (heartbeatTimeout < heartbeat * 2) {throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
// 开始心跳
startHeartbeatTimer();}
/**
* 该方法是开始心跳, 跟 HeaderExchangeClient 类中的开始心跳方法唯一区别是获得的通道不一样,
* 客户端跟通道是一一对应的, 所有只要对一个通道进行心跳检测, 而服务端跟通道是一对多的关系,
* 所有需要对该服务器连接的所有通道进行心跳检测
*/
private void startHeartbeatTimer() {
// 先停止现有的心跳检测
stopHeartbeatTimer();
if (heartbeat > 0) {
// 创建心跳定时器
heartbeatTimer = scheduled.scheduleWithFixedDelay(new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
// 返回一个不可修改的连接该服务器的信息交换通道集合
return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
/**
* 该类实现了 Runnable 接口, 实现的是心跳任务, 里面包含了核心的心跳策略
*/
final class HeartBeatTask implements Runnable {
// 通道管理
private ChannelProvider channelProvider;
// 心跳间隔, 单位:ms
private int heartbeat;
// 心跳超时时间, 单位:ms
private int heartbeatTimeout;
HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
this.channelProvider = provider;
this.heartbeat = heartbeat;
this.heartbeatTimeout = heartbeatTimeout;
}
/**
* 该方法中是心跳机制的核心逻辑, 注意以下几个点:*
* 如果需要心跳的通道本身如果关闭了, 那么跳过, 不添加心跳机制.
* 无论是接收消息还是发送消息, 只要超过了设置的心跳间隔, 就发送心跳消息来测试是否断开
* 如果最后一次接收到消息到到现在已经超过了心跳超时时间, 那就认定对方的确断开, 分两种情况来处理对方断开的情况.
* 分别是服务端断开, 客户端重连以及客户端断开, 服务端断开这个客户端的连接. 这里要好好品味一下谁是发送方,
* 谁在等谁的响应, 苦苦没有等到.
*/
@Override
public void run() {
try {long now = System.currentTimeMillis();
// 遍历所有通道
for (Channel channel : channelProvider.getChannels()) {
// 如果通道关闭了, 则跳过
if (channel.isClosed()) {continue;}
try {
// 最后一次接收到消息的时间戳
Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
// 最后一次发送消息的时间戳
Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
// 如果最后一次接收或者发送消息到时间到现在的时间间隔超过了心跳间隔时间
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
// 创建一个 request, 设置版本号, 设置需要得到响应
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
// 设置事件类型, 为心跳事件
req.setEvent(Request.HEARTBEAT_EVENT);
// 发送心跳请求
channel.send(req);
}
// 如果最后一次接收消息的时间到现在已经超过了超时时间
if (lastRead != null && now - lastRead > heartbeatTimeout) {
// 如果该通道是客户端, 也就是请求的服务器挂掉了, 客户端尝试重连服务器
if (channel instanceof Client) {
try {
// 重新连接服务器
((Client) channel).reconnect();} catch (Exception e) {//do nothing}
} else {
// 如果不是客户端, 也就是是服务端返回响应给客户端, 但是客户端挂掉了,
// 则服务端关闭客户端连接
channel.close();}
}
}
}
}
}
interface ChannelProvider {
// 获得所有的通道集合, 需要心跳的通道数组
Collection<Channel> getChannels();}
}
正文完