共计 19026 个字符,预计需要花费 48 分钟才能阅读完成。
通用调用
java 从零开始手写 RPC (01) 基于 socket 实现
java 从零开始手写 RPC (02)-netty4 实现客户端和服务端
java 从零开始手写 RPC (03) 如何实现客户端调用服务端?
java 从零开始手写 RPC (04) - 序列化
上一篇咱们介绍了,如何实现基于反射的通用服务端。
这一节咱们来一起学习下如何实现通用客户端。
因为内容较多,所以拆分为 2 个局部。
基本思路
所有的办法调用,基于反射进行相干解决实现。
外围类
为了便于拓展,咱们把外围类调整如下:
package com.github.houbb.rpc.client.core;
import com.github.houbb.heaven.annotation.ThreadSafe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.context.RpcClientContext;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import com.github.houbb.rpc.common.constant.RpcConstant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* <p> rpc 客户端 </p>
*
* <pre> Created: 2019/10/16 11:21 下午 </pre>
* <pre> Project: rpc </pre>
*
* @author houbinbin
* @since 0.0.2
*/
@ThreadSafe
public class RpcClient {private static final Log log = LogFactory.getLog(RpcClient.class);
/**
* 地址信息
* @since 0.0.6
*/
private final String address;
/**
* 监听端口号
* @since 0.0.6
*/
private final int port;
/**
* 客户端解决 handler
* 作用:用于获取申请信息
* @since 0.0.4
*/
private final ChannelHandler channelHandler;
public RpcClient(final RpcClientContext clientContext) {this.address = clientContext.address();
this.port = clientContext.port();
this.channelHandler = clientContext.channelHandler();}
/**
* 进行连贯
* @since 0.0.6
*/
public ChannelFuture connect() {
// 启动服务端
log.info("RPC 服务开始启动客户端");
EventLoopGroup workerGroup = new NioEventLoopGroup();
/**
* channel future 信息
* 作用:用于写入申请信息
* @since 0.0.6
*/
ChannelFuture channelFuture;
try {Bootstrap bootstrap = new Bootstrap();
channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {ch.pipeline()
// 解码 bytes=>resp
.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
// request=>bytes
.addLast(new ObjectEncoder())
// 日志输入
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(channelHandler);
}
})
.connect(address, port)
.syncUninterruptibly();
log.info("RPC 服务启动客户端实现,监听地址 {}:{}", address, port);
} catch (Exception e) {log.error("RPC 客户端遇到异样", e);
throw new RuntimeException(e);
}
// 不要敞开线程池!!!return channelFuture;
}
}
能够灵便指定对应的服务端地址、端口信息。
ChannelHandler 作为解决参数传入。
ObjectDecoder、ObjectEncoder、LoggingHandler 都和服务端相似,是 netty 的内置实现。
RpcClientHandler
客户端的 handler 实现如下:
/*
* Copyright (c) 2019. houbinbin Inc.
* rpc All rights reserved.
*/
package com.github.houbb.rpc.client.handler;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* <p> 客户端解决类 </p>
*
* <pre> Created: 2019/10/16 11:30 下午 </pre>
* <pre> Project: rpc </pre>
*
* @author houbinbin
* @since 0.0.2
*/
public class RpcClientHandler extends SimpleChannelInboundHandler {private static final Log log = LogFactory.getLog(RpcClient.class);
/**
* 调用服务治理类
*
* @since 0.0.6
*/
private final InvokeService invokeService;
public RpcClientHandler(InvokeService invokeService) {this.invokeService = invokeService;}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {RpcResponse rpcResponse = (RpcResponse)msg;
invokeService.addResponse(rpcResponse.seqId(), rpcResponse);
log.info("[Client] response is :{}", rpcResponse);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 每次用完要敞开,不然拿不到 response,我也不晓得为啥(目测得理解 netty 才行)// 集体了解:如果不敞开,则永远会被阻塞。ctx.flush();
ctx.close();}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();
ctx.close();}
}
只有 channelRead0 做了调整,基于 InvokeService 对后果进行解决。
InvokeService
接口
package com.github.houbb.rpc.client.invoke;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
/**
* 调用服务接口
* @author binbin.hou
* @since 0.0.6
*/
public interface InvokeService {
/**
* 增加申请信息
* @param seqId 序列号
* @return this
* @since 0.0.6
*/
InvokeService addRequest(final String seqId);
/**
* 放入后果
* @param seqId 惟一标识
* @param rpcResponse 响应后果
* @return this
* @since 0.0.6
*/
InvokeService addResponse(final String seqId, final RpcResponse rpcResponse);
/**
* 获取标记信息对应的后果
* @param seqId 序列号
* @return 后果
* @since 0.0.6
*/
RpcResponse getResponse(final String seqId);
}
次要是对入参、出参的设置,以及出参的获取。
实现
package com.github.houbb.rpc.client.invoke.impl;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.common.exception.RpcRuntimeException;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 调用服务接口
* @author binbin.hou
* @since 0.0.6
*/
public class DefaultInvokeService implements InvokeService {private static final Log LOG = LogFactory.getLog(DefaultInvokeService.class);
/**
* 申请序列号汇合
*(1)这里前期如果要增加超时检测,能够增加对应的超时工夫。* 能够把这里调整为 map
* @since 0.0.6
*/
private final Set<String> requestSet;
/**
* 响应后果
* @since 0.0.6
*/
private final ConcurrentHashMap<String, RpcResponse> responseMap;
public DefaultInvokeService() {requestSet = Guavas.newHashSet();
responseMap = new ConcurrentHashMap<>();}
@Override
public InvokeService addRequest(String seqId) {LOG.info("[Client] start add request for seqId: {}", seqId);
requestSet.add(seqId);
return this;
}
@Override
public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {
// 这里放入之前,能够增加判断。// 如果 seqId 必须解决申请汇合中,才容许放入。或者间接疏忽抛弃。LOG.info("[Client] 获取后果信息,seq: {}, rpcResponse: {}", seqId, rpcResponse);
responseMap.putIfAbsent(seqId, rpcResponse);
// 告诉所有期待方
LOG.info("[Client] seq 信息曾经放入,告诉所有期待方", seqId);
synchronized (this) {this.notifyAll();
}
return this;
}
@Override
public RpcResponse getResponse(String seqId) {
try {RpcResponse rpcResponse = this.responseMap.get(seqId);
if(ObjectUtil.isNotNull(rpcResponse)) {LOG.info("[Client] seq {} 对应后果曾经获取: {}", seqId, rpcResponse);
return rpcResponse;
}
// 进入期待
while (rpcResponse == null) {LOG.info("[Client] seq {} 对应后果为空,进入期待", seqId);
// 同步期待锁
synchronized (this) {this.wait();
}
rpcResponse = this.responseMap.get(seqId);
LOG.info("[Client] seq {} 对应后果曾经获取: {}", seqId, rpcResponse);
}
return rpcResponse;
} catch (InterruptedException e) {throw new RpcRuntimeException(e);
}
}
}
应用 requestSet 存储对应的申请入参。
应用 responseMap 存储对应的申请出参,在获取的时候通过同步 while 循环期待,获取后果。
此处,通过 notifyAll() 和 wait() 进行期待和唤醒。
ReferenceConfig- 服务端配置
阐明
咱们想调用服务端,首先必定要定义好要调用的对象。
ReferenceConfig 就是要通知 rpc 框架,调用的服务端信息。
接口
package com.github.houbb.rpc.client.config.reference;
import com.github.houbb.rpc.common.config.component.RpcAddress;
import java.util.List;
/**
* 援用配置类
*
* 前期配置:*(1)timeout 调用超时工夫
*(2)version 服务版本解决
*(3)callType 调用形式 oneWay/sync/async
*(4)check 是否必须要求服务启动。*
* spi:
*(1)codec 序列化形式
*(2)netty 网络通讯架构
*(3)load-balance 负载平衡
*(4)失败策略 fail-over/fail-fast
*
* filter:
*(1)路由
*(2)耗时统计 monitor 服务治理
*
* 优化思考:*(1)对于惟一的 serviceId,其实其 interface 是固定的,是否能够省去?* @author binbin.hou
* @since 0.0.6
* @param <T> 接口泛型
*/
public interface ReferenceConfig<T> {
/**
* 设置服务标识
* @param serviceId 服务标识
* @return this
* @since 0.0.6
*/
ReferenceConfig<T> serviceId(final String serviceId);
/**
* 服务惟一标识
* @since 0.0.6
*/
String serviceId();
/**
* 服务接口
* @since 0.0.6
* @return 接口信息
*/
Class<T> serviceInterface();
/**
* 设置服务接口信息
* @param serviceInterface 服务接口信息
* @return this
* @since 0.0.6
*/
ReferenceConfig<T> serviceInterface(final Class<T> serviceInterface);
/**
* 设置服务地址信息
*(1)单个写法:ip:port:weight
*(2)集群写法:ip1:port1:weight1,ip2:port2:weight2
*
* 其中 weight 权重能够不写,默认为 1.
*
* @param addresses 地址列表信息
* @return this
* @since 0.0.6
*/
ReferenceConfig<T> addresses(final String addresses);
/**
* 获取对应的援用实现
* @return 援用代理类
* @since 0.0.6
*/
T reference();}
实现
package com.github.houbb.rpc.client.config.reference.impl;
import com.github.houbb.heaven.constant.PunctuationConst;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.heaven.util.guava.Guavas;
import com.github.houbb.heaven.util.lang.NumUtil;
import com.github.houbb.rpc.client.config.reference.ReferenceConfig;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.client.core.context.impl.DefaultRpcClientContext;
import com.github.houbb.rpc.client.handler.RpcClientHandler;
import com.github.houbb.rpc.client.invoke.InvokeService;
import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService;
import com.github.houbb.rpc.client.proxy.ReferenceProxy;
import com.github.houbb.rpc.client.proxy.context.ProxyContext;
import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext;
import com.github.houbb.rpc.common.config.component.RpcAddress;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import java.util.List;
/**
* 援用配置类默认实现
*
* @author binbin.hou
* @since 0.0.6
* @param <T> 接口泛型
*/
public class DefaultReferenceConfig<T> implements ReferenceConfig<T> {
/**
* 服务惟一标识
* @since 0.0.6
*/
private String serviceId;
/**
* 服务接口
* @since 0.0.6
*/
private Class<T> serviceInterface;
/**
* 服务地址信息
*(1)如果不为空,则间接依据地址获取
*(2)如果为空,则采纳主动发现的形式
*
* TODO: 这里调整为 set 更加正当。*
* 如果为 subscribe 能够主动发现,而后填充这个字段信息。* @since 0.0.6
*/
private List<RpcAddress> rpcAddresses;
/**
* 用于写入信息
*(1)client 连贯 server 端的 channel future
*(2)前期进行 Load-balance 路由等操作。能够放在这里执行。* @since 0.0.6
*/
private List<ChannelFuture> channelFutures;
/**
* 客户端解决信息
* @since 0.0.6
*/
@Deprecated
private RpcClientHandler channelHandler;
/**
* 调用服务治理类
* @since 0.0.6
*/
private InvokeService invokeService;
public DefaultReferenceConfig() {
// 初始化信息
this.rpcAddresses = Guavas.newArrayList();
this.channelFutures = Guavas.newArrayList();
this.invokeService = new DefaultInvokeService();}
@Override
public String serviceId() {return serviceId;}
@Override
public DefaultReferenceConfig<T> serviceId(String serviceId) {
this.serviceId = serviceId;
return this;
}
@Override
public Class<T> serviceInterface() {return serviceInterface;}
@Override
public DefaultReferenceConfig<T> serviceInterface(Class<T> serviceInterface) {
this.serviceInterface = serviceInterface;
return this;
}
@Override
public ReferenceConfig<T> addresses(String addresses) {ArgUtil.notEmpty(addresses, "addresses");
String[] addressArray = addresses.split(PunctuationConst.COMMA);
ArgUtil.notEmpty(addressArray, "addresses");
for(String address : addressArray) {String[] addressSplits = address.split(PunctuationConst.COLON);
if(addressSplits.length < 2) {throw new IllegalArgumentException("Address must be has ip and port, like 127.0.0.1:9527");
}
String ip = addressSplits[0];
int port = NumUtil.toIntegerThrows(addressSplits[1]);
// 蕴含权重信息
int weight = 1;
if(addressSplits.length >= 3) {weight = NumUtil.toInteger(addressSplits[2], 1);
}
RpcAddress rpcAddress = new RpcAddress(ip, port, weight);
this.rpcAddresses.add(rpcAddress);
}
return this;
}
/**
* 获取对应的援用实现
*(1)解决所有的反射代理信息 - 办法能够抽离,启动各自独立即可。*(2)启动对应的长连贯
* @return 援用代理类
* @since 0.0.6
*/
@Override
public T reference() {
// 1. 启动 client 端到 server 端的连贯信息
// 1.1 为了晋升性能,能够将所有的 client=>server 的连贯都调整为一个 thread。// 1.2 初期为了简略,间接应用同步循环的形式。// 创立 handler
// 循环连贯
for(RpcAddress rpcAddress : rpcAddresses) {final ChannelHandler channelHandler = new RpcClientHandler(invokeService);
final DefaultRpcClientContext context = new DefaultRpcClientContext();
context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler);
ChannelFuture channelFuture = new RpcClient(context).connect();
// 循环同步期待
// 如果出现异常,间接中断?捕捉异样持续进行??channelFutures.add(channelFuture);
}
// 2. 接口动静代理
ProxyContext<T> proxyContext = buildReferenceProxyContext();
return ReferenceProxy.newProxyInstance(proxyContext);
}
/**
* 构建调用上下文
* @return 援用代理上下文
* @since 0.0.6
*/
private ProxyContext<T> buildReferenceProxyContext() {DefaultProxyContext<T> proxyContext = new DefaultProxyContext<>();
proxyContext.serviceId(this.serviceId);
proxyContext.serviceInterface(this.serviceInterface);
proxyContext.channelFutures(this.channelFutures);
proxyContext.invokeService(this.invokeService);
return proxyContext;
}
}
这里次要依据指定的服务端信息,初始化对应的代理实现。
这里还能够拓展指定权重,便于前期负载平衡拓展,本期临时不做实现。
ReferenceProxy
阐明
所有的 rpc 调用,客户端只有服务端的接口。
那么,怎么能力和调用本地办法一样调用近程办法呢?
答案就是动静代理。
实现
实现如下:
package com.github.houbb.rpc.client.proxy;
import com.github.houbb.heaven.util.lang.ObjectUtil;
import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.proxy.context.ProxyContext;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcRequest;
import com.github.houbb.rpc.common.support.id.impl.Uuid;
import com.github.houbb.rpc.common.support.time.impl.DefaultSystemTime;
import io.netty.channel.Channel;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* 参考:https://blog.csdn.net/u012240455/article/details/79210250
*
*(1)办法执行并不需要肯定要有实现类。*(2)间接依据反射即可解决相干信息。*(3)rpc 是一种强制依据接口进行编程的实现形式。* @author binbin.hou
* @since 0.0.6
*/
public class ReferenceProxy<T> implements InvocationHandler {private static final Log LOG = LogFactory.getLog(ReferenceProxy.class);
/**
* 服务标识
* @since 0.0.6
*/
private final ProxyContext<T> proxyContext;
/**
* 临时私有化该结构器
* @param proxyContext 代理上下文
* @since 0.0.6
*/
private ReferenceProxy(ProxyContext<T> proxyContext) {this.proxyContext = proxyContext;}
/**
* 反射调用
* @param proxy 代理
* @param method 办法
* @param args 参数
* @return 后果
* @throws Throwable 异样
* @since 0.0.6
* @see Method#getGenericSignature() 通用标识,能够依据这个来优化代码。*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 反射信息处理成为 rpcRequest
final String seqId = Uuid.getInstance().id();
final long createTime = DefaultSystemTime.getInstance().time();
DefaultRpcRequest rpcRequest = new DefaultRpcRequest();
rpcRequest.serviceId(proxyContext.serviceId());
rpcRequest.seqId(seqId);
rpcRequest.createTime(createTime);
rpcRequest.paramValues(args);
rpcRequest.paramTypeNames(ReflectMethodUtil.getParamTypeNames(method));
rpcRequest.methodName(method.getName());
// 调用近程
LOG.info("[Client] start call remote with request: {}", rpcRequest);
proxyContext.invokeService().addRequest(seqId);
// 这里应用 load-balance 进行抉择 channel 写入。final Channel channel = getChannel();
LOG.info("[Client] start call channel id: {}", channel.id().asLongText());
// 对于信息的写入,实际上有着严格的要求。// writeAndFlush 理论是一个异步的操作,间接应用 sync() 能够看到异样信息。// 反对的必须是 ByteBuf
channel.writeAndFlush(rpcRequest).sync();
// 循环获取后果
// 通过 Loop+match wait/notifyAll 来获取
// 分布式依据 redis+queue+loop
LOG.info("[Client] start get resp for seqId: {}", seqId);
RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);
LOG.info("[Client] start get resp for seqId: {}", seqId);
Throwable error = rpcResponse.error();
if(ObjectUtil.isNotNull(error)) {throw error;}
return rpcResponse.result();}
/**
* 获取对应的 channel
*(1)临时应用写死的第一个
*(2)前期这里须要调整,ChannelFuture 加上权重信息。* @return 对应的 channel 信息。* @since 0.0.6
*/
private Channel getChannel() {return proxyContext.channelFutures().get(0).channel();}
/**
* 获取代理实例
*(1)接口只是为了代理。*(2)理论调用中更加关怀 的是 serviceId
* @param proxyContext 代理上下文
* @param <T> 泛型
* @return 代理实例
* @since 0.0.6
*/
@SuppressWarnings("unchecked")
public static <T> T newProxyInstance(ProxyContext<T> proxyContext) {final Class<T> interfaceClass = proxyContext.serviceInterface();
ClassLoader classLoader = interfaceClass.getClassLoader();
Class<?>[] interfaces = new Class[]{interfaceClass};
ReferenceProxy proxy = new ReferenceProxy(proxyContext);
return (T) Proxy.newProxyInstance(classLoader, interfaces, proxy);
}
}
客户端初始化 newProxyInstance 的就是创立的代理的过程。
客户端调用近程办法,实际上是调用 invoke 的过程。
(1)构建反射 invoke 申请信息,增加 reqId
(2)netty 近程调用服务端
(3)同步获取响应信息
测试
引入 maven
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>rpc-client</artifactId>
<version>0.0.6</version>
</dependency>
测试代码
public static void main(String[] args) {
// 服务配置信息
ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>();
config.serviceId(ServiceIdConst.CALC);
config.serviceInterface(CalculatorService.class);
config.addresses("localhost:9527");
CalculatorService calculatorService = config.reference();
CalculateRequest request = new CalculateRequest();
request.setOne(10);
request.setTwo(20);
CalculateResponse response = calculatorService.sum(request);
System.out.println(response);
}
测试日志:
[DEBUG] [2021-10-05 14:16:17.534] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 14:16:17.625] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务开始启动客户端
...
[INFO] [2021-10-05 14:16:19.328] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务启动客户端实现,监听地址 localhost:9527
[INFO] [2021-10-05 14:16:19.346] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2021-10-05 14:16:19.347] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: a525c5a6196545f5a5241b2cdc2ec2c2
[INFO] [2021-10-05 14:16:19.348] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: 00e04cfffe360988-000017bc-00000000-399b9d7e1b88839d-5ccc4a29
十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler write
信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] WRITE: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler flush
信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] FLUSH
[INFO] [2021-10-05 14:16:19.412] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: a525c5a6196545f5a5241b2cdc2ec2c2
[INFO] [2021-10-05 14:16:19.413] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq a525c5a6196545f5a5241b2cdc2ec2c2 对应后果为空,进入期待
十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] READ: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
...
[INFO] [2021-10-05 14:16:19.505] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取后果信息,seq: a525c5a6196545f5a5241b2cdc2ec2c2, rpcResponse: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 14:16:19.505] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seq 信息曾经放入,告诉所有期待方
[INFO] [2021-10-05 14:16:19.506] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 14:16:19.506] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq a525c5a6196545f5a5241b2cdc2ec2c2 对应后果曾经获取: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 14:16:19.507] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: a525c5a6196545f5a5241b2cdc2ec2c2
CalculateResponse{success=true, sum=30}
小结
当初看来有一个小问题,要求服务端必须指定 port,这有点不太正当,比方代理域名,后续须要优化。
这里的启动申明形式也比拟根底,后续能够思考和 spring 进行整合。
为了便于大家学习,以上源码曾经开源:
https://github.com/houbb/rpc
心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。
我是老马,期待与你的下次重逢。