序
本文主要研究一下 Elasticsearch 的 TaskScheduler
TaskScheduler
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/TaskScheduler.java
public class TaskScheduler {private final PriorityQueue<DelayedTask> tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline));
/**
* Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a
* relative nanotime after the scheduled time, the task will be returned. This method returns a
* {@link Runnable} that can be run to cancel the scheduled task.
*
* @param task to schedule
* @param relativeNanos defining when to execute the task
* @return runnable that will cancel the task
*/
public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) {DelayedTask delayedTask = new DelayedTask(relativeNanos, task);
tasks.offer(delayedTask);
return delayedTask;
}
Runnable pollTask(long relativeNanos) {
DelayedTask task;
while ((task = tasks.peek()) != null) {if (relativeNanos - task.deadline >= 0) {tasks.remove();
if (task.cancelled == false) {return task.runnable;}
} else {return null;}
}
return null;
}
long nanosUntilNextTask(long relativeNanos) {DelayedTask nextTask = tasks.peek();
if (nextTask == null) {return Long.MAX_VALUE;} else {return Math.max(nextTask.deadline - relativeNanos, 0);
}
}
private static class DelayedTask implements Runnable {
private final long deadline;
private final Runnable runnable;
private boolean cancelled = false;
private DelayedTask(long deadline, Runnable runnable) {
this.deadline = deadline;
this.runnable = runnable;
}
private long getDeadline() {return deadline;}
@Override
public void run() {cancelled = true;}
}
}
- TaskScheduler 定义了 DelayedTask,它实现了 Runnable 接口,它包含 deadline、runnable、cancelled 三个属性
- TaskScheduler 定义了 DelayedTask 类型的 PriorityQueue,其 comparator 为 Comparator.comparingLong(DelayedTask::getDeadline)
- scheduleAtRelativeTime 方法将 runnable 包装为 delayedTask,然后 offer 到 priorityQueue 中;pollTask 则 peek 出来 task,如果不为 null 则判断 relativeNanos 是否大于等于 task.deadline,条件成立的话则将其从 tasks 中移除,然后在 cancelled 为 false 的时候返回 task.runnable
SSLChannelContext
elasticsearch-7.0.1/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java
public final class SSLChannelContext extends SocketChannelContext {
//......
@Override
public void queueWriteOperation(WriteOperation writeOperation) {getSelector().assertOnSelectorThread();
if (writeOperation instanceof CloseNotifyOperation) {sslDriver.initiateClose();
long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime();
closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos);
} else {super.queueWriteOperation(writeOperation);
}
}
private void channelCloseTimeout() {
closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER;
setCloseNow();
getSelector().queueChannelClose(channel);
}
//......
}
- SSLChannelContext 的 queueWriteOperation 方法会使用 taskScheduler 的 scheduleAtRelativeTime 注册一个 channelCloseTimeout 的延时任务
NioSelector
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java
public class NioSelector implements Closeable {
//......
public void runLoop() {if (runLock.tryLock()) {isRunningFuture.complete(null);
try {setThread();
while (isOpen()) {singleLoop();
}
} finally {
try {cleanupAndCloseChannels();
} finally {
try {selector.close();
} catch (IOException e) {eventHandler.selectorException(e);
} finally {runLock.unlock();
exitedLoop.countDown();}
}
}
} else {throw new IllegalStateException("selector is already running");
}
}
void singleLoop() {
try {closePendingChannels();
preSelect();
long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());
int ready;
if (nanosUntilNextTask == 0) {ready = selector.selectNow();
} else {long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);
// Only select until the next task needs to be run. Do not select with a value of 0 because
// that blocks without a timeout.
ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1)));
}
if (ready > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {SelectionKey sk = keyIterator.next();
keyIterator.remove();
if (sk.isValid()) {
try {processKey(sk);
} catch (CancelledKeyException cke) {eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), cke);
}
} else {eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), new CancelledKeyException());
}
}
}
handleScheduledTasks(System.nanoTime());
} catch (ClosedSelectorException e) {if (isOpen()) {throw e;}
} catch (IOException e) {eventHandler.selectorException(e);
} catch (Exception e) {eventHandler.uncaughtException(e);
}
}
private void handleScheduledTasks(long nanoTime) {
Runnable task;
while ((task = taskScheduler.pollTask(nanoTime)) != null) {
try {task.run();
} catch (Exception e) {eventHandler.taskException(e);
}
}
}
//......
}
- NioSelector 的 runLoop 方法调用了 singleLoop 方法,后者调用了 handleScheduledTasks 方法,而 handleScheduledTasks 方法则是从 taskScheduler.pollTask,然后执行 task.run()
小结
- TaskScheduler 定义了 DelayedTask,它实现了 Runnable 接口,它包含 deadline、runnable、cancelled 三个属性
- TaskScheduler 定义了 DelayedTask 类型的 PriorityQueue,其 comparator 为 Comparator.comparingLong(DelayedTask::getDeadline)
- scheduleAtRelativeTime 方法将 runnable 包装为 delayedTask,然后 offer 到 priorityQueue 中;pollTask 则 peek 出来 task,如果不为 null 则判断 relativeNanos 是否大于等于 task.deadline,条件成立的话则将其从 tasks 中移除,然后在 cancelled 为 false 的时候返回 task.runnable
doc
- TaskScheduler