Netty Handler 执行顺序

56次阅读

共计 4863 个字符,预计需要花费 13 分钟才能阅读完成。

构建简单的服务端 / 客户端程序,当服务端收到消息后,经过 Handler 的处理再将新的消息发送给客户端,通过处理的过程和消息的内容来验证执行的顺序。
客户端就一个 Handler,实现功能就是当连接成功后发送消息,接收消息并打印到控制台。
@Sharable
public class FirstClientHandler extends ChannelHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer(“Hello netty “, CharsetUtil.UTF_8));
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(“Client received: ” + ((ByteBuf)msg).toString(CharsetUtil.UTF_8));
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
客户端启动程序
public class Client {
private final String host;
private final int port;

public Client(String host, int port) {
this.host = host;
this.port = port;
}

public void start()
throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new FirstClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args)
throws Exception {
new Client(“127.0.0.1”, 8080).start();
}
}
服务端一个 Handler 肯定时不行的,这样就没有什么顺序不顺序的了,这里添加两个 Handler,FirstServerHandler 和 SecondServerHandler。这两个 Handler 在接到消息后会在后面添加一个字符串,最后把消息发送给客户端。当然它们顺序就如同它们的名字一样了。
FirstServerHandler
@Sharable
public class FirstServerHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
in.writeBytes(“–>FirstServerHandler read–>”.getBytes());
ctx.fireChannelRead(msg);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
ByteBuf in = (ByteBuf) msg;
in.writeBytes(“FirstServerHandler write–>”.getBytes());
super.write(ctx, msg, promise);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
SecondServerHandler
@Sharable
public class SecondServerHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
in.writeBytes(“SecondServerHandler read–>”.getBytes());
ctx.writeAndFlush(msg);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
ByteBuf in = (ByteBuf) msg;
in.writeBytes(“SecondServerHandler write –>”.getBytes());
super.write(ctx, msg, promise);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
Server
public class Server {

public static void main(String[] args) {
final FirstServerHandler firstServerHandler = new FirstServerHandler();
final SecondServerHandler secondServerHandler = new SecondServerHandler();
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(8080))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(firstServerHandler);
ch.pipeline().addLast(secondServerHandler);
}
});
ChannelFuture f = b.bind().sync();
System.out.println(Server.class.getName() +
” started and listening for connections on ” + f.channel().localAddress());
f.channel().closeFuture().sync();
} catch (Exception e) {

} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
先启动 Server 中的 main 方法,然后运行 Client 客户端程序,控制台输出如下:
Client received: Hello netty –> FirstServerHandler read –> SecondServerHandler read –> FirstServerHandler write –>
从客户端接收到的消息来看,服务端在接收到客户端发来的消息 Hello netty 后,先经过了 FirstServerHandler 的处理,在消息后天添加了 –> FirstServerHandler read –>,最后调用 ctx.fireChannelRead(msg) 把消息传递到下一个 Handler,也就是 SecondServerHandler,在消息后面添加了 SecondServerHandler read –>,然后调用 ctx.writeAndFlush(msg) 把消息发送给客户端。
消息经过了 FirstServerHandler 的 write 方法,在后面添加了 FirstServerHandler write –> 最后发送给了客户端,这和客户端输出的内容时一致的。这里有一个疑问就是在调用 ctx.writeAndFlush(msg) 发送的消息没有经过 SecondServerHandler 的 write 方法,而是直接进入了 FirstServerHandler 的 wirte 方法。
那是因为在调用 ChannelHandlerContext 中写消息的方法时只会传递给位于当前 Handler 的小一个能够处理该事件的 Handler。如果调用的是 Channel 上的写消息的方法将会沿着整个管道传播。
我们把 SecondServerHandler 中的 ctx.writeAndFlush(msg) 修改为 ctx.channel().writeAndFlush(msg),客户端输出消息为
Client received: Hello netty –> FirstServerHandler read –> SecondServerHandler read –> SecondServerHandler write –> FirstServerHandler write –>
这样就经过了 SecondServerHandler 的 write。
再修改一下 FirstServerHandler 中的 channelRead 方法,去掉 ctx.fireChannelRead(msg) 添加 ctx.channel().writeAndFlush(msg),这样在读的时候就经过 SecondServerHandler 了,直接写。这样客户端输出为
Client received: Hello netty –> FirstServerHandler read –> SecondServerHandler write –> FirstServerHandler write –>
同样经过了 SecondServerHandler 和 FirstServerHandler 的 write。
通过上面的实验应该能清晰的了解 Handler 的一个执行顺序了。

正文完
 0