关于java:Netty源码分析之Reactor线程模型详解

10次阅读

共计 29046 个字符,预计需要花费 73 分钟才能阅读完成。

上一篇文章,剖析了 Netty 服务端启动的初始化过程,明天咱们来剖析一下 Netty 中的 Reactor 线程模型

在剖析源码之前,咱们先剖析,哪些地方用到了 EventLoop?

  • NioServerSocketChannel 的连贯监听注册
  • NioSocketChannel 的 IO 事件注册

NioServerSocketChannel 连贯监听

在 AbstractBootstrap 类的 initAndRegister()办法中,当 NioServerSocketChannel 初始化实现后,会调用 case 标记地位的代码进行注册。

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) { }
   // 注册到 boss 线程的 selector 上。ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();
        } else {channel.unsafe().closeForcibly();}
    }
    return regFuture;
}

AbstractNioChannel.doRegister

依照代码的执行逻辑,最终会执行到 AbstractNioChannel 的 doRegister() 办法中。

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 调用 ServerSocketChannel 的 register 办法,把以后服务端对象注册到 boss 线程的 selector 上
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

NioEventLoop 的启动过程

NioEventLoop 是一个线程,它的启动过程如下。

在 AbstractBootstrap 的 doBind0 办法中,获取了 NioServerSocketChannel 中的 NioEventLoop,而后应用它来执行绑定端口的工作。

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {

    // 启动
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {promise.setFailure(regFuture.cause());
            }
        }
    });
}

SingleThreadEventExecutor.execute

而后一路执行到 SingleThreadEventExecutor.execute 办法中,调用 startThread() 办法启动线程。

private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {startThread(); // 启动线程
        if (isShutdown()) {
            boolean reject = false;
            try {if (removeTask(task)) {reject = true;}
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);
    }
}

startThread

private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {doStartThread(); // 执行启动过程
                success = true;
            } finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

接着调用 doStartThread()办法,通过 executor.execute 执行一个工作,在该工作中启动了 NioEventLoop 线程

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() { // 通过线程池执行一个工作
        @Override
        public void run() {thread = Thread.currentThread();
            if (interrupted) {thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {SingleThreadEventExecutor.this.run(); // 调用 boss 的 NioEventLoop 的 run 办法,开启轮询
            }
            // 省略....
        }
    });
}

NioEventLoop 的轮询过程

当 NioEventLoop 线程被启动后,就间接进入到 NioEventLoop 的 run 办法中。

protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {if (!hasTasks()) {strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                try {if (strategy > 0) {processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    ranTasks = runAllTasks();}
            } else if (strategy > 0) {final long ioStartTime = System.nanoTime();
                try {processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                 selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            // Harmless exception - log anyway
            if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + "raised by a Selector {} - JDK bug?",
                             selector, e);
            }
        } catch (Error e) {throw (Error) e;
        } catch (Throwable t) {handleLoopException(t);
        } finally {
            // Always handle shutdown even if the loop processing threw an exception.
            try {if (isShuttingDown()) {closeAll();
                    if (confirmShutdown()) {return;}
                }
            } catch (Error e) {throw (Error) e;
            } catch (Throwable t) {handleLoopException(t);
            }
        }
    }
}

NioEventLoop 的执行流程

NioEventLoop 中的 run 办法是一个有限循环的线程,在该循环中次要做三件事件,如图 9 - 1 所示。

<center> 图 9 -1</center>

  • 轮询解决 I / O 事件(select),轮询 Selector 选择器中曾经注册的所有 Channel 的 I / O 就绪事件
  • 解决 I / O 事件,如果存在曾经就绪的 Channel 的 I / O 事件,则调用 processSelectedKeys 进行解决
  • 解决异步工作(runAllTasks),Reactor 线程有一个十分重要的职责,就是解决工作队列中的非 I / O 工作,Netty 提供了 ioRadio 参数用来调整 I / O 工夫和工作解决的工夫比例。

轮询 I / O 就绪事件

咱们先来看 I / O 工夫相干的代码片段:

  1. 通过 selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()) 获取以后的执行策略
  2. 依据不同的策略,用来管制每次轮询时的执行策略。
protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {if (!hasTasks()) {strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                }
                // 省略....
            }
        }
}

selectStrategy 解决逻辑

@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

如果 hasTasks 为 true,示意以后 NioEventLoop 线程存在异步工作的状况下,则调用selectSupplier.get(),否则间接返回SELECT

其中 selectSupplier.get() 的定义如下:

private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {return selectNow();
    }
};

该办法中调用的是 selectNow() 办法,这个办法是 Selector 选择器中的提供的非阻塞办法,执行后会立即返回。

  • 如果以后曾经有就绪的 Channel,则会返回对应就绪 Channel 的数量
  • 否则,返回 0.

分支解决

在下面一个步骤中取得了 strategy 之后,会依据不同的后果进行分支解决。

  • CONTINUE,示意须要重试。
  • BUSY_WAIT,因为在 NIO 中并不反对 BUSY_WAIT,所以 BUSY_WAIT 和 SELECT 的执行逻辑是一样的
  • SELECT,示意须要通过 select 办法获取就绪的 Channel 列表,当 NioEventLoop 中不存在异步工作时,也就是工作队列为空,则返回该策略。
switch (strategy) {
    case SelectStrategy.CONTINUE:
        continue;

    case SelectStrategy.BUSY_WAIT:
        // fall-through to SELECT since the busy-wait is not supported with NIO

    case SelectStrategy.SELECT:
        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
        if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}
        nextWakeupNanos.set(curDeadlineNanos);
        try {if (!hasTasks()) {strategy = select(curDeadlineNanos);
            }
        } finally {
            // This update is just to help block unnecessary selector wakeups
            // so use of lazySet is ok (no race condition)
            nextWakeupNanos.lazySet(AWAKE);
        }
        // fall through
    default:
}

SelectStrategy.SELECT

当 NioEventLoop 线程中不存在异步工作时,则开始执行 SELECT 策略

// 下一次定时工作触发截至工夫,默认不是定时工作,返回 -1L
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}
nextWakeupNanos.set(curDeadlineNanos);
try {if (!hasTasks()) {
        //2. taskQueue 中工作执行完,开始执行 select 进行阻塞
        strategy = select(curDeadlineNanos);
    }
} finally {
    // This update is just to help block unnecessary selector wakeups
    // so use of lazySet is ok (no race condition)
    nextWakeupNanos.lazySet(AWAKE);
}

select 办法定义如下,默认状况下 deadlineNanos=NONE,所以会调用select() 办法阻塞。

private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select();
    }
    // 计算 select()办法的阻塞超时工夫
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

最终返回就绪的 channel 个数,后续的逻辑中会依据返回的就绪 channel 个数来决定执行逻辑。

NioEventLoop.run 中的业务解决

业务解决的逻辑相对来说比拟容易了解

  • 如果有就绪的 channel,则解决就绪 channel 的 IO 事件
  • 解决实现后同步执行异步队列中的工作。
  • 另外,这里为了解决 Java NIO 中的空转问题,通过 selectCnt 记录了空转次数,一次循环产生了空转(既没有 IO 须要解决、也没有执行任何工作),那么记录下来(selectCnt);,如果间断产生空转(selectCnt 达到肯定值),netty 认为触发了 NIO 的 BUG(unexpectedSelectorWakeup 解决);

Java Nio 中有一个 bug,Java nio 在 Linux 零碎下的 epoll 空轮询问题。也就是在 select() 办法中,及时就绪的 channel 为 0,也会从原本应该阻塞的操作中被唤醒,从而导致 CPU 使用率达到 100%。

@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        // 省略....
        selectCnt++;//selectCnt 记录的是无功而返的 select 次数,即 eventLoop 空转的次数,为解决 NIO BUG
        cancelledKeys = 0;
        needsToSelectAgain = false;
        final int ioRatio = this.ioRatio;
        boolean ranTasks;
        if (ioRatio == 100) { //ioRadio 执行工夫占比是 100%,默认是 50%
            try {if (strategy > 0) { //strategy>0 示意存在就绪的 SocketChannel
                    processSelectedKeys(); // 执行就绪 SocketChannel 的工作}
            } finally {
             // 留神,将 ioRatio 设置为 100,并不代表工作不执行,反而是每次将工作队列执行完
                ranTasks = runAllTasks(); // 确保总是执行队列中的工作}
        } else if (strategy > 0) { //strategy>0 示意存在就绪的 SocketChannel
            final long ioStartTime = System.nanoTime(); //io 工夫解决开始工夫
            try {processSelectedKeys(); // 开始解决 IO 就绪事件
            } finally {
                // io 事件执行完结工夫
                final long ioTime = System.nanoTime() - ioStartTime;
                // 基于本次循环解决 IO 的工夫,ioRatio,计算出执行工作耗时的下限,也就是只容许解决多长时间异步工作
                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        } else {
            // 这个分支代表:strategy=0,ioRatio<100,此时工作限时 =0,意为:尽量少地执行异步工作
            // 这个分支和 strategy>0 理论是一码事,代码简化了一下而已
            ranTasks = runAllTasks(0); // This will run the minimum number of tasks
        }

        if (ranTasks || strategy > 0) { //ranTasks=true,或 strategy>0,阐明 eventLoop 干活了,没有空转,清空 selectCnt
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                             selectCnt - 1, selector);
            }
            selectCnt = 0;
        } 
         //unexpectedSelectorWakeup 解决 NIO BUG
        else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
            selectCnt = 0;
        }
    }
}

processSelectedKeys

通过在 select 办法中,咱们能够取得就绪的 I / O 事件数量,从而触发执行 processSelectedKeys 办法。

private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();
    } else {processSelectedKeysPlain(selector.selectedKeys());
    }
}

解决 I / O 事件时,有两个逻辑分支解决:

  • 一种是解决 Netty 优化过的 selectedKeys,
  • 另一种是失常的解决逻辑

processSelectedKeys 办法中依据是否设置了 selectedKeys 来判断应用哪种策略,默认应用的是 Netty 优化过的 selectedKeys,它返回的对象是SelectedSelectionKeySet

processSelectedKeysOptimized

private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {
        //1. 取出 IO 事件以及对应的 channel
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;// k 的援用置 null,便于 gc 回收,也示意该 channel 的事件处理实现防止反复解决

        final Object a = k.attachment(); // 获取保留在以后 channel 中的 attachment,此时应该是 NioServerSocketChannel
        // 解决以后的 channel
        if (a instanceof AbstractNioChannel) {
             // 对于 boss NioEventLoop,轮询到的根本是连贯事件,后续的事件就是通过他的 pipeline 将连贯扔给一个 worker NioEventLoop 解决
            // 对于 worker NioEventLoop 来说,轮循道的根本商是 IO 读写事件,后续的事件就是通过他的 pipeline 将读取到的字节流传递给每个 channelHandler 来解决
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {@SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);

            selectAgain();
            i = -1;
        }
    }
}

processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {eventLoop = ch.eventLoop();
        } catch (Throwable ignored) { }
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {int readyOps = k.readyOps(); // 获取以后 key 所属的操作类型
      
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {// 如果是连贯类型
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();}
        if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 如果是写类型
            ch.unsafe().forceFlush();
        }
        // 如果是读类型或者 ACCEPT 类型。则执行 unsafe.read()办法,unsafe 的实例对象为 NioMessageUnsafe
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();
        }
    } catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());
    }
}

NioMessageUnsafe.read()

假如此时是一个读操作,或者是客户端建设连贯,那么代码执行逻辑如下,

@Override
public void read() {assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline(); // 如果是第一次建设连贯,此时的 pipeline 是 ServerBootstrapAcceptor
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {int localRead = doReadMessages(readBuf);
                if (localRead == 0) {break;}
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
            } while (continueReading(allocHandle));
        } catch (Throwable t) {exception = t;}

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));  // 调用 pipeline 中的 channelRead 办法
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (exception != null) {closed = closeOnReadError(exception);

            pipeline.fireExceptionCaught(exception); // 调用 pipeline 中的 ExceptionCaught 办法
        }

        if (closed) {
            inputShutdown = true;
            if (isOpen()) {close(voidPromise());
            }
        }
    } finally {if (!readPending && !config.isAutoRead()) {removeReadOp();
        }
    }
}

SelectedSelectionKeySet 的优化

Netty 中本人封装实现了一个 SelectedSelectionKeySet,用来优化本来 SelectorKeys 的构造,它是怎么进行优化的呢?先来看它的代码定义

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {if (o == null) {return false;}

        keys[size++] = o;
        if (size == keys.length) {increaseCapacity();
        }

        return true;
    }
}

SelectedSelectionKeySet 外部应用的是 SelectionKey 数组,所有在 processSelectedKeysOptimized 办法中能够间接通过遍历数组来取出就绪的 I / O 事件。

而原来的 Set<SelectionKey> 返回的是 HashSet 类型,两者相比,SelectionKey[]不须要思考哈希抵触的问题,所以能够实现 O(1)工夫复杂度的 add 操作。

SelectedSelectionKeySet 的初始化

netty 通过反射的形式,把 Selector 对象外部的 selectedKeys 和 publicSelectedKeys 替换为 SelectedSelectionKeySet。

本来的 selectedKeys 和 publicSelectedKeys 这两个字段都是 HashSet 类型,替换之后变成了 SelectedSelectionKeySet。当有就绪的 key 时,会间接填充到 SelectedSelectionKeySet 的数组中。后续只须要遍历即可。

private SelectorTuple openSelector() {final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    // 应用反射
    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                //Selector 外部的 selectedKeys 字段
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                //Selector 外部的 publicSelectedKeys 字段
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    // 获取 selectedKeysField 字段偏移量
                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                    // 获取 publicSelectedKeysField 字段偏移量
                    long publicSelectedKeysFieldOffset =
                        PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                        // 替换为 selectedKeySet
                        PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                        PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                        return null;
                    }
                    // We could not retrieve the offset, lets try reflection as last-resort.
                }
                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                if (cause != null) {return cause;}
                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                if (cause != null) {return cause;}
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {return e;} catch (IllegalAccessException e) {return e;}
        }
    });
    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
        return new SelectorTuple(unwrappedSelector);
    }
    selectedKeys = selectedKeySet;
}

异步工作的执行流程

剖析完下面的流程后,咱们持续来看 NioEventLoop 中的 run 办法中,针对异步工作的解决流程

@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {ranTasks = runAllTasks();
    }
}

runAllTask

须要留神,NioEventLoop 能够反对定时工作的执行,通过 nioEventLoop.schedule() 来实现。

protected boolean runAllTasks() {assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {fetchedAll = fetchFromScheduledTaskQueue(); // 合并定时工作到一般工作队列
        if (runAllTasksFrom(taskQueue)) { // 循环执行 taskQueue 中的工作
            ranAtLeastOne = true;
        }
    } while (!fetchedAll);  

    if (ranAtLeastOne) { // 如果工作全副执行实现,记录执行完实现工夫
        lastExecutionTime = ScheduledFutureTask.nanoTime();}
    afterRunningAllTasks();// 执行收尾工作
    return ranAtLeastOne;
}

fetchFromScheduledTaskQueue

遍历 scheduledTaskQueue 中的工作,增加到 taskQueue 中。

private boolean fetchFromScheduledTaskQueue() {if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {return true;}
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    for (;;) {Runnable scheduledTask = pollScheduledTask(nanoTime);
        if (scheduledTask == null) {return true;}
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}

工作增加办法 execute

NioEventLoop 外部有两个十分重要的异步工作队列,别离是一般工作和定时工作队列,针对这两个队列提供了两个办法别离向两个队列中增加工作。

  • execute()
  • schedule()

其中,execute 办法的定义如下。

private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();
    addTask(task); // 把当前任务增加到阻塞队列中
    if (!inEventLoop) { // 如果是非 NioEventLoop
        startThread(); // 启动线程
        if (isShutdown()) { // 如果以后 NioEventLoop 曾经是进行状态
            boolean reject = false;
            try {if (removeTask(task)) {reject = true;}
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);
    }
}

Nio 的空轮转问题

所谓的空轮训,是指咱们在执行 selector.select() 办法时,如果没有就绪的 SocketChannel 时,以后线程会被阻塞。而空轮询是指当没有就绪 SocketChannel 时,会被触发唤醒。

而这个唤醒是没有任何读写申请的,从而导致线程在做有效的轮询,使得 CPU 占用率较高。

导致这个问题的根本原因是:

在局部 Linux 的 2.6 的 kernel 中,poll 和 epoll 对于忽然中断的连贯 socket 会对返回的 eventSet 事件汇合置为 POLLHUP,也可能是 POLLERR,eventSet 事件汇合产生了变动,这就可能导致 Selector 会被唤醒。这是与操作系统机制有关系的,JDK 尽管仅仅是一个兼容各个操作系统平台的软件,但很遗憾在 JDK5 和 JDK6 最后的版本中(严格意义上来将,JDK 局部版本都是),这个问题并没有解决,而将这个帽子抛给了操作系统方,这也就是这个 bug 最终始终到 2013 年才最终修复的起因,最终影响力太广。

Netty 是如何解决这个问题的呢?咱们回到 NioEventLoop 的 run 办法中

@Override
protected void run() {
    int selectCnt = 0;
    for (;;) {
        //selectCnt 记录的是无功而返的 select 次数,即 eventLoop 空转的次数,为解决 NIO BUG
        selectCnt++; 
        //ranTasks=true,或 strategy>0,阐明 eventLoop 干活了,没有空转,清空 selectCnt
        if (ranTasks || strategy > 0) {
            // 如果抉择操作计数器的值,大于最小选择器重构阈值,则输入 log
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                             selectCnt - 1, selector);
            }
            selectCnt = 0;
        } 
        //unexpectedSelectorWakeup 解决 NIO BUG
        else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
            selectCnt = 0;
        }
    }
}

unexpectedSelectorWakeup

private boolean unexpectedSelectorWakeup(int selectCnt) {if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because" +
                         "Thread.currentThread().interrupt() was called. Use" +
                         "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
        }
        return true;
    }
    // 如果抉择重构的阈值大于 0,默认值是 512 次、并且以后触发的空轮询次数大于 512 次。,则触发重构
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
        // The selector returned prematurely many times in a row.
        // Rebuild the selector to work around the problem.
        logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
        rebuildSelector();
        return true;
    }
    return false;
}

rebuildSelector()

public void rebuildSelector() {if (!inEventLoop()) { // 如果不是在 eventLoop 中执行,则应用异步线程执行
        execute(new Runnable() {
            @Override
            public void run() {rebuildSelector0();
            }
        });
        return;
    }
    rebuildSelector0();}

rebuildSelector0

这个办法的次要作用:从新创立一个选择器, 代替以后事件循环中的选择器

private void rebuildSelector0() {
    final Selector oldSelector = selector; // 获取老的 selector 选择器
    final SelectorTuple newSelectorTuple; // 定义新的选择器

    if (oldSelector == null) { // 如果老的选择器为空,间接返回
        return;
    }

    try {newSelectorTuple = openSelector(); // 创立一个新的选择器
    } catch (Exception e) {logger.warn("Failed to create a new Selector.", e);
        return;
    }

    // Register all channels to the new Selector.
    int nChannels = 0;
    for (SelectionKey key: oldSelector.keys()) {// 遍历注册到选择器的抉择 key 汇合
        Object a = key.attachment();
        try {
             // 如果抉择 key 有效或抉择关联的通道曾经注册到新的选择器,则跳出以后循环
            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {continue;}
             // 获取 key 的抉择关注事件集
            int interestOps = key.interestOps();
            key.cancel();// 勾销抉择 key
          // 注册抉择 key 到新的选择器
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {// 如果是 nio 通道,则更新通道的抉择 key
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {@SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }
    // 更新以后事件循环选择器
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    try {
        // time to close the old selector as everything else is registered to the new one
        oldSelector.close(); // 敞开原始选择器} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("Failed to close the old Selector.", t);
        }
    }

    if (logger.isInfoEnabled()) {logger.info("Migrated" + nChannels + "channel(s) to the new Selector.");
    }
}

从上述过程中咱们发现,Netty 解决 NIO 空轮转问题的形式,是通过重建 Selector 对象来实现的,在这个重建过程中,外围是把 Selector 中所有的 SelectionKey 从新注册到新的 Selector 上,从而奇妙的防止了 JDK epoll 空轮训问题。

连贯的建设及处理过程

在 9.2.4.3 节中,提到了当客户端有连贯或者读事件发送到服务端时,会调用 NioMessageUnsafe 类的 read()办法。

public void read() {assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                // 如果有客户端连贯进来,则 localRead 为 1,否则返回 0
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {break;}
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                
                allocHandle.incMessagesRead(localRead); // 累计减少 read 音讯数量
            } while (continueReading(allocHandle));
        } catch (Throwable t) {exception = t;}

        int size = readBuf.size(); // 遍历客户端连贯列表
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i)); // 调用 pipeline 中 handler 的 channelRead 办法。}
        readBuf.clear(); // 清空集合
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete(); // 触发 pipeline 中 handler 的 readComplete 办法

        if (exception != null) {closed = closeOnReadError(exception);

            pipeline.fireExceptionCaught(exception);
        }

        if (closed) {
            inputShutdown = true;
            if (isOpen()) {close(voidPromise());
            }
        }
    } finally {if (!readPending && !config.isAutoRead()) {removeReadOp();
        }
    }
}

pipeline.fireChannelRead(readBuf.get(i))

持续来看 pipeline 的触发办法,此时的 pipeline 组成,如果以后是连贯事件,那么 pipeline = ServerBootstrap$ServerBootstrapAcceptor。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {next.invokeChannelRead(m); // 获取 pipeline 中的下一个节点,调用该 handler 的 channelRead 办法
    } else {executor.execute(new Runnable() {
            @Override
            public void run() {next.invokeChannelRead(m);
            }
        });
    }
}

ServerBootstrapAcceptor

ServerBootstrapAcceptor 是 NioServerSocketChannel 中一个非凡的 Handler,专门用来解决客户端连贯事件,该办法中外围的目标是把针对 SocketChannel 的 handler 链表,增加到以后 NioSocketChannel 中的 pipeline 中。

public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);  // 把服务端配置的 childHandler,增加到以后 NioSocketChannel 中的 pipeline 中

    setChannelOptions(child, childOptions, logger); // 设置 NioSocketChannel 的属性
    setAttributes(child, childAttrs); 

    try {
        // 把以后的 NioSocketChannel 注册到 Selector 上,并且监听一个异步事件。childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {forceClose(child, t);
    }
}

pipeline 的构建过程

9.6.2 节中,child 其实就是一个 NioSocketChannel,它是在 NioServerSocketChannel 中,当接管到一个新的链接时,创建对象。

@Override
protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());

    try {if (ch != null) {buf.add(new NioSocketChannel(this, ch)); // 这里
            return 1;
        }
    } catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {ch.close();
        } catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

而 NioSocketChannel 在结构时,调用了父类 AbstractChannel 中的构造方法,初始化了一个 pipeline.

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();}

DefaultChannelPipeline

pipeline 的默认实例是 DefaultChannelPipeline,构造方法如下。

protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

初始化了一个头节点和尾节点,组成一个双向链表,如图 9 - 2 所示

<center> 图 9 -2</center>

NioSocketChannel 中 handler 链的形成

再回到 ServerBootstrapAccepter 的 channelRead 办法中,收到客户端连贯时,触发了 NioSocketChannel 中的 pipeline 的增加

以下代码是 DefaultChannelPipeline 的 addLast 办法。

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {ObjectUtil.checkNotNull(handlers, "handlers");

   for (ChannelHandler h: handlers) { // 遍历 handlers 列表,此时这里的 handler 是 ChannelInitializer 回调办法
       if (h == null) {break;}
       addLast(executor, null, h);
   }

   return this;
}

addLast

把服务端配置的 ChannelHandler,增加到 pipeline 中,留神,此时的 pipeline 中保留的是 ChannelInitializer 回调办法。

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {checkMultiplicity(handler); // 查看是否有反复的 handler
        // 创立新的 DefaultChannelHandlerContext 节点
        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);  // 增加新的 DefaultChannelHandlerContext 到 ChannelPipeline

      
        if (!registered) {newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

这个回调办法什么时候触发调用呢?其实就是在 ServerBootstrapAcceptor 这个类的 channelRead 办法中,注册以后 NioSocketChannel 时

childGroup.register(child).addListener(new ChannelFutureListener() {}

最终依照之前咱们上一节课源码剖析的思路,定位到 AbstractChannel 中的 register0 办法中。

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
                //
                pipeline.invokeHandlerAddedIfNeeded();}
}

callHandlerAddedForAllHandlers

pipeline.invokeHandlerAddedIfNeeded()办法,向下执行,会进入到 DefaultChannelPipeline 这个类中的 callHandlerAddedForAllHandlers 办法中

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // This Channel itself was registered.
        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        this.pendingHandlerCallbackHead = null;
    }
    // 从期待被调用的 handler 回调列表中,取出工作来执行。PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {task.execute();
        task = task.next;
    }
}

咱们发现,pendingHandlerCallbackHead 这个单向链表,是在 callHandlerCallbackLater 办法中被增加的,

而 callHandlerCallbackLater 又是在 addLast 办法中增加的,所以形成了一个异步残缺的闭环。

ChannelInitializer.handlerAdded

task.execute()办法执行门路是

callHandlerAdded0 -> ctx.callHandlerAdded ->

​ ——-> AbstractChannelHandlerContext.callHandlerAddded()

​ —————> ChannelInitializer.handlerAdded

调用 initChannel 办法来初始化 NioSocketChannel 中的 Channel.

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {
        // This should always be true with our current DefaultChannelPipeline implementation.
        // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
        // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
        // will be added in the expected order.
        if (initChannel(ctx)) {

            // We are done with init the Channel, removing the initializer now.
            removeState(ctx);
        }
    }
}

接着,调用 initChannel 形象办法,该办法由具体的实现类来实现。

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.
        try {initChannel((C) ctx.channel());
        } catch (Throwable cause) {// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

ChannelInitializer 的实现,是咱们自定义 Server 中的匿名外部类,ChannelInitializer。因而通过这个回调来实现以后 NioSocketChannel 的 pipeline 的构建过程。

public static void main(String[] args){EventLoopGroup boss = new NioEventLoopGroup();
    //2 用于对承受客户端连贯读写操作的线程工作组
    EventLoopGroup work = new NioEventLoopGroup();
    ServerBootstrap b = new ServerBootstrap();
    b.group(boss, work)    // 绑定两个工作线程组
        .channel(NioServerSocketChannel.class)    // 设置 NIO 的模式
        // 初始化绑定服务通道
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {sc.pipeline()
                    .addLast(
                    new LengthFieldBasedFrameDecoder(1024,
                                                     9,4,0,0))
                    .addLast(new MessageRecordEncoder())
                    .addLast(new MessageRecordDecode())
                    .addLast(new ServerHandler());
            }
        });
}

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

正文完
 0