关于java:Netty四事件循环EventLoop与EventLoopGroup

45次阅读

共计 18092 个字符,预计需要花费 46 分钟才能阅读完成。

一、简介

在 netty 中,事件循环 EventLoop是一个很重要的组件,用于解决 已注册 Channel的各种IO 事件,而 EventLoopGroup 对应了一个或多个 EventLoop,能够看做EvenLoopGroup 就是 EventLoop 的汇合。上面是 EventLoop 和 EventLoopGroup 相干类图:

从下面类图能够看到,netty 在 jdk 原生接口 ScheduledExecutorService 上衍生了 EventExecutorGroup 接口,其通过next() 办法 来获取 EventExecutor 事件执行器,并在 ScheduledExecutorService 的根底上增加了 优雅敞开、是否正在敞开 等操作,如下图

而 EventLoopGroup 继承了 EventExecutorGroup 接口,重写 next()办法 并增加 注册 Channel的操作,如下图

EventLoop 接口自身比较简单,继承于 EventExecutor 及 EventLoopGroup 接口,如下

最常应用的 NioEventLoopGroup 和 NioEventLoop,别离继承于抽象类 MultithreadEventLoopGroup 和 SingleThreadEventLoop,而这两个抽象类自身实现不难,其次要是继承了 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor,所以上面来看下 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor 的次要代码逻辑

二、MultithreadEventExecutorGroup 和 SingleThreadEventExecutor

2.1 MultithreadEventExecutorGroup

MultithreadEventExecutorGroup 示意 通过多个 EventExecutor 来解决所提交的工作

2.1.1 重要属性

有两个较为重要的属性children 和 chooser,children 对应 EventExecutor 数组,而 chooser 选取器的作用是从 children 选取 EventExecutor 来执行工作。如下

// 对应的 EventExecutor 数组
private final EventExecutor[] children;

// 选取器,作用是从 children 里选取 EventExecutor 来执行工作
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

2.1.2 初始化

MultithreadEventExecutorGroup 的构造函数会对 children 和 chooser 进行初始化,大抵步骤如下:

  1. 依据传进来的 nThreads 线程数来初始化 children 数组 children = new EventExecutor[nThreads]
  2. 通过 newChild() 办法 来实例化 children 的每个 EventExecutor,newChild()为形象办法,须要子类 (如 NioEventLoopGroup) 具体实现。如果没有胜利的实例化 children 数组,则逐个优雅敞开 EventExecutor
  3. 初始化 chooser 选取器
  4. 给 children 中的每个 EventExecutor 增加terminationListener 终止监听器,每有一个 EventExecutor 终止了,就会将 terminatedChildren 加 1,等到 terminatedChildren==children 总数时,阐明所有的 EventExecutor 曾经全副终止

源码如下:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {checkPositive(nThreads, "nThreads");

    if (executor == null) {
        // 如果传进的 executor 执行器为空,设置为 ThreadPerTaskExecutor 执行器,该执行器会独自创立一个线程来解决每个工作
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 依据传进来的 nThreads 线程数来实例化 children
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // newChild 作用是生成具体的 EventExecutor,其为形象办法,须要子类(如 NioEventLoopGroup)去具体实现
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // 如果没有胜利的实例化 children 数组,则逐个优雅敞开 EventExecutor
            if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}

                // 期待终止所有的 EventExecutor
                for (int j = 0; j < i; j ++) {EventExecutor e = children[j];
                    try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    // 初始化 chooser 选取器
    chooser = chooserFactory.newChooser(children);

    // 给 children 中的每个 EventExecutor 增加终止监听器
    // 每有一个 EventExecutor 终止了,就会将 terminatedChildren 加 1
    // 等到 terminatedChildren==children 总数时,阐明所有的 EventExecutor 曾经全副终止
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);
            }
        }
    };
    for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);
    }
}

2.1.3 提交工作

MultithreadEventExecutorGroup 提交工作的大抵流程如下图:

提交工作时,MultithreadEventExecutorGroup 是间接应用父类 AbstractEventExecutorGroup 的 submit 办法来提交,而该 submit 办法中是 通过调用 next()办法来选取到某个 EventExecutor,再调用 EventExecutor 的 submit()办法来提交的,如下

@Override
public Future<?> submit(Runnable task) {return next().submit(task);
}

而 next()办法则是通过 chooser 选取器来选取到某个 EventExecutor 的,如下

@Override
public EventExecutor next() {return chooser.next();
}

2.2 SingleThreadEventExecutor

从下面咱们能够得悉 MultithreadEventExecutorGroup 提交工作时,本质上是 选取到某个 EventExecutor,再由该 EventExecutor 来进行提交

因为咱们罕用的 NioEventLoop 的大多数操作其实 是由 SingleThreadEventExecutor 提供了默认实现(当然 NioEventLoop 也有其具体的一些操作,后续会详解),所以在深刻 NioEventLoop 之前,有必要先理解一下 SingleThreadEventExecutor

2.2.1 重要属性

SingleThreadEventExecutor 中有一个 寄存工作的 taskQueue 工作队列 ,还有一个 与之绑定的 thread 线程,还有一些优雅敞开相干属性,如下

// 寄存工作的队列
private final Queue<Runnable> taskQueue;

// 与该 SingleThreadEventExecutor 绑定的 thread
private volatile Thread thread;

// 执行器,首次启动时通过该执行器来启动线程,再由该线程来生产 taskQueue 的工作
private final Executor executor;

// 该属性很重要,示意 addTask 增加工作时,是否主动唤醒线程,如果不能主动唤醒,须要被动调用 wakeup 办法来唤醒
// 如:DefaultEventExecutor 的 addTaskWakesUp 为 true,而 NioEventLoop 为 false
private final boolean addTaskWakesUp;

// 队列的最大容量
private final int maxPendingTasks;

// 优雅敞开的静默工夫
private volatile long gracefulShutdownQuietPeriod;

// 优雅敞开的超时工夫
private volatile long gracefulShutdownTimeout;

// 优雅敞开的开始工夫
private long gracefulShutdownStartTime;

2.2.2 状态治理

SingleThreadEventExecutor 总共有 5 种状态,如下

  • ST_NOT_STARTED (未启动)
  • ST_STARTED (启动)
  • ST_SHUTTING_DOWN (敞开中)
  • ST_SHUTDOWN (已敞开)
  • ST_TERMINATED (已终止)

初始状态为ST_NOT_STARTED 未启动,如下

private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
private volatile int state = ST_NOT_STARTED;

状态转换:ST_NOT_STARTED –> ST_STARTED –> ST_SHUTTING_DOWN –> ST_SHUTDOWN –> ST_TERMINATED

2.2.3 提交工作

SingleThreadEventExecutor 提交工作的流程图如下:

SingleThreadEventExecutor 在 首次提交工作 时,会将state 设置为已启动,启动工作线程,并将该工作线程与 thread 属性进行绑定,后续再次提交工作时,只会将工作增加到 taskQueue 工作队列中。源码如下

private void execute(Runnable task, boolean immediate) {
    // 判断以后线程与该 SingleThreadEventExecutor 绑定的线程是否是同一个
    boolean inEventLoop = inEventLoop();
    // 增加工作至 taskQueue 工作队列
    addTask(task);
    if (!inEventLoop) {
        // 如果 state 为未启动,则将 state 更新为已启动,启动工作线程,并将工作线程与该 SingleThreadEventExecutor 绑定
        startThread();
         // 如果 state 为已敞开,则回绝增加工作
        if (isShutdown()) {
            boolean reject = false;
            try {if (removeTask(task)) {reject = true;}
            } catch (UnsupportedOperationException e) { }
            if (reject) {reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);
    }
}
private void startThread() {
    // 如果 state 为未启动
    if (state == ST_NOT_STARTED) {
        // 将 state 更新为已启动
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                // 启动工作线程
                doStartThread();
                success = true;
            } finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

doStartThread 局部代码如下:

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            // 绑定工作线程
            thread = Thread.currentThread();
            if (interrupted) {thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // run 办法,从 taskQueue 中获取工作来执行,由子类去具体实现
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {logger.warn("Unexpected exception from an event executor:", t);
            } 

2.2.4 优雅敞开

下面曾经介绍了有 3 个属性是跟 优雅敞开 相干的,有gracefulShutdownQuietPeriod 静默工夫、gracefulShutdownTimeout 超时工夫、gracefulShutdownStartTime 开始工夫

gracefulShutdownQuietPeriod:如果以后工夫 - 上一次执行工夫 < 静默工夫,那么临时先不敞开,否则进行敞开
gracefulShutdownTimeout:如果以后工夫 - 优雅敞开的开始工夫 > 超时工夫,那么进行敞开
gracefulShutdownStartTime:优雅敞开的开始工夫

接下来看下优雅敞开 shutdownGracefully 办法,该办法会将 state 状态设置为敞开中,并间接返回 terminationFuture,源码如下:

public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
    if (timeout < quietPeriod) {
        throw new IllegalArgumentException("timeout:" + timeout + "(expected >= quietPeriod (" + quietPeriod + "))");
    }
    ObjectUtil.checkNotNull(unit, "unit");
    
    // 如果 state >= 敞开中,间接返回 terminationFuture
    if (isShuttingDown()) {return terminationFuture();
    }

    boolean inEventLoop = inEventLoop();
    boolean wakeup;
    int oldState;
    for (;;) {
        // 再次判断,如果 state >= 敞开中,间接返回 terminationFuture
        if (isShuttingDown()) {return terminationFuture();
        }
        int newState;
        wakeup = true;    // 是否须要唤醒
        oldState = state;
        if (inEventLoop) {newState = ST_SHUTTING_DOWN;} else {switch (oldState) {
                case ST_NOT_STARTED:
                case ST_STARTED:  // 如果旧状态为已启动,则设置新状态为敞开中
                    newState = ST_SHUTTING_DOWN;
                    break;
                default:  // 如果旧状态 >= 已启动,那么将 wakeup 设置为 false,不唤醒工作线程
                    newState = oldState;
                    wakeup = false;
            }
        }
        // 通过 CAS 操作来更新状态
        if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {break;}
    }
    gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
    gracefulShutdownTimeout = unit.toNanos(timeout);

    if (ensureThreadStarted(oldState)) {return terminationFuture;}
    // 如果须要唤醒,则将 WAKEUP_TASK 放到队列中,来唤醒工作线程
    if (wakeup) {taskQueue.offer(WAKEUP_TASK);
        if (!addTaskWakesUp) {wakeup(inEventLoop);
        }
    }

    return terminationFuture();}

state 状态更新为敞开中后,工作线程从 taskQueue 队列中每次拿到工作后,将会对 state 状态进行判断,如果是敞开中,会进一步判断是否确认敞开,如果确认敞开,则会跳出 run 办法,工作线程执行完结,最终该 SingleThreadEventExecutor 的 state 状态更新为已终止。这里咱们能够通过查看 SingleThreadEventExecutor 的默认实现DefaultEventExecutor 的 run 办法,源码如下:

protected void run() {for (;;) {
        // 从 taskQueue 中获取工作,如果是 WAKEUP_TASK,则拿到的 task 为 null
        Runnable task = takeTask();
        if (task != null) {
            // 执行工作
            runTask(task);
            // 更新上一次执行工夫
            updateLastExecutionTime();}
        // 确认是否敞开,如果是,跳出死循环
        if (confirmShutdown()) {break;}
    }
}

confirmShutdown 的源码如下:

protected boolean confirmShutdown() {
    // 如果 state < 敞开中,间接返回 false
    if (!isShuttingDown()) {return false;}

    // 勾销定时工作
    cancelScheduledTasks();

    if (gracefulShutdownStartTime == 0) {
        // 设置优雅敞开的开始工夫为以后工夫
        gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();}
    // 如果 taskQueue 里还有工作,运行所有工作,否则运行敞开钩子
    if (runAllTasks() || runShutdownHooks()) {if (isShutdown()) {return true;}
        // 如果静默期为 0,返回 true 确认敞开
        if (gracefulShutdownQuietPeriod == 0) {return true;}
        taskQueue.offer(WAKEUP_TASK);
        return false;
    }

    final long nanoTime = ScheduledFutureTask.nanoTime();
    // 如果 state >= 已敞开,返回 true 确认敞开
    // 否则判断以后工夫 - 优雅敞开的开始工夫 是否大于 超时工夫,如果大于,返回 true 确认敞开
    if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {return true;}
    // 如果以后工夫 - 上一次工作执行工夫 小于等于 静默工夫,阐明在这一段时间内还有工作执行,则线程休眠 100 毫秒,返回 false 暂不敞开
    if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
        // 休眠 100 毫秒后,用于持续唤醒工作线程
        taskQueue.offer(WAKEUP_TASK);
        try {Thread.sleep(100);
        } catch (InterruptedException e) { }
        return false;
    }

    return true;
}

当 confirmShutdown 返回 true 跳出 run 办法后

  1. 工作线程开始收尾工作
  2. 因为此时可能又有一些工作被增加到 taskQueue 外面,所以须要再次调用 confirmShutdown 办法
  3. state 状态更新为已敞开(这时曾经不能再接管新的工作了)
  4. 再次调用 confirmShutdown 办法(理由同上,如果是这样的话,第一次的 confirmShutdown 调用是不是没有必要?)。
  5. 最终调用 cleanup()钩子办法,并 将 state 状态更新为已终止,设置 terminationFuture 后果为胜利

三、NioEventLoopGroup 和 NioEventLoop

3.1 NioEventLoopGroup

NioEventLoopGroup能够说是 netty 中咱们最相熟的类之一,继承于 MultithreadEventLoopGroup,而 MultithreadEventLoopGroup 继承于 MultithreadEventExecutorGroup(下面曾经剖析过该类),如下图

因为 NioEventLoopGroup 自身实现比较简单,所以这里咱们只看下它的 newChild() 办法,这个办法之前在介绍 MultithreadEventExecutorGroup 也有提及,该办法是一个形象办法,须要子类具体实现,源码如下:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    // SelectorProvider 有关上选择器 openSelector、关上服务端通道 openServerSocketChannel 等办法
    SelectorProvider selectorProvider = (SelectorProvider) args[0];
    // select 策略工厂,用于产生 SelectStrategy
    SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
    // 拒绝执行处理器
    RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
    // taskQueue 工厂,taskQueue 之前已有提及
    EventLoopTaskQueueFactory taskQueueFactory = null;
    // tailTaskQueue 工厂
    EventLoopTaskQueueFactory tailTaskQueueFactory = null;

    int argsLength = args.length;
    if (argsLength > 3) {taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
    }
    if (argsLength > 4) {tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
    }
    // new 一个 NioEventLoop 实例
    return new NioEventLoop(this, executor, selectorProvider,
            selectStrategyFactory.newSelectStrategy(),
            rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}

能够看到,newChild 办法的最初就是new 一个 NioEventLoop 实例,所以最初咱们须要来看下 NioEventLoop 中的源码,看看它到底是如何来运作的?

3.2 NioEventLoop

学习过 NIO 的同学都晓得,多个 Channel 能够注册到一个 Selector,这样咱们就能够在单线程中通过一个 Selector 来治理多个 Channel,这就是IO 多路复用,而NioEventLoop 就是 IO 多路复用的一个具体实现

在最开始的类图咱们能够看到 NioEventLoop 和 EpollEventLoop 都继承于 SingleThreadEventLoop,因为 NioEventLoop 更为常常应用,所以这里只介绍 NioEventLoop。NioEventLoop 继承于 SingleThreadEventLoop,而 SingleThreadEventLoop 继承于 SingleThreadEventExecutor,如下图

SinleThreadEventLoop 在 SingleThreadEventExecutor 的根底上,增加了一个 tailTasks 工作队列(runAllTasks 办法中执行完 taskQueue 中的工作后,会再执行 tailTasks 中的工作

3.2.1 构造函数

NioEventLoop 的构造函数,接管多个参数,有 executor 执行器、select 策略、拒绝执行处理器、taskQueue 工厂、tailTaskQueue 工厂,如下

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
             EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
            rejectedExecutionHandler);
    this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    // select 策略
    this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    // 创立 SelectorTuple,final SelectorTuple selectorTuple = openSelector();
    // 包装后的 Selector,类型为 SelectedSelectionKeySetSelector,蕴含了 SelectedSelectionKeySet
    this.selector = selectorTuple.selector;
    // 未包装的 Selector
    this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

3.2.2 工作流程

在之前 SingleThreadEventExecutor 的介绍中,咱们曾经晓得,它会在启动工作线程后,调用 run 办法,而 run 办法由子类具体实现,所以 NioEventLoop 的 run 办法就是其工作流程,大抵的工作如下:

  1. 判断 taskQueue 和 tailTaskQueue 是否有工作,如果没有工作,则通过调用Selector.select 办法来阻塞或超时阻塞获取 IO 事件
  2. 如果有工作,调用 Selector.selectNow 办法非阻塞获取 IO 事件
  3. 判断是否有 IO 事件筹备好,如果有,先解决 IO 事件
  4. 再解决 taskQueue 和 tailTaskQueue 中的工作
  5. 返回第一步,有限循环

流程图如下:

联合源码来看,源码如下:

protected void run() {
    int selectCnt = 0;
    // 死循环
    for (;;) {
        try {
            int strategy;
            try {
                // 计算 strategy 的值
                // 如果 hasTasks 为 true,代表 taskQueue 或 tailQueue 里有工作,则间接调用 Selector.selectNow()来获取以后已筹备好的 IO 事件数量,并间接返回
                // 如果 hasTask 为 false,代表 taskQueue 或 tailQueue 里没有工作,则返回 SelectStrategy.SELECT(值为 -1)
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.BUSY_WAIT:
                case SelectStrategy.SELECT:
                    // 查看 scheduledTaskQueue 里的定时工作,如果定时工作不为空,将定时工作的 deadlineNanos 过期工夫赋值给 curDeadlineNanos
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE;}
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        // 如果没有工作,则进一步判断
                        // 如果 curDeadlineNanos 为 NONE,则调用 Selector.select()进行阻塞,直到有 IO 事件筹备好
                        // 如果 curDeadlineNanos 不为 NONE,则调用 selector.select(timeoutMillis)进行超时阻塞
                        if (!hasTasks()) {strategy = select(curDeadlineNanos);
                        }
                    } finally {nextWakeupNanos.lazySet(AWAKE);
                    }
                default:
                }
            } catch (IOException e) {// 这里是 selector.select()的一个 bug,即在某种状况下,在没有 IO 事件筹备好时,select()也没有进行阻塞,此时须要重建 Selector
                // 后续会进行具体介绍
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }
            // select 的次数
            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            // ioRatio 示意 processSelectedKeys 办法(解决 IO 事件)和 runAllTasks()办法所用的事件占比
            // 如果 ioRatio 为 50,则工夫比为 1:1,如果为 60,则工夫比为 3:2
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {    // 如果 ioRatio 为 100
                try {if (strategy > 0) {
                        // 解决筹备好的 IO 事件
                        processSelectedKeys();}
                } finally {
                    // 执行 taskQueue 里的所有工作
                    ranTasks = runAllTasks();}
            } else if (strategy > 0) {final long ioStartTime = System.nanoTime();
                try {
                    // 解决筹备好的 IO 事件
                    processSelectedKeys();} finally {final long ioTime = System.nanoTime() - ioStartTime;
                    // 限时执行,工夫到了之后须要先返回,所以可能只能执行 taskQueue 里的局部工作
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                // 只执行一个工作
                ranTasks = runAllTasks(0);
            }

            // 如果至多有一个工作执行胜利,runTasks 则为 true,则重置 selectCnt 为 0
            if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) {    // 如果未预料到的 Selector 被唤醒,阐明可能是 bug 呈现了,重建 Selector 并重置 selectCnt
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + "raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Error e) {throw e;} catch (Throwable t) {handleLoopException(t);
        } finally {// 判断是否敞开,代码这里省略}
    }
}

3.2.3 解决 IO 事件

下面曾经介绍了 NioEventLoop 的工作流程,那么在判断如果有筹备好的 IO 事件,那么会调用 processSelectedKeys 来解决这些 IO 事件,接下来就来看下它的源码

private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {
        // 获取对应 IO 事件的 SelectionKey,以及它的附件
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            // 真正来解决对应的 IO 事件,将 SelectionKey 以及对应的 NioChannel 传入
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

这里来看下 processSelectedKey 是如何真正的来解决每个 IO 事件的

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // 判断 SelectionKey 是否无效,如果有效,间接敞开 channel 通道并返回
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {return;}

        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        // 获取以后 SelectionKey 的已筹备好的事件集
        int readyOps = k.readyOps();
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {  // 如果有 OP_CONNECT 连贯事件
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);    // 设置 ops,示意该 SelectionKey 不再关注连贯事件

            unsafe.finishConnect();    // 下一章节会具体介绍,外部就是开始执行 pipeline,去执行各个 ChannelHandler 对应的办法}

        if ((readyOps & SelectionKey.OP_WRITE) != 0) {  // 如果有 OP_WRITE 写事件
            ch.unsafe().forceFlush();    // 下一章节会具体介绍,外部就是开始执行 pipeline,去执行各个 ChannelHandler 对应的办法
        }

        // 如果有 OP_READ 读事件或者 OP_ACCEPT 接管事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();    // 下一章节会具体介绍,外部就是开始执行 pipeline,去执行各个 ChannelHandler 对应的办法
        }
    } catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());
    }
}

3.2.4 空轮询问题

在执行 Selector.select()办法时,失常状况下如果没有筹备好的 Channel 时,线程会被阻塞。
空轮询是因为 Selector.select()没有正确工作,在没有筹备好的 Channel 时,就间接被唤醒,而没有进行阻塞。从而导致 run 办法始终在死循环,CPU 达到了 100%

那么在什么时候,代表该 bug 呈现了?
空轮询次数 selectCnt 大于等于 SELECTOR_AUTO_REBUILD_THRESHOLD(默认为 512)时,示意该 bug 呈现,则进行 重建 Selector,源码如下:

private boolean unexpectedSelectorWakeup(int selectCnt) {if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because" +
                    "Thread.currentThread().interrupt() was called. Use" +
                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
        }
        return true;
    }
    // 判断如果空轮询次数 selectCnt 大于 SELECTOR_AUTO_REBUILD_THRESHOLD 时
    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                selectCnt, selector);
        // 重建 Selector
        rebuildSelector();
        return true;
    }
    return false;
}

如何重建 Selector?
大抵步骤:

  1. 从新关上一个 Selector
  2. 将旧 Selector 的所有 Channel 和对应的附件,都注册到新的 Selector 上
  3. 敞开旧 Selector
private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;

    if (oldSelector == null) {return;}
    // 关上一个新的 Selector
    newSelectorTuple = openSelector();
    int nChannels = 0;
    for (SelectionKey key: oldSelector.keys()) {Object a = key.attachment();
        try {if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {continue;}
            int interestOps = key.interestOps();
            key.cancel();
            // 将旧 Selector 上的 channel 都注册到新的 Selector
            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {// 省略局部代码}
    }
    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;
    try {
        // 敞开旧 Selector
        oldSelector.close();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("Failed to close the old Selector.", t);
        }
    }
}

四、总结

EventLoop 和 EventLoopGroup 是 netty 中最重要的组件之一,也是 netty 可能构建高性能程序的关键所在,理解其工作机制与原理是十分有必要的。

正文完
 0