概要

RPC(Remote Procedure Call)是指近程过程调用,也就是说两台服务器A,B,一个利用部署在A服务器上,想要调用B服务器上利用提供的函数/办法。在分布式系统中的零碎环境建设和利用程序设计中有着宽泛的利用。

常见的RPC框架有

  • Apache Dubbo
  • Google gRPC
  • Apache Thrift
  • Spring Cloud的Http实现

优良的开源框架有高性能,能够像调用本地办法一样调用近程服务,本文着重探讨以下流程的实现

  • 低侵入
  • 利用Netty自定义网络协议实现近程调用

本文的代码可在github上自取,
链接:https://github.com/tangbu/myrpc

RPC的流程


以上,咱们能够看到在实现RPC的过程中,咱们须要着重解决一下几点

  1. 低侵入(咱们应用动静代理来实现办法级别间接调用)
  2. 实现RpcRequest和RpcResponse的序列化和反序列化
  3. 基于TCP自定义报文,承载RpcRequest和RpcResponse
  4. 解决网络连接,网络传输

代码实现

动静代理实现低侵入(咱们应用jdk动静代理)

假如应用层存在这样一个接口

public interface HelloWorldService {    String helloWorld(String name);}

咱们在调用HelloWorldService#helloWorld的时候,心愿自定义外面的逻辑,应用RPC来调用,为此咱们就应用动静代理来实现

public class DynamicProxy implements InvocationHandler {    @Override    public Object invoke(Object proxy, Method method, Object[] args) throws ExecutionException, InterruptedException, JsonProcessingException {               System.out.println("在调用办法时走到了动静代理外面");        return null;    }}

在真正调用helloWorld办法的时候应用如下代码

        Class<?> helloWorldServiceClass = HelloWorldService.class;        //创立代理类对象        HelloWorldService so = (HelloWorldService) Proxy.newProxyInstance(helloWorldServiceClass.getClassLoader(),                new Class[]{HelloWorldService.class}, new DynamicProxy());        String result1 = so.helloWorld("zhangsan");

此时,原有接口的逻辑就调到了动静代理办法外面。之后,咱们会将RPC的实现封装在DynamicProxy这个办法里的实现中。

封装RpcRequest对象和RpcResponse对象

在执行近程调用的时候,必须通知近程服务,我须要调用那个类,那个办法,办法参数是哪些,入参是什么才能够让他返回后果给我,所以须要封装一下RpcRequest对象和RpcResponse对象

public class RpcRequest {    private String requestId;    private String className;    private String methodName;    private Class<?>[] parameterTypes;    private Object[] parameters;    private int version;// getter setter...}public class RpcResponse  {    private String requestId;    private boolean success;    private String message;    private Object result;// getter setter...}

定义网络协议,将RpcRequest和RpcResponse写成字节放在网络报文中传输

自定义报文构造

0----7----15---23---31|  1 |       2      |    ---------------------| 2  | 3  |  4 |.....        4前面的是音讯体  ---------------------......5.......--------------------- 序号1 0-7 version  1byte序号2 7-39 总报文长度 4byte序号3 39-47 type音讯类型 1byte序号4 47-77 priority音讯优先级 1byte序号5 依据报文总长度减掉1-4的长度就是5的长度

相应的依据这个报文构造,能够形象出咱们的TCP的报文Java类

/** * @author tangbu */public final class NettyMessage {    private byte version = 1;    private int length;// 音讯长度    private byte type;// 音讯类型    private byte priority;// 音讯优先级;    private JsonNode body; // 目前全副用json传递申请}

针对报文构造编写Netty的编码器和解码器

编码器的实现 由NettyMessage对象变成网络字节

/** * @author tangbu */public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {    private ObjectMapper mapper = new ObjectMapper();        @Override    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception {        out.writeByte(msg.getVersion());        out.writeInt(msg.getLength());        out.writeByte(msg.getType());        out.writeByte(msg.getPriority());        JsonNode body = msg.getBody();        try {            byte[] jsonBytes = mapper.writeValueAsBytes(body);            out.writeBytes(jsonBytes);        } catch (JsonProcessingException e) {            throw new RuntimeException(e);        }        // 最初填充报文长度        out.setInt(1, out.readableBytes());    }}

解码器的实现由网络字节变成NettyMessage对象

网络报文接收端须要做两件事件

  1. 依据报文的Length字段的长度从TCP流中读取一整个NettyMessage对象长度。
    发送到网络中的字节以流的模式传输,如果没有指定的拆包规定,报文就像没有标点符号一样字节发送到接收端,造成下层利用无奈辨认,所以须要拆包,netty提供了针对固定报文构造的拆包器,对于咱们的报文来说,长度占4个字节、报文首部偏移量为1,所以应用这个

    new LengthFieldBasedFrameDecoder(1460, 1, 4, -5, 0)

    这个来进行拆包, 通过这个解码器的报文就被拆成一整个NettyMessage的一段段字节了

  2. 读取到字节须要反序列化成一个NettyMessage对象。
    相应的字节须要转换成NettyMessage对象,咱们就应用一个对象解码器来进行解码

    public class NettyMessageDecoder extends ChannelInboundHandlerAdapter { ObjectMapper mapper = new ObjectMapper(); @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {     ByteBuf byteBuf= (ByteBuf) obj;     if (byteBuf == null){         return ;     }     NettyMessage message = new NettyMessage();     message.setVersion(byteBuf.readByte());     message.setLength(byteBuf.readInt());     message.setType(byteBuf.readByte());     message.setPriority(byteBuf.readByte());     byte[] bodyBytes = new byte[message.getLength() - 7];     byteBuf.readBytes(bodyBytes);     JsonNode jsonNode = null;     try {         jsonNode = mapper.readValue(bodyBytes, JsonNode.class);     } catch (IOException e) {         throw new RuntimeException(e);     }     message.setBody(jsonNode);     ctx.fireChannelRead(message); }

    这样,网络申请接收端就能够把承受到的字节读取成NettyMessage对象了。

    解决RPC的客户端和服务端逻辑

    RPC客户端的申请

    解决逻辑封装在动静代理类中

    public class DynamicProxy implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws ExecutionException, InterruptedException, JsonProcessingException {     if ("toString".equals(method.getName())){         return proxy.toString();     }     RpcRequest request = new RpcRequest();     request.setRequestId(UUID.randomUUID().toString());     request.setClassName(method.getDeclaringClass().getName());     request.setMethodName(method.getName());     request.setParameterTypes(method.getParameterTypes());     request.setParameters(args);     request.setVersion(1);     System.out.println("动静代理封装的request"+ request);     System.out.println("----------------执行近程调用--------");     RpcResponse response = invokeRpc(request);     System.out.println("近程调用返回的后果"+ response);     return response.getResult(); } int count = 0; private RpcResponse invokeRpc(RpcRequest request) throws ExecutionException, InterruptedException, JsonProcessingException {     count++;     NettyClientHandler nettyClientHandler = ChannelHandlerManager.chooseHandler("127.0.0.1", count % 2 == 0 ? 8888 : 8889);     RpcReqResponseFuture rpcReqResponseFuture = nettyClientHandler.sendRpcRequest(request);     return rpcReqResponseFuture.get(); }}

    与此同时,在客户端记录连贯,和request编号和返回的response对应造成回调。

    public class NettyClientHandler extends ChannelInboundHandlerAdapter { private ObjectMapper mapper = new ObjectMapper(); private ChannelHandlerContext ctx; private Map<String, RpcReqResponseFuture> reqRespFutures = new HashMap<>(); private Executor executor = Executors.newFixedThreadPool(5); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {     System.out.println("连贯服务提供者" + ctx.channel().remoteAddress() + "胜利");     this.ctx = ctx;     ChannelHandlerManager.register(this); } public RpcReqResponseFuture sendRpcRequest(RpcRequest request) throws JsonProcessingException {     RpcReqResponseFuture future = new RpcReqResponseFuture(request, executor);     reqRespFutures.put(request.getRequestId(), future);     NettyMessage nettyMessage = new NettyMessage();     nettyMessage.setType((byte) 1);     nettyMessage.setPriority((byte) 2);     nettyMessage.setBody(mapper.readValue(mapper.writeValueAsString(request), JsonNode.class));     ctx.channel().writeAndFlush(nettyMessage);     return future; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {     NettyMessage message = (NettyMessage) msg;     System.out.println("服务器回复的Frame:" + message);     JsonNode body = message.getBody();     RpcResponse response = mapper.readValue(body.toString(), RpcResponse.class);     String requestId = response.getRequestId();     RpcReqResponseFuture future = reqRespFutures.get(requestId);     future.done(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {     cause.printStackTrace();     ctx.close();     ctx.fireExceptionCaught(cause); } public ChannelHandlerContext getCtx() {     return ctx; }}

在服务端注册好真正的HelloWorld实现类来执行后果,返回RpcResponse

/** * @author tangbu */public class NettyServerHandler extends ChannelInboundHandlerAdapter {    public static Map<String,Object> serviceMap = new HashMap<>();    private ObjectMapper mapper = new ObjectMapper();        @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        NettyMessage message = (NettyMessage) msg;        System.out.println("服务端收到的音讯是:" + message);        JsonNode rpcRequestBody = message.getBody();        RpcRequest rpcRequest = mapper.readValue(rpcRequestBody.toString(), RpcRequest.class);        RpcResponse rpcResponse = handleRpcRequest(rpcRequest);        NettyMessage response = new NettyMessage();        response.setType((byte) 1);        response.setPriority((byte) 2);        response.setBody(mapper.readValue(mapper.writeValueAsString(rpcResponse), JsonNode.class));        ctx.channel().writeAndFlush(response);    }    private RpcResponse handleRpcRequest(RpcRequest rpcRequest) {        String requestId = rpcRequest.getRequestId();        RpcResponse response = new RpcResponse();        response.setRequestId(requestId);        try {            String className = rpcRequest.getClassName();            String methodName = rpcRequest.getMethodName();            Class<?>[] parameterTypes = rpcRequest.getParameterTypes();            Object[] parameters = rpcRequest.getParameters();            Object o = serviceMap.get(className);            if (o == null){                throw new DkRuntimeException("服务不存在");            }            Class clazz = Class.forName(className);            Method method = clazz.getMethod(methodName, parameterTypes);            Object result = method.invoke(o, parameters);            response.setSuccess(true);            response.setResult(result);        } catch (Exception e) {            response.setSuccess(false);            response.setMessage(e.getMessage());        }        return response;    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();        ctx.fireExceptionCaught(cause);    }}

测试

启动两个NettyServer,

绑定8888和8889,别离注册HelloWorldImpl1和HelloWorldImpl2的实现

public class NettyServer1 {    public static void main(String[] args) throws InterruptedException {        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup(1);        Map<String, Object> serviceMap = new HashMap<>();// HelloWorldServiceImpl1        serviceMap.put(HelloWorldService.class.getName(), new HelloWorldServiceImpl1());        NettyServerHandler.serviceMap = serviceMap;        try {            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childOption(ChannelOption.SO_KEEPALIVE,true)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1460, 1, 4, -5, 0));                            ch.pipeline().addLast(new NettyMessageDecoder());                            ch.pipeline().addLast(new NettyMessageEncoder());                            ch.pipeline().addLast(new NettyServerHandler());                        }                    });// 一个服务绑定8888,一个绑定8889端口             ChannelFuture cf = bootstrap.bind(8888).sync();            cf.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture channelFuture) throws Exception {                    if (channelFuture.isSuccess()) {                        System.out.println("监听端口 " + 8888 + " 胜利");                    } else {                        System.out.println("监听端口 " + 8888 + " 失败");                    }                }            });            cf.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}

构建Netty客户端,测试时同时和两个NettyServer建设连贯

public class NettyClient implements Runnable {    private String ip;    private int port;    public NettyClient(String ip, int port) {        this.ip = ip;        this.port = port;    }    @Override    public void run() {        EventLoopGroup group = new NioEventLoopGroup(1);        try {            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(group)                    .channel(NioSocketChannel.class)                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1460, 4, 4, -8, 0));                            ch.pipeline().addLast(new NettyMessageDecoder());                            ch.pipeline().addLast(new NettyMessageEncoder());                            ch.pipeline().addLast(new NettyClientHandler());                        }                    });            System.out.println("客户端 ok..");            ChannelFuture connect = bootstrap.connect(new InetSocketAddress(ip, port));            try {                connect.channel().closeFuture().sync();            } catch (InterruptedException e) {                e.printStackTrace();            }        } finally {            group.shutdownGracefully();        }    }}

执行程序

@Test    public void test2() throws InterruptedException {        new Thread(new NettyClient("127.0.0.1",8888)).start();        new Thread(new NettyClient("127.0.0.1",8889)).start();        Thread.sleep(3000);                Class<?> helloWorldServiceClass = HelloWorldService.class;        System.out.println();        System.out.println();        //创立代理类对象        HelloWorldService so = (HelloWorldService) Proxy.newProxyInstance(helloWorldServiceClass.getClassLoader(),                new Class[]{HelloWorldService.class}, new DynamicProxy());        String result1 = so.helloWorld("zhangsan");                Thread.sleep(1000000L);    }


如图,近程调用胜利,繁难的RPC代码失去了实现

瞻望

后续能够改良的计划

  1. 减少Netty的异步实现,缩小收发申请的阻塞
  2. 联合spring在BeanPostProcessor中对Bean进行加强,对立动静代理。
  3. 能够独自抽出RpcClient局部和RpcServer局部,不对利用裸露细节
  4. 减少注册核心,容错,负载平衡,实现高可用