根本组件
- NioEventLoop: 监听客户端连贯和解决客户端读写
- Channel: 对一个Socket连贯的封装,可进行数据的读写
- Pipline: 对数据的逻辑解决链
- ChannelHandler: Pipline里的一个解决逻辑
- ByteBuf: 字节缓冲的容器
Netty服务端启动步骤:
1.创立服务端Channel
创立JDK定义的Channel,将它包装成netty的Channel,并创立一些根本组件绑定在Channel上
newChannel办法里通过反射调用clazz.newInstance()返回一个Channel对象,这个clazz的类型是在创立ServerBootStrap的时候传入的NioServerSocketChannel。
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true) .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue") .handler(new ServerHandler()) .childHandler(null);
NioServerSocketChannel的创立流程:
2.初始化服务端Channel
初始化一些根本属性,增加一些逻辑解决
3.注册Selector
将JDK的Channel注册到事件轮询器Selector上,并把netty的服务端Channel作为一个attachment绑定在JDK的Channel上
4.端口绑定
调用JDK API,实现对本地端口的监听
AbstractChannel#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... // wasActive示意端口绑定之前的状态 boolean wasActive = isActive(); try { // 调用javaChannel().bind将JDK的channel绑定到指定端口 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // 端口绑定之后,isActive返回true if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise);}
fireChannelActive办法流传事件调用到doBeginRead办法,增加selectionKey感兴趣的事件。
就是在服务端端口绑定胜利之后,开始承受连贯申请
protected void doBeginRead() throws Exception { // selectionKey是注册服务端channel到selector上的时候返回的 final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; // 获取以后感兴趣的事件,初始值是0 final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { // 增加感兴趣的事件,readInterestOp代表的是accept事件,承受申请 selectionKey.interestOps(interestOps | readInterestOp); }}
NioEventLoop
NioEventLoop创立
NioEventLoop在NioEventLoopGroup构造函数中被创立,默认创立 2*CPU 核数个NioEventLoop
每个NioEventLoop持有一个线程。
newChild()办法创立NioEventLoop,次要做了三件事:
- 保留线程执行器ThreadPerTaskExecutor
- 为NioEventLoop创立一个Selector
- 创立一个MpscQueue工作队列
ThreadPerTaskExecutor: 执行每个工作都去创立一个线程去执行
chooserFactory.newChooser()
chooser的作用: 采纳循环的形式为新连贯绑定一个NioEventLoop
NioEventLoop启动
NioEventLoop启动触发条件
- 服务端启动绑定端口
- 新连贯接入通过chooser绑定一个NioEventLoop
NioEventLoop执行
protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: // 轮询I/O事件 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: // fallthrough } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } }}
整个执行过程次要做三件事:
- select(): 查看是否有I/O事件
- processSelectKeys(): 解决I/O事件
- runAllTasks: 解决异步工作队列
- select():
select()办法也是在一个 for(;;) 循环里执行
- 判断select操作是否超过截止工夫
- 执行阻塞式的select
- 为防止jkd空轮训的bug,查看空轮训次数;如果超过512就创立一个新的selector,并将原有selector的selectionKey赋给新的selector
- processSelectKeys()
对NioEventLoop读取到的selectionKey对应的事件进行解决
- runAllTasks()
工作有两种,一种是一般的工作,另一种是定时工作。在解决之前,工作被增加到了不同的队列里,所以要先把两种工作合并到一个工作队列,而后顺次执行。
此外,还要查看以后工夫是否超过了容许执行工作的工夫,如果超时就间接中断,进行下一次NioEventLoop的执行循环。
新连贯接入
检测新连贯
总体流程
在服务端的NioEventLoop执行的第二个过程processSelectKeys中检测出accept事件后, 通过JDK的accept办法创立JDK的一个channel
创立NioSocketChannel
逐层调用父类构造函数,将服务端NioServerSocketChannel和accept()创立的channel作为参数;设置阻塞模式为false,保留读事件,创立unsafe、pipline组件,禁止Nagel算法
服务端channel在检测到新连贯并创立完NioSocketChannel后,会通过ServerBootStrapAcceptor对NioSocketChannel做一些解决:
- 增加childHandler
- 设置options和attrs
- 通过chooser在childGroup中抉择NioEventLoop并注册selector
创立实现之后,调用pipeline.fireChannelActive()向selector注册读事件,开始筹备承受I/O数据。
在执行工作的时候,常常会用到inEventLoop()办法,这个办法是用来判断以后线程是否是EventLoop中的线程。
如果是间接执行,不是就把工作放到MpscQueue(Multi producer single consumer)工作队列中,这样工作交给指定的NioEventLoop执行,不用加锁。
pipeline
初始化
pipeline在创立channel的时候被创立
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();}
pipeline中的节点是ChannelHandlerContext类型的,双向链表构造;在初始化时就会创立head、tail两个节点
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head;}
tail和head别离是inbound和outbound类型的。
channelHandler的增加在用户代码配置childHandler时实现
childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new AuthHandler()); }})
增加时首先判断是否反复增加,而后创立节点(HandlerContext)增加至链表,最初回调增加实现的事件。
channelHandler的删除操作与增加相似,首先找到节点将它从链表上删除,最初回调删除handler事件。
事件流传
事件流传有两种形式:
- pipeline流传: 这种形式会从pipeline的head节点开始流传
- channelContext流传: 从以后节点开始流传
inbound事件
inBound事件包含register、active、active事件
每次流传都会找到下一个inbound类型的节点,最初流传给tail节点(tail也是inbound类型)
outBound事件
outBound事件流传方向与inBound相同,是从tail向head流传
异样事件流传与以上两种不同,它不关怀是inBoundhandler还是outboundhandler,并且只会从产生异样的节点开始向后流传,达到tail节点。
所以,最好在tail节点前创立一个专门用于解决异样的节点,重写exceptionCaught办法,这样所有异样都能被捕捉。