乐趣区

关于dubbo:java-从零开始手写-RPC-04-序列化

序列化

java 从零开始手写 RPC (01) 基于 socket 实现

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

java 从零开始手写 RPC (03) 如何实现客户端调用服务端?

后面几节咱们实现了最根底的客户端调用服务端,这一节来学习一下通信中的对象序列化。

为什么须要序列化

netty 底层都是基于 ByteBuf 进行通信的。

后面咱们通过编码器 / 解码器专门为计算的入参 / 出参进行解决,这样不便咱们间接应用 pojo。

然而有一个问题,如果想把咱们的我的项目形象为框架,那就须要为所有的对象编写编码器 / 解码器。

显然,间接通过每一个对象写一对的形式是不事实的,而且用户如何应用,也是未知的。

序列化的形式

基于字节的实现,性能好,可读性不高。

基于字符串的实现,比方 json 序列化,可读性好,性能绝对较差。

ps: 能够依据集体还好抉择,相干序列化可参考下文,此处不做开展。

json 序列化框架简介

实现思路

能够将咱们的 Pojo 全副转化为 byte,而后 Byte 转换为 ByteBuf 即可。

反之亦然。

代码实现

maven

引入序列化包:

<dependency>
    <groupId>com.github.houbb</groupId>
    <artifactId>json</artifactId>
    <version>0.1.1</version>
</dependency>

服务端

外围

服务端的代码能够大大简化:

serverBootstrap.group(workerGroup, bossGroup)
    .channel(NioServerSocketChannel.class)
    // 打印日志
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) throws Exception {ch.pipeline()
                    .addLast(new RpcServerHandler());
        }
    })
    // 这个参数影响的是还没有被 accept 取出的连贯
    .option(ChannelOption.SO_BACKLOG, 128)
    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。.childOption(ChannelOption.SO_KEEPALIVE, true);

这里只须要一个实现类即可。

RpcServerHandler

服务端的序列化 / 反序列化调整为间接应用 JsonBs 实现。

package com.github.houbb.rpc.server.handler;

import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;
import com.github.houbb.rpc.server.service.CalculatorService;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @author binbin.hou
 * @since 0.0.1
 */
public class RpcServerHandler extends SimpleChannelInboundHandler {private static final Log log = LogFactory.getLog(RpcServerHandler.class);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {final String id = ctx.channel().id().asLongText();
        log.info("[Server] channel {} connected" + id);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {final String id = ctx.channel().id().asLongText();

        ByteBuf byteBuf = (ByteBuf)msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class);
        log.info("[Server] receive channel {} request: {} from", id, request);

        Calculator calculator = new CalculatorService();
        CalculateResponse response = calculator.sum(request);

        // 回写到 client 端
        byte[] responseBytes = JsonBs.serializeBytes(response);
        ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
        ctx.writeAndFlush(responseBuffer);
        log.info("[Server] channel {} response {}", id, response);
    }

}

客户端

外围

客户端能够简化如下:

channelFuture = bootstrap.group(workerGroup)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.SO_KEEPALIVE, true)
    .handler(new ChannelInitializer<Channel>(){
        @Override
        protected void initChannel(Channel ch) throws Exception {channelHandler = new RpcClientHandler();
            ch.pipeline()
                    .addLast(new LoggingHandler(LogLevel.INFO))
                    .addLast(channelHandler);
        }
    })
    .connect(RpcConstant.ADDRESS, port)
    .syncUninterruptibly();

RpcClientHandler

客户端的序列化 / 反序列化调整为间接应用 JsonBs 实现。

package com.github.houbb.rpc.client.handler;

import com.github.houbb.json.bs.JsonBs;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateResponse;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * <p> 客户端解决类 </p>
 *
 * <pre> Created: 2019/10/16 11:30 下午  </pre>
 * <pre> Project: rpc  </pre>
 *
 * @author houbinbin
 * @since 0.0.2
 */
public class RpcClientHandler extends SimpleChannelInboundHandler {private static final Log log = LogFactory.getLog(RpcClient.class);

    /**
     * 响应信息
     * @since 0.0.4
     */
    private CalculateResponse response;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf)msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);

        this.response = JsonBs.deserializeBytes(bytes, CalculateResponse.class);
        log.info("[Client] response is :{}", response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 每次用完要敞开,不然拿不到 response,我也不晓得为啥(目测得理解 netty 才行)// 集体了解:如果不敞开,则永远会被阻塞。ctx.flush();
        ctx.close();}

    public CalculateResponse getResponse() {return response;}

}

小结

为了便于大家学习,以上源码曾经开源:

https://github.com/houbb/rpc

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次相遇。

退出移动版