问题形容
最近须要用netty实现一个中间件通信,开始为了先疾速把客户端和服务端通信的demo实现,只是采纳了字符串的编解码形式(StringEncoder
,StringDecoder
)。客户端和服务端能够失常互发数据,所有运行失常。
然而字符串的编解码并不适宜业务实体类的传输,为了疾速实现实体类传输,所以决定采纳jboss-marshalling-serial
序列化形式先实现demo,然而在客户端发送数据时,服务端却无奈收到数据,客户端控制台也没有任何异样信息。
先看整个demo实现代码,再查找问题起因。(先提前阐明,示例代码是完全正确无逻辑bug的)
pom依赖
`<dependencies> <!--只是用到了外面的日志框架--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.56.Final</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>2.0.10.Final</version> </dependency></dependencies>` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19
jboss-marshalling-serial
序列化工具类
- netty提供的Marshalling编解码器采纳音讯头和音讯体的形式
- JBoss Marshalling是一个Java对象序列化包,对jdk默认的序列化框架进行优化,但又放弃跟Serializable接口的兼容,同时减少了一些可调用的参数和附加的个性
- 通过测试发现序列化后的流较protostuff,MessagePack还是比拟大的,
- 序列化和反序列化的类必须是同一个类,否则抛出异样: io.netty.handler.codec.DecoderException: java.lang.ClassNotFoundException: com.bruce.netty.rpc.entity.UserInfo
`public final class MarshallingCodeFactory { private static final InternalLogger log = InternalLoggerFactory.getInstance(MarshallingCodeFactory.class); /** 创立Jboss marshalling 解码器 */ public static MyMarshallingDecoder buildMarshallingDecoder() { //参数serial示意创立的是Java序列化工厂对象,由jboss-marshalling-serial提供 MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial"); MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration); return new MyMarshallingDecoder(provider, 1024); } /** 创立Jboss marshalling 编码器 */ public static MarshallingEncoder buildMarshallingEncoder() { MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial"); MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration); return new MarshallingEncoder(provider); } public static class MyMarshallingDecoder extends MarshallingDecoder { public MyMarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) { super(provider, maxObjectSize); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { log.info("读取数据长度:{}", in.readableBytes()); return super.decode(ctx, in); } }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34
服务端代码实现
服务端业务处理器:(实在场景中不要在io线程执行耗时业务逻辑解决)
`@ChannelHandler.Sharablepublic class SimpleServerHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded" + this.hashCode()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("server channelRead:{}", msg); ctx.channel().writeAndFlush("hello netty"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof java.io.IOException) { log.warn("client close"); } else { cause.printStackTrace(); } }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24
服务端启动类
`public class NettyServer { private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyServer.class); public static void main(String[] args) throws Exception { EventLoopGroup acceptGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); Class<? extends ServerSocketChannel> serverSocketChannelClass = NioServerSocketChannel.class; ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(acceptGroup, workerGroup) .channel(serverSocketChannelClass) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, false) //默认为false .handler(new LoggingHandler()) .childHandler(new CustomCodecChannelInitializer()); try { //sync() 将异步变为同步,绑定到8088端口 ChannelFuture channelFuture = bootstrap.bind(8088).sync(); log.info("server 启动胜利"); } catch (Exception e) { e.printStackTrace(); } Thread serverShutdown = new Thread(() -> { log.info("执行jvm ShutdownHook, server shutdown"); acceptGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }); //注册jvm ShutdownHook,jvm退出之前敞开服务资源 Runtime.getRuntime().addShutdownHook(serverShutdown); } static class CustomCodecChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder()); pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder()); pipeline.addLast(new SimpleServerHandler()); } }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34* 35* 36* 37* 38* 39* 40* 41* 42* 43
客户端代码实现
客户端业务处理器
`public class SimpleClientHandler extends ChannelInboundHandlerAdapter { private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("client receive:{}", msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13
客户端启动类
`public class NettyClient { private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class); public static void main(String[] args) { EventLoopGroup workerGroup = new NioEventLoopGroup(); Class<? extends SocketChannel> socketChannelClass = NioSocketChannel.class; Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(socketChannelClass) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000) .handler(new CustomCodecChannelInitializer()); Channel clientChannel; try { ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088); //同步期待连贯建设胜利, 这里示例代码, 能够认为是肯定会连贯胜利 boolean b = channelFuture.awaitUninterruptibly(10, TimeUnit.SECONDS); clientChannel = channelFuture.channel(); for (int i = 0; i < 10; i++) { Thread.sleep(1000); UserInfo userInfo = new UserInfo("bruce", 18); log.info("send user info"); //连贯胜利后发送数据 send(clientChannel, userInfo); } //实际上这个中央会永远阻塞期待 clientChannel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } static void send(Channel channel, UserInfo data) { //连贯胜利后发送数据 ChannelFuture channelFuture1 = channel.writeAndFlush(data); } static class CustomCodecChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder()); pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder()); pipeline.addLast(new SimpleClientHandler()); } }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17* 18* 19* 20* 21* 22* 23* 24* 25* 26* 27* 28* 29* 30* 31* 32* 33* 34* 35* 36* 37* 38* 39* 40* 41* 42* 43* 44* 45* 46* 47* 48* 49* 50* 51* 52* 53* 54
实体类UserInfo,
`public class UserInfo { private String username; private int age; public UserInfo() { } public UserInfo(String username, int age) { this.username = username; this.age = age; } //getter / setter 省略}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13
先启动服务端,再启动客户端能够在idea控制台发现:
服务端和客户端建设了连贯,客户端在发送数据,但 是 服 务 端 却 没 有 收 到 , 并 且 控 制 台 没 有 任 何 异 常 信 息 color{#FF3030}{然而服务端却没有收到,并且控制台没有任何异样信息}然而服务端却没有收到,并且控制台没有任何异样信息
既然没有异样,只能先在客户端断点,确认客户端是否失常,依据教训间接查看MarshallingEncoder
的编码方法MarshallingEncoder#encode
,debug执行先确认UserInfo对象有没有被正确序列化。
在执行到marshaller.writeObject(msg)时呈现了异样。
持续跟进断点会进入catch中,显示java.io.NotSerializableException
,(脑中呈现一句话:我粗心了,没有…)曾经能够晓得UserInfo类没有继承序列化接口java.io.Serializable
而抛出异样。UserInfo只须要继承java.io.Serializable就能够失常向客户端发送数据。
然而为什么控制台没有抛出异样呢 !?
持续跟进断点NotSerializableException被包装在io.netty.handler.codec.EncoderException
中抛出,序列化的buf也在finally中被开释。而EncoderException会被AbstractChannelHandlerContext#invokeWrite0
办法的catch语句中被解决。
`private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); }} private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) { // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return // false. PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12
最终会执行到io.netty.util.concurrent.DefaultPromise#setValue0
,次要目标就是为了记录这个异样信息,而后查看是否有GenericFutureListener
监听这次发送申请的后果。如果有Listener则在nio线程中回调监听器办法。
`private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true; } return false;}private synchronized boolean checkNotifyWaiters() { if (waiters > 0) { notifyAll(); } return listeners != null;}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16* 17
然而笔者的示例中并没有设置GenericFutureListener,checkNotifyWaiters
办法返回的是false,不会执行notifyListeners();
办法,所以整个异样被淹没。而Promise#tryFailure
办法最终返回true。
再看办法io.netty.util.internal.PromiseNotificationUtil#tryFailure
,尽管也是会解决Throwable,然而只在Promise#tryFailure返回false并且logger不为null时执行。所以这里也不会打印出日志。
`public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) { if (!p.tryFailure(cause) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause); } else if (logger.isWarnEnabled()) { logger.warn( "Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}", p, ThrowableUtil.stackTraceToString(err), cause); } }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12
如何打印出这些异样信息呢?
计划1 (异步解决)
在数据发送过后,给ChannelFuture
增加监听器,用于监听此次发送的后果,当出现异常时,对异样进行解决。
`static void send(Channel channel, UserInfo data) { //连贯胜利后发送数据 ChannelFuture channelFuture1 = channel.writeAndFlush(data); channelFuture1.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { cause.printStackTrace(); } } });}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13
计划2 (不举荐,依据业务决定)
在数据发送过后,同步期待发送后果,判断是否存在异样。
`static void send(Channel channel, UserInfo data) { //连贯胜利后发送数据 ChannelFuture channelFuture1 = channel.writeAndFlush(data); while (!channelFuture1.isDone()) { try { //超时工夫示例值,依据业务决定 boolean notTimeout = channelFuture1.await(50); } catch (Exception e) { log.warn(e.getMessage()); } } Throwable cause = channelFuture1.cause(); if (cause != null) { cause.printStackTrace(); }}` * 1* 2* 3* 4* 5* 6* 7* 8* 9* 10* 11* 12* 13* 14* 15* 16
没有监听ChannelFuture,异样就被暗藏是否正当呢?
这个问题见仁见智,对笔者有点代码洁癖来说,这里至多是可有优化一下的,不至于让开发者消耗工夫去查找失落的异样信息。优化逻辑也简略,在io.netty.util.concurrent.DefaultPromise#setFailure0
中,如果既没有listeners
也没有await
期待时,则打印异样信息。
修DefaultPromise
改代码如下:
private boolean setFailure0(Throwable cause) { if (listeners == null && waiters == 0) { logger.error("cause:", cause); } return setValue0(new CauseHolder(checkNotNull(cause, "cause")));}