关于netty:netty系列之使用POJO替代buf

39次阅读

共计 3808 个字符,预计需要花费 10 分钟才能阅读完成。

简介

在之前的文章中咱们提到了,对于 NioSocketChannel 来说,它不接管最根本的 string 音讯,只接管 ByteBuf 和 FileRegion。然而 ByteBuf 是以二进制的模式进行解决的,对于程序员来说太不直观了,解决起来也比拟麻烦,有没有可能间接解决 java 简略对象呢?本文将会探讨一下这个问题。

decode 和 encode

比方咱们须要间接向 channel 中写入一个字符串,在之前的文章中,咱们晓得这是不能够的,会报上面的谬误:

DefaultChannelPromise@57f5c075(failure: java.lang.UnsupportedOperationException: unsupported message type: String (expected: ByteBuf, FileRegion))

也就说 ChannelPromise 只承受 ByteBuf 和 FileRegion,那么怎么做呢?

既然 ChannelPromise 只承受 ByteBuf 和 FileRegion,那么咱们就须要把 String 对象转换成 ByteBuf 即可。

也就是说在写入 String 之前把 String 转换成 ByteBuf,当要读取数据的时候,再把 ByteBuf 转换成 String。

咱们晓得 ChannelPipeline 中能够增加多个 handler,并且管制这些 handler 的程序。

那么咱们的思路就进去了,在 ChannelPipeline 中增加一个 encode,用于数据写入的是对数据进行编码成 ByteBuf,而后再增加一个 decode, 用于在数据写出的时候对数据进行解码成对应的对象。

encode,decode 是不是很相熟?对了,这就是对象的序列化。

对象序列化

netty 中对象序列化是要把传输的对象和 ByteBuf 间接相互转换,当然咱们能够本人实现这个转换对象。然而 netty 曾经为咱们提供了不便的两个转换类:ObjectEncoder 和 ObjectDecoder。

先看 ObjectEncoder,他的作用就是将对象转换成为 ByteBuf。

这个类很简略,咱们对其剖析一下:

public class ObjectEncoder extends MessageToByteEncoder<Serializable> {private static final byte[] LENGTH_PLACEHOLDER = new byte[4];

    @Override
    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {int startIdx = out.writerIndex();

        ByteBufOutputStream bout = new ByteBufOutputStream(out);
        ObjectOutputStream oout = null;
        try {bout.write(LENGTH_PLACEHOLDER);
            oout = new CompactObjectOutputStream(bout);
            oout.writeObject(msg);
            oout.flush();} finally {if (oout != null) {oout.close();
            } else {bout.close();
            }
        }

        int endIdx = out.writerIndex();

        out.setInt(startIdx, endIdx - startIdx - 4);
    }
}

ObjectEncoder 继承了 MessageToByteEncoder,而 MessageToByteEncoder 又继承了 ChannelOutboundHandlerAdapter。为什么是 OutBound 呢?这是因为咱们是要对写入的对象进行转换,所以是 outbound。

首先应用 ByteBufOutputStream 对 out ByteBuf 进行封装,在 bout 中,首先写入了一个 LENGTH_PLACEHOLDER 字段,用来示意 stream 中中 Byte 的长度。而后用一个 CompactObjectOutputStream 对 bout 进行封装,最初就能够用 CompactObjectOutputStream 写入对象了。

对应的,netty 还有一个 ObjectDecoder 对象,用于将 ByteBuf 转换成对应的对象,ObjectDecoder 继承自 LengthFieldBasedFrameDecoder,实际上他是一个 ByteToMessageDecoder,也是一个 ChannelInboundHandlerAdapter,用来对数据读取进行解决。

咱们看下 ObjectDecoder 中最重要的 decode 办法:

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {return null;}

        ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
        try {return ois.readObject();
        } finally {ois.close();
        }
    }

下面的代码能够看到,将输出的 ByteBuf 转换为 ByteBufInputStream,最初转换成为 CompactObjectInputStream,就能够间接读取对象了。

应用编码和解码器

有了下面两个编码解码器,间接须要将其增加到 client 和 server 端的 ChannelPipeline 中就能够了。

对于 server 端,其外围代码如下:

// 定义 bossGroup 和 workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
                    p.addLast(
                            // 增加 encoder 和 decoder
                            new ObjectEncoder(),
                            new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                            new PojoServerHandler());
                }
             });

            // 绑定端口,并筹备承受连贯
            b.bind(PORT).sync().channel().closeFuture().sync();

同样的,对于 client 端,咱们其外围代码如下:

EventLoopGroup group = new NioEventLoopGroup();
        try {Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
                    p.addLast(
                            // 增加 encoder 和 decoder
                            new ObjectEncoder(),
                            new ObjectDecoder(ClassResolvers.cacheDisabled(null)),
                            new PojoClientHandler());
                }
             });

            // 建设连贯
            b.connect(HOST, PORT).sync().channel().closeFuture().sync();

能够看到下面的逻辑就是将 ObjectEncoder 和 ObjectDecoder 增加到 ChannelPipeline 中即可。

最初,就能够在客户端和浏览器端通过调用:

ctx.write("加油!");

间接写入字符串对象了。

总结

有了 ObjectEncoder 和 ObjectDecoder,咱们就能够不必受限于 ByteBuf 了,程序的灵便水平失去了大幅晋升。

本文的例子能够参考:learn-netty4

本文已收录于 http://www.flydean.com/08-netty-pojo-buf/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

正文完
 0