通用调用

java 从零开始手写 RPC (01) 基于 socket 实现

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

java 从零开始手写 RPC (03) 如何实现客户端调用服务端?

java 从零开始手写 RPC (04) -序列化

上一篇咱们介绍了,如何实现基于反射的通用服务端。

这一节咱们来一起学习下如何实现通用客户端。

因为内容较多,所以拆分为 2 个局部。

基本思路

所有的办法调用,基于反射进行相干解决实现。

外围类

为了便于拓展,咱们把外围类调整如下:

package com.github.houbb.rpc.client.core;import com.github.houbb.heaven.annotation.ThreadSafe;import com.github.houbb.log.integration.core.Log;import com.github.houbb.log.integration.core.LogFactory;import com.github.houbb.rpc.client.core.context.RpcClientContext;import com.github.houbb.rpc.client.handler.RpcClientHandler;import com.github.houbb.rpc.common.constant.RpcConstant;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;/** * <p> rpc 客户端 </p> * * <pre> Created: 2019/10/16 11:21 下午  </pre> * <pre> Project: rpc  </pre> * * @author houbinbin * @since 0.0.2 */@ThreadSafepublic class RpcClient {    private static final Log log = LogFactory.getLog(RpcClient.class);    /**     * 地址信息     * @since 0.0.6     */    private final String address;    /**     * 监听端口号     * @since 0.0.6     */    private final int port;    /**     * 客户端解决 handler     * 作用:用于获取申请信息     * @since 0.0.4     */    private final ChannelHandler channelHandler;    public RpcClient(final RpcClientContext clientContext) {        this.address = clientContext.address();        this.port = clientContext.port();        this.channelHandler = clientContext.channelHandler();    }    /**     * 进行连贯     * @since 0.0.6     */    public ChannelFuture connect() {        // 启动服务端        log.info("RPC 服务开始启动客户端");        EventLoopGroup workerGroup = new NioEventLoopGroup();        /**         * channel future 信息         * 作用:用于写入申请信息         * @since 0.0.6         */        ChannelFuture channelFuture;        try {            Bootstrap bootstrap = new Bootstrap();            channelFuture = bootstrap.group(workerGroup)                    .channel(NioSocketChannel.class)                    .option(ChannelOption.SO_KEEPALIVE, true)                    .handler(new ChannelInitializer<Channel>(){                        @Override                        protected void initChannel(Channel ch) throws Exception {                            ch.pipeline()                                    // 解码 bytes=>resp                                    .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))                                    // request=>bytes                                    .addLast(new ObjectEncoder())                                    // 日志输入                                    .addLast(new LoggingHandler(LogLevel.INFO))                                    .addLast(channelHandler);                        }                    })                    .connect(address, port)                    .syncUninterruptibly();            log.info("RPC 服务启动客户端实现,监听地址 {}:{}", address, port);        } catch (Exception e) {            log.error("RPC 客户端遇到异样", e);            throw new RuntimeException(e);        }        // 不要敞开线程池!!!        return channelFuture;    }}

能够灵便指定对应的服务端地址、端口信息。

ChannelHandler 作为解决参数传入。

ObjectDecoder、ObjectEncoder、LoggingHandler 都和服务端相似,是 netty 的内置实现。

RpcClientHandler

客户端的 handler 实现如下:

/* * Copyright (c)  2019. houbinbin Inc. * rpc All rights reserved. */package com.github.houbb.rpc.client.handler;import com.github.houbb.log.integration.core.Log;import com.github.houbb.log.integration.core.LogFactory;import com.github.houbb.rpc.client.core.RpcClient;import com.github.houbb.rpc.client.invoke.InvokeService;import com.github.houbb.rpc.common.rpc.domain.RpcResponse;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * <p> 客户端解决类 </p> * * <pre> Created: 2019/10/16 11:30 下午  </pre> * <pre> Project: rpc  </pre> * * @author houbinbin * @since 0.0.2 */public class RpcClientHandler extends SimpleChannelInboundHandler {    private static final Log log = LogFactory.getLog(RpcClient.class);    /**     * 调用服务治理类     *     * @since 0.0.6     */    private final InvokeService invokeService;    public RpcClientHandler(InvokeService invokeService) {        this.invokeService = invokeService;    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        RpcResponse rpcResponse = (RpcResponse)msg;        invokeService.addResponse(rpcResponse.seqId(), rpcResponse);        log.info("[Client] response is :{}", rpcResponse);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        // 每次用完要敞开,不然拿不到response,我也不晓得为啥(目测得理解netty才行)        // 集体了解:如果不敞开,则永远会被阻塞。        ctx.flush();        ctx.close();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}

只有 channelRead0 做了调整,基于 InvokeService 对后果进行解决。

InvokeService

接口

package com.github.houbb.rpc.client.invoke;import com.github.houbb.rpc.common.rpc.domain.RpcResponse;/** * 调用服务接口 * @author binbin.hou * @since 0.0.6 */public interface InvokeService {    /**     * 增加申请信息     * @param seqId 序列号     * @return this     * @since 0.0.6     */    InvokeService addRequest(final String seqId);    /**     * 放入后果     * @param seqId 惟一标识     * @param rpcResponse 响应后果     * @return this     * @since 0.0.6     */    InvokeService addResponse(final String seqId, final RpcResponse rpcResponse);    /**     * 获取标记信息对应的后果     * @param seqId 序列号     * @return 后果     * @since 0.0.6     */    RpcResponse getResponse(final String seqId);}

次要是对入参、出参的设置,以及出参的获取。

实现

package com.github.houbb.rpc.client.invoke.impl;import com.github.houbb.heaven.util.guava.Guavas;import com.github.houbb.heaven.util.lang.ObjectUtil;import com.github.houbb.log.integration.core.Log;import com.github.houbb.log.integration.core.LogFactory;import com.github.houbb.rpc.client.core.RpcClient;import com.github.houbb.rpc.client.invoke.InvokeService;import com.github.houbb.rpc.common.exception.RpcRuntimeException;import com.github.houbb.rpc.common.rpc.domain.RpcResponse;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;/** * 调用服务接口 * @author binbin.hou * @since 0.0.6 */public class DefaultInvokeService implements InvokeService {    private static final Log LOG = LogFactory.getLog(DefaultInvokeService.class);    /**     * 申请序列号汇合     * (1)这里前期如果要增加超时检测,能够增加对应的超时工夫。     * 能够把这里调整为 map     * @since 0.0.6     */    private final Set<String> requestSet;    /**     * 响应后果     * @since 0.0.6     */    private final ConcurrentHashMap<String, RpcResponse> responseMap;    public DefaultInvokeService() {        requestSet = Guavas.newHashSet();        responseMap = new ConcurrentHashMap<>();    }    @Override    public InvokeService addRequest(String seqId) {        LOG.info("[Client] start add request for seqId: {}", seqId);        requestSet.add(seqId);        return this;    }    @Override    public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {        // 这里放入之前,能够增加判断。        // 如果 seqId 必须解决申请汇合中,才容许放入。或者间接疏忽抛弃。        LOG.info("[Client] 获取后果信息,seq: {}, rpcResponse: {}", seqId, rpcResponse);        responseMap.putIfAbsent(seqId, rpcResponse);        // 告诉所有期待方        LOG.info("[Client] seq 信息曾经放入,告诉所有期待方", seqId);        synchronized (this) {            this.notifyAll();        }        return this;    }    @Override    public RpcResponse getResponse(String seqId) {        try {            RpcResponse rpcResponse = this.responseMap.get(seqId);            if(ObjectUtil.isNotNull(rpcResponse)) {                LOG.info("[Client] seq {} 对应后果曾经获取: {}", seqId, rpcResponse);                return rpcResponse;            }            // 进入期待            while (rpcResponse == null) {                LOG.info("[Client] seq {} 对应后果为空,进入期待", seqId);                // 同步期待锁                synchronized (this) {                    this.wait();                }                rpcResponse = this.responseMap.get(seqId);                LOG.info("[Client] seq {} 对应后果曾经获取: {}", seqId, rpcResponse);            }            return rpcResponse;        } catch (InterruptedException e) {            throw new RpcRuntimeException(e);        }    }}

应用 requestSet 存储对应的申请入参。

应用 responseMap 存储对应的申请出参,在获取的时候通过同步 while 循环期待,获取后果。

此处,通过 notifyAll() 和 wait() 进行期待和唤醒。

ReferenceConfig-服务端配置

阐明

咱们想调用服务端,首先必定要定义好要调用的对象。

ReferenceConfig 就是要通知 rpc 框架,调用的服务端信息。

接口

package com.github.houbb.rpc.client.config.reference;import com.github.houbb.rpc.common.config.component.RpcAddress;import java.util.List;/** * 援用配置类 * * 前期配置: * (1)timeout 调用超时工夫 * (2)version 服务版本解决 * (3)callType 调用形式 oneWay/sync/async * (4)check 是否必须要求服务启动。 * * spi: * (1)codec 序列化形式 * (2)netty 网络通讯架构 * (3)load-balance 负载平衡 * (4)失败策略 fail-over/fail-fast * * filter: * (1)路由 * (2)耗时统计 monitor 服务治理 * * 优化思考: * (1)对于惟一的 serviceId,其实其 interface 是固定的,是否能够省去? * @author binbin.hou * @since 0.0.6 * @param <T> 接口泛型 */public interface ReferenceConfig<T> {    /**     * 设置服务标识     * @param serviceId 服务标识     * @return this     * @since 0.0.6     */    ReferenceConfig<T> serviceId(final String serviceId);    /**     * 服务惟一标识     * @since 0.0.6     */    String serviceId();    /**     * 服务接口     * @since 0.0.6     * @return 接口信息     */    Class<T> serviceInterface();    /**     * 设置服务接口信息     * @param serviceInterface 服务接口信息     * @return this     * @since 0.0.6     */    ReferenceConfig<T> serviceInterface(final Class<T> serviceInterface);    /**     * 设置服务地址信息     * (1)单个写法:ip:port:weight     * (2)集群写法:ip1:port1:weight1,ip2:port2:weight2     *     * 其中 weight 权重能够不写,默认为1.     *     * @param addresses 地址列表信息     * @return this     * @since 0.0.6     */    ReferenceConfig<T> addresses(final String addresses);    /**     * 获取对应的援用实现     * @return 援用代理类     * @since 0.0.6     */    T reference();}

实现

package com.github.houbb.rpc.client.config.reference.impl;import com.github.houbb.heaven.constant.PunctuationConst;import com.github.houbb.heaven.util.common.ArgUtil;import com.github.houbb.heaven.util.guava.Guavas;import com.github.houbb.heaven.util.lang.NumUtil;import com.github.houbb.rpc.client.config.reference.ReferenceConfig;import com.github.houbb.rpc.client.core.RpcClient;import com.github.houbb.rpc.client.core.context.impl.DefaultRpcClientContext;import com.github.houbb.rpc.client.handler.RpcClientHandler;import com.github.houbb.rpc.client.invoke.InvokeService;import com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService;import com.github.houbb.rpc.client.proxy.ReferenceProxy;import com.github.houbb.rpc.client.proxy.context.ProxyContext;import com.github.houbb.rpc.client.proxy.context.impl.DefaultProxyContext;import com.github.houbb.rpc.common.config.component.RpcAddress;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelHandler;import java.util.List;/** * 援用配置类默认实现 * * @author binbin.hou * @since 0.0.6 * @param <T> 接口泛型 */public class DefaultReferenceConfig<T> implements ReferenceConfig<T> {    /**     * 服务惟一标识     * @since 0.0.6     */    private String serviceId;    /**     * 服务接口     * @since 0.0.6     */    private Class<T> serviceInterface;    /**     * 服务地址信息     * (1)如果不为空,则间接依据地址获取     * (2)如果为空,则采纳主动发现的形式     *     * TODO: 这里调整为 set 更加正当。     *     * 如果为 subscribe 能够主动发现,而后填充这个字段信息。     * @since 0.0.6     */    private List<RpcAddress> rpcAddresses;    /**     * 用于写入信息     * (1)client 连贯 server 端的 channel future     * (2)前期进行 Load-balance 路由等操作。能够放在这里执行。     * @since 0.0.6     */    private List<ChannelFuture> channelFutures;    /**     * 客户端解决信息     * @since 0.0.6     */    @Deprecated    private RpcClientHandler channelHandler;    /**     * 调用服务治理类     * @since 0.0.6     */    private InvokeService invokeService;    public DefaultReferenceConfig() {        // 初始化信息        this.rpcAddresses = Guavas.newArrayList();        this.channelFutures = Guavas.newArrayList();        this.invokeService = new DefaultInvokeService();    }    @Override    public String serviceId() {        return serviceId;    }    @Override    public DefaultReferenceConfig<T> serviceId(String serviceId) {        this.serviceId = serviceId;        return this;    }    @Override    public Class<T> serviceInterface() {        return serviceInterface;    }    @Override    public DefaultReferenceConfig<T> serviceInterface(Class<T> serviceInterface) {        this.serviceInterface = serviceInterface;        return this;    }    @Override    public ReferenceConfig<T> addresses(String addresses) {        ArgUtil.notEmpty(addresses, "addresses");        String[] addressArray = addresses.split(PunctuationConst.COMMA);        ArgUtil.notEmpty(addressArray, "addresses");        for(String address : addressArray) {            String[] addressSplits = address.split(PunctuationConst.COLON);            if(addressSplits.length < 2) {                throw new IllegalArgumentException("Address must be has ip and port, like 127.0.0.1:9527");            }            String ip = addressSplits[0];            int port = NumUtil.toIntegerThrows(addressSplits[1]);            // 蕴含权重信息            int weight = 1;            if(addressSplits.length >= 3) {                weight = NumUtil.toInteger(addressSplits[2], 1);            }            RpcAddress rpcAddress = new RpcAddress(ip, port, weight);            this.rpcAddresses.add(rpcAddress);        }        return this;    }    /**     * 获取对应的援用实现     * (1)解决所有的反射代理信息-办法能够抽离,启动各自独立即可。     * (2)启动对应的长连贯     * @return 援用代理类     * @since 0.0.6     */    @Override    public T reference() {        // 1. 启动 client 端到 server 端的连贯信息        // 1.1 为了晋升性能,能够将所有的 client=>server 的连贯都调整为一个 thread。        // 1.2 初期为了简略,间接应用同步循环的形式。        // 创立 handler        // 循环连贯        for(RpcAddress rpcAddress : rpcAddresses) {            final ChannelHandler channelHandler = new RpcClientHandler(invokeService);            final DefaultRpcClientContext context = new DefaultRpcClientContext();            context.address(rpcAddress.address()).port(rpcAddress.port()).channelHandler(channelHandler);            ChannelFuture channelFuture = new RpcClient(context).connect();            // 循环同步期待            // 如果出现异常,间接中断?捕捉异样持续进行??            channelFutures.add(channelFuture);        }        // 2. 接口动静代理        ProxyContext<T> proxyContext = buildReferenceProxyContext();        return ReferenceProxy.newProxyInstance(proxyContext);    }    /**     * 构建调用上下文     * @return 援用代理上下文     * @since 0.0.6     */    private ProxyContext<T> buildReferenceProxyContext() {        DefaultProxyContext<T> proxyContext = new DefaultProxyContext<>();        proxyContext.serviceId(this.serviceId);        proxyContext.serviceInterface(this.serviceInterface);        proxyContext.channelFutures(this.channelFutures);        proxyContext.invokeService(this.invokeService);        return proxyContext;    }}

这里次要依据指定的服务端信息,初始化对应的代理实现。

这里还能够拓展指定权重,便于前期负载平衡拓展,本期临时不做实现。

ReferenceProxy

阐明

所有的 rpc 调用,客户端只有服务端的接口。

那么,怎么能力和调用本地办法一样调用近程办法呢?

答案就是动静代理。

实现

实现如下:

package com.github.houbb.rpc.client.proxy;import com.github.houbb.heaven.util.lang.ObjectUtil;import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;import com.github.houbb.log.integration.core.Log;import com.github.houbb.log.integration.core.LogFactory;import com.github.houbb.rpc.client.proxy.context.ProxyContext;import com.github.houbb.rpc.common.rpc.domain.RpcResponse;import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcRequest;import com.github.houbb.rpc.common.support.id.impl.Uuid;import com.github.houbb.rpc.common.support.time.impl.DefaultSystemTime;import io.netty.channel.Channel;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;/** * 参考:https://blog.csdn.net/u012240455/article/details/79210250 * * (1)办法执行并不需要肯定要有实现类。 * (2)间接依据反射即可解决相干信息。 * (3)rpc 是一种强制依据接口进行编程的实现形式。 * @author binbin.hou * @since 0.0.6 */public class ReferenceProxy<T> implements InvocationHandler {    private static final Log LOG = LogFactory.getLog(ReferenceProxy.class);    /**     * 服务标识     * @since 0.0.6     */    private final ProxyContext<T> proxyContext;    /**     * 临时私有化该结构器     * @param proxyContext 代理上下文     * @since 0.0.6     */    private ReferenceProxy(ProxyContext<T> proxyContext) {        this.proxyContext = proxyContext;    }    /**     * 反射调用     * @param proxy 代理     * @param method 办法     * @param args 参数     * @return 后果     * @throws Throwable 异样     * @since 0.0.6     * @see Method#getGenericSignature() 通用标识,能够依据这个来优化代码。     */    @Override    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {        // 反射信息处理成为 rpcRequest        final String seqId = Uuid.getInstance().id();        final long createTime = DefaultSystemTime.getInstance().time();        DefaultRpcRequest rpcRequest = new DefaultRpcRequest();        rpcRequest.serviceId(proxyContext.serviceId());        rpcRequest.seqId(seqId);        rpcRequest.createTime(createTime);        rpcRequest.paramValues(args);        rpcRequest.paramTypeNames(ReflectMethodUtil.getParamTypeNames(method));        rpcRequest.methodName(method.getName());        // 调用近程        LOG.info("[Client] start call remote with request: {}", rpcRequest);        proxyContext.invokeService().addRequest(seqId);        // 这里应用 load-balance 进行抉择 channel 写入。        final Channel channel = getChannel();        LOG.info("[Client] start call channel id: {}", channel.id().asLongText());        // 对于信息的写入,实际上有着严格的要求。        // writeAndFlush 理论是一个异步的操作,间接应用 sync() 能够看到异样信息。        // 反对的必须是 ByteBuf        channel.writeAndFlush(rpcRequest).sync();        // 循环获取后果        // 通过 Loop+match  wait/notifyAll 来获取        // 分布式依据 redis+queue+loop        LOG.info("[Client] start get resp for seqId: {}", seqId);        RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);        LOG.info("[Client] start get resp for seqId: {}", seqId);        Throwable error = rpcResponse.error();        if(ObjectUtil.isNotNull(error)) {            throw error;        }        return rpcResponse.result();    }    /**     * 获取对应的 channel     * (1)临时应用写死的第一个     * (2)前期这里须要调整,ChannelFuture 加上权重信息。     * @return 对应的 channel 信息。     * @since 0.0.6     */    private Channel getChannel() {        return proxyContext.channelFutures().get(0).channel();    }    /**     * 获取代理实例     * (1)接口只是为了代理。     * (2)理论调用中更加关怀 的是 serviceId     * @param proxyContext 代理上下文     * @param <T> 泛型     * @return 代理实例     * @since 0.0.6     */    @SuppressWarnings("unchecked")    public static <T> T newProxyInstance(ProxyContext<T> proxyContext) {        final Class<T> interfaceClass = proxyContext.serviceInterface();        ClassLoader classLoader = interfaceClass.getClassLoader();        Class<?>[] interfaces = new Class[]{interfaceClass};        ReferenceProxy proxy = new ReferenceProxy(proxyContext);        return (T) Proxy.newProxyInstance(classLoader, interfaces, proxy);    }}

客户端初始化 newProxyInstance 的就是创立的代理的过程。

客户端调用近程办法,实际上是调用 invoke 的过程。

(1)构建反射 invoke 申请信息,增加 reqId

(2)netty 近程调用服务端

(3)同步获取响应信息

测试

引入 maven

<dependency>    <groupId>com.github.houbb</groupId>    <artifactId>rpc-client</artifactId>    <version>0.0.6</version></dependency>

测试代码

public static void main(String[] args) {    // 服务配置信息    ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>();    config.serviceId(ServiceIdConst.CALC);    config.serviceInterface(CalculatorService.class);    config.addresses("localhost:9527");    CalculatorService calculatorService = config.reference();    CalculateRequest request = new CalculateRequest();    request.setOne(10);    request.setTwo(20);    CalculateResponse response = calculatorService.sum(request);    System.out.println(response);}

测试日志:

[DEBUG] [2021-10-05 14:16:17.534] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.[INFO] [2021-10-05 14:16:17.625] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务开始启动客户端...[INFO] [2021-10-05 14:16:19.328] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务启动客户端实现,监听地址 localhost:9527[INFO] [2021-10-05 14:16:19.346] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}[INFO] [2021-10-05 14:16:19.347] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: a525c5a6196545f5a5241b2cdc2ec2c2[INFO] [2021-10-05 14:16:19.348] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: 00e04cfffe360988-000017bc-00000000-399b9d7e1b88839d-5ccc4a29十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler write信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] WRITE: DefaultRpcRequest{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', createTime=1633414579339, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler flush信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] FLUSH[INFO] [2021-10-05 14:16:19.412] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: a525c5a6196545f5a5241b2cdc2ec2c2[INFO] [2021-10-05 14:16:19.413] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq a525c5a6196545f5a5241b2cdc2ec2c2 对应后果为空,进入期待十月 05, 2021 2:16:19 下午 io.netty.handler.logging.LoggingHandler channelRead信息: [id: 0x5ccc4a29, L:/127.0.0.1:50596 - R:localhost/127.0.0.1:9527] READ: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}...[INFO] [2021-10-05 14:16:19.505] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取后果信息,seq: a525c5a6196545f5a5241b2cdc2ec2c2, rpcResponse: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}[INFO] [2021-10-05 14:16:19.505] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seq 信息曾经放入,告诉所有期待方[INFO] [2021-10-05 14:16:19.506] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}[INFO] [2021-10-05 14:16:19.506] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq a525c5a6196545f5a5241b2cdc2ec2c2 对应后果曾经获取: DefaultRpcResponse{seqId='a525c5a6196545f5a5241b2cdc2ec2c2', error=null, result=CalculateResponse{success=true, sum=30}}[INFO] [2021-10-05 14:16:19.507] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start get resp for seqId: a525c5a6196545f5a5241b2cdc2ec2c2CalculateResponse{success=true, sum=30}

小结

当初看来有一个小问题,要求服务端必须指定 port,这有点不太正当,比方代理域名,后续须要优化。

这里的启动申明形式也比拟根底,后续能够思考和 spring 进行整合。

为了便于大家学习,以上源码曾经开源:

https://github.com/houbb/rpc

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。