通用调用

java 从零开始手写 RPC (01) 基于 socket 实现

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

java 从零开始手写 RPC (03) 如何实现客户端调用服务端?

java 从零开始手写 RPC (04) -序列化

后面咱们的例子是一个固定的出参和入参,固定的办法实现。

本节将实现通用的调用,让框架具备更宽泛的实用性。

基本思路

所有的办法调用,基于反射进行相干解决实现。

服务端

外围类

  • RpcServer

调整如下:

serverBootstrap.group(workerGroup, bossGroup)    .channel(NioServerSocketChannel.class)    // 打印日志    .handler(new LoggingHandler(LogLevel.INFO))    .childHandler(new ChannelInitializer<Channel>() {        @Override        protected void initChannel(Channel ch) throws Exception {            ch.pipeline()            // 解码 bytes=>resp            .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))             // request=>bytes             .addLast(new ObjectEncoder())             .addLast(new RpcServerHandler());        }    })    // 这个参数影响的是还没有被accept 取出的连贯    .option(ChannelOption.SO_BACKLOG, 128)    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。    .childOption(ChannelOption.SO_KEEPALIVE, true);

其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。

RpcServerHandler

package com.github.houbb.rpc.server.handler;import com.github.houbb.log.integration.core.Log;import com.github.houbb.log.integration.core.LogFactory;import com.github.houbb.rpc.common.rpc.domain.RpcRequest;import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;/** * @author binbin.hou * @since 0.0.1 */public class RpcServerHandler extends SimpleChannelInboundHandler {    private static final Log log = LogFactory.getLog(RpcServerHandler.class);    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        final String id = ctx.channel().id().asLongText();        log.info("[Server] channel {} connected " + id);    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        final String id = ctx.channel().id().asLongText();        log.info("[Server] channel read start: {}", id);        // 承受客户端申请        RpcRequest rpcRequest = (RpcRequest)msg;        log.info("[Server] receive channel {} request: {}", id, rpcRequest);        // 回写到 client 端        DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest);        ctx.writeAndFlush(rpcResponse);        log.info("[Server] channel {} response {}", id, rpcResponse);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }    /**     * 解决申请信息     * @param rpcRequest 申请信息     * @return 后果信息     * @since 0.0.6     */    private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) {        DefaultRpcResponse rpcResponse = new DefaultRpcResponse();        rpcResponse.seqId(rpcRequest.seqId());        try {            // 获取对应的 service 实现类            // rpcRequest=>invocationRequest            // 执行 invoke            Object result = DefaultServiceFactory.getInstance()                    .invoke(rpcRequest.serviceId(),                            rpcRequest.methodName(),                            rpcRequest.paramTypeNames(),                            rpcRequest.paramValues());            rpcResponse.result(result);        } catch (Exception e) {            rpcResponse.error(e);            log.error("[Server] execute meet ex for request", rpcRequest, e);        }        // 构建后果值        return rpcResponse;    }}

和以前相似,不过 handleRpcRequest 要略微麻烦一点。

这里须要依据发射,调用对应的办法。

pojo

其中应用的出参、入参实现如下:

RpcRequest

package com.github.houbb.rpc.common.rpc.domain;import java.util.List;/** * 序列化相干解决 * (1)调用创立工夫-createTime * (2)调用形式 callType * (3)超时工夫 timeOut * * 额定信息: * (1)上下文信息 * * @author binbin.hou * @since 0.0.6 */public interface RpcRequest extends BaseRpc {    /**     * 创立工夫     * @return 创立工夫     * @since 0.0.6     */    long createTime();    /**     * 服务惟一标识     * @return 服务惟一标识     * @since 0.0.6     */    String serviceId();    /**     * 办法名称     * @return 办法名称     * @since 0.0.6     */    String methodName();    /**     * 办法类型名称列表     * @return 名称列表     * @since 0.0.6     */    List<String> paramTypeNames();    // 调用参数信息列表    /**     * 调用参数值     * @return 参数值数组     * @since 0.0.6     */    Object[] paramValues();}

RpcResponse

package com.github.houbb.rpc.common.rpc.domain;/** * 序列化相干解决 * @author binbin.hou * @since 0.0.6 */public interface RpcResponse extends BaseRpc {    /**     * 异样信息     * @return 异样信息     * @since 0.0.6     */    Throwable error();    /**     * 申请后果     * @return 申请后果     * @since 0.0.6     */    Object result();}

BaseRpc

package com.github.houbb.rpc.common.rpc.domain;import java.io.Serializable;/** * 序列化相干解决 * @author binbin.hou * @since 0.0.6 */public interface BaseRpc extends Serializable {    /**     * 获取惟一标识号     * (1)用来惟一标识一次调用,便于获取该调用对应的响应信息。     * @return 惟一标识号     */    String seqId();    /**     * 设置惟一标识号     * @param traceId 惟一标识号     * @return this     */    BaseRpc seqId(final String traceId);}

ServiceFactory-服务工厂

为了便于对所有的 service 实现类对立治理,这里定义 service 工厂类。

ServiceFactory

package com.github.houbb.rpc.server.service;import com.github.houbb.rpc.server.config.service.ServiceConfig;import com.github.houbb.rpc.server.registry.ServiceRegistry;import java.util.List;/** * 服务办法类仓库治理类-接口 * * * (1)对外裸露的办法,应该尽可能的少。 * (2)对于内部的调用,前期比方 telnet 治理,能够应用比方有哪些服务列表? * 单个服务有哪些办法名称? * * 等等根底信息的查问,本期临时全副暗藏掉。 * * (3)后期尽可能的少裸露办法。 * @author binbin.hou * @since 0.0.6 * @see ServiceRegistry 服务注册,将服务信息放在这个类中,进行对立的治理。 * @see ServiceMethod 办法信息 */public interface ServiceFactory {    /**     * 注册服务列表信息     * @param serviceConfigList 服务配置列表     * @return this     * @since 0.0.6     */    ServiceFactory registerServices(final List<ServiceConfig> serviceConfigList);    /**     * 间接反射调用     * (1)此处对于办法反射,为了晋升性能,所有的 class.getFullName() 进行拼接而后放进 key 中。     *     * @param serviceId 服务名称     * @param methodName 办法名称     * @param paramTypeNames 参数类型名称列表     * @param paramValues 参数值     * @return 办法调用返回值     * @since 0.0.6     */    Object invoke(final String serviceId, final String methodName,                  List<String> paramTypeNames, final Object[] paramValues);}

DefaultServiceFactory

作为默认实现,如下:

package com.github.houbb.rpc.server.service.impl;import com.github.houbb.heaven.constant.PunctuationConst;import com.github.houbb.heaven.util.common.ArgUtil;import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;import com.github.houbb.heaven.util.util.CollectionUtil;import com.github.houbb.rpc.common.exception.RpcRuntimeException;import com.github.houbb.rpc.server.config.service.ServiceConfig;import com.github.houbb.rpc.server.service.ServiceFactory;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;import java.util.HashMap;import java.util.List;import java.util.Map;/** * 默认服务仓库实现 * @author binbin.hou * @since 0.0.6 */public class DefaultServiceFactory implements ServiceFactory {    /**     * 服务 map     * @since 0.0.6     */    private Map<String, Object> serviceMap;    /**     * 间接获取对应的 method 信息     * (1)key: serviceId:methodName:param1@param2@param3     * (2)value: 对应的 method 信息     */    private Map<String, Method> methodMap;    private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory();    private DefaultServiceFactory(){}    public static DefaultServiceFactory getInstance() {        return INSTANCE;    }    /**     * 服务注册个别在我的项目启动的时候,进行解决。     * 属于比拟重的操作,而且一个服务按理说只应该初始化一次。     * 此处加锁为了保障线程平安。     * @param serviceConfigList 服务配置列表     * @return this     */    @Override    public synchronized ServiceFactory registerServices(List<ServiceConfig> serviceConfigList) {        ArgUtil.notEmpty(serviceConfigList, "serviceConfigList");        // 汇合初始化        serviceMap = new HashMap<>(serviceConfigList.size());        // 这里只是预估,个别为2个服务。        methodMap = new HashMap<>(serviceConfigList.size()*2);        for(ServiceConfig serviceConfig : serviceConfigList) {            serviceMap.put(serviceConfig.id(), serviceConfig.reference());        }        // 寄存办法名称        for(Map.Entry<String, Object> entry : serviceMap.entrySet()) {            String serviceId = entry.getKey();            Object reference = entry.getValue();            //获取所有办法列表            Method[] methods = reference.getClass().getMethods();            for(Method method : methods) {                String methodName = method.getName();                if(ReflectMethodUtil.isIgnoreMethod(methodName)) {                    continue;                }                List<String> paramTypeNames = ReflectMethodUtil.getParamTypeNames(method);                String key = buildMethodKey(serviceId, methodName, paramTypeNames);                methodMap.put(key, method);            }        }        return this;    }    @Override    public Object invoke(String serviceId, String methodName, List<String> paramTypeNames, Object[] paramValues) {        //参数校验        ArgUtil.notEmpty(serviceId, "serviceId");        ArgUtil.notEmpty(methodName, "methodName");        // 提供 cache,能够依据前三个值疾速定位对应的 method        // 依据 method 进行反射解决。        // 对于 paramTypes 进行 string 连贯解决。        final Object reference = serviceMap.get(serviceId);        final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames);        final Method method = methodMap.get(methodKey);        try {            return method.invoke(reference, paramValues);        } catch (IllegalAccessException | InvocationTargetException e) {            throw new RpcRuntimeException(e);        }    }    /**     * (1)多个之间才用 : 分隔     * (2)参数之间采纳 @ 分隔     * @param serviceId 服务标识     * @param methodName 办法名称     * @param paramTypeNames 参数类型名称     * @return 构建残缺的 key     * @since 0.0.6     */    private String buildMethodKey(String serviceId, String methodName, List<String> paramTypeNames) {        String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT);        return serviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON                +param;    }}

ServiceRegistry-服务注册类

接口

package com.github.houbb.rpc.server.registry;/** * 服务注册类 * (1)每个利用惟一 * (2)每个服务的裸露协定应该保持一致 * 临时不提供单个服务的非凡解决,前期能够思考增加 * * @author binbin.hou * @since 0.0.6 */public interface ServiceRegistry {    /**     * 裸露的 rpc 服务端口信息     * @param port 端口信息     * @return this     * @since 0.0.6     */    ServiceRegistry port(final int port);    /**     * 注册服务实现     * @param serviceId 服务标识     * @param serviceImpl 服务实现     * @return this     * @since 0.0.6     */    ServiceRegistry register(final String serviceId, final Object serviceImpl);    /**     * 裸露所有服务信息     * (1)启动服务端     * @return this     * @since 0.0.6     */    ServiceRegistry expose();}

实现

package com.github.houbb.rpc.server.registry.impl;import com.github.houbb.heaven.util.common.ArgUtil;import com.github.houbb.rpc.common.config.protocol.ProtocolConfig;import com.github.houbb.rpc.server.config.service.DefaultServiceConfig;import com.github.houbb.rpc.server.config.service.ServiceConfig;import com.github.houbb.rpc.server.core.RpcServer;import com.github.houbb.rpc.server.registry.ServiceRegistry;import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory;import java.util.ArrayList;import java.util.List;/** * 默认服务端注册类 * @author binbin.hou * @since 0.0.6 */public class DefaultServiceRegistry implements ServiceRegistry {    /**     * 单例信息     * @since 0.0.6     */    private static final DefaultServiceRegistry INSTANCE = new DefaultServiceRegistry();    /**     * rpc 服务端端口号     * @since 0.0.6     */    private int rpcPort;    /**     * 协定配置     * (1)默认只实现 tcp     * (2)前期能够拓展实现 web-service/http/https 等等。     * @since 0.0.6     */    private ProtocolConfig protocolConfig;    /**     * 服务配置列表     * @since 0.0.6     */    private List<ServiceConfig> serviceConfigList;    private DefaultServiceRegistry(){        // 初始化默认参数        this.serviceConfigList = new ArrayList<>();        this.rpcPort = 9527;    }    public static DefaultServiceRegistry getInstance() {        return INSTANCE;    }    @Override    public ServiceRegistry port(int port) {        ArgUtil.positive(port, "port");        this.rpcPort = port;        return this;    }    /**     * 注册服务实现     * (1)次要用于前期服务调用     * (2)如何依据 id 获取实现?非常简单,id 是惟一的。     * 有就是有,没有就抛出异样,间接返回。     * (3)如果依据 {@link com.github.houbb.rpc.common.rpc.domain.RpcRequest} 获取对应的办法。     *     * 3.1 依据 serviceId 获取惟一的实现     * 3.2 依据 {@link Class#getMethod(String, Class[])} 办法名称+参数类型惟一获取办法     * 3.3 依据 {@link java.lang.reflect.Method#invoke(Object, Object...)} 执行办法     *     * @param serviceId 服务标识     * @param serviceImpl 服务实现     * @return this     * @since 0.0.6     */    @Override    @SuppressWarnings("unchecked")    public synchronized DefaultServiceRegistry register(final String serviceId, final Object serviceImpl) {        ArgUtil.notEmpty(serviceId, "serviceId");        ArgUtil.notNull(serviceImpl, "serviceImpl");        // 构建对应的其余信息        ServiceConfig serviceConfig = new DefaultServiceConfig();        serviceConfig.id(serviceId).reference(serviceImpl);        serviceConfigList.add(serviceConfig);        return this;    }    @Override    public ServiceRegistry expose() {        // 注册所有服务信息        DefaultServiceFactory.getInstance()                .registerServices(serviceConfigList);        // 裸露 netty server 信息        new RpcServer(rpcPort).start();        return this;    }}

ServiceConfig 是一些服务的配置信息,接口定义如下:

package com.github.houbb.rpc.server.config.service;/** * 单个服务配置类 * * 简化用户应用: * 在用户应用的时候,这个类应该是不可见的。 * 间接提供对应的服务注册类即可。 * * 后续拓展 * (1)版本信息 * (2)服务端超时工夫 * * @author binbin.hou * @since 0.0.6 * @param <T> 实现类泛型 */public interface ServiceConfig<T> {    /**     * 获取惟一标识     * @return 获取惟一标识     * @since 0.0.6     */    String id();    /**     * 设置惟一标识     * @param id 标识信息     * @return this     * @since 0.0.6     */    ServiceConfig<T> id(String id);    /**     * 获取援用实体实现     * @return 实体实现     * @since 0.0.6     */    T reference();    /**     * 设置援用实体实现     * @param reference 援用实现     * @return this     * @since 0.0.6     */    ServiceConfig<T> reference(T reference);}

测试

maven 引入

引入服务端的对应 maven 包:

<dependency>    <groupId>com.github.houbb</groupId>    <artifactId>rpc-server</artifactId>    <version>0.0.6</version></dependency>

服务端启动

// 启动服务DefaultServiceRegistry.getInstance()        .register(ServiceIdConst.CALC, new CalculatorServiceImpl())        .expose();

这里注册了一个计算服务,并且设置对应的实现。

和以前实现相似,此处不再赘述。

启动日志:

[DEBUG] [2021-10-05 13:39:42.638] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.[INFO] [2021-10-05 13:39:42.645] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务开始启动服务端十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelRegistered信息: [id: 0xec4dc74f] REGISTERED十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler bind信息: [id: 0xec4dc74f] BIND: 0.0.0.0/0.0.0.0:9527十月 05, 2021 1:39:43 下午 io.netty.handler.logging.LoggingHandler channelActive信息: [id: 0xec4dc74f, L:/0:0:0:0:0:0:0:0:9527] ACTIVE[INFO] [2021-10-05 13:39:43.893] [Thread-0] [c.g.h.r.s.c.RpcServer.run] - RPC 服务端启动实现,监听【9527】端口

ps: 写到这里突然发现遗记增加对应的 register 日志了,这里能够增加对应的 registerListener 拓展。

小结

为了便于大家学习,以上源码曾经开源:

https://github.com/houbb/rpc

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。