作者:京东物流 王奕龙
Netty是一个异步基于事件驱动的高性能网络通信框架,能够看做是对NIO和BIO的封装,并提供了简略易用的API、Handler和工具类等,用以疾速开发高性能、高可靠性的网络服务端和客户端程序。
1. 创立服务端
服务端启动须要创立 ServerBootstrap
对象,并实现初始化线程模型,配置IO模型和增加业务解决逻辑(Handler) 。在增加业务解决逻辑时,调用的是 childHandler()
办法增加了一个ChannelInitializer
,代码示例如下
// 负责服务端的启动ServerBootstrap serverBootstrap = new ServerBootstrap();// 以下两个对象能够看做是两个线程组// boss线程组负责监听端口,承受新的连贯NioEventLoopGroup boss = new NioEventLoopGroup();// worker线程组负责读取数据NioEventLoopGroup worker = new NioEventLoopGroup();// 配置线程组并指定NIO模型serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class) // 定义后续每个 新连贯 的读写业务逻辑 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline() // 增加业务解决逻辑 .addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg); } }); } });// 绑定端口号serverBootstrap.bind(2002);
通过调用 .channel(NioServerSocketChannel.class)
办法指定 Channel
类型为NIO类型,如果要指定为BIO类型,参数改成 OioServerSocketChannel.class
即可。
其中 nioSocketChannel.pipeline()
用来获取 PipeLine
对象,调用办法 addLast()
增加必要的业务解决逻辑,这里采纳的是责任链模式,会将每个Handler作为一个节点进行解决。
1.1 创立客户端
客户端与服务端启动相似,不同的是,客户端须要创立 Bootstrap
对象来启动,并指定一个客户端线程组,雷同的是都须要实现初始化线程模型,配置IO模型和增加业务解决逻辑(Handler) , 代码示例如下
// 负责客户端的启动Bootstrap bootstrap = new Bootstrap();// 客户端的线程模型NioEventLoopGroup group = new NioEventLoopGroup();// 指定线程组和NIO模型bootstrap.group(group).channel(NioSocketChannel.class) // handler() 办法封装业务解决逻辑 .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline() // 增加业务解决逻辑 .addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg); } }); } });// 连贯服务端IP和端口bootstrap.connect("127.0.0.1", 2002);
(留神:下文中内容均以服务端代码示例为准)
2. 编码和解码
客户端与服务端进行通信,通信的音讯是以二进制字节流的模式通过 Channel
进行传递的,所以当咱们在客户端封装好Java业务对象后,须要将其依照协定转换成字节数组,并且当服务端承受到该二进制字节流时,须要将其依据协定再次解码成Java业务对象进行逻辑解决,这就是编码和解码的过程。Netty 为咱们提供了MessageToByteEncoder
用于编码,ByteToMessageDecoder
用于解码。
2.1 MessageToByteEncoder
用于将Java对象编码成字节数组并写入 ByteBuf
,代码示例如下
public class TcpEncoder extends MessageToByteEncoder<Message> { /** * 序列化器 */ private final Serializer serializer; public TcpEncoder(Serializer serializer) { this.serializer = serializer; } /** * 编码的执行逻辑 * * @param message 须要被编码的音讯对象 * @param byteBuf 将字节数组写入ByteBuf */ @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception { // 通过自定义的序列化器将对象转换成字节数组 byte[] bytes = serializer.serialize(message); // 将字节数组写入 ByteBuf 便实现了对象的编码流程 byteBuf.writeBytes(bytes); }}
2.2 ByteToMessageDecoder
它用于将接管到的二进制数据流解码成Java对象,与上述代码相似,只不过是将该过程反过来了而已,代码示例如下
public class TcpDecoder extends ByteToMessageDecoder { /** * 序列化器 */ private final Serializer serializer; public TcpDecoder(Serializer serializer) { this.serializer = serializer; } /** * 解码的执行逻辑 * * @param byteBuf 接管到的ByteBuf对象 * @param list 任何实现解码的Java对象增加到该List中即可 */ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception { // 依据协定自定义的解码逻辑将其解码成Java对象 Message message = serializer.deSerialize(byteBuf); // 解码实现后增加到List中即可 list.add(message); }}
2.3 留神要点
ByteBuf默认状况下应用的是堆外内存,不进行内存开释会产生内存溢出。不过 ByteToMessageDecoder
和 MessageToByteEncoder
这两个解码和编码Handler
会主动帮咱们实现内存开释的操作,无需再次手动开释。因为咱们实现的 encode()
和 decode()
办法只是这两个 Handler
源码中执行的一个环节,最终会在 finally 代码块中实现对内存的开释,具体内容可浏览 MessageToByteEncoder
中第99行 write()
办法源码。
2.4 在服务端中增加编码解码Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline() // 接管到申请时进行解码 .addLast(new TcpDecoder(serializer)) // 发送申请时进行编码 .addLast(new TcpEncoder(serializer)); } });
3. 增加业务解决Handler
在Netty框架中,客户端与服务端的每个连贯都对应着一个 Channel
,而这个 Channel
的所有解决逻辑都封装在一个叫作ChannelPipeline
的对象里。ChannelPipeline
是一个双向链表,它应用的是责任链模式,每个链表节点都是一个 Handler
,能通它能获取 Channel
相干的上下文信息(ChannelHandlerContext)。
Netty为咱们提供了多种读取 Channel
中数据的 Handler
,其中比拟罕用的是 ChannelInboundHandlerAdapter
和SimpleChannelInboundHandler
,下文中咱们以读取心跳音讯为例。
3.1 ChannelInboundHandlerAdapter
如下为解决心跳业务逻辑的 Handler
,具体执行逻辑参考代码和正文即可
public class HeartBeatHandler extends ChannelInboundHandlerAdapter { /** * channel中有数据可读时,会回调该办法 * * @param msg 如果在该Handler前没有解码Handler节点解决,该对象类型为ByteBuf;否则为解码后的Java对象 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Message message = (Message) msg; // 解决心跳音讯 processHeartBeatMessage(message); // 初始化Ack音讯 Message ackMessage = initialAckMessage(); // 回写给客户端 ctx.channel().writeAndFlush(ackMessage); }}
3.2 SimpleChannelInboundHandler
SimpleChannelInboundHandler
是ChannelInboundHandlerAdapter
的实现类,SimpleChannelInboundHandler
可能指定泛型,这样在解决业务逻辑时,便无需再增加上文代码中对象强转的逻辑,这部分代码实现是在 SimpleChannelInboundHandler
的 channelRead()
办法中实现的,它是一个模版办法,咱们仅仅须要实现 channelRead0()
办法即可,代码示例如下
public class HeartBeatHandler extends SimpleChannelInboundHandler<Message> { /** * @param msg 留神这里的对象类型即为 Message */ @Override protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception { // 解决心跳音讯 processHeartBeatMessage(message); // 初始化Ack音讯 Message ackMessage = initialAckMessage(); // 回写给客户端 ctx.channel().writeAndFlush(ackMessage); }}
3.3 在服务端中增加心跳解决Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline() // 接管到进行解码 .addLast(new TcpDecoder(serializer)) // 心跳业务解决Handler .addLast(new HeartBeatHandler()) // 发送申请时进行编码 .addLast(new TcpEncoder(serializer)); } });
4. ChannelHandler的生命周期
在 ChannelInboundHandlerAdapter
能够通过实现不同的办法来实现指定机会的办法回调,具体可参考如下代码
public class LifeCycleHandler extends ChannelInboundHandlerAdapter { /** * 当检测到新连贯之后,调用 ch.pipeline().addLast(...); 之后的回调 * 示意以后channel中胜利增加了 Handler */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("逻辑处理器被增加时回调:handlerAdded()"); super.handlerAdded(ctx); } /** * 示意以后channel的所有逻辑解决曾经和某个NIO线程建设了绑定关系 * 这里的NIO线程通常指的是 NioEventLoop */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 绑定到线程(NioEventLoop)时回调:channelRegistered()"); super.channelRegistered(ctx); } /** * 当Channel的所有业务逻辑链筹备结束,连贯被激活时 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 准备就绪时回调:channelActive()"); super.channelActive(ctx); } /** * 客户端向服务端发送数据,示意有数据可读时,就会回调该办法 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channel 有数据可读时回调:channelRead()"); super.channelRead(ctx, msg); } /** * 服务端每残缺的读完一次数据,都会回调该办法 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 某次数据读完时回调:channelReadComplete()"); super.channelReadComplete(ctx); } // ---断开连接时--- /** * 该客户端与服务端的连贯被敞开时回调 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 被敞开时回调:channelInactive()"); super.channelInactive(ctx); } /** * 对应的NIO线程移除了对这个连贯的解决 */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 勾销线程(NioEventLoop) 的绑定时回调: channelUnregistered()"); super.channelUnregistered(ctx); } /** * 为该连贯增加的所有业务逻辑Handler被移除时 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("逻辑处理器被移除时回调:handlerRemoved()"); super.handlerRemoved(ctx); }}
5. 解决粘包和半包问题
即便咱们发送音讯的时候是以 ByteBuf
的模式发送的,然而到了底层操作系统,依然是以字节流的模式对数据进行发送的,而且服务端也以字节流的模式读取,因而在服务端对字节流进行拼接时,可能就会造成发送时 ByteBuf
与读取时的 ByteBuf
不对等的状况,这就是所谓的粘包或半包景象。
以如下状况为例,当客户端频繁的向服务端发送心跳音讯时,读取到的ByteBuf信息如下,其中一个心跳申请是用红框圈出的局部
能够发现多个心跳申请"粘"在了一起,那么咱们须要对它进行拆包解决,否则只会读取第一条心跳申请,之后的申请会全副生效
Netty 为咱们提供了基于长度的拆包器LengthFieldBasedFrameDecoder
来进行拆包工作,它能对超过所需数据量的包进行拆分,也能在数据有余的时候期待读取,直到数据足够时,形成一个残缺的数据包并进行业务解决。
5.1 LengthFieldBasedFrameDecoder
以标准接口文档中的协定(图示)为准,代码示例如下,其中的四个参数比拟重要,详细信息可见正文形容
public class SplitHandler extends LengthFieldBasedFrameDecoder { /** * 在协定中示意数据长度的字段在字节流首尾中的偏移量 */ private static final Integer LENGTH_FIELD_OFFSET = 10; /** * 示意数据长度的字节长度 */ private static final Integer LENGTH_FIELD_LENGTH = 4; /** * 数据长度后边的头信息中的字节偏移量 */ private static final Integer LENGTH_ADJUSTMENT = 10; /** * 示意从第一个字节开始须要舍去的字节数,在咱们的协定中,不须要进行舍去 */ private static final Integer INITIAL_BYTES_TO_STRIP = 0; public SplitHandler() { super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP); }}
之后将其增加到Handler中即可,如果遇到其余协定,更改其中参数或查看 LengthFieldBasedFrameDecoder
的JavaDoc中详细描述。
5.2 在服务端中增加拆包Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 接管到进行解码 .addLast(new TcpDecoder(serializer)) // 心跳业务解决Handler .addLast(new HeartBeatHandler()) // 发送申请时进行编码 .addLast(new TcpEncoder(serializer)); } });
6. Netty性能优化
6.1 Handler对单例模式的利用
Netty 在每次有新连贯到来的时候,都会调用 ChannelInitializer
的 initChannel()
办法,会将其中相干的 Handler
都创立一次,
如果其中的 Handler
是无状态且可能通用的,能够将其改成单例,这样就可能在每次连贯建设时,防止屡次创立雷同的对象。
以如下服务端代码为例,蕴含如下Handler,能够将编码解码、以及业务解决Handler都定义成Spring单例bean的模式注入进来,这样就可能实现对象的复用而无需每次建设连贯都创立雷同的对象了
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码Handler .addLast(new TcpDecoder(serializer)) // 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(new HeartBeatHandler(), new ChuteStatusHandler()) .addLast(new DeviceStatusReceiveHandler(), new RfidBindReceiveHandler()) .addLast(new ScanReceiveHandler(), new SortResultHandler()) // 编码Handler .addLast(new TcpEncoder(serializer)); } });
革新实现后如下
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码Handler .addLast(tcpDecoder) // 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(heartBeatHandler, chuteStatusHandler) .addLast(deviceStatusReceiveHandler, rfidBindReceiveHandler) .addLast(scanReceiveHandler, sortResultHandler) // 编码Handler .addLast(tcpEncoder); } });
不过须要留神在每个单例Handler的类上标注 @ChannelHandler.Sharable
注解,否则会抛出如下异样
io.netty.channel.ChannelPipelineException: netty.book.practice.handler.server.LoginHandler is not a @Sharable handler, so can't be added or removed multiple times
另外,SplitHanlder
不能进行单例解决,因为它的外部实现与每个 Channel
都无关,每个 SplitHandler
都须要维持每个Channel
读到的数据,即它是有状态的。
6.2 缩短责任链调用
对服务端来说,每次解码进去的Java对象在多个业务解决 Handler
中只会通过一个其中 Handler
实现业务解决,那么咱们将所有业务相干的 Handler
封装起来到一个Map中,每次只让它通过必要的Handler而不是通过整个责任链,那么便能够进步Netty解决申请的性能。
定义如下 ServerHandlers
单例bean,并应用 策略模式 将对应的 Handler
治理起来,每次解决时依据音讯类型获取对应的 Handler
来实现业务逻辑
@ChannelHandler.Sharablepublic class ServerHandlers extends SimpleChannelInboundHandler<Message> { @Resourse private HeartBeatHandler heartBeatHandler; /** * 策略模式封装Handler,这样就能在回调 ServerHandler 的 channelRead0 办法时 * 找到具体的Handler,而不须要通过责任链的每个 Handler 节点,以此来进步性能 */ private final Map<Command, SimpleChannelInboundHandler<Message>> map; public ServerHandler() { map = new HashMap(); // key: 音讯类型枚举 value: 对应的Handler map.put(MessageType.HEART_BEAT, heartBeatHandler); // ... } @Override protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception { // 调用 channelRead() 办法实现业务逻辑解决 map.get(msg.getMessageType()).channelRead(ctx, msg); }}
革新实现后,服务端代码如下,因为咱们封装了平行的业务解决Handler
,所以代码很清新
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码Handler .addLast(tcpDecoder) // serverHandlers 封装了 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(serverHandlers) // 编码Handler .addLast(tcpEncoder); } });
6.3 合并编码、解码Handler
Netty 对编码解码提供了对立解决Handler是MessageToMessageCodec
,这样咱们就能将编码和解码的Handler合并成一个增加接口,代码示例如下
@ChannelHandler.Sharablepublic class MessageCodecHandler extends MessageToMessageCodec<ByteBuf, Message> { /** * 序列化器 */ @Resourse private Serializer serializer; @Override protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception { // 将字节数组写入 ByteBuf ByteBuf byteBuf = ctx.alloc().ioBuffer(); serializer.serialize(byteBuf, msg); // 这个编码也须要增加到List中 out.add(byteBuf); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { // 依据协定自定义的解码逻辑将其解码成Java对象,并增加到List中 out.add(serializer.deSerialize(msg)); }}
革新实现后,服务端代码如下,将其放在业务解决Handler前即可,调用完业务Handler逻辑,会执行编码逻辑
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码、编码Handler .addLast(messageCodecHandler) // serverHandlers 封装了 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(serverHandlers); } });
6.4 缩小NIO线程阻塞
对于耗时的业务操作,须要将它们都丢到业务线程池中去解决,因为单个NIO线程会治理很多 Channel
,只有有一个 Channel
中的 Handler
的 channelRead()
办法被业务逻辑阻塞,那么它就会拖慢绑定在该NIO线程上的其余所有 Channel
。
为了防止上述情况,能够在蕴含长时间业务解决逻辑的Handler中创立一个线程池,并将其丢入线程池中进行执行,伪代码如下
protected void channelRead(ChannelHandlerContext ctx, Object message) { threadPool.submit(new Runnable() { // 耗时的业务解决逻辑 doSomethingSependTooMuchTime(); writeAndFlush(); });}
6.5 闲暇"假死"检测Handler
如果底层的TCP连贯曾经断开,然而另一端服务并没有捕捉到,在某一端(客户端或服务端)看来会认为这条连贯依然存在,这就是连贯"假死"景象。这造成的问题就是,对于服务端来说,每个连贯连贯都会消耗CPU和内存资源,过多的假死连贯会造成性能降落和服务解体;对客户端来说,
连贯假死会使得发往服务端的申请都会超时,所以须要尽可能防止假死景象的产生。
造成假死的起因可能是公网丢包、客户端或服务端网络故障等,Netty为咱们提供了 IdleStateHandler
来解决超时假死问题,示例代码如下
public class MyIdleStateHandler extends IdleStateHandler { private static final int READER_IDLE_TIME = 15; public MyIdleStateHandler() { // 读超时工夫、写超时工夫、读写超时工夫 指定0值不判断超时 super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) { System.out.println(READER_IDLE_TIME + "秒内没有读到数据,敞开连贯"); ctx.channel().close(); }}
其构造方法中有三个参数来别离指定读、写和读写超时工夫,当指定0时不判断超时,除此之外Netty也有专门用来解决读和写超时的Handler,别离为 ReadTimeoutHandler
, WriteTimeoutHandler
。
将其增加到服务端 Handler
的首位即可
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 超时判断Handler .addLast(new MyIdleStateHandler()) // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码、编码Handler .addLast(messageCodecHandler) // serverHandlers 封装了 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(serverHandlers); } });
7. ChannelPipeline
ChannelPipeline
与 Channel
密切相关,它能够看做是一条流水线,数据以字节流的模式进来,通过不同 Handler
的"加工解决",
最终以字节流的模式输入。ChannelPipeline
在每条新连贯建设的时候被创立,是一条双向链表,其中每一个节点都是ChannelHadnlerContext
对象,可能通过它拿到相干的上下文信息,默认它有头节点 HeadContext
和尾结点 TailContext
。
7.1 InboundHandler 和 OutboundHandler
定义在 ChannelPipeline
中的 Handler 是可插拔的,可能实现动静编织,调用 ctx.pipeline().remove()
办法可移除,调用 ctx.pipeline().addXxx()
办法可进行增加。
InboundHandler
与 OutboundHandler
解决的事件不同,前者解决 Inbound事件
,典型的就是读取数据流并加工解决;后者会对调用 writeAndFlush()
办法的 Outbound事件
进行解决。
此外,两者的流传机制也是不同的:
InboundHandler
会从链表头一一向下调用,头节点只是简略的将该事件流传上来(ctx.fireChannelRead(mug)
),执行过程中调用findContextInbound()
办法来寻找 InboundHandler
节点,直到 TailContext
节点执行办法结束,完结调用。
个别自定义的 ChannelInboundHandler
都继承自ChannelInboundHandlerAdapter
, 如果没有笼罩channelXxx()
相干办法,那么该事件失常会遍历双向链表始终流传到尾结点,否则就会在以后节点执行完完结;当然也能够调用 fireXxx()
办法让事件从以后节点持续向下流传。
OutboundHandler
是从链表尾向链表头调用,相当于反向遍历 ChannelPipeline
双向链表,Outbound事件
会先通过TailContext
尾节点,并在执行过程中一直寻找OutboundHandler
节点加工解决,直到头节点 HeadContext
调用 Unsafe.write()
办法完结。
7.2 异样流传
异样的流传机制和 Inbound事件
的流传机制相似,在任何节点产生的异样都会向下一个节点传递。如果自定义的 Handler 没有解决异样也没有实现 exceptionCaught()
办法,最终则会落到 TailContext
节点,控制台打印异样未解决的正告信息。
通常异样解决,咱们会定义一个异样处理器,继承自ChannelDuplexHandler
,放在自定义链表节点的开端,这样就可能肯定捕捉和解决异样。
8. Reactor线程模型
8.1 NioEventLoopGroup
创立 new NioEventLoopGroup()
它的默认线程数是以后CPU线程数的2倍,最终会调用到如下源码
// 这里计算的线程数量private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}
跟进到构造方法的最终实现,会执行如下业务逻辑
其中在第2步创立 NioEventLoop
时,值得关注的是创立了一个 Selector
,以此来实现IO多路复用;另外它还创立了高性能 MPSC
(多生产者单消费者)队列,借助它来协调工作的异步执行,如此单条线程(NioEventLoop)、Selector和MPSC它们三者是一对一的关系。而每条连贯都对应一个 Channel
,每个 Channel
都绑定惟一一个 NioEventLoop
,因而单个连贯的所有操作都是在一个线程中执行,是线程平安的。
第3步骤创立线程选择器,它的作用是为连贯在NioEventLoopGroup
中抉择一个 NioEventLoop
,并将该连贯与 NioEventLoop
中的 Selector
实现绑定。
在底层有两种选择器的实现,别离是PowerOfTowEventExecutorChooser
和GenericEventExecutorChooser
,它们的原理都是从线程池里循环抉择线程,不同的是前者计算循环的索引采纳的是位运算而后者采纳的是取余运算。
8.2 Reactor线程 select 操作
源码地位 NioEventLoop
的 run()
办法, select
操作会一直轮询是否有IO事件产生,并且在轮询过程中一直查看是否有工作须要执行,保障Netty工作队列中的工作可能及时执行,轮询过程应用一个计数器避开了 JDK 的空轮询Bug
8.3 解决产生IO事件的Channel
在 Netty 的 Channel
中,有两大类型的 Channel
,一个是 NioServerSocketChannel
,由 boss NioEventLoop 解决;另一个是 NioSocketChannel
,由worker NioEventLoop 解决,所以
- 对于 boss NioEventLoop 来说,轮询到的是连贯事件,后续通过 NioServerSocketChannel 的 Pipeline 将连贯交给一个 work NioEventLoop 解决
- 对于 work NioEventLoop 来说,轮询到的是读写事件,后续通过 NioSocketChannel 的 Pipeline 将读取到的数据传递给每个ChannelHandler 解决
留神工作的执行都是异步的。
8.4 工作的收集和执行
上文中提到了咱们创立了高性能的MPSC
队列,它是用来汇集非Reactor线程创立的工作的,NioEventLoop
会在执行的过程中一直检测是否有事件产生,如果有事件产生就解决,解决完事件之后再解决非Reactor线程创立的工作。在检测是否有事件产生的时候,为了保障异步工作的及时处理,只有有工作要解决,就会进行工作检测,去解决工作,解决工作时是Reactor单线程执行。
8.5 注册连贯的流程
当 boss Reactor线程检测到 ACCEPT 事件之后,创立一个 NioSocketChannel
,并把用户设置的 ChannelOption(Option参数配置)、ChannelAttr(Channel 参数)、ChannelHandler(ChannelInitializer)封装到 NioSocketChannel
中。接着,应用线程选择器在NioEventLoopGroup
中抉择一条 NioEventLoop
(线程),把 NioSocketChannel
中包装的JDK Channel 当做Key,本身(NioSocketChannel)作为 attachment,注册 NioEventLoop 对应的 Selector上。这样,后续有读写事件产生,就能够间接获取 attachment 来解决读写数据的逻辑。
8.6 如何了解IO多路复用
简略地说:IO多路复用是指能够在一个线程内解决多个连贯的IO事件申请。以Java中的IO多路复用为例,服务端创立 Selector
对象一直的调用 select()
办法来解决各个连贯上的IO事件,之后将这些IO事件交给工作线程异步去执行,这就达到了在一个线程内同时解决多个连贯的IO申请事件的目标。