本文主要研究一下Elasticsearch的TaskScheduler

TaskScheduler

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java

public class TaskScheduler {    private final PriorityQueue<DelayedTask> tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline));    /**     * Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a     * relative nanotime after the scheduled time, the task will be returned. This method returns a     * {@link Runnable} that can be run to cancel the scheduled task.     *     * @param task to schedule     * @param relativeNanos defining when to execute the task     * @return runnable that will cancel the task     */    public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) {        DelayedTask delayedTask = new DelayedTask(relativeNanos, task);        tasks.offer(delayedTask);        return delayedTask;    }    Runnable pollTask(long relativeNanos) {        DelayedTask task;        while ((task = tasks.peek()) != null) {            if (relativeNanos - task.deadline >= 0) {                tasks.remove();                if (task.cancelled == false) {                    return task.runnable;                }            } else {                return null;            }        }        return null;    }    long nanosUntilNextTask(long relativeNanos) {        DelayedTask nextTask = tasks.peek();        if (nextTask == null) {            return Long.MAX_VALUE;        } else {            return Math.max(nextTask.deadline - relativeNanos, 0);        }    }    private static class DelayedTask implements Runnable {        private final long deadline;        private final Runnable runnable;        private boolean cancelled = false;        private DelayedTask(long deadline, Runnable runnable) {            this.deadline = deadline;            this.runnable = runnable;        }        private long getDeadline() {            return deadline;        }        @Override        public void run() {            cancelled = true;        }    }}
  • TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
  • TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
  • scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable

SSLChannelContext

elasticsearch-7.0.1/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java

public final class SSLChannelContext extends SocketChannelContext {    //......    @Override    public void queueWriteOperation(WriteOperation writeOperation) {        getSelector().assertOnSelectorThread();        if (writeOperation instanceof CloseNotifyOperation) {            sslDriver.initiateClose();            long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime();            closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos);        } else {            super.queueWriteOperation(writeOperation);        }    }    private void channelCloseTimeout() {        closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER;        setCloseNow();        getSelector().queueChannelClose(channel);    }    //......}
  • SSLChannelContext的queueWriteOperation方法会使用taskScheduler的scheduleAtRelativeTime注册一个channelCloseTimeout的延时任务

NioSelector

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java

public class NioSelector implements Closeable {    //......    public void runLoop() {        if (runLock.tryLock()) {            isRunningFuture.complete(null);            try {                setThread();                while (isOpen()) {                    singleLoop();                }            } finally {                try {                    cleanupAndCloseChannels();                } finally {                    try {                        selector.close();                    } catch (IOException e) {                        eventHandler.selectorException(e);                    } finally {                        runLock.unlock();                        exitedLoop.countDown();                    }                }            }        } else {            throw new IllegalStateException("selector is already running");        }    }    void singleLoop() {        try {            closePendingChannels();            preSelect();            long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());            int ready;            if (nanosUntilNextTask == 0) {                ready = selector.selectNow();            } else {                long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);                // Only select until the next task needs to be run. Do not select with a value of 0 because                // that blocks without a timeout.                ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1)));            }            if (ready > 0) {                Set<SelectionKey> selectionKeys = selector.selectedKeys();                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();                while (keyIterator.hasNext()) {                    SelectionKey sk = keyIterator.next();                    keyIterator.remove();                    if (sk.isValid()) {                        try {                            processKey(sk);                        } catch (CancelledKeyException cke) {                            eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(),  cke);                        }                    } else {                        eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(),  new CancelledKeyException());                    }                }            }            handleScheduledTasks(System.nanoTime());        } catch (ClosedSelectorException e) {            if (isOpen()) {                throw e;            }        } catch (IOException e) {            eventHandler.selectorException(e);        } catch (Exception e) {            eventHandler.uncaughtException(e);        }    }    private void handleScheduledTasks(long nanoTime) {        Runnable task;        while ((task = taskScheduler.pollTask(nanoTime)) != null) {            try {                task.run();            } catch (Exception e) {                eventHandler.taskException(e);            }        }    }    //......}
  • NioSelector的runLoop方法调用了singleLoop方法,后者调用了handleScheduledTasks方法,而handleScheduledTasks方法则是从taskScheduler.pollTask,然后执行task.run()

小结

  • TaskScheduler定义了DelayedTask,它实现了Runnable接口,它包含deadline、runnable、cancelled三个属性
  • TaskScheduler定义了DelayedTask类型的PriorityQueue,其comparator为Comparator.comparingLong(DelayedTask::getDeadline)
  • scheduleAtRelativeTime方法将runnable包装为delayedTask,然后offer到priorityQueue中;pollTask则peek出来task,如果不为null则判断relativeNanos是否大于等于task.deadline,条件成立的话则将其从tasks中移除,然后在cancelled为false的时候返回task.runnable

doc

  • TaskScheduler