关于netty:netty系列之自定义编码解码器

35次阅读

共计 3560 个字符,预计需要花费 9 分钟才能阅读完成。

简介

在之前的 netty 系列文章中,咱们讲到了如何将对象或者 String 转换成为 ByteBuf,通过应用 netty 自带的 encoder 和 decoder 能够实现十分不便的对象和 ByteBuf 之间的转换,而后就能够向 channel 中随便写入对象和字符串了。

应用 netty 自带的编码器当然很好,然而如果你有些非凡的需要,比方心愿在编码的过程中对数据进行变换,或者对对象的字段进行抉择,那么可能就须要自定义编码解码器了。

自定义编码器

自定义编码器须要继承 MessageToByteEncoder 类,并实现 encode 办法,在该办法中写入具体的编码逻辑。

本例咱们心愿计算 2 的 N 次方,据说将一张纸折叠 100 次能够达到地球到月亮的高度,这么大的数据一般的 number 必定是装不下的,咱们将会应用 BigInteger 来对这个微小的数字进行保留。

那么对于被编码器来说,则须要将这个 BigInteger 转换成为 byte 数组。同时在 byte 数组读取的过程中,咱们须要界定到底哪些 byte 数据是属于同一个 BigInteger 的,这就须要对写入的数据格式做一个约定。

这里咱们应用三局部的数据结构来示意一个 BigInteger。第一局部是一个 magic word 也就是魔法词,这里咱们应用魔法词“N”,当读取到这个魔法词就示意接下来的数字是 BigInteger。第二局部是示意 bigInteger 数字的 byte 数组的长度,获取到这个长度值,就能够读取到所有的 byte 数组值,最初将其转换成为 BigInteger。

因为 BigInteger 是 Number 的子类,为了更加泛化编码器,咱们应用 Number 作为 MessageToByteEncoder 的泛型,外围编码代码如下:

 protected void encode(ChannelHandlerContext ctx, Number msg, ByteBuf out) {
        // 将 number 编码成为 ByteBuf
        BigInteger v;
        if (msg instanceof BigInteger) {v = (BigInteger) msg;
        } else {v = new BigInteger(String.valueOf(msg));
        }

        // 将 BigInteger 转换成为 byte[] 数组
        byte[] data = v.toByteArray();
        int dataLength = data.length;

        // 将 Number 进行编码
        out.writeByte((byte) 'N'); // 魔法词
        out.writeInt(dataLength);  // 数组长度
        out.writeBytes(data);      // 最终的数据
    }

自定义解码器

有了编码之后的 byte 数组,就能够在解码器中对其解码了。

上一节介绍了,编码过后的数据格式是魔法词 N + 数组长度 + 真正的数据。

其中魔法词长度是一个字节,数组长度是四个字节,后面局部总共是 5 个字节。所以在解码的时候,首先判断 ByteBuf 中可读字节的长度是否小于 5,如果小于 5 阐明数据是有效的,能够间接 return。

如果可读字节的长度大于 5,则示意数据是无效的,能够进行数据的解码了。

解码过程中须要留神的是,并不是所有的数据都是咱们所心愿的格局,如果在读取的过程中读到了咱们不意识的格局,那么阐明这个数据并不是咱们想要的,则能够交由其余的 handler 进行解决。

然而对于 ByteBuf 来说,一旦调用 read 办法,就会导致 reader index 挪动地位,所以在真正的读取数据之前须要调用 ByteBuf 的 markReaderIndex 办法,对 readerIndex 进行记录。而后别离读取魔法词、数组长度和残余的数据,最初将数据转换成为 BigInteger,如下所示:

 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 保障魔法词和数组长度无效
        if (in.readableBytes() < 5) {return;}
        in.markReaderIndex();
        // 查看魔法词
        int magicNumber = in.readUnsignedByte();
        if (magicNumber != 'N') {in.resetReaderIndex();
            throw new CorruptedFrameException("有效的魔法词:" + magicNumber);
        }
        // 读取所有的数据
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {in.resetReaderIndex();
            return;
        }
        // 将剩下的数据转换成为 BigInteger
        byte[] decoded = new byte[dataLength];
        in.readBytes(decoded);
        out.add(new BigInteger(decoded));
    }

增加编码解码器到 pipeline

有了两个编码解码器,还须要将其增加到 pipeline 中进行调用。

在实现 ChannelInitializer 中的 initChannel 中,能够对 ChannelPipeline 进行初始化,本例中的初始化代码如下:

// 对流进行压缩
        pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
        pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));

        // 增加 number 编码解码器
        pipeline.addLast(new NumberDecoder());
        pipeline.addLast(new NumberEncoder());

        // 增加业务解决逻辑
        pipeline.addLast(new CustomProtocolServerHandler());

其中最初一行是真正的业务解决逻辑,NumberDecoder 和 NumberEncoder 是编码和解码器。这里咱们还应用了一个 ZlibEncoder 用于对流数据进行压缩,这里应用的压缩形式是 GZIP。

压缩的益处就是能够缩小数据传输的数量,晋升传输效率。其本质也是一个编码解码器。

计算 2 的 N 次方

计算 2 的 N 次方的逻辑是这样的,首先客户端发送 2 给服务器端,服务器端接管到该音讯和后果 1 相乘,并将后果写回给客户端,客户端收到音讯之后再发送 2 给服务器端,服务器端将上次的计算结果乘以 2,再发送给客户端,以此类推直到执行 N 次。

首先看下客户端的发送逻辑:

// 最大计算 2 的 1000 次方
        ChannelFuture future = null;
        for (int i = 0; i < 1000 && next <= CustomProtocolClient.COUNT; i++) {future = ctx.write(2);
            next++;
        }

当 next 小于等于要计算的 COUNT 时,就将 2 写入到 channel 中。

对于服务器来说,在 channelRead0 办法中,读取音讯,并将其和后果相乘,再把后果写回给客户端。

    public void channelRead0(ChannelHandlerContext ctx, BigInteger msg) throws Exception {
        // 将接管到的 msg 乘以 2,而后返回给客户端
        count++;
        result = result.multiply(msg);
        ctx.writeAndFlush(result);
    }

客户端统计读取到的音讯个数,如果音讯个数 =COUNT,阐明计算结束,就能够将后果保存起来供后续应用,其外围代码如下:

    public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
        receivedMessages ++;
        if (receivedMessages == CustomProtocolClient.COUNT) {
            // 计算结束,将后果放入 answer 中
            ctx.channel().close().addListener(future -> {boolean offered = answer.offer(msg);
                assert offered;
            });
        }
    }

总结

本文实现了一个 Number 的编码解码器,事实上你能够自定义实现任何对象的编码解码器。

本文的例子能够参考:learn-netty4

本文已收录于 http://www.flydean.com/13-netty-customprotocol/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

正文完
 0