乐趣区

关于java:手把手教你基于Netty实现一个基础的RPC框架通俗易懂

浏览这篇文章之前,倡议先浏览和这篇文章关联的内容。

[[1]具体分析散布式微服务架构下网络通信的底层实现原理(图解)](https://mp.weixin.qq.com/s?__…)

[2 工作了 5 年,你真的了解 Netty 以及为什么要用吗?(深度干货)](https://mp.weixin.qq.com/s?__…)

[[3]深度解析 Netty 中的外围组件(图解 + 实例)](https://mp.weixin.qq.com/s?__…)

[[4]BAT 面试必问细节:对于 Netty 中的 ByteBuf 详解](https://mp.weixin.qq.com/s?__…)

[[5]通过大量实战案例合成 Netty 中是如何解决拆包黏包问题的?](https://mp.weixin.qq.com/s?__…)

[[6]基于 Netty 实现自定义音讯通信协议(协定设计及解析利用实战)](https://mp.weixin.qq.com/s?__…)

[[7]全网最具体最齐全的序列化技术及深度解析与利用实战](https://mp.weixin.qq.com/s?__…)

在后面的内容中,咱们曾经由浅入深的了解了 Netty 的基础知识和实现原理,置信大家曾经对 Netty 有了一个较为全面的了解。那么接下来,咱们通过一个手写 RPC 通信的实战案例来带大家理解 Netty 的理论利用。

为什么要抉择 RPC 来作为实战呢?因为 Netty 自身就是解决通信问题,而在理论利用中,RPC 协定框架是咱们接触得最多的一种,所以这个实战能让大家理解到 Netty 理论利用之外,还能了解 RPC 的底层原理。

什么是 RPC

RPC 全称为(Remote Procedure Call),是一种通过网络从近程计算机程序上申请服务,而不须要理解底层网络技术的协定,简略了解就是让开发者可能像调用本地服务一样调用近程服务。

既然是协定,那么它必然有协定的标准,如图 6 - 1 所示。

为了达到“让开发者可能像调用本地服务那样调用近程服务”的目标,RPC 协定需像图 6 - 1 那样实现近程交互。

  • 客户端调用近程服务时,必须要通过本地动静代理模块来屏蔽网络通信的细节,所以动静代理模块须要负责将申请参数、办法等数据组装成数据包发送到指标服务器
  • 这个数据包在发送时,还须要遵循约定的音讯协定以及序列化协定,最终转化为二进制数据流传输
  • 服务端收到数据包后,先依照约定的音讯协定解码,失去申请信息。
  • 服务端再依据申请信息路由调用到指标服务,取得后果并返回给客户端。

<center> 图 6 -1</center>

业内支流的 RPC 框架

但凡满足 RPC 协定的框架,咱们成为 RPC 框架,在理论开发中,咱们能够应用开源且绝对成熟的 RPC 框架解决微服务架构下的近程通信问题,常见的 rpc 框架:

  1. Thrift:thrift 是一个软件框架,用来进行可扩大且跨语言的服务的开发。它联合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝联合的、高效的服务。
  2. Dubbo:Dubbo 是一个分布式服务框架,以及 SOA 治理计划。其性能次要包含:高性能 NIO 通信及多协定集成,服务动静寻址与路由,软负载平衡与容错,依赖剖析与降级等。Dubbo 是阿里巴巴外部的 SOA 服务化治理计划的外围框架,Dubbo 自 2011 年开源后,已被许多非阿里系公司应用。

手写 RPC 留神要点

基于上文中对于 RPC 协定的了解,如果咱们本人去实现,须要思考哪些技术呢?其实基于图 6 - 1 的整个流程应该有一个大略的了解。

  • 通信协议,RPC 框架对性能的要求十分高,所以通信协议应该是越简略越好,这样能够缩小编解码带来的性能损耗,大部分支流的 RPC 框架会间接抉择 TCP、HTTP 协定。
  • 序列化和反序列化,数据要进行网络传输,须要对数据进行序列化和反序列化,后面咱们说过,所谓的序列化和反序列化是不把对象转化成二进制流以及将二进制流转化成对象的过程。在序列化框架抉择上,咱们个别会抉择高效且通用的算法,比方 FastJson、Protobuf、Hessian 等。这些序列化技术都要比原生的序列化操作更加高效,压缩比也较高。
  • 动静代理,客户端调用近程服务时,须要通过动静代理来屏蔽网络通信细节。而动静代理又是在运行过程中生成的,所以动静代理类的生成速度、字节码大小都会影响到 RPC 整体框架的性能和资源耗费。常见的动静代理技术:Javassist、Cglib、JDK 的动静代理等。

基于 Netty 手写实现 RPC

了解了 RPC 协定后,咱们基于 Netty 来实现一个 RPC 通信框架。

代码详见附件 netty-rpc-example

<center> 图 6 -2 我的项目模块组成 </center>

须要引入的 jar 包:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.72</version>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

模块依赖关系:

  • provider 依赖 netty-rpc-protocol 和 netty-rpc-api
  • cosumer 依赖 netty-rpc-protocol 和 netty-rpc-api

netty-rpc-api 模块

<center> 图 6 -3 netty-rpc-api 模块组成 </center>

IUserService

public interface IUserService {String saveUser(String name);
}

netty-rpc-provider 模块

<center> 图 6 -4 netty-rpc-provider 模块组成 </center>

UserServiceImpl

@Service
@Slf4j
public class UserServiceImpl implements IUserService {
    @Override
    public String saveUser(String name) {log.info("begin saveUser:"+name);
        return "Save User Success!";
    }
}

NettyRpcProviderMain

留神,在以后步骤中,形容了 case 的局部,临时先不必加,后续再加上

@ComponentScan(basePackages = {"com.example.spring","com.example.service"})  //case1(后续再加上)
@SpringBootApplication
public class NettyRpcProviderMain {public static void main(String[] args) throws Exception {SpringApplication.run(NettyRpcProviderMain.class, args);
        new NettyServer("127.0.0.1",8080).startNettyServer();   //case2(后续再加上)
    }
}

netty-rpc-protocol

开始写通信协议模块,这个模块次要做几个事件

  • 定义音讯协定
  • 定义序列化反序列化办法
  • 建设 netty 通信

<center> 图 6 -5 </center>

定义音讯协定

之前咱们讲过自定义音讯协定,咱们在这里能够依照上面这个协定格局来定义好。

    /*
    +----------------------------------------------+
    | 魔数 2byte | 序列化算法 1byte | 申请类型 1byte  |
    +----------------------------------------------+
    | 音讯 ID 8byte     |      数据长度 4byte       |
    +----------------------------------------------+
    */

Header

@AllArgsConstructor
@Data
public class Header implements Serializable {
    /*
    +----------------------------------------------+
    | 魔数 2byte | 序列化算法 1byte | 申请类型 1byte  |
    +----------------------------------------------+
    | 音讯 ID 8byte     |      数据长度 4byte       |
    +----------------------------------------------+
    */
    private short magic; // 魔数 - 用来验证报文的身份(2 个字节)private byte serialType; // 序列化类型(1 个字节)private byte reqType; // 操作类型(1 个字节)private long requestId; // 申请 id(8 个字节)private int length; // 数据长度(4 个字节)}

RpcRequest

@Data
public class RpcRequest implements Serializable {
    private String className;
    private String methodName;
    private Object[] params;
    private Class<?>[] parameterTypes;}

RpcResponse

@Data
public class RpcResponse implements Serializable {

    private Object data;
    private String msg;
}

RpcProtocol

@Data
public class RpcProtocol<T> implements Serializable {
    private Header header;
    private T content;
}

定义相干常量

上述音讯协定定义中,波及到几个枚举相干的类,定义如下

ReqType

音讯类型

public enum ReqType {REQUEST((byte)1),
    RESPONSE((byte)2),
    HEARTBEAT((byte)3);

    private byte code;

    private ReqType(byte code) {this.code=code;}

    public byte code(){return this.code;}
    public static ReqType findByCode(int code) {for (ReqType msgType : ReqType.values()) {if (msgType.code() == code) {return msgType;}
        }
        return null;
    }
}

SerialType

序列化类型

public enum SerialType {JSON_SERIAL((byte)0),
    JAVA_SERIAL((byte)1);

    private byte code;

    SerialType(byte code) {this.code=code;}

    public byte code(){return this.code;}
}

RpcConstant

public class RpcConstant {
    //header 局部的总字节数
    public final static int HEAD_TOTAL_LEN=16;
    // 魔数
    public final static short MAGIC=0xca;
}

定义序列化相干实现

这里演示两种,一种是 JSON 形式,另一种是 Java 原生的形式

ISerializer

public interface ISerializer {<T> byte[] serialize(T obj);

    <T> T deserialize(byte[] data,Class<T> clazz);

    byte getType();}

JavaSerializer

public class JavaSerializer implements ISerializer{

    @Override
    public <T> byte[] serialize(T obj) {
        ByteArrayOutputStream byteArrayOutputStream=
                new ByteArrayOutputStream();
        try {
            ObjectOutputStream outputStream=
                    new ObjectOutputStream(byteArrayOutputStream);

            outputStream.writeObject(obj);

            return  byteArrayOutputStream.toByteArray();} catch (IOException e) {e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public <T> T deserialize(byte[] data, Class<T> clazz) {ByteArrayInputStream byteArrayInputStream=new ByteArrayInputStream(data);
        try {
            ObjectInputStream objectInputStream=
                    new ObjectInputStream(byteArrayInputStream);

            return (T) objectInputStream.readObject();} catch (IOException e) {e.printStackTrace();
        } catch (ClassNotFoundException e) {e.printStackTrace();
        }
        return null;
    }

    @Override
    public byte getType() {return SerialType.JAVA_SERIAL.code();
    }
}

JsonSerializer

public class JsonSerializer implements ISerializer{
    @Override
    public <T> byte[] serialize(T obj) {return JSON.toJSONString(obj).getBytes();}

    @Override
    public <T> T deserialize(byte[] data, Class<T> clazz) {return JSON.parseObject(new String(data),clazz);
    }

    @Override
    public byte getType() {return SerialType.JSON_SERIAL.code();
    }
}

SerializerManager

实现对序列化机制的治理

public class SerializerManager {private final static ConcurrentHashMap<Byte, ISerializer> serializers=new ConcurrentHashMap<Byte, ISerializer>();

    static {ISerializer jsonSerializer=new JsonSerializer();
        ISerializer javaSerializer=new JavaSerializer();
        serializers.put(jsonSerializer.getType(),jsonSerializer);
        serializers.put(javaSerializer.getType(),javaSerializer);
    }

    public static ISerializer getSerializer(byte key){ISerializer serializer=serializers.get(key);
        if(serializer==null){return new JavaSerializer();
        }
        return serializer;
    }
}

定义编码和解码实现

因为自定义了音讯协定,所以 须要本人实现编码和解码,代码如下

RpcDecoder

@Slf4j
public class RpcDecoder extends ByteToMessageDecoder {


    /*
    +----------------------------------------------+
    | 魔数 2byte | 序列化算法 1byte | 申请类型 1byte  |
    +----------------------------------------------+
    | 音讯 ID 8byte     |      数据长度 4byte       |
    +----------------------------------------------+
    */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {log.info("==========begin RpcDecoder ==============");
        if(in.readableBytes()< RpcConstant.HEAD_TOTAL_LEN){
            // 音讯长度不够,不须要解析
            return;
        }
        in.markReaderIndex();// 标记一个读取数据的索引,后续用来重置。short magic=in.readShort(); // 读取 magic
        if(magic!=RpcConstant.MAGIC){throw new IllegalArgumentException("Illegal request parameter'magic',"+magic);
        }
        byte serialType=in.readByte(); // 读取序列化算法类型
        byte reqType=in.readByte(); // 申请类型
        long requestId=in.readLong(); // 申请音讯 id
        int dataLength=in.readInt(); // 申请数据长度
        // 可读区域的字节数小于理论数据长度
        if(in.readableBytes()<dataLength){in.resetReaderIndex();
            return;
        }
        // 读取音讯内容
        byte[] content=new byte[dataLength];
        in.readBytes(content);

        // 构建 header 头信息
        Header header=new Header(magic,serialType,reqType,requestId,dataLength);
        ISerializer serializer=SerializerManager.getSerializer(serialType);
        ReqType rt=ReqType.findByCode(reqType);
        switch(rt){
            case REQUEST:
                RpcRequest request=serializer.deserialize(content, RpcRequest.class);
                RpcProtocol<RpcRequest> reqProtocol=new RpcProtocol<>();
                reqProtocol.setHeader(header);
                reqProtocol.setContent(request);
                out.add(reqProtocol);
                break;
            case RESPONSE:
                RpcResponse response=serializer.deserialize(content,RpcResponse.class);
                RpcProtocol<RpcResponse> resProtocol=new RpcProtocol<>();
                resProtocol.setHeader(header);
                resProtocol.setContent(response);
                out.add(resProtocol);
                break;
            case HEARTBEAT:
                break;
            default:
                break;
        }

    }
}

RpcEncoder

@Slf4j
public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {

    /*
    +----------------------------------------------+
    | 魔数 2byte | 序列化算法 1byte | 申请类型 1byte  |
    +----------------------------------------------+
    | 音讯 ID 8byte     |      数据长度 4byte       |
    +----------------------------------------------+
    */
    @Override
    protected void encode(ChannelHandlerContext ctx, RpcProtocol<Object> msg, ByteBuf out) throws Exception {log.info("=============begin RpcEncoder============");
        Header header=msg.getHeader();
        out.writeShort(header.getMagic()); // 写入魔数
        out.writeByte(header.getSerialType()); // 写入序列化类型
        out.writeByte(header.getReqType());// 写入申请类型
        out.writeLong(header.getRequestId()); // 写入申请 id
        ISerializer serializer= SerializerManager.getSerializer(header.getSerialType());
        byte[] data=serializer.serialize(msg.getContent()); // 序列化
        header.setLength(data.length);
        out.writeInt(data.length); // 写入音讯长度
        out.writeBytes(data);
    }
}

NettyServer

实现 NettyServer 构建。

@Slf4j
public class NettyServer{
    private String serverAddress; // 地址
    private int serverPort; // 端口

    public NettyServer(String serverAddress, int serverPort) {
        this.serverAddress = serverAddress;
        this.serverPort = serverPort;
    }

    public void startNettyServer() throws Exception {log.info("begin start Netty Server");
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try {ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new RpcServerInitializer());
            ChannelFuture channelFuture = bootstrap.bind(this.serverAddress, this.serverPort).sync();
            log.info("Server started Success on Port:{}", this.serverPort);
            channelFuture.channel().closeFuture().sync();}catch (Exception e){log.error("Rpc Server Exception",e);
        }finally {workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();}
    }
}

RpcServerInitializer

public class RpcServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
            .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0))
            .addLast(new RpcDecoder())
            .addLast(new RpcEncoder())
            .addLast(new RpcServerHandler());
    }
}

RpcServerHandler

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {RpcProtocol resProtocol=new RpcProtocol<>();
        Header header=msg.getHeader();
        header.setReqType(ReqType.RESPONSE.code());
        Object result=invoke(msg.getContent());
        resProtocol.setHeader(header);
        RpcResponse response=new RpcResponse();
        response.setData(result);
        response.setMsg("success");
        resProtocol.setContent(response);

        ctx.writeAndFlush(resProtocol);
    }

    private Object invoke(RpcRequest request){
        try {Class<?> clazz=Class.forName(request.getClassName());
            Object bean= SpringBeansManager.getBean(clazz); // 获取实例对象(CASE)
            Method declaredMethod=clazz.getDeclaredMethod(request.getMethodName(),request.getParameterTypes());
            return declaredMethod.invoke(bean,request.getParams());
        } catch (ClassNotFoundException | NoSuchMethodException e) {e.printStackTrace();
        } catch (IllegalAccessException e) {e.printStackTrace();
        } catch (InvocationTargetException e) {e.printStackTrace();
        }
        return null;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);
    }
}

SpringBeansManager

@Component
public class SpringBeansManager implements ApplicationContextAware {
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {SpringBeansManager.applicationContext=applicationContext;}

    public static <T> T getBean(Class<T> clazz){return applicationContext.getBean(clazz);
    }
}

须要留神,这个类的构建好之后,须要在 netty-rpc-provider 模块的 main 办法中减少 compone-scan 进行扫描

@ComponentScan(basePackages = {"com.example.spring","com.example.service"})  // 批改这里
@SpringBootApplication
public class NettyRpcProviderMain {public static void main(String[] args) throws Exception {SpringApplication.run(NettyRpcProviderMain.class, args);
        new NettyServer("127.0.0.1",8080).startNettyServer();  // 批改这里}
}

netty-rpc-consumer

接下来开始实现生产端

RpcClientProxy

public class RpcClientProxy {public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){return (T) Proxy.newProxyInstance
                (interfaceCls.getClassLoader(),
                        new Class<?>[]{interfaceCls},
                        new RpcInvokerProxy(host,port));
    }
}

RpcInvokerProxy

@Slf4j
public class RpcInvokerProxy implements InvocationHandler {

    private String serviceAddress;
    private int servicePort;

    public RpcInvokerProxy(String serviceAddress, int servicePort) {
        this.serviceAddress = serviceAddress;
        this.servicePort = servicePort;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {log.info("begin invoke target server");
        // 组装参数
        RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();
        long requestId= RequestHolder.REQUEST_ID.incrementAndGet();
        Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);
        protocol.setHeader(header);
        RpcRequest request=new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParams(args);
        protocol.setContent(request);
        // 发送申请
        NettyClient nettyClient=new NettyClient(serviceAddress,servicePort);
        // 构建异步数据处理
        RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));
        RequestHolder.REQUEST_MAP.put(requestId,future);
        nettyClient.sendRequest(protocol);
        return future.getPromise().get().getData();}
}

定义客户端连贯

在 netty-rpc-protocol 这个模块的 protocol 包门路下,创立 NettyClient

@Slf4j
public class NettyClient {
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
    private String serviceAddress;
    private int servicePort;
    public NettyClient(String serviceAddress,int servicePort){log.info("begin init NettyClient");
        bootstrap=new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new RpcClientInitializer());
        this.serviceAddress=serviceAddress;
        this.servicePort=servicePort;
    }

    public void sendRequest(RpcProtocol<RpcRequest> protocol) throws InterruptedException {ChannelFuture future=bootstrap.connect(this.serviceAddress,this.servicePort).sync();
        future.addListener(listener->{if(future.isSuccess()){log.info("connect rpc server {} success.",this.serviceAddress);
            }else{log.error("connect rpc server {} failed .",this.serviceAddress);
                future.cause().printStackTrace();
                eventLoopGroup.shutdownGracefully();}
        });
        log.info("begin transfer data");
        future.channel().writeAndFlush(protocol);
    }
}

RpcClientInitializer

@Slf4j
public class RpcClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {log.info("begin initChannel");
        ch.pipeline()
                .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,12,4,0,0))
                .addLast(new LoggingHandler())
                .addLast(new RpcEncoder())
                .addLast(new RpcDecoder())
                .addLast(new RpcClientHandler());
    }
}

RpcClientHandler

须要留神,Netty 的通信过程是基于入站出站拆散的,所以在获取后果时,咱们须要借助一个 Future 对象来实现。

@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {log.info("receive rpc server result");
        long requestId=msg.getHeader().getRequestId();
        RpcFuture<RpcResponse> future=RequestHolder.REQUEST_MAP.remove(requestId);
        future.getPromise().setSuccess(msg.getContent()); // 返回后果
    }
}

Future 的实现

在 netty-rpc-protocol 模块中增加 rpcFuture 实现

RpcFuture

@Data
public class RpcFuture<T> {
    //Promise 是可写的 Future, Future 本身并没有写操作相干的接口,
    // Netty 通过 Promise 对 Future 进行扩大, 用于设置 IO 操作的后果
    private Promise<T> promise;

    public RpcFuture(Promise<T> promise) {this.promise = promise;}
}

RequestHolder

保留 requestid 和 future 的对应后果

public class RequestHolder {public static final AtomicLong REQUEST_ID=new AtomicLong();

    public static final Map<Long,RpcFuture> REQUEST_MAP=new ConcurrentHashMap<>();}

须要源码的同学,请关注公众号[跟着 Mic 学架构],回复关键字[rpc],即可取得

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

退出移动版