关于java:基于Netty实现自定义消息通信协议协议设计及解析应用实战

3次阅读

共计 10245 个字符,预计需要花费 26 分钟才能阅读完成。

所谓的协定,是由语法、语义、时序这三个因素组成的一种标准,通信单方依照该协定标准来实现网络数据传输,这样通信单方能力实现数据失常通信和解析。

因为不同的中间件在性能方面有肯定差别,所以其实应该是没有一种标准化协定来满足不同差异化需要,因而很多中间件都会定义本人的通信协议,另外通信协议能够解决粘包和拆包问题。

在本篇文章中,咱们来实现一个自定义音讯协定。

自定义协定的因素

自定义协定,那这个协定必须要有组成的元素,

  • 魔数:用来判断数据包的有效性
  • 版本号:能够反对协定降级
  • 序列化算法:音讯注释采纳什么样的序列化和反序列化形式,比方 json、protobuf、hessian 等
  • 指令类型:也就是以后发送的是一个什么类型的音讯,像 zookeeper 中,它传递了一个 Type
  • 申请序号:基于双工协定,提供异步能力,也就是收到的异步音讯须要找到后面的通信申请进行响应解决
  • 音讯长度
  • 音讯注释

协定定义

sessionId | reqType | Content-Length | Content |

其中 Version,Content-Length,SessionId 就是 Header 信息,Content就是交互的主体。

定义我的项目构造以及引入包

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

我的项目构造如图 4 - 1 所示:

  • netty-message-mic : 示意协定模块。
  • netty-message-server:示意 nettyserver。

图 4 -1

  • 引入 log4j.properties

在 nettyMessage-mic 中,包的构造如下。

定义 Header

示意音讯头

@Data
public class Header{
    private long sessionId; // 会话 id  : 占 8 个字节
    private byte type; // 音讯类型:占 1 个字节

    private int length;     // 音讯长度 : 占 4 个字节
}

定义 MessageRecord

示意音讯体

@Data
public class MessageRecord{

    private Header header;
    private Object body;
}

OpCode

定义操作类型

public enum OpCode {BUSI_REQ((byte)0),
    BUSI_RESP((byte)1),
    PING((byte)3),
    PONG((byte)4);

    private byte code;

    private OpCode(byte code) {this.code=code;}

    public byte code(){return this.code;}
}

定义编解码器

别离定义对该音讯协定的编解码器

MessageRecordEncoder

@Slf4j
public class MessageRecordEncoder extends MessageToByteEncoder<MessageRecord> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MessageRecord record, ByteBuf byteBuf) throws Exception {log.info("=========== 开始编码 Header 局部 ===========");
        Header header=record.getHeader();
        byteBuf.writeLong(header.getSessionId()); // 保留 8 个字节的 sessionId
        byteBuf.writeByte(header.getType());  // 写入 1 个字节的申请类型

        log.info("=========== 开始编码 Body 局部 ===========");
        Object body=record.getBody();
        if(body!=null){ByteArrayOutputStream bos=new ByteArrayOutputStream();
            ObjectOutputStream oos=new ObjectOutputStream(bos);
            oos.writeObject(body);
            byte[] bytes=bos.toByteArray();
            byteBuf.writeInt(bytes.length); // 写入音讯体长度: 占 4 个字节
            byteBuf.writeBytes(bytes); // 写入音讯体内容
        }else{byteBuf.writeInt(0); // 写入音讯长度占 4 个字节,长度为 0
        }
    }
}

MessageRecordDecode

@Slf4j
public class MessageRecordDecode extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {MessageRecord record=new MessageRecord();
        Header header=new Header();
        header.setSessionId(byteBuf.readLong());  // 读取 8 个字节的 sessionid
        header.setType(byteBuf.readByte()); // 读取一个字节的操作类型
        record.setHeader(header);
        // 如果 byteBuf 剩下的长度还有大于 4 个字节,阐明 body 不为空
        if(byteBuf.readableBytes()>4){int length=byteBuf.readInt(); // 读取四个字节的长度
            header.setLength(length);
            byte[] contents=new byte[length];
            byteBuf.readBytes(contents,0,length);
            ByteArrayInputStream bis=new ByteArrayInputStream(contents);
            ObjectInputStream ois=new ObjectInputStream(bis);
            record.setBody(ois.readObject());
            list.add(record);
            log.info("序列化进去的后果:"+record);
        }else{log.error("音讯内容为空");
        }
    }
}

测试协定的解析和编码

EmbeddedChannel 是 netty 专门改良针对 ChannelHandler 的单元测试而提供的

public class CodesMainTest {public static void main( String[] args ) throws Exception {
        EmbeddedChannel channel=new EmbeddedChannel(new LoggingHandler(),
            new MessageRecordEncoder(),
            new MessageRecordDecode());
        Header header=new Header();
        header.setSessionId(123456);
        header.setType(OpCode.PING.code());
        MessageRecord record=new MessageRecord();
        record.setBody("Hello World");
        record.setHeader(header);
        channel.writeOutbound(record);

        ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
        new MessageRecordEncoder().encode(null,record,buf);
        channel.writeInbound(buf);
    }
}

编码包剖析

运行上述代码后,会失去上面的一个信息

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 01 e2 40 03 00 00 00 12 ac ed 00 |.......@........|
|00000010| 05 74 00 0b 48 65 6c 6c 6f 20 57 6f 72 6c 64    |.t..Hello World |
+--------+-------------------------------------------------+----------------+

依照协定标准:

  • 后面 8 个字节示意 sessionId
  • 一个字节示意申请类型
  • 4 个字节示意长度
  • 前面局部内容示意音讯体

测试粘包和半包问题

通过 slice 办法进行拆分,失去两个包。

ByteBuf 中提供了一个 slice 办法,这个办法能够在不做数据拷贝的状况下对原始 ByteBuf 进行拆分。

public class CodesMainTest {public static void main( String[] args ) throws Exception {
        //EmbeddedChannel 是 netty 专门针对 ChannelHandler 的单元测试而提供的类。能够通过这个类来测试 channel 输出入站和出站的实现
        EmbeddedChannel channel=new EmbeddedChannel(
                // 解决粘包和半包问题
//                new LengthFieldBasedFrameDecoder(2048,10,4,0,0),
                new LoggingHandler(),
                new MessageRecordEncoder(),
                new MessageRecordDecode());
        Header header=new Header();
        header.setSessionId(123456);
        header.setType(OpCode.PING.code());
        MessageRecord record=new MessageRecord();
        record.setBody("Hello World");
        record.setHeader(header);
        channel.writeOutbound(record);

        ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
        new MessageRecordEncoder().encode(null,record,buf);

       //********* 模仿半包和粘包问题 ************//
        // 把一个包通过 slice 拆分成两个局部
        ByteBuf bb1=buf.slice(0,7); // 获取后面 7 个字节
        ByteBuf bb2=buf.slice(7,buf.readableBytes()-7); // 获取前面的字节
        bb1.retain();

        channel.writeInbound(bb1);
        channel.writeInbound(bb2);
    }
}

运行上述代码会失去如下异样,readerIndex(0) +length(8)示意要读取 8 个字节,然而只收到 7 个字节,所以间接报错。

2021-08-31 15:53:01,385 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ: 7B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 01 e2                            |.......         |
+--------+-------------------------------------------------+----------------+
2021-08-31 15:53:01,397 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
Exception in thread "main" io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8) exceeds writerIndex(7): UnpooledSlicedByteBuf(ridx: 0, widx: 7, cap: 7/7, unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 31, cap: 256))

解决拆包问题

LengthFieldBasedFrameDecoder 是长度域解码器,它是解决拆包粘包最罕用的解码器,基本上能笼罩大部分基于长度拆包的场景。其中开源的消息中间件 RocketMQ 就是应用该解码器进行解码的。

首先来阐明一下该解码器的外围参数

  • lengthFieldOffset,长度字段的偏移量,也就是寄存长度数据的起始地位
  • lengthFieldLength,长度字段锁占用的字节数
  • lengthAdjustment,在一些较为简单的协定设计中,长度域不仅仅蕴含音讯的长度,还蕴含其余数据比方版本号、数据类型、数据状态等,这个时候咱们能够应用 lengthAdjustment 进行修改,它的值 = 包体的长度值 - 长度域的值
  • initialBytesToStrip,解码后须要跳过的初始字节数,也就是音讯内容字段的起始地位
  • lengthFieldEndOffset,长度字段完结的偏移量,该属性的值 =lengthFieldOffset+lengthFieldLength
public class CodesMainTest {public static void main( String[] args ) throws Exception {
        EmbeddedChannel channel=new EmbeddedChannel(
                // 解决粘包和半包问题
                new LengthFieldBasedFrameDecoder(1024,
                        9,4,0,0),
                new LoggingHandler(),
                new MessageRecordEncoder(),
                new MessageRecordDecode());
        Header header=new Header();
        header.setSessionId(123456);
        header.setType(OpCode.PING.code());
        MessageRecord record=new MessageRecord();
        record.setBody("Hello World");
        record.setHeader(header);
        channel.writeOutbound(record);

        ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
        new MessageRecordEncoder().encode(null,record,buf);

       //********* 模仿半包和粘包问题 ************//
        // 把一个包通过 slice 拆分成两个局部
        ByteBuf bb1=buf.slice(0,7);
        ByteBuf bb2=buf.slice(7,buf.readableBytes()-7);
        bb1.retain();

        channel.writeInbound(bb1);
        channel.writeInbound(bb2);
    }
}

增加一个长度解码器,就解决了拆包带来的问题。运行后果如下

2021-08-31 16:09:35,115 [com.netty.example.codec.MessageRecordDecode]-[INFO] 序列化进去的后果:MessageRecord(header=Header(sessionId=123456, type=3, length=18), body=Hello World)
2021-08-31 16:09:35,116 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE

基于自定义音讯协定通信

上面咱们把整个通信过程编写残缺,代码构造如图 4 - 2 所示.

图 4 -2

服务端开发

@Slf4j
public class ProtocolServer {public static void main(String[] args){EventLoopGroup boss = new NioEventLoopGroup();
        //2 用于对承受客户端连贯读写操作的线程工作组
        EventLoopGroup work = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(boss, work)    // 绑定两个工作线程组
            .channel(NioServerSocketChannel.class)    // 设置 NIO 的模式
            // 初始化绑定服务通道
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {sc.pipeline()
                        .addLast(
                        new LengthFieldBasedFrameDecoder(1024,
                                                         9,4,0,0))
                        .addLast(new MessageRecordEncoder())
                        .addLast(new MessageRecordDecode())
                        .addLast(new ServerHandler());
                }
            });
        ChannelFuture cf= null;
        try {cf = b.bind(8080).sync();
            log.info("ProtocolServer start success");
            cf.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        }finally {work.shutdownGracefully();
            boss.shutdownGracefully();}
    }
}

ServerHandler

@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {MessageRecord messageRecord=(MessageRecord)msg;
        log.info("server receive message:"+messageRecord);
        MessageRecord res=new MessageRecord();
        Header header=new Header();
        header.setSessionId(messageRecord.getHeader().getSessionId());
        header.setType(OpCode.BUSI_RESP.code());
        String message="Server Response Message!";
        res.setBody(message);
        header.setLength(message.length());
        ctx.writeAndFlush(res);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("服务器读取数据异样");
        super.exceptionCaught(ctx, cause);
        ctx.close();}
}

客户端开发

public class ProtocolClient {public static void main(String[] args) {
        // 创立工作线程组
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,
                                                                           9,4,0,0))
                        .addLast(new MessageRecordEncoder())
                        .addLast(new MessageRecordDecode())
                        .addLast(new ClientHandler());

                }
            });
        // 发动异步连贯操作
        try {ChannelFuture future = b.connect(new InetSocketAddress("localhost", 8080)).sync();
            Channel c = future.channel();
            for (int i = 0; i < 500; i++) {MessageRecord message = new MessageRecord();
                Header header = new Header();
                header.setSessionId(10001);
                header.setType((byte) OpCode.BUSI_REQ.code());
                message.setHeader(header);
                String context="我是申请数据"+i;
                header.setLength(context.length());
                message.setBody(context);
                c.writeAndFlush(message);
            }
            //closeFuture().sync()就是让以后线程 (即主线程) 同步期待 Netty server 的 close 事件,Netty server 的 channel close 后,主线程才会持续往下执行。closeFuture()在 channel close 的时候会告诉以后线程。future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        }finally {group.shutdownGracefully();
        }
    }
}

ClientHandler

@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {MessageRecord record=(MessageRecord)msg;
        log.info("Client Receive message:"+record);
        super.channelRead(ctx, msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);
        ctx.close();}
}

爆文举荐浏览:

正文完
 0