根本组件

  • NioEventLoop: 监听客户端连贯和解决客户端读写
  • Channel: 对一个Socket连贯的封装,可进行数据的读写
  • Pipline: 对数据的逻辑解决链
  • ChannelHandler: Pipline里的一个解决逻辑
  • ByteBuf: 字节缓冲的容器

Netty服务端启动步骤:

1.创立服务端Channel

创立JDK定义的Channel,将它包装成netty的Channel,并创立一些根本组件绑定在Channel上

newChannel办法里通过反射调用clazz.newInstance()返回一个Channel对象,这个clazz的类型是在创立ServerBootStrap的时候传入的NioServerSocketChannel。

ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)    .channel(NioServerSocketChannel.class)    .childOption(ChannelOption.TCP_NODELAY, true)    .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")    .handler(new ServerHandler())    .childHandler(null);

NioServerSocketChannel的创立流程:

2.初始化服务端Channel

初始化一些根本属性,增加一些逻辑解决

3.注册Selector

将JDK的Channel注册到事件轮询器Selector上,并把netty的服务端Channel作为一个attachment绑定在JDK的Channel上

4.端口绑定

调用JDK API,实现对本地端口的监听

AbstractChannel#bind

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {    ...    // wasActive示意端口绑定之前的状态    boolean wasActive = isActive();    try {        // 调用javaChannel().bind将JDK的channel绑定到指定端口        doBind(localAddress);    } catch (Throwable t) {        safeSetFailure(promise, t);        closeIfClosed();        return;    }    // 端口绑定之后,isActive返回true    if (!wasActive && isActive()) {        invokeLater(new Runnable() {            @Override            public void run() {                pipeline.fireChannelActive();            }        });    }    safeSetSuccess(promise);}

fireChannelActive办法流传事件调用到doBeginRead办法,增加selectionKey感兴趣的事件。
就是在服务端端口绑定胜利之后,开始承受连贯申请

protected void doBeginRead() throws Exception {    // selectionKey是注册服务端channel到selector上的时候返回的    final SelectionKey selectionKey = this.selectionKey;    if (!selectionKey.isValid()) {        return;    }    readPending = true;    // 获取以后感兴趣的事件,初始值是0    final int interestOps = selectionKey.interestOps();    if ((interestOps & readInterestOp) == 0) {        // 增加感兴趣的事件,readInterestOp代表的是accept事件,承受申请        selectionKey.interestOps(interestOps | readInterestOp);    }}

NioEventLoop

NioEventLoop创立

NioEventLoop在NioEventLoopGroup构造函数中被创立,默认创立 2*CPU 核数个NioEventLoop
每个NioEventLoop持有一个线程。

newChild()办法创立NioEventLoop,次要做了三件事:

  • 保留线程执行器ThreadPerTaskExecutor
  • 为NioEventLoop创立一个Selector
  • 创立一个MpscQueue工作队列

ThreadPerTaskExecutor: 执行每个工作都去创立一个线程去执行

chooserFactory.newChooser()
chooser的作用: 采纳循环的形式为新连贯绑定一个NioEventLoop

NioEventLoop启动

NioEventLoop启动触发条件

  • 服务端启动绑定端口

  • 新连贯接入通过chooser绑定一个NioEventLoop

NioEventLoop执行

protected void run() {    for (;;) {        try {            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {                case SelectStrategy.CONTINUE:                    continue;                case SelectStrategy.SELECT:                    // 轮询I/O事件                    select(wakenUp.getAndSet(false));                    if (wakenUp.get()) {                        selector.wakeup();                    }                default:                    // fallthrough            }            cancelledKeys = 0;            needsToSelectAgain = false;            final int ioRatio = this.ioRatio;            if (ioRatio == 100) {                try {                    processSelectedKeys();                } finally {                    // Ensure we always run tasks.                    runAllTasks();                }            } else {                final long ioStartTime = System.nanoTime();                try {                    processSelectedKeys();                } finally {                    // Ensure we always run tasks.                    final long ioTime = System.nanoTime() - ioStartTime;                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }            }        } catch (Throwable t) {            handleLoopException(t);        }        // Always handle shutdown even if the loop processing threw an exception.        try {            if (isShuttingDown()) {                closeAll();                if (confirmShutdown()) {                    return;                }            }        } catch (Throwable t) {            handleLoopException(t);        }    }}

整个执行过程次要做三件事:

  • select(): 查看是否有I/O事件
  • processSelectKeys(): 解决I/O事件
  • runAllTasks: 解决异步工作队列
  1. select():

select()办法也是在一个 for(;;) 循环里执行

  • 判断select操作是否超过截止工夫
  • 执行阻塞式的select
  • 为防止jkd空轮训的bug,查看空轮训次数;如果超过512就创立一个新的selector,并将原有selector的selectionKey赋给新的selector
  1. processSelectKeys()

对NioEventLoop读取到的selectionKey对应的事件进行解决

  1. runAllTasks()

工作有两种,一种是一般的工作,另一种是定时工作。在解决之前,工作被增加到了不同的队列里,所以要先把两种工作合并到一个工作队列,而后顺次执行。
此外,还要查看以后工夫是否超过了容许执行工作的工夫,如果超时就间接中断,进行下一次NioEventLoop的执行循环。

新连贯接入

检测新连贯

总体流程

在服务端的NioEventLoop执行的第二个过程processSelectKeys中检测出accept事件后, 通过JDK的accept办法创立JDK的一个channel

创立NioSocketChannel

逐层调用父类构造函数,将服务端NioServerSocketChannel和accept()创立的channel作为参数;设置阻塞模式为false,保留读事件,创立unsafe、pipline组件,禁止Nagel算法

服务端channel在检测到新连贯并创立完NioSocketChannel后,会通过ServerBootStrapAcceptor对NioSocketChannel做一些解决:

  • 增加childHandler
  • 设置options和attrs
  • 通过chooser在childGroup中抉择NioEventLoop并注册selector

创立实现之后,调用pipeline.fireChannelActive()向selector注册读事件,开始筹备承受I/O数据。

在执行工作的时候,常常会用到inEventLoop()办法,这个办法是用来判断以后线程是否是EventLoop中的线程。
如果是间接执行,不是就把工作放到MpscQueue(Multi producer single consumer)工作队列中,这样工作交给指定的NioEventLoop执行,不用加锁。

pipeline

初始化

pipeline在创立channel的时候被创立

protected AbstractChannel(Channel parent) {    this.parent = parent;    id = newId();    unsafe = newUnsafe();    pipeline = newChannelPipeline();}

pipeline中的节点是ChannelHandlerContext类型的,双向链表构造;在初始化时就会创立head、tail两个节点

protected DefaultChannelPipeline(Channel channel) {    this.channel = ObjectUtil.checkNotNull(channel, "channel");    succeededFuture = new SucceededChannelFuture(channel, null);    voidPromise =  new VoidChannelPromise(channel, true);    tail = new TailContext(this);    head = new HeadContext(this);    head.next = tail;    tail.prev = head;}

tail和head别离是inbound和outbound类型的。

channelHandler的增加在用户代码配置childHandler时实现

childHandler(new ChannelInitializer<SocketChannel>() {    @Override    public void initChannel(SocketChannel ch) {        ch.pipeline().addLast(new AuthHandler());    }})

增加时首先判断是否反复增加,而后创立节点(HandlerContext)增加至链表,最初回调增加实现的事件。

channelHandler的删除操作与增加相似,首先找到节点将它从链表上删除,最初回调删除handler事件。

事件流传

事件流传有两种形式:

  • pipeline流传: 这种形式会从pipeline的head节点开始流传
  • channelContext流传: 从以后节点开始流传

inbound事件

inBound事件包含register、active、active事件

每次流传都会找到下一个inbound类型的节点,最初流传给tail节点(tail也是inbound类型)

outBound事件

outBound事件流传方向与inBound相同,是从tail向head流传

异样事件流传与以上两种不同,它不关怀是inBoundhandler还是outboundhandler,并且只会从产生异样的节点开始向后流传,达到tail节点。
所以,最好在tail节点前创立一个专门用于解决异样的节点,重写exceptionCaught办法,这样所有异样都能被捕捉。