问题形容
最近须要用 netty 实现一个中间件通信,开始为了先疾速把客户端和服务端通信的 demo 实现,只是采纳了字符串的编解码形式 (StringEncoder
,StringDecoder
)。客户端和服务端能够失常互发数据,所有运行失常。
然而字符串的编解码并不适宜业务实体类的传输,为了疾速实现实体类传输,所以决定采纳jboss-marshalling-serial
序列化形式先实现 demo,然而在客户端发送数据时,服务端却无奈收到数据,客户端控制台也没有任何异样信息。
先看整个 demo 实现代码,再查找问题起因。(先提前阐明,示例代码是完全正确无逻辑 bug 的)
pom 依赖
`<dependencies>
<!-- 只是用到了外面的日志框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.56.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.10.Final</version>
</dependency>
</dependencies>`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
jboss-marshalling-serial
序列化工具类
- netty 提供的 Marshalling 编解码器采纳音讯头和音讯体的形式
- JBoss Marshalling 是一个 Java 对象序列化包, 对 jdk 默认的序列化框架进行优化,但又放弃跟 Serializable 接口的兼容, 同时减少了一些可调用的参数和附加的个性
- 通过测试发现序列化后的流较 protostuff,MessagePack 还是比拟大的,
- 序列化和反序列化的类必须是同一个类, 否则抛出异样: io.netty.handler.codec.DecoderException: java.lang.ClassNotFoundException: com.bruce.netty.rpc.entity.UserInfo
`public final class MarshallingCodeFactory {private static final InternalLogger log = InternalLoggerFactory.getInstance(MarshallingCodeFactory.class);
/** 创立 Jboss marshalling 解码器 */
public static MyMarshallingDecoder buildMarshallingDecoder() {
// 参数 serial 示意创立的是 Java 序列化工厂对象, 由 jboss-marshalling-serial 提供
MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
DefaultUnmarshallerProvider provider = new DefaultUnmarshallerProvider(factory, configuration);
return new MyMarshallingDecoder(provider, 1024);
}
/** 创立 Jboss marshalling 编码器 */
public static MarshallingEncoder buildMarshallingEncoder() {MarshallerFactory factory = Marshalling.getProvidedMarshallerFactory("serial");
MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
DefaultMarshallerProvider provider = new DefaultMarshallerProvider(factory, configuration);
return new MarshallingEncoder(provider);
}
public static class MyMarshallingDecoder extends MarshallingDecoder {public MyMarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize) {super(provider, maxObjectSize);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {log.info("读取数据长度:{}", in.readableBytes());
return super.decode(ctx, in);
}
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
服务端代码实现
服务端业务处理器:(实在场景中不要在 io 线程执行耗时业务逻辑解决)
`@ChannelHandler.Sharable
public class SimpleServerHandler extends ChannelInboundHandlerAdapter {private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleServerHandler.class);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded" + this.hashCode());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("server channelRead:{}", msg);
ctx.channel().writeAndFlush("hello netty");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof java.io.IOException) {log.warn("client close");
} else {cause.printStackTrace();
}
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
服务端启动类
`public class NettyServer {private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyServer.class);
public static void main(String[] args) throws Exception {EventLoopGroup acceptGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Class<? extends ServerSocketChannel> serverSocketChannelClass = NioServerSocketChannel.class;
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(acceptGroup, workerGroup)
.channel(serverSocketChannelClass)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, false) // 默认为 false
.handler(new LoggingHandler())
.childHandler(new CustomCodecChannelInitializer());
try {//sync() 将异步变为同步, 绑定到 8088 端口
ChannelFuture channelFuture = bootstrap.bind(8088).sync();
log.info("server 启动胜利");
} catch (Exception e) {e.printStackTrace();
}
Thread serverShutdown = new Thread(() -> {log.info("执行 jvm ShutdownHook, server shutdown");
acceptGroup.shutdownGracefully();
workerGroup.shutdownGracefully();});
// 注册 jvm ShutdownHook,jvm 退出之前敞开服务资源
Runtime.getRuntime().addShutdownHook(serverShutdown);
}
static class CustomCodecChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
pipeline.addLast(new SimpleServerHandler());
}
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
客户端代码实现
客户端业务处理器
`public class SimpleClientHandler extends ChannelInboundHandlerAdapter {private static final InternalLogger log = InternalLoggerFactory.getInstance(SimpleClientHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("client receive:{}", msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
客户端启动类
`public class NettyClient {private static final InternalLogger log = InternalLoggerFactory.getInstance(NettyClient.class);
public static void main(String[] args) {EventLoopGroup workerGroup = new NioEventLoopGroup();
Class<? extends SocketChannel> socketChannelClass = NioSocketChannel.class;
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(socketChannelClass)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)
.handler(new CustomCodecChannelInitializer());
Channel clientChannel;
try {ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8088);
// 同步期待连贯建设胜利, 这里示例代码, 能够认为是肯定会连贯胜利
boolean b = channelFuture.awaitUninterruptibly(10, TimeUnit.SECONDS);
clientChannel = channelFuture.channel();
for (int i = 0; i < 10; i++) {Thread.sleep(1000);
UserInfo userInfo = new UserInfo("bruce", 18);
log.info("send user info");
// 连贯胜利后发送数据
send(clientChannel, userInfo);
}
// 实际上这个中央会永远阻塞期待
clientChannel.closeFuture().sync();
} catch (InterruptedException e) {e.printStackTrace();
} finally {workerGroup.shutdownGracefully();
}
}
static void send(Channel channel, UserInfo data) {
// 连贯胜利后发送数据
ChannelFuture channelFuture1 = channel.writeAndFlush(data);
}
static class CustomCodecChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
pipeline.addLast(new SimpleClientHandler());
}
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
* 18
* 19
* 20
* 21
* 22
* 23
* 24
* 25
* 26
* 27
* 28
* 29
* 30
* 31
* 32
* 33
* 34
* 35
* 36
* 37
* 38
* 39
* 40
* 41
* 42
* 43
* 44
* 45
* 46
* 47
* 48
* 49
* 50
* 51
* 52
* 53
* 54
实体类 UserInfo,
`public class UserInfo {
private String username;
private int age;
public UserInfo() {}
public UserInfo(String username, int age) {
this.username = username;
this.age = age;
}
//getter / setter 省略
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
先启动服务端,再启动客户端能够在 idea 控制台发现:
服务端和客户端建设了连贯,客户端在发送数据,但 是 服 务 端 却 没 有 收 到,并 且 控 制 台 没 有 任 何 异 常 信 息 color{#FF3030}{然而服务端却没有收到,并且控制台没有任何异样信息}然而服务端却没有收到,并且控制台没有任何异样信息
既然没有异样,只能先在客户端断点,确认客户端是否失常,依据教训间接查看 MarshallingEncoder
的编码方法 MarshallingEncoder#encode
,debug 执行先确认 UserInfo 对象有没有被正确序列化。
在执行到 marshaller.writeObject(msg)时呈现了异样。
持续跟进断点会进入 catch 中,显示 java.io.NotSerializableException
,(脑中呈现一句话:我粗心了,没有…)曾经能够晓得 UserInfo 类没有继承序列化接口java.io.Serializable
而抛出异样。UserInfo 只须要继承 java.io.Serializable 就能够失常向客户端发送数据。
然而为什么控制台没有抛出异样呢!?
持续跟进断点 NotSerializableException 被包装在 io.netty.handler.codec.EncoderException
中抛出,序列化的 buf 也在 finally 中被开释。而 EncoderException 会被 AbstractChannelHandlerContext#invokeWrite0
办法的 catch 语句中被解决。
`private void invokeWrite0(Object msg, ChannelPromise promise) {
try {((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {notifyOutboundHandlerException(t, promise);
}
}
private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {// Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
// false.
PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
最终会执行到 io.netty.util.concurrent.DefaultPromise#setValue0
,次要目标就是为了记录这个异样信息,而后查看是否有GenericFutureListener
监听这次发送申请的后果。如果有 Listener 则在 nio 线程中回调监听器办法。
`private boolean setValue0(Object objResult) {if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {if (checkNotifyWaiters()) {notifyListeners();
}
return true;
}
return false;
}
private synchronized boolean checkNotifyWaiters() {if (waiters > 0) {notifyAll();
}
return listeners != null;
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
* 17
然而笔者的示例中并没有设置 GenericFutureListener,checkNotifyWaiters
办法返回的是 false,不会执行 notifyListeners();
办法,所以整个异样被淹没 。而Promise#tryFailure
办法最终返回 true。
再看办法 io.netty.util.internal.PromiseNotificationUtil#tryFailure
,尽管也是会解决 Throwable,然而只在 Promise#tryFailure 返回 false 并且 logger 不为 null 时执行。 所以这里也不会打印出日志。
`public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) {if (!p.tryFailure(cause) && logger != null) {Throwable err = p.cause();
if (err == null) {logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause);
} else if (logger.isWarnEnabled()) {
logger.warn("Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}",
p, ThrowableUtil.stackTraceToString(err), cause);
}
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
如何打印出这些异样信息呢?
计划 1 (异步解决)
在数据发送过后,给 ChannelFuture
增加监听器,用于监听此次发送的后果,当出现异常时,对异样进行解决。
`static void send(Channel channel, UserInfo data) {
// 连贯胜利后发送数据
ChannelFuture channelFuture1 = channel.writeAndFlush(data);
channelFuture1.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();
if (cause != null) {cause.printStackTrace();
}
}
});
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
计划 2(不举荐,依据业务决定)
在数据发送过后,同步期待 发送后果,判断是否存在异样。
`static void send(Channel channel, UserInfo data) {
// 连贯胜利后发送数据
ChannelFuture channelFuture1 = channel.writeAndFlush(data);
while (!channelFuture1.isDone()) {
try {
// 超时工夫示例值,依据业务决定
boolean notTimeout = channelFuture1.await(50);
} catch (Exception e) {log.warn(e.getMessage());
}
}
Throwable cause = channelFuture1.cause();
if (cause != null) {cause.printStackTrace();
}
}`
* 1
* 2
* 3
* 4
* 5
* 6
* 7
* 8
* 9
* 10
* 11
* 12
* 13
* 14
* 15
* 16
没有监听 ChannelFuture,异样就被暗藏是否正当呢?
这个问题见仁见智,对笔者有点代码洁癖来说,这里至多是可有优化一下的,不至于让开发者消耗工夫去查找失落的异样信息。优化逻辑也简略,在 io.netty.util.concurrent.DefaultPromise#setFailure0
中,如果既没有 listeners
也没有 await
期待时,则打印异样信息。
修 DefaultPromise
改代码如下:
private boolean setFailure0(Throwable cause) {if (listeners == null && waiters == 0) {logger.error("cause:", cause);
}
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}