问题形容

    最近须要用netty实现一个中间件通信,开始为了先疾速把客户端和服务端通信的demo实现,只是采纳了字符串的编解码形式(StringEncoder,StringDecoder)。客户端和服务端能够失常互发数据,所有运行失常。
    然而字符串的编解码并不适宜业务实体类的传输,为了疾速实现实体类传输,所以决定采纳jboss-marshalling-serial序列化形式先实现demo,然而在客户端发送数据时,服务端却无奈收到数据,客户端控制台也没有任何异样信息。
    先看整个demo实现代码,再查找问题起因。(先提前阐明,示例代码是完全正确无逻辑bug的)

pom依赖

`<dependencies>    <!--只是用到了外面的日志框架-->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter</artifactId>    </dependency>    <dependency>        <groupId>io.netty</groupId>        <artifactId>netty-all</artifactId>        <version>4.1.56.Final</version>    </dependency>    <dependency>        <groupId>org.jboss.marshalling</groupId>        <artifactId>jboss-marshalling-serial</artifactId>        <version>2.0.10.Final</version>    </dependency></dependencies>` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13*   14*   15*   16*   17*   18*   19

jboss-marshalling-serial序列化工具类

  • netty提供的Marshalling编解码器采纳音讯头和音讯体的形式
  • JBoss Marshalling是一个Java对象序列化包,对jdk默认的序列化框架进行优化,但又放弃跟Serializable接口的兼容,同时减少了一些可调用的参数和附加的个性
  • 通过测试发现序列化后的流较protostuff,MessagePack还是比拟大的,
  • 序列化和反序列化的类必须是同一个类,否则抛出异样: io.netty.handler.codec.DecoderException: java.lang.ClassNotFoundException: com.bruce.netty.rpc.entity.UserInfo
`public final class MarshallingCodeFactory {    private static final InternalLogger log = InternalLoggerFactory.getInstance(MarshallingCodeFactory.class);    /** 创立Jboss marshalling 解码器 */    public static MyMarshallingDecoder buildMarshallingDecoder() {        //参数serial示意创立的是Java序列化工厂对象,由jboss-marshalling-serial提供        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");        MarshallingConfiguration configuration = new MarshallingConfiguration();        configuration.setVersion(5);        DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);        return new MyMarshallingDecoder(provider, 1024);    }    /** 创立Jboss marshalling 编码器 */    public static MarshallingEncoder buildMarshallingEncoder() {        MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");        MarshallingConfiguration configuration = new MarshallingConfiguration();        configuration.setVersion(5);        DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);        return new MarshallingEncoder(provider);    }    public static class MyMarshallingDecoder extends MarshallingDecoder {        public MyMarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) {            super(provider, maxObjectSize);        }        @Override        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {            log.info("读取数据长度:{}", in.readableBytes());            return super.decode(ctx, in);        }    }}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13*   14*   15*   16*   17*   18*   19*   20*   21*   22*   23*   24*   25*   26*   27*   28*   29*   30*   31*   32*   33*   34

服务端代码实现

服务端业务处理器:(实在场景中不要在io线程执行耗时业务逻辑解决)

`@ChannelHandler.Sharablepublic class SimpleServerHandler extends ChannelInboundHandlerAdapter {    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class);    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        log.info("handlerAdded" + this.hashCode());    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        log.info("server channelRead:{}", msg);        ctx.channel().writeAndFlush("hello netty");    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        if (cause instanceof java.io.IOException) {            log.warn("client close");        } else {            cause.printStackTrace();        }    }}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13*   14*   15*   16*   17*   18*   19*   20*   21*   22*   23*   24

服务端启动类

`public class NettyServer {    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyServer.class);    public static void main(String[] args) throws Exception {        EventLoopGroup acceptGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup =  new NioEventLoopGroup();        Class<? extends ServerSocketChannel> serverSocketChannelClass = NioServerSocketChannel.class;        ServerBootstrap bootstrap = new ServerBootstrap();        bootstrap.group(acceptGroup, workerGroup)                .channel(serverSocketChannelClass)                .option(ChannelOption.SO_BACKLOG, 128)                .option(ChannelOption.SO_REUSEADDR, true)                .childOption(ChannelOption.SO_KEEPALIVE, false) //默认为false                .handler(new LoggingHandler())                .childHandler(new CustomCodecChannelInitializer());        try {            //sync() 将异步变为同步,绑定到8088端口            ChannelFuture channelFuture = bootstrap.bind(8088).sync();            log.info("server 启动胜利");        } catch (Exception e) {            e.printStackTrace();        }                Thread serverShutdown = new Thread(() -> {            log.info("执行jvm ShutdownHook, server shutdown");            acceptGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        });        //注册jvm ShutdownHook,jvm退出之前敞开服务资源        Runtime.getRuntime().addShutdownHook(serverShutdown);    }    static class CustomCodecChannelInitializer extends ChannelInitializer<SocketChannel> {        @Override        protected void initChannel(SocketChannel ch) throws Exception {            ChannelPipeline pipeline = ch.pipeline();            pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());            pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());            pipeline.addLast(new SimpleServerHandler());        }    }}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13*   14*   15*   16*   17*   18*   19*   20*   21*   22*   23*   24*   25*   26*   27*   28*   29*   30*   31*   32*   33*   34*   35*   36*   37*   38*   39*   40*   41*   42*   43

客户端代码实现

客户端业务处理器

`public class SimpleClientHandler extends ChannelInboundHandlerAdapter {    private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        log.info("client receive:{}", msg);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();    }}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13

客户端启动类

`public class NettyClient {    private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);    public static void main(String[] args) {        EventLoopGroup workerGroup = new NioEventLoopGroup();        Class<? extends SocketChannel> socketChannelClass = NioSocketChannel.class;        Bootstrap bootstrap = new Bootstrap();        bootstrap.group(workerGroup)                .channel(socketChannelClass)                .option(ChannelOption.TCP_NODELAY, true)                .option(ChannelOption.SO_KEEPALIVE, false)                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)                .handler(new CustomCodecChannelInitializer());        Channel clientChannel;        try {            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);            //同步期待连贯建设胜利, 这里示例代码, 能够认为是肯定会连贯胜利            boolean b = channelFuture.awaitUninterruptibly(10, TimeUnit.SECONDS);            clientChannel = channelFuture.channel();            for (int i = 0; i < 10; i++) {                Thread.sleep(1000);                UserInfo userInfo = new UserInfo("bruce", 18);                log.info("send user info");                //连贯胜利后发送数据                send(clientChannel, userInfo);            }            //实际上这个中央会永远阻塞期待            clientChannel.closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            workerGroup.shutdownGracefully();        }    }    static void send(Channel channel, UserInfo data) {        //连贯胜利后发送数据        ChannelFuture channelFuture1 = channel.writeAndFlush(data);    }    static class CustomCodecChannelInitializer extends ChannelInitializer<SocketChannel> {        @Override        protected void initChannel(SocketChannel ch) throws Exception {            ChannelPipeline pipeline = ch.pipeline();            pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());            pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());            pipeline.addLast(new SimpleClientHandler());        }    }}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13*   14*   15*   16*   17*   18*   19*   20*   21*   22*   23*   24*   25*   26*   27*   28*   29*   30*   31*   32*   33*   34*   35*   36*   37*   38*   39*   40*   41*   42*   43*   44*   45*   46*   47*   48*   49*   50*   51*   52*   53*   54

实体类UserInfo,

`public class UserInfo {    private String username;    private int age;    public UserInfo() {    }    public UserInfo(String username, int age) {        this.username = username;        this.age = age;    }    //getter / setter 省略}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13

先启动服务端,再启动客户端能够在idea控制台发现:

服务端和客户端建设了连贯,客户端在发送数据,但 是 服 务 端 却 没 有 收 到 , 并 且 控 制 台 没 有 任 何 异 常 信 息 color{#FF3030}{然而服务端却没有收到,并且控制台没有任何异样信息}然而服务端却没有收到,并且控制台没有任何异样信息


既然没有异样,只能先在客户端断点,确认客户端是否失常,依据教训间接查看MarshallingEncoder的编码方法MarshallingEncoder#encode,debug执行先确认UserInfo对象有没有被正确序列化。

在执行到marshaller.writeObject(msg)时呈现了异样。

持续跟进断点会进入catch中,显示java.io.NotSerializableException,(脑中呈现一句话:我粗心了,没有…)曾经能够晓得UserInfo类没有继承序列化接口java.io.Serializable而抛出异样。UserInfo只须要继承java.io.Serializable就能够失常向客户端发送数据。

然而为什么控制台没有抛出异样呢 !?

持续跟进断点NotSerializableException被包装在io.netty.handler.codec.EncoderException中抛出,序列化的buf也在finally中被开释。而EncoderException会被AbstractChannelHandlerContext#invokeWrite0办法的catch语句中被解决。

`private void invokeWrite0(Object msg, ChannelPromise promise) {    try {        ((ChannelOutboundHandler) handler()).write(this, msg, promise);    } catch (Throwable t) {        notifyOutboundHandlerException(t, promise);    }} private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {    // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return    // false.    PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12

最终会执行到io.netty.util.concurrent.DefaultPromise#setValue0,次要目标就是为了记录这个异样信息,而后查看是否有GenericFutureListener监听这次发送申请的后果。如果有Listener则在nio线程中回调监听器办法。

`private boolean setValue0(Object objResult) {    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {        if (checkNotifyWaiters()) {            notifyListeners();        }        return true;    }    return false;}private synchronized boolean checkNotifyWaiters() {    if (waiters > 0) {        notifyAll();    }    return listeners != null;}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13*   14*   15*   16*   17

然而笔者的示例中并没有设置GenericFutureListener,checkNotifyWaiters办法返回的是false,不会执行notifyListeners();办法,所以整个异样被淹没。而Promise#tryFailure办法最终返回true。
再看办法io.netty.util.internal.PromiseNotificationUtil#tryFailure,尽管也是会解决Throwable,然而只在Promise#tryFailure返回false并且logger不为null时执行。所以这里也不会打印出日志

`public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) {    if (!p.tryFailure(cause) && logger != null) {        Throwable err = p.cause();        if (err == null) {            logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause);        } else if (logger.isWarnEnabled()) {            logger.warn(                    "Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}",                    p, ThrowableUtil.stackTraceToString(err), cause);        }    }}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12

如何打印出这些异样信息呢?

计划1 (异步解决)
在数据发送过后,给ChannelFuture增加监听器,用于监听此次发送的后果,当出现异常时,对异样进行解决。

`static void send(Channel channel, UserInfo data) {    //连贯胜利后发送数据    ChannelFuture channelFuture1 = channel.writeAndFlush(data);    channelFuture1.addListener(new ChannelFutureListener() {        @Override        public void operationComplete(ChannelFuture future) throws Exception {            Throwable cause = future.cause();            if (cause != null) {                cause.printStackTrace();            }        }    });}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13

计划2 (不举荐,依据业务决定)
在数据发送过后,同步期待发送后果,判断是否存在异样。

`static void send(Channel channel, UserInfo data) {    //连贯胜利后发送数据    ChannelFuture channelFuture1 = channel.writeAndFlush(data);    while (!channelFuture1.isDone()) {        try {            //超时工夫示例值,依据业务决定            boolean notTimeout = channelFuture1.await(50);        } catch (Exception e) {            log.warn(e.getMessage());        }    }    Throwable cause = channelFuture1.cause();    if (cause != null) {        cause.printStackTrace();    }}` *   1*   2*   3*   4*   5*   6*   7*   8*   9*   10*   11*   12*   13*   14*   15*   16

没有监听ChannelFuture,异样就被暗藏是否正当呢?

这个问题见仁见智,对笔者有点代码洁癖来说,这里至多是可有优化一下的,不至于让开发者消耗工夫去查找失落的异样信息。优化逻辑也简略,在io.netty.util.concurrent.DefaultPromise#setFailure0中,如果既没有listeners也没有await期待时,则打印异样信息。
DefaultPromise改代码如下:

private boolean setFailure0(Throwable cause) {    if (listeners == null && waiters == 0) {        logger.error("cause:", cause);    }    return setValue0(new CauseHolder(checkNotNull(cause, "cause")));}