共计 17958 个字符,预计需要花费 45 分钟才能阅读完成。
作者:京东物流 王奕龙
Netty 是一个异步基于 事件驱动 的高性能网络通信 框架,能够看做是对 NIO 和 BIO 的封装,并提供了简略易用的 API、Handler 和工具类等,用以疾速开发高性能、高可靠性的网络服务端和客户端程序。
1. 创立服务端
服务端启动须要创立 ServerBootstrap
对象,并实现 初始化线程模型 , 配置 IO 模型 和增加业务解决逻辑(Handler)。在增加业务解决逻辑时,调用的是 childHandler()
办法增加了一个ChannelInitializer
,代码示例如下
// 负责服务端的启动
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 以下两个对象能够看做是两个线程组
// boss 线程组负责监听端口,承受新的连贯
NioEventLoopGroup boss = new NioEventLoopGroup();
// worker 线程组负责读取数据
NioEventLoopGroup worker = new NioEventLoopGroup();
// 配置线程组并指定 NIO 模型
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
// 定义后续每个 新连贯 的读写业务逻辑
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline()
// 增加业务解决逻辑
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println(msg);
}
});
}
});
// 绑定端口号
serverBootstrap.bind(2002);
通过调用 .channel(NioServerSocketChannel.class)
办法指定 Channel
类型为 NIO 类型,如果要指定为 BIO 类型,参数改成 OioServerSocketChannel.class
即可。
其中 nioSocketChannel.pipeline()
用来获取 PipeLine
对象,调用办法 addLast()
增加必要的业务解决逻辑,这里采纳的是 责任链模式,会将每个 Handler 作为一个节点进行解决。
1.1 创立客户端
客户端与服务端启动相似,不同的是,客户端须要创立 Bootstrap
对象来启动,并指定一个客户端线程组,雷同的是都须要实现 初始化线程模型 , 配置 IO 模型 和增加业务解决逻辑(Handler),代码示例如下
// 负责客户端的启动
Bootstrap bootstrap = new Bootstrap();
// 客户端的线程模型
NioEventLoopGroup group = new NioEventLoopGroup();
// 指定线程组和 NIO 模型
bootstrap.group(group).channel(NioSocketChannel.class)
// handler() 办法封装业务解决逻辑
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {channel.pipeline()
// 增加业务解决逻辑
.addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println(msg);
}
});
}
});
// 连贯服务端 IP 和端口
bootstrap.connect("127.0.0.1", 2002);
(留神:下文中内容均以服务端代码示例为准)
2. 编码和解码
客户端与服务端进行通信,通信的音讯是以 二进制字节流 的模式通过 Channel
进行传递的,所以当咱们在客户端封装好 Java 业务对象 后,须要将其依照协定转换成 字节数组 ,并且当服务端承受到该 二进制字节流 时,须要将其依据协定再次解码成 Java 业务对象 进行逻辑解决,这就是 编码和解码 的过程。Netty 为咱们提供了MessageToByteEncoder
用于编码,ByteToMessageDecoder
用于解码。
2.1 MessageToByteEncoder
用于将 Java 对象编码成字节数组并写入 ByteBuf
,代码示例如下
public class TcpEncoder extends MessageToByteEncoder<Message> {
/**
* 序列化器
*/
private final Serializer serializer;
public TcpEncoder(Serializer serializer) {this.serializer = serializer;}
/**
* 编码的执行逻辑
*
* @param message 须要被编码的音讯对象
* @param byteBuf 将字节数组写入 ByteBuf
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
// 通过自定义的序列化器将对象转换成字节数组
byte[] bytes = serializer.serialize(message);
// 将字节数组写入 ByteBuf 便实现了对象的编码流程
byteBuf.writeBytes(bytes);
}
}
2.2 ByteToMessageDecoder
它用于将接管到的二进制数据流解码成 Java 对象,与上述代码相似,只不过是将该过程反过来了而已,代码示例如下
public class TcpDecoder extends ByteToMessageDecoder {
/**
* 序列化器
*/
private final Serializer serializer;
public TcpDecoder(Serializer serializer) {this.serializer = serializer;}
/**
* 解码的执行逻辑
*
* @param byteBuf 接管到的 ByteBuf 对象
* @param list 任何实现解码的 Java 对象增加到该 List 中即可
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
// 依据协定自定义的解码逻辑将其解码成 Java 对象
Message message = serializer.deSerialize(byteBuf);
// 解码实现后增加到 List 中即可
list.add(message);
}
}
2.3 留神要点
ByteBuf 默认状况下应用的是 堆外内存,不进行内存开释会产生内存溢出。不过 ByteToMessageDecoder
和 MessageToByteEncoder
这两个解码和编码Handler
会主动帮咱们实现内存开释的操作,无需再次手动开释。因为咱们实现的 encode()
和 decode()
办法只是这两个 Handler
源码中执行的一个环节,最终会在 finally 代码块中实现对内存的开释,具体内容可浏览 MessageToByteEncoder
中第 99 行 write()
办法源码。
2.4 在服务端中增加编码解码 Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline()
// 接管到申请时进行解码
.addLast(new TcpDecoder(serializer))
// 发送申请时进行编码
.addLast(new TcpEncoder(serializer));
}
});
3. 增加业务解决 Handler
在 Netty 框架中,客户端与服务端的每个连贯都对应着一个 Channel
,而这个 Channel
的所有解决逻辑都封装在一个叫作 ChannelPipeline
的对象里。ChannelPipeline
是一个双向链表,它应用的是 责任链模式,每个链表节点都是一个 Handler
,能通它能获取 Channel
相干的上下文信息(ChannelHandlerContext)。
Netty 为咱们提供了多种读取 Channel
中数据的 Handler
,其中比拟罕用的是 ChannelInboundHandlerAdapter
和SimpleChannelInboundHandler
,下文中咱们以读取心跳音讯为例。
3.1 ChannelInboundHandlerAdapter
如下为解决心跳业务逻辑的 Handler
,具体执行逻辑参考代码和正文即可
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
/**
* channel 中有数据可读时,会回调该办法
*
* @param msg 如果在该 Handler 前没有解码 Handler 节点解决,该对象类型为 ByteBuf;否则为解码后的 Java 对象
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Message message = (Message) msg;
// 解决心跳音讯
processHeartBeatMessage(message);
// 初始化 Ack 音讯
Message ackMessage = initialAckMessage();
// 回写给客户端
ctx.channel().writeAndFlush(ackMessage);
}
}
3.2 SimpleChannelInboundHandler
SimpleChannelInboundHandler
是 ChannelInboundHandlerAdapter
的实现类,SimpleChannelInboundHandler
可能指定泛型,这样在解决业务逻辑时,便 无需再增加上文代码中对象强转的逻辑,这部分代码实现是在 SimpleChannelInboundHandler
的 channelRead()
办法中实现的,它是一个模版办法,咱们仅仅须要实现 channelRead0()
办法即可,代码示例如下
public class HeartBeatHandler extends SimpleChannelInboundHandler<Message> {
/**
* @param msg 留神这里的对象类型即为 Message
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
// 解决心跳音讯
processHeartBeatMessage(message);
// 初始化 Ack 音讯
Message ackMessage = initialAckMessage();
// 回写给客户端
ctx.channel().writeAndFlush(ackMessage);
}
}
3.3 在服务端中增加心跳解决 Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline()
// 接管到进行解码
.addLast(new TcpDecoder(serializer))
// 心跳业务解决 Handler
.addLast(new HeartBeatHandler())
// 发送申请时进行编码
.addLast(new TcpEncoder(serializer));
}
});
4. ChannelHandler 的生命周期
在 ChannelInboundHandlerAdapter
能够通过实现不同的办法来实现指定机会的办法回调,具体可参考如下代码
public class LifeCycleHandler extends ChannelInboundHandlerAdapter {
/**
* 当检测到新连贯之后,调用 ch.pipeline().addLast(...); 之后的回调
* 示意以后 channel 中胜利增加了 Handler
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("逻辑处理器被增加时回调:handlerAdded()");
super.handlerAdded(ctx);
}
/**
* 示意以后 channel 的所有逻辑解决曾经和某个 NIO 线程建设了绑定关系
* 这里的 NIO 线程通常指的是 NioEventLoop
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 绑定到线程 (NioEventLoop) 时回调:channelRegistered()");
super.channelRegistered(ctx);
}
/**
* 当 Channel 的所有业务逻辑链筹备结束,连贯被激活时
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 准备就绪时回调:channelActive()");
super.channelActive(ctx);
}
/**
* 客户端向服务端发送数据,示意有数据可读时,就会回调该办法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("channel 有数据可读时回调:channelRead()");
super.channelRead(ctx, msg);
}
/**
* 服务端每残缺的读完一次数据,都会回调该办法
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 某次数据读完时回调:channelReadComplete()");
super.channelReadComplete(ctx);
}
// --- 断开连接时 ---
/**
* 该客户端与服务端的连贯被敞开时回调
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 被敞开时回调:channelInactive()");
super.channelInactive(ctx);
}
/**
* 对应的 NIO 线程移除了对这个连贯的解决
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 勾销线程(NioEventLoop) 的绑定时回调: channelUnregistered()");
super.channelUnregistered(ctx);
}
/**
* 为该连贯增加的所有业务逻辑 Handler 被移除时
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("逻辑处理器被移除时回调:handlerRemoved()");
super.handlerRemoved(ctx);
}
}
5. 解决粘包和半包问题
即便咱们发送音讯的时候是以 ByteBuf
的模式发送的,然而到了底层操作系统,依然是以 字节流 的模式对数据进行发送的,而且服务端也以 字节流 的模式读取,因而在服务端对字节流进行拼接时,可能就会造成发送时 ByteBuf
与读取时的 ByteBuf
不对等的状况,这就是所谓的粘包或半包景象。
以如下状况为例,当客户端频繁的向服务端发送心跳音讯时,读取到的 ByteBuf 信息如下,其中一个心跳申请是用红框圈出的局部
能够发现多个心跳申请 ” 粘 ” 在了一起,那么咱们须要对它进行拆包解决,否则只会读取第一条心跳申请,之后的申请会全副生效
Netty 为咱们提供了基于长度的拆包器LengthFieldBasedFrameDecoder
来进行拆包工作,它能对超过所需数据量的包进行拆分,也能在数据有余的时候期待读取,直到数据足够时,形成一个残缺的数据包并进行业务解决。
5.1 LengthFieldBasedFrameDecoder
以标准接口文档中的协定(图示)为准,代码示例如下,其中的四个参数比拟重要,详细信息可见正文形容
public class SplitHandler extends LengthFieldBasedFrameDecoder {
/**
* 在协定中示意数据长度的字段在字节流首尾中的偏移量
*/
private static final Integer LENGTH_FIELD_OFFSET = 10;
/**
* 示意数据长度的字节长度
*/
private static final Integer LENGTH_FIELD_LENGTH = 4;
/**
* 数据长度后边的头信息中的字节偏移量
*/
private static final Integer LENGTH_ADJUSTMENT = 10;
/**
* 示意从第一个字节开始须要舍去的字节数,在咱们的协定中,不须要进行舍去
*/
private static final Integer INITIAL_BYTES_TO_STRIP = 0;
public SplitHandler() {super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);
}
}
之后将其增加到 Handler 中即可,如果遇到其余协定,更改其中参数或查看 LengthFieldBasedFrameDecoder
的 JavaDoc 中详细描述。
5.2 在服务端中增加拆包 Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline()
// 拆包 Handler
.addLast(new SplitHandler())
// 接管到进行解码
.addLast(new TcpDecoder(serializer))
// 心跳业务解决 Handler
.addLast(new HeartBeatHandler())
// 发送申请时进行编码
.addLast(new TcpEncoder(serializer));
}
});
6. Netty 性能优化
6.1 Handler 对单例模式的利用
Netty 在每次有新连贯到来的时候,都会调用 ChannelInitializer
的 initChannel()
办法,会将其中相干的 Handler
都创立一次,
如果其中的 Handler
是无状态且可能通用的,能够将其改成单例,这样就可能在每次连贯建设时,防止屡次创立雷同的对象。
以如下服务端代码为例,蕴含如下 Handler,能够将编码解码、以及业务解决 Handler 都定义成 Spring 单例 bean 的模式注入进来,这样就可能实现对象的复用而无需每次建设连贯都创立雷同的对象了
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
// 拆包 Handler
.addLast(new SplitHandler())
// 日志 Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解码 Handler
.addLast(new TcpDecoder(serializer))
// 心跳、格口状态、设施状态、RFID 上报、扫码上报和分拣后果上报 Handler
.addLast(new HeartBeatHandler(), new ChuteStatusHandler())
.addLast(new DeviceStatusReceiveHandler(), new RfidBindReceiveHandler())
.addLast(new ScanReceiveHandler(), new SortResultHandler())
// 编码 Handler
.addLast(new TcpEncoder(serializer));
}
});
革新实现后如下
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
// 拆包 Handler
.addLast(new SplitHandler())
// 日志 Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解码 Handler
.addLast(tcpDecoder)
// 心跳、格口状态、设施状态、RFID 上报、扫码上报和分拣后果上报 Handler
.addLast(heartBeatHandler, chuteStatusHandler)
.addLast(deviceStatusReceiveHandler, rfidBindReceiveHandler)
.addLast(scanReceiveHandler, sortResultHandler)
// 编码 Handler
.addLast(tcpEncoder);
}
});
不过须要留神在每个单例 Handler 的类上标注 @ChannelHandler.Sharable
注解,否则会抛出如下异样
io.netty.channel.ChannelPipelineException: netty.book.practice.handler.server.LoginHandler is not a @Sharable handler, so can't be added or removed multiple times
另外,SplitHanlder
不能进行单例解决,因为它的外部实现与每个 Channel
都无关,每个 SplitHandler
都须要维持每个 Channel
读到的数据,即它是 有状态的。
6.2 缩短责任链调用
对服务端来说,每次解码进去的 Java 对象在多个业务解决 Handler
中只会通过一个其中 Handler
实现业务解决,那么咱们将所有业务相干的 Handler
封装起来到一个 Map 中,每次只让它通过必要的 Handler 而不是通过整个责任链,那么便能够进步 Netty 解决申请的性能。
定义如下 ServerHandlers
单例 bean,并应用 策略模式 将对应的 Handler
治理起来,每次解决时依据音讯类型获取对应的 Handler
来实现业务逻辑
@ChannelHandler.Sharable
public class ServerHandlers extends SimpleChannelInboundHandler<Message> {
@Resourse
private HeartBeatHandler heartBeatHandler;
/**
* 策略模式封装 Handler,这样就能在回调 ServerHandler 的 channelRead0 办法时
* 找到具体的 Handler,而不须要通过责任链的每个 Handler 节点,以此来进步性能
*/
private final Map<Command, SimpleChannelInboundHandler<Message>> map;
public ServerHandler() {map = new HashMap();
// key: 音讯类型枚举 value: 对应的 Handler
map.put(MessageType.HEART_BEAT, heartBeatHandler);
// ...
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {// 调用 channelRead() 办法实现业务逻辑解决
map.get(msg.getMessageType()).channelRead(ctx, msg);
}
}
革新实现后,服务端代码如下,因为咱们封装了平行的业务解决Handler
,所以代码很清新
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
// 拆包 Handler
.addLast(new SplitHandler())
// 日志 Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解码 Handler
.addLast(tcpDecoder)
// serverHandlers 封装了 心跳、格口状态、设施状态、RFID 上报、扫码上报和分拣后果上报 Handler
.addLast(serverHandlers)
// 编码 Handler
.addLast(tcpEncoder);
}
});
6.3 合并编码、解码 Handler
Netty 对编码解码提供了对立解决 Handler 是MessageToMessageCodec
,这样咱们就能将编码和解码的 Handler 合并成一个增加接口,代码示例如下
@ChannelHandler.Sharable
public class MessageCodecHandler extends MessageToMessageCodec<ByteBuf, Message> {
/**
* 序列化器
*/
@Resourse
private Serializer serializer;
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
// 将字节数组写入 ByteBuf
ByteBuf byteBuf = ctx.alloc().ioBuffer();
serializer.serialize(byteBuf, msg);
// 这个编码也须要增加到 List 中
out.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
// 依据协定自定义的解码逻辑将其解码成 Java 对象,并增加到 List 中
out.add(serializer.deSerialize(msg));
}
}
革新实现后,服务端代码如下,将其放在业务解决 Handler 前即可,调用完业务 Handler 逻辑,会执行编码逻辑
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
// 拆包 Handler
.addLast(new SplitHandler())
// 日志 Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解码、编码 Handler
.addLast(messageCodecHandler)
// serverHandlers 封装了 心跳、格口状态、设施状态、RFID 上报、扫码上报和分拣后果上报 Handler
.addLast(serverHandlers);
}
});
6.4 缩小 NIO 线程阻塞
对于耗时的业务操作,须要将它们都丢到 业务线程池中去解决,因为单个 NIO 线程会治理很多 Channel
,只有有一个 Channel
中的 Handler
的 channelRead()
办法被业务逻辑阻塞,那么它就会拖慢绑定在该 NIO 线程上的其余所有 Channel
。
为了防止上述情况,能够在蕴含长时间业务解决逻辑的 Handler 中创立一个线程池,并将其丢入线程池中进行执行,伪代码如下
protected void channelRead(ChannelHandlerContext ctx, Object message) {threadPool.submit(new Runnable() {
// 耗时的业务解决逻辑
doSomethingSependTooMuchTime();
writeAndFlush();});
}
6.5 闲暇 ” 假死 ” 检测 Handler
如果底层的 TCP 连贯曾经断开,然而另一端服务并没有捕捉到,在某一端(客户端或服务端)看来会认为这条连贯依然存在,这就是 连贯 ” 假死 ” 景象 。这造成的问题就是,对于服务端来说,每个连贯连贯都会消耗 CPU 和内存资源,过多的假死连贯会造成性能降落和服务解体;对客户端来说,
连贯假死会使得发往服务端的申请都会超时,所以须要尽可能防止假死景象的产生。
造成假死的起因可能是公网丢包、客户端或服务端网络故障等,Netty 为咱们提供了 IdleStateHandler
来解决超时假死问题,示例代码如下
public class MyIdleStateHandler extends IdleStateHandler {
private static final int READER_IDLE_TIME = 15;
public MyIdleStateHandler() {
// 读超时工夫、写超时工夫、读写超时工夫 指定 0 值不判断超时
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {System.out.println(READER_IDLE_TIME + "秒内没有读到数据,敞开连贯");
ctx.channel().close();
}
}
其构造方法中有三个参数来别离指定读、写和读写超时工夫,当指定 0 时不判断超时,除此之外 Netty 也有专门用来解决读和写超时的 Handler,别离为 ReadTimeoutHandler
, WriteTimeoutHandler
。
将其增加到服务端 Handler
的首位即可
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
// 超时判断 Handler
.addLast(new MyIdleStateHandler())
// 拆包 Handler
.addLast(new SplitHandler())
// 日志 Handler
.addLast(new LoggingHandler(LogLevel.INFO))
// 解码、编码 Handler
.addLast(messageCodecHandler)
// serverHandlers 封装了 心跳、格口状态、设施状态、RFID 上报、扫码上报和分拣后果上报 Handler
.addLast(serverHandlers);
}
});
7. ChannelPipeline
ChannelPipeline
与 Channel
密切相关,它能够看做是一条流水线,数据以字节流的模式进来,通过不同 Handler
的 ” 加工解决 ”,
最终以字节流的模式输入。ChannelPipeline
在每条新连贯建设的时候被创立,是一条双向链表,其中每一个节点都是ChannelHadnlerContext
对象,可能通过它拿到相干的上下文信息,默认它有头节点 HeadContext
和尾结点 TailContext
。
7.1 InboundHandler 和 OutboundHandler
定义在 ChannelPipeline
中的 Handler 是 可插拔 的,可能实现动静编织,调用 ctx.pipeline().remove()
办法可移除,调用 ctx.pipeline().addXxx()
办法可进行增加。
InboundHandler
与 OutboundHandler
解决的事件不同,前者解决 Inbound 事件
,典型的就是读取数据流并加工解决;后者会对调用 writeAndFlush()
办法的 Outbound 事件
进行解决。
此外,两者的流传机制也是不同的:
InboundHandler
会从链表头一一向下调用,头节点只是简略的将该事件流传上来(ctx.fireChannelRead(mug)
),执行过程中调用findContextInbound()
办法来寻找 InboundHandler
节点,直到 TailContext
节点执行办法结束,完结调用。
个别自定义的 ChannelInboundHandler
都继承自ChannelInboundHandlerAdapter
,如果没有笼罩channelXxx()
相干办法,那么该事件失常会遍历双向链表始终流传到尾结点,否则就会在以后节点执行完完结;当然也能够调用 fireXxx()
办法让事件从以后节点持续向下流传。
OutboundHandler
是 从链表尾向链表头 调用,相当于反向遍历 ChannelPipeline
双向链表,Outbound 事件
会先通过TailContext
尾节点,并在执行过程中一直寻找OutboundHandler
节点加工解决,直到头节点 HeadContext
调用 Unsafe.write()
办法完结。
7.2 异样流传
异样的流传机制和 Inbound 事件
的流传机制相似,在 任何节点产生的异样都会向下一个节点传递。如果自定义的 Handler 没有解决异样也没有实现 exceptionCaught()
办法,最终则会落到 TailContext
节点,控制台打印异样未解决的正告信息。
通常异样解决,咱们会定义一个异样处理器,继承自 ChannelDuplexHandler
,放在自定义 链表节点的开端,这样就可能肯定捕捉和解决异样。
8. Reactor 线程模型
8.1 NioEventLoopGroup
创立 new NioEventLoopGroup()
它的默认线程数是以后 CPU 线程数的 2 倍,最终会调用到如下源码
// 这里计算的线程数量
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
跟进到构造方法的最终实现,会执行如下业务逻辑
其中在第 2 步创立 NioEventLoop
时,值得关注的是创立了一个 Selector
,以此来实现 IO 多路复用;另外它还创立了高性能 MPSC
(多生产者单消费者)队列,借助它来协调工作的异步执行,如此单条线程(NioEventLoop)、Selector 和 MPSC 它们三者是 一对一 的关系。而每条连贯都对应一个 Channel
,每个 Channel
都绑定惟一一个 NioEventLoop
,因而单个连贯的所有操作都是在一个线程中执行,是线程平安的。
第 3 步骤创立 线程选择器,它的作用是为连贯在NioEventLoopGroup
中抉择一个 NioEventLoop
,并将该连贯与 NioEventLoop
中的 Selector
实现绑定。
在底层有两种选择器的实现,别离是 PowerOfTowEventExecutorChooser
和GenericEventExecutorChooser
,它们的原理都是从线程池里循环抉择线程,不同的是前者计算循环的索引采纳的是 位运算 而后者采纳的是 取余运算。
8.2 Reactor 线程 select 操作
源码地位 NioEventLoop
的 run()
办法,select
操作会 一直轮询是否有 IO 事件产生,并且在轮询过程中一直查看是否有工作须要执行,保障 Netty 工作队列中的工作可能及时执行,轮询过程应用一个计数器避开了 JDK 的空轮询 Bug
8.3 解决产生 IO 事件的 Channel
在 Netty 的 Channel
中,有两大类型的 Channel
,一个是 NioServerSocketChannel
,由 boss NioEventLoop 解决;另一个是 NioSocketChannel
,由 worker NioEventLoop 解决,所以
- 对于 boss NioEventLoop 来说,轮询到的是连贯事件,后续通过 NioServerSocketChannel 的 Pipeline 将连贯交给一个 work NioEventLoop 解决
- 对于 work NioEventLoop 来说,轮询到的是读写事件,后续通过 NioSocketChannel 的 Pipeline 将读取到的数据传递给每个 ChannelHandler 解决
留神工作的执行都是 异步 的。
8.4 工作的收集和执行
上文中提到了咱们创立了高性能的 MPSC
队列,它是用来 汇集 非 Reactor 线程创立的工作的,NioEventLoop
会在执行的过程中一直检测是否有事件产生,如果有事件产生就解决,解决完事件之后再解决非 Reactor 线程创立的工作。在检测是否有事件产生的时候,为了保障异步工作的及时处理,只有有工作要解决,就会进行工作检测,去解决工作,解决工作时是 Reactor 单线程执行。
8.5 注册连贯的流程
当 boss Reactor 线程检测到 ACCEPT 事件之后,创立一个 NioSocketChannel
,并把用户设置的 ChannelOption(Option 参数配置)、ChannelAttr(Channel 参数)、ChannelHandler(ChannelInitializer)封装到 NioSocketChannel
中。接着,应用线程选择器在NioEventLoopGroup
中抉择一条 NioEventLoop
(线程),把 NioSocketChannel
中包装的 JDK Channel 当做 Key,本身(NioSocketChannel)作为 attachment,注册 NioEventLoop 对应的 Selector 上。这样,后续有读写事件产生,就能够间接获取 attachment 来解决读写数据的逻辑。
8.6 如何了解 IO 多路复用
简略地说:IO 多路复用是指 能够在一个线程内解决多个连贯的 IO 事件申请。以 Java 中的 IO 多路复用为例,服务端创立 Selector
对象一直的调用 select()
办法来解决各个连贯上的 IO 事件,之后将这些 IO 事件交给工作线程异步去执行,这就达到了在一个线程内同时解决多个连贯的 IO 申请事件的目标。