上一篇文章,剖析了Netty服务端启动的初始化过程,明天咱们来剖析一下Netty中的Reactor线程模型

在剖析源码之前,咱们先剖析,哪些地方用到了EventLoop?

  • NioServerSocketChannel的连贯监听注册
  • NioSocketChannel的IO事件注册

NioServerSocketChannel连贯监听

在AbstractBootstrap类的initAndRegister()办法中,当NioServerSocketChannel初始化实现后,会调用case标记地位的代码进行注册。

final ChannelFuture initAndRegister() {    Channel channel = null;    try {        channel = channelFactory.newChannel();        init(channel);    } catch (Throwable t) {           }   //注册到boss线程的selector上。    ChannelFuture regFuture = config().group().register(channel);    if (regFuture.cause() != null) {        if (channel.isRegistered()) {            channel.close();        } else {            channel.unsafe().closeForcibly();        }    }    return regFuture;}

AbstractNioChannel.doRegister

依照代码的执行逻辑,最终会执行到AbstractNioChannel的doRegister()办法中。

@Overrideprotected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            //调用ServerSocketChannel的register办法,把以后服务端对象注册到boss线程的selector上            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true;            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}

NioEventLoop的启动过程

NioEventLoop是一个线程,它的启动过程如下。

在AbstractBootstrap的doBind0办法中,获取了NioServerSocketChannel中的NioEventLoop,而后应用它来执行绑定端口的工作。

private static void doBind0(    final ChannelFuture regFuture, final Channel channel,    final SocketAddress localAddress, final ChannelPromise promise) {    //启动    channel.eventLoop().execute(new Runnable() {        @Override        public void run() {            if (regFuture.isSuccess()) {                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            } else {                promise.setFailure(regFuture.cause());            }        }    });}

SingleThreadEventExecutor.execute

而后一路执行到SingleThreadEventExecutor.execute办法中,调用startThread()办法启动线程。

private void execute(Runnable task, boolean immediate) {    boolean inEventLoop = inEventLoop();    addTask(task);    if (!inEventLoop) {        startThread(); //启动线程        if (isShutdown()) {            boolean reject = false;            try {                if (removeTask(task)) {                    reject = true;                }            } catch (UnsupportedOperationException e) {                // The task queue does not support removal so the best thing we can do is to just move on and                // hope we will be able to pick-up the task before its completely terminated.                // In worst case we will log on termination.            }            if (reject) {                reject();            }        }    }    if (!addTaskWakesUp && immediate) {        wakeup(inEventLoop);    }}

startThread

private void startThread() {    if (state == ST_NOT_STARTED) {        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {            boolean success = false;            try {                doStartThread(); //执行启动过程                success = true;            } finally {                if (!success) {                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);                }            }        }    }}

接着调用doStartThread()办法,通过executor.execute执行一个工作,在该工作中启动了NioEventLoop线程

private void doStartThread() {    assert thread == null;    executor.execute(new Runnable() { //通过线程池执行一个工作        @Override        public void run() {            thread = Thread.currentThread();            if (interrupted) {                thread.interrupt();            }            boolean success = false;            updateLastExecutionTime();            try {                SingleThreadEventExecutor.this.run(); //调用boss的NioEventLoop的run办法,开启轮询            }            //省略....        }    });}

NioEventLoop的轮询过程

当NioEventLoop线程被启动后,就间接进入到NioEventLoop的run办法中。

protected void run() {    int selectCnt = 0;    for (;;) {        try {            int strategy;            try {                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                switch (strategy) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.BUSY_WAIT:                    case SelectStrategy.SELECT:                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();                        if (curDeadlineNanos == -1L) {                            curDeadlineNanos = NONE; // nothing on the calendar                        }                        nextWakeupNanos.set(curDeadlineNanos);                        try {                            if (!hasTasks()) {                                strategy = select(curDeadlineNanos);                            }                        } finally {                            // This update is just to help block unnecessary selector wakeups                            // so use of lazySet is ok (no race condition)                            nextWakeupNanos.lazySet(AWAKE);                        }                        // fall through                    default:                }            } catch (IOException e) {                // If we receive an IOException here its because the Selector is messed up. Let's rebuild                // the selector and retry. https://github.com/netty/netty/issues/8566                rebuildSelector0();                selectCnt = 0;                handleLoopException(e);                continue;            }            selectCnt++;            cancelledKeys = 0;            needsToSelectAgain = false;            final int ioRatio = this.ioRatio;            boolean ranTasks;            if (ioRatio == 100) {                try {                    if (strategy > 0) {                        processSelectedKeys();                    }                } finally {                    // Ensure we always run tasks.                    ranTasks = runAllTasks();                }            } else if (strategy > 0) {                final long ioStartTime = System.nanoTime();                try {                    processSelectedKeys();                } finally {                    // Ensure we always run tasks.                    final long ioTime = System.nanoTime() - ioStartTime;                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }            } else {                ranTasks = runAllTasks(0); // This will run the minimum number of tasks            }            if (ranTasks || strategy > 0) {                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                                 selectCnt - 1, selector);                }                selectCnt = 0;            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)                selectCnt = 0;            }        } catch (CancelledKeyException e) {            // Harmless exception - log anyway            if (logger.isDebugEnabled()) {                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",                             selector, e);            }        } catch (Error e) {            throw (Error) e;        } catch (Throwable t) {            handleLoopException(t);        } finally {            // Always handle shutdown even if the loop processing threw an exception.            try {                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        return;                    }                }            } catch (Error e) {                throw (Error) e;            } catch (Throwable t) {                handleLoopException(t);            }        }    }}

NioEventLoop的执行流程

NioEventLoop中的run办法是一个有限循环的线程,在该循环中次要做三件事件,如图9-1所示。

<center>图9-1</center>

  • 轮询解决I/O事件(select),轮询Selector选择器中曾经注册的所有Channel的I/O就绪事件
  • 解决I/O事件,如果存在曾经就绪的Channel的I/O事件,则调用processSelectedKeys进行解决
  • 解决异步工作(runAllTasks),Reactor线程有一个十分重要的职责,就是解决工作队列中的非I/O工作,Netty提供了ioRadio参数用来调整I/O工夫和工作解决的工夫比例。

轮询I/O就绪事件

咱们先来看I/O工夫相干的代码片段:

  1. 通过selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())获取以后的执行策略
  2. 依据不同的策略,用来管制每次轮询时的执行策略。
protected void run() {        int selectCnt = 0;        for (;;) {            try {                int strategy;                try {                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                    switch (strategy) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.BUSY_WAIT:                        // fall-through to SELECT since the busy-wait is not supported with NIO                    case SelectStrategy.SELECT:                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();                        if (curDeadlineNanos == -1L) {                            curDeadlineNanos = NONE; // nothing on the calendar                        }                        nextWakeupNanos.set(curDeadlineNanos);                        try {                            if (!hasTasks()) {                                strategy = select(curDeadlineNanos);                            }                        } finally {                            // This update is just to help block unnecessary selector wakeups                            // so use of lazySet is ok (no race condition)                            nextWakeupNanos.lazySet(AWAKE);                        }                        // fall through                    default:                    }                }                //省略....            }        }}

selectStrategy解决逻辑

@Overridepublic int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}

如果hasTasks为true,示意以后NioEventLoop线程存在异步工作的状况下,则调用selectSupplier.get(),否则间接返回SELECT

其中selectSupplier.get()的定义如下:

private final IntSupplier selectNowSupplier = new IntSupplier() {    @Override    public int get() throws Exception {        return selectNow();    }};

该办法中调用的是selectNow()办法,这个办法是Selector选择器中的提供的非阻塞办法,执行后会立即返回。

  • 如果以后曾经有就绪的Channel,则会返回对应就绪Channel的数量
  • 否则,返回0.

分支解决

在下面一个步骤中取得了strategy之后,会依据不同的后果进行分支解决。

  • CONTINUE,示意须要重试。
  • BUSY_WAIT,因为在NIO中并不反对BUSY_WAIT,所以BUSY_WAIT和SELECT的执行逻辑是一样的
  • SELECT,示意须要通过select办法获取就绪的Channel列表,当NioEventLoop中不存在异步工作时,也就是工作队列为空,则返回该策略。
switch (strategy) {    case SelectStrategy.CONTINUE:        continue;    case SelectStrategy.BUSY_WAIT:        // fall-through to SELECT since the busy-wait is not supported with NIO    case SelectStrategy.SELECT:        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();        if (curDeadlineNanos == -1L) {            curDeadlineNanos = NONE; // nothing on the calendar        }        nextWakeupNanos.set(curDeadlineNanos);        try {            if (!hasTasks()) {                strategy = select(curDeadlineNanos);            }        } finally {            // This update is just to help block unnecessary selector wakeups            // so use of lazySet is ok (no race condition)            nextWakeupNanos.lazySet(AWAKE);        }        // fall through    default:}

SelectStrategy.SELECT

当NioEventLoop线程中不存在异步工作时,则开始执行SELECT策略

//下一次定时工作触发截至工夫,默认不是定时工作,返回 -1Llong curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {    curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {    if (!hasTasks()) {        //2. taskQueue中工作执行完,开始执行select进行阻塞        strategy = select(curDeadlineNanos);    }} finally {    // This update is just to help block unnecessary selector wakeups    // so use of lazySet is ok (no race condition)    nextWakeupNanos.lazySet(AWAKE);}

select办法定义如下,默认状况下deadlineNanos=NONE,所以会调用select()办法阻塞。

private int select(long deadlineNanos) throws IOException {    if (deadlineNanos == NONE) {        return selector.select();    }    //计算select()办法的阻塞超时工夫    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);}

最终返回就绪的channel个数,后续的逻辑中会依据返回的就绪channel个数来决定执行逻辑。

NioEventLoop.run中的业务解决

业务解决的逻辑相对来说比拟容易了解

  • 如果有就绪的channel,则解决就绪channel的IO事件
  • 解决实现后同步执行异步队列中的工作。
  • 另外,这里为了解决Java NIO中的空转问题,通过selectCnt记录了空转次数,一次循环产生了空转(既没有IO须要解决、也没有执行任何工作),那么记录下来(selectCnt); ,如果间断产生空转(selectCnt达到肯定值),netty认为触发了NIO的BUG(unexpectedSelectorWakeup解决);
Java Nio中有一个bug,Java nio在Linux零碎下的epoll空轮询问题。也就是在select()办法中,及时就绪的channel为0,也会从原本应该阻塞的操作中被唤醒,从而导致CPU 使用率达到100%。
@Overrideprotected void run() {    int selectCnt = 0;    for (;;) {        //省略....        selectCnt++;//selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG        cancelledKeys = 0;        needsToSelectAgain = false;        final int ioRatio = this.ioRatio;        boolean ranTasks;        if (ioRatio == 100) { //ioRadio执行工夫占比是100%,默认是50%            try {                if (strategy > 0) { //strategy>0示意存在就绪的SocketChannel                    processSelectedKeys(); //执行就绪SocketChannel的工作                }            } finally {             //留神,将ioRatio设置为100,并不代表工作不执行,反而是每次将工作队列执行完                ranTasks = runAllTasks(); //确保总是执行队列中的工作            }        } else if (strategy > 0) { //strategy>0示意存在就绪的SocketChannel            final long ioStartTime = System.nanoTime(); //io工夫解决开始工夫            try {                processSelectedKeys(); //开始解决IO就绪事件            } finally {                // io事件执行完结工夫                final long ioTime = System.nanoTime() - ioStartTime;                //基于本次循环解决IO的工夫,ioRatio,计算出执行工作耗时的下限,也就是只容许解决多长时间异步工作                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);            }        } else {            //这个分支代表:strategy=0,ioRatio<100,此时工作限时=0,意为:尽量少地执行异步工作            //这个分支和strategy>0理论是一码事,代码简化了一下而已            ranTasks = runAllTasks(0); // This will run the minimum number of tasks        }        if (ranTasks || strategy > 0) { //ranTasks=true,或strategy>0,阐明eventLoop干活了,没有空转,清空selectCnt            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                             selectCnt - 1, selector);            }            selectCnt = 0;        }          //unexpectedSelectorWakeup解决NIO BUG        else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)            selectCnt = 0;        }    }}

processSelectedKeys

通过在select办法中,咱们能够取得就绪的I/O事件数量,从而触发执行processSelectedKeys办法。

private void processSelectedKeys() {    if (selectedKeys != null) {        processSelectedKeysOptimized();    } else {        processSelectedKeysPlain(selector.selectedKeys());    }}

解决I/O事件时,有两个逻辑分支解决:

  • 一种是解决Netty优化过的selectedKeys,
  • 另一种是失常的解决逻辑

processSelectedKeys办法中依据是否设置了selectedKeys来判断应用哪种策略,默认应用的是Netty优化过的selectedKeys,它返回的对象是SelectedSelectionKeySet

processSelectedKeysOptimized

private void processSelectedKeysOptimized() {    for (int i = 0; i < selectedKeys.size; ++i) {        //1. 取出IO事件以及对应的channel        final SelectionKey k = selectedKeys.keys[i];        selectedKeys.keys[i] = null;//k的援用置null,便于gc回收,也示意该channel的事件处理实现防止反复解决        final Object a = k.attachment(); //获取保留在以后channel中的attachment,此时应该是NioServerSocketChannel        //解决以后的channel        if (a instanceof AbstractNioChannel) {             //对于boss NioEventLoop,轮询到的根本是连贯事件,后续的事件就是通过他的pipeline将连贯扔给一个worker NioEventLoop解决            //对于worker NioEventLoop来说,轮循道的根本商是IO读写事件,后续的事件就是通过他的pipeline将读取到的字节流传递给每个channelHandler来解决            processSelectedKey(k, (AbstractNioChannel) a);        } else {            @SuppressWarnings("unchecked")            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;            processSelectedKey(k, task);        }                if (needsToSelectAgain) {            // null out entries in the array to allow to have it GC'ed once the Channel close            // See https://github.com/netty/netty/issues/2363            selectedKeys.reset(i + 1);            selectAgain();            i = -1;        }    }}

processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();    if (!k.isValid()) {        final EventLoop eventLoop;        try {            eventLoop = ch.eventLoop();        } catch (Throwable ignored) {                   }        if (eventLoop == this) {            // close the channel if the key is not valid anymore            unsafe.close(unsafe.voidPromise());        }        return;    }    try {        int readyOps = k.readyOps(); //获取以后key所属的操作类型              if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如果是连贯类型            int ops = k.interestOps();            ops &= ~SelectionKey.OP_CONNECT;            k.interestOps(ops);            unsafe.finishConnect();        }        if ((readyOps & SelectionKey.OP_WRITE) != 0) { //如果是写类型            ch.unsafe().forceFlush();        }        //如果是读类型或者ACCEPT类型。则执行unsafe.read()办法,unsafe的实例对象为 NioMessageUnsafe        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {            unsafe.read();        }    } catch (CancelledKeyException ignored) {        unsafe.close(unsafe.voidPromise());    }}

NioMessageUnsafe.read()

假如此时是一个读操作,或者是客户端建设连贯,那么代码执行逻辑如下,

@Overridepublic void read() {    assert eventLoop().inEventLoop();    final ChannelConfig config = config();    final ChannelPipeline pipeline = pipeline(); //如果是第一次建设连贯,此时的pipeline是ServerBootstrapAcceptor    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    allocHandle.reset(config);    boolean closed = false;    Throwable exception = null;    try {        try {            do {                int localRead = doReadMessages(readBuf);                if (localRead == 0) {                    break;                }                if (localRead < 0) {                    closed = true;                    break;                }                allocHandle.incMessagesRead(localRead);            } while (continueReading(allocHandle));        } catch (Throwable t) {            exception = t;        }        int size = readBuf.size();        for (int i = 0; i < size; i ++) {            readPending = false;            pipeline.fireChannelRead(readBuf.get(i));  //调用pipeline中的channelRead办法        }        readBuf.clear();        allocHandle.readComplete();        pipeline.fireChannelReadComplete();        if (exception != null) {            closed = closeOnReadError(exception);            pipeline.fireExceptionCaught(exception); //调用pipeline中的ExceptionCaught办法        }        if (closed) {            inputShutdown = true;            if (isOpen()) {                close(voidPromise());            }        }    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

SelectedSelectionKeySet的优化

Netty中本人封装实现了一个SelectedSelectionKeySet,用来优化本来SelectorKeys的构造,它是怎么进行优化的呢?先来看它的代码定义

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {    SelectionKey[] keys;    int size;    SelectedSelectionKeySet() {        keys = new SelectionKey[1024];    }    @Override    public boolean add(SelectionKey o) {        if (o == null) {            return false;        }        keys[size++] = o;        if (size == keys.length) {            increaseCapacity();        }        return true;    }}

SelectedSelectionKeySet外部应用的是SelectionKey数组,所有在processSelectedKeysOptimized办法中能够间接通过遍历数组来取出就绪的I/O事件。

而原来的Set<SelectionKey>返回的是HashSet类型,两者相比,SelectionKey[]不须要思考哈希抵触的问题,所以能够实现O(1)工夫复杂度的add操作。

SelectedSelectionKeySet的初始化

netty通过反射的形式,把Selector对象外部的selectedKeys和publicSelectedKeys替换为SelectedSelectionKeySet。

本来的selectedKeys和publicSelectedKeys这两个字段都是HashSet类型,替换之后变成了SelectedSelectionKeySet。当有就绪的key时,会间接填充到SelectedSelectionKeySet的数组中。后续只须要遍历即可。

private SelectorTuple openSelector() {    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();    //应用反射    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {        @Override        public Object run() {            try {                //Selector外部的selectedKeys字段                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");                //Selector外部的publicSelectedKeys字段                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {                    //获取selectedKeysField字段偏移量                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);                    //获取publicSelectedKeysField字段偏移量                    long publicSelectedKeysFieldOffset =                        PlatformDependent.objectFieldOffset(publicSelectedKeysField);                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {                        //替换为selectedKeySet                        PlatformDependent.putObject(                            unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);                        PlatformDependent.putObject(                            unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);                        return null;                    }                    // We could not retrieve the offset, lets try reflection as last-resort.                }                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);                if (cause != null) {                    return cause;                }                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);                if (cause != null) {                    return cause;                }                selectedKeysField.set(unwrappedSelector, selectedKeySet);                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);                return null;            } catch (NoSuchFieldException e) {                return e;            } catch (IllegalAccessException e) {                return e;            }        }    });    if (maybeException instanceof Exception) {        selectedKeys = null;        Exception e = (Exception) maybeException;        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);        return new SelectorTuple(unwrappedSelector);    }    selectedKeys = selectedKeySet;}

异步工作的执行流程

剖析完下面的流程后,咱们持续来看NioEventLoop中的run办法中,针对异步工作的解决流程

@Overrideprotected void run() {    int selectCnt = 0;    for (;;) {        ranTasks = runAllTasks();    }}

runAllTask

须要留神,NioEventLoop能够反对定时工作的执行,通过nioEventLoop.schedule()来实现。

protected boolean runAllTasks() {    assert inEventLoop();    boolean fetchedAll;    boolean ranAtLeastOne = false;    do {        fetchedAll = fetchFromScheduledTaskQueue(); //合并定时工作到一般工作队列        if (runAllTasksFrom(taskQueue)) { //循环执行taskQueue中的工作            ranAtLeastOne = true;        }    } while (!fetchedAll);      if (ranAtLeastOne) { //如果工作全副执行实现,记录执行完实现工夫        lastExecutionTime = ScheduledFutureTask.nanoTime();    }    afterRunningAllTasks();//执行收尾工作    return ranAtLeastOne;}

fetchFromScheduledTaskQueue

遍历scheduledTaskQueue中的工作,增加到taskQueue中。

private boolean fetchFromScheduledTaskQueue() {    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {        return true;    }    long nanoTime = AbstractScheduledEventExecutor.nanoTime();    for (;;) {        Runnable scheduledTask = pollScheduledTask(nanoTime);        if (scheduledTask == null) {            return true;        }        if (!taskQueue.offer(scheduledTask)) {            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);            return false;        }    }}

工作增加办法execute

NioEventLoop外部有两个十分重要的异步工作队列,别离是一般工作和定时工作队列,针对这两个队列提供了两个办法别离向两个队列中增加工作。

  • execute()
  • schedule()

其中,execute办法的定义如下。

private void execute(Runnable task, boolean immediate) {    boolean inEventLoop = inEventLoop();    addTask(task); //把当前任务增加到阻塞队列中    if (!inEventLoop) { //如果是非NioEventLoop        startThread(); //启动线程        if (isShutdown()) { //如果以后NioEventLoop曾经是进行状态            boolean reject = false;            try {                if (removeTask(task)) {                     reject = true;                }            } catch (UnsupportedOperationException e) {                // The task queue does not support removal so the best thing we can do is to just move on and                // hope we will be able to pick-up the task before its completely terminated.                // In worst case we will log on termination.            }            if (reject) {                reject();            }        }    }    if (!addTaskWakesUp && immediate) {        wakeup(inEventLoop);    }}

Nio的空轮转问题

所谓的空轮训,是指咱们在执行selector.select()办法时,如果没有就绪的SocketChannel时,以后线程会被阻塞 。 而空轮询是指当没有就绪SocketChannel时,会被触发唤醒。

而这个唤醒是没有任何读写申请的,从而导致线程在做有效的轮询,使得CPU占用率较高。

导致这个问题的根本原因是:

在局部Linux的2.6的kernel中,poll和epoll对于忽然中断的连贯socket会对返回的eventSet事件汇合置为POLLHUP,也可能是POLLERR,eventSet事件汇合产生了变动,这就可能导致Selector会被唤醒。这是与操作系统机制有关系的,JDK尽管仅仅是一个兼容各个操作系统平台的软件,但很遗憾在JDK5和JDK6最后的版本中(严格意义上来将,JDK局部版本都是),这个问题并没有解决,而将这个帽子抛给了操作系统方,这也就是这个bug最终始终到2013年才最终修复的起因,最终影响力太广。

Netty是如何解决这个问题的呢?咱们回到NioEventLoop的run办法中

@Overrideprotected void run() {    int selectCnt = 0;    for (;;) {        //selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG        selectCnt++;         //ranTasks=true,或strategy>0,阐明eventLoop干活了,没有空转,清空selectCnt        if (ranTasks || strategy > 0) {            //如果抉择操作计数器的值,大于最小选择器重构阈值,则输入log            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                             selectCnt - 1, selector);            }            selectCnt = 0;        }         //unexpectedSelectorWakeup解决NIO BUG        else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)            selectCnt = 0;        }    }}

unexpectedSelectorWakeup

private boolean unexpectedSelectorWakeup(int selectCnt) {    if (Thread.interrupted()) {        if (logger.isDebugEnabled()) {            logger.debug("Selector.select() returned prematurely because " +                         "Thread.currentThread().interrupt() was called. Use " +                         "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");        }        return true;    }    //如果抉择重构的阈值大于0, 默认值是512次、 并且以后触发的空轮询次数大于 512次。,则触发重构    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {        // The selector returned prematurely many times in a row.        // Rebuild the selector to work around the problem.        logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",                    selectCnt, selector);        rebuildSelector();        return true;    }    return false;}

rebuildSelector()

public void rebuildSelector() {    if (!inEventLoop()) { //如果不是在eventLoop中执行,则应用异步线程执行        execute(new Runnable() {            @Override            public void run() {                rebuildSelector0();            }        });        return;    }    rebuildSelector0();}

rebuildSelector0

这个办法的次要作用: 从新创立一个选择器,代替以后事件循环中的选择器

private void rebuildSelector0() {    final Selector oldSelector = selector; //获取老的selector选择器    final SelectorTuple newSelectorTuple; //定义新的选择器    if (oldSelector == null) { //如果老的选择器为空,间接返回        return;    }    try {        newSelectorTuple = openSelector(); //创立一个新的选择器    } catch (Exception e) {        logger.warn("Failed to create a new Selector.", e);        return;    }    // Register all channels to the new Selector.    int nChannels = 0;    for (SelectionKey key: oldSelector.keys()) {//遍历注册到选择器的抉择key汇合        Object a = key.attachment();        try {             //如果抉择key有效或抉择关联的通道曾经注册到新的选择器,则跳出以后循环            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {                continue;            }             //获取key的抉择关注事件集            int interestOps = key.interestOps();            key.cancel();//勾销抉择key          //注册抉择key到新的选择器            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);            if (a instanceof AbstractNioChannel) {//如果是nio通道,则更新通道的抉择key                // Update SelectionKey                ((AbstractNioChannel) a).selectionKey = newKey;            }            nChannels ++;        } catch (Exception e) {            logger.warn("Failed to re-register a Channel to the new Selector.", e);            if (a instanceof AbstractNioChannel) {                AbstractNioChannel ch = (AbstractNioChannel) a;                ch.unsafe().close(ch.unsafe().voidPromise());            } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                invokeChannelUnregistered(task, key, e);            }        }    }    //更新以后事件循环选择器    selector = newSelectorTuple.selector;    unwrappedSelector = newSelectorTuple.unwrappedSelector;    try {        // time to close the old selector as everything else is registered to the new one        oldSelector.close(); //敞开原始选择器    } catch (Throwable t) {        if (logger.isWarnEnabled()) {            logger.warn("Failed to close the old Selector.", t);        }    }    if (logger.isInfoEnabled()) {        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");    }}

从上述过程中咱们发现,Netty解决NIO空轮转问题的形式,是通过重建Selector对象来实现的,在这个重建过程中,外围是把Selector中所有的SelectionKey从新注册到新的Selector上,从而奇妙的防止了JDK epoll空轮训问题。

连贯的建设及处理过程

在9.2.4.3节中,提到了当客户端有连贯或者读事件发送到服务端时,会调用NioMessageUnsafe类的read()办法。

public void read() {    assert eventLoop().inEventLoop();    final ChannelConfig config = config();    final ChannelPipeline pipeline = pipeline();    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    allocHandle.reset(config);    boolean closed = false;    Throwable exception = null;    try {        try {            do {                //如果有客户端连贯进来,则localRead为1,否则返回0                int localRead = doReadMessages(readBuf);                if (localRead == 0) {                    break;                }                if (localRead < 0) {                    closed = true;                    break;                }                                allocHandle.incMessagesRead(localRead); //累计减少read音讯数量            } while (continueReading(allocHandle));        } catch (Throwable t) {            exception = t;        }        int size = readBuf.size(); //遍历客户端连贯列表        for (int i = 0; i < size; i ++) {            readPending = false;            pipeline.fireChannelRead(readBuf.get(i)); //调用pipeline中handler的channelRead办法。        }        readBuf.clear(); //清空集合        allocHandle.readComplete();        pipeline.fireChannelReadComplete(); //触发pipeline中handler的readComplete办法        if (exception != null) {            closed = closeOnReadError(exception);            pipeline.fireExceptionCaught(exception);        }        if (closed) {            inputShutdown = true;            if (isOpen()) {                close(voidPromise());            }        }    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

pipeline.fireChannelRead(readBuf.get(i))

持续来看pipeline的触发办法,此时的pipeline组成,如果以后是连贯事件,那么pipeline = ServerBootstrap$ServerBootstrapAcceptor。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m); //获取pipeline中的下一个节点,调用该handler的channelRead办法    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}

ServerBootstrapAcceptor

ServerBootstrapAcceptor是NioServerSocketChannel中一个非凡的Handler,专门用来解决客户端连贯事件,该办法中外围的目标是把针对SocketChannel的handler链表,增加到以后NioSocketChannel中的pipeline中。

public void channelRead(ChannelHandlerContext ctx, Object msg) {    final Channel child = (Channel) msg;    child.pipeline().addLast(childHandler);  //把服务端配置的childHandler,增加到以后NioSocketChannel中的pipeline中    setChannelOptions(child, childOptions, logger); //设置NioSocketChannel的属性    setAttributes(child, childAttrs);     try {        //把以后的NioSocketChannel注册到Selector上,并且监听一个异步事件。        childGroup.register(child).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    forceClose(child, future.cause());                }            }        });    } catch (Throwable t) {        forceClose(child, t);    }}

pipeline的构建过程

9.6.2节中,child其实就是一个NioSocketChannel,它是在NioServerSocketChannel中,当接管到一个新的链接时,创建对象。

@Overrideprotected int doReadMessages(List<Object> buf) throws Exception {    SocketChannel ch = SocketUtils.accept(javaChannel());    try {        if (ch != null) {            buf.add(new NioSocketChannel(this, ch)); //这里            return 1;        }    } catch (Throwable t) {        logger.warn("Failed to create a new channel from an accepted socket.", t);        try {            ch.close();        } catch (Throwable t2) {            logger.warn("Failed to close a socket.", t2);        }    }    return 0;}

而NioSocketChannel在结构时,调用了父类AbstractChannel中的构造方法,初始化了一个pipeline.

protected AbstractChannel(Channel parent) {    this.parent = parent;    id = newId();    unsafe = newUnsafe();    pipeline = newChannelPipeline();}

DefaultChannelPipeline

pipeline的默认实例是DefaultChannelPipeline,构造方法如下。

protected DefaultChannelPipeline(Channel channel) {    this.channel = ObjectUtil.checkNotNull(channel, "channel");    succeededFuture = new SucceededChannelFuture(channel, null);    voidPromise =  new VoidChannelPromise(channel, true);    tail = new TailContext(this);    head = new HeadContext(this);    head.next = tail;    tail.prev = head;}

初始化了一个头节点和尾节点,组成一个双向链表,如图9-2所示

<center>图9-2</center>

NioSocketChannel中handler链的形成

再回到ServerBootstrapAccepter的channelRead办法中,收到客户端连贯时,触发了NioSocketChannel中的pipeline的增加

以下代码是DefaultChannelPipeline的addLast办法。

@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {   ObjectUtil.checkNotNull(handlers, "handlers");   for (ChannelHandler h: handlers) { //遍历handlers列表,此时这里的handler是ChannelInitializer回调办法       if (h == null) {           break;       }       addLast(executor, null, h);   }   return this;}

addLast

把服务端配置的ChannelHandler,增加到pipeline中,留神,此时的pipeline中保留的是ChannelInitializer回调办法。

@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {    final AbstractChannelHandlerContext newCtx;    synchronized (this) {        checkMultiplicity(handler); //查看是否有反复的handler        //创立新的DefaultChannelHandlerContext节点        newCtx = newContext(group, filterName(name, handler), handler);        addLast0(newCtx);  //增加新的DefaultChannelHandlerContext到ChannelPipeline              if (!registered) {             newCtx.setAddPending();            callHandlerCallbackLater(newCtx, true);            return this;        }        EventExecutor executor = newCtx.executor();        if (!executor.inEventLoop()) {            callHandlerAddedInEventLoop(newCtx, executor);            return this;        }    }    callHandlerAdded0(newCtx);    return this;}

这个回调办法什么时候触发调用呢?其实就是在ServerBootstrapAcceptor这个类的channelRead办法中,注册以后NioSocketChannel时

childGroup.register(child).addListener(new ChannelFutureListener() {}

最终依照之前咱们上一节课源码剖析的思路,定位到AbstractChannel中的register0办法中。

private void register0(ChannelPromise promise) {            try {                // check if the channel is still open as it could be closed in the mean time when the register                // call was outside of the eventLoop                if (!promise.setUncancellable() || !ensureOpen(promise)) {                    return;                }                boolean firstRegistration = neverRegistered;                doRegister();                neverRegistered = false;                registered = true;                //                pipeline.invokeHandlerAddedIfNeeded();            }}

callHandlerAddedForAllHandlers

pipeline.invokeHandlerAddedIfNeeded()办法,向下执行,会进入到DefaultChannelPipeline这个类中的callHandlerAddedForAllHandlers办法中

private void callHandlerAddedForAllHandlers() {    final PendingHandlerCallback pendingHandlerCallbackHead;    synchronized (this) {        assert !registered;        // This Channel itself was registered.        registered = true;        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;        // Null out so it can be GC'ed.        this.pendingHandlerCallbackHead = null;    }    //从期待被调用的handler 回调列表中,取出工作来执行。    PendingHandlerCallback task = pendingHandlerCallbackHead;    while (task != null) {        task.execute();        task = task.next;    }}

咱们发现,pendingHandlerCallbackHead这个单向链表,是在callHandlerCallbackLater办法中被增加的,

而callHandlerCallbackLater又是在addLast办法中增加的,所以形成了一个异步残缺的闭环。

ChannelInitializer.handlerAdded

task.execute()办法执行门路是

callHandlerAdded0 -> ctx.callHandlerAdded ->

-------> AbstractChannelHandlerContext.callHandlerAddded()

---------------> ChannelInitializer.handlerAdded

调用initChannel办法来初始化NioSocketChannel中的Channel.

@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {    if (ctx.channel().isRegistered()) {        // This should always be true with our current DefaultChannelPipeline implementation.        // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering        // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers        // will be added in the expected order.        if (initChannel(ctx)) {            // We are done with init the Channel, removing the initializer now.            removeState(ctx);        }    }}

接着,调用initChannel形象办法,该办法由具体的实现类来实现。

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {    if (initMap.add(ctx)) { // Guard against re-entrance.        try {            initChannel((C) ctx.channel());        } catch (Throwable cause) {            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).            // We do so to prevent multiple calls to initChannel(...).            exceptionCaught(ctx, cause);        } finally {            ChannelPipeline pipeline = ctx.pipeline();            if (pipeline.context(this) != null) {                pipeline.remove(this);            }        }        return true;    }    return false;}

ChannelInitializer的实现,是咱们自定义Server中的匿名外部类,ChannelInitializer。因而通过这个回调来实现以后NioSocketChannel的pipeline的构建过程。

public static void main(String[] args){    EventLoopGroup boss = new NioEventLoopGroup();    //2 用于对承受客户端连贯读写操作的线程工作组    EventLoopGroup work = new NioEventLoopGroup();    ServerBootstrap b = new ServerBootstrap();    b.group(boss, work)    //绑定两个工作线程组        .channel(NioServerSocketChannel.class)    //设置NIO的模式        // 初始化绑定服务通道        .childHandler(new ChannelInitializer<SocketChannel>() {            @Override            protected void initChannel(SocketChannel sc) throws Exception {                sc.pipeline()                    .addLast(                    new LengthFieldBasedFrameDecoder(1024,                                                     9,4,0,0))                    .addLast(new MessageRecordEncoder())                    .addLast(new MessageRecordDecode())                    .addLast(new ServerHandler());            }        });}
版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!