序
本文主要研究一下 Elasticsearch 的 TimedRunnable
TimedRunnable
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java
class TimedRunnable extends AbstractRunnable implements WrappedRunnable {
private final Runnable original;
private final long creationTimeNanos;
private long startTimeNanos;
private long finishTimeNanos = -1;
TimedRunnable(final Runnable original) {
this.original = original;
this.creationTimeNanos = System.nanoTime();}
@Override
public void doRun() {
try {startTimeNanos = System.nanoTime();
original.run();} finally {finishTimeNanos = System.nanoTime();
}
}
@Override
public void onRejection(final Exception e) {if (original instanceof AbstractRunnable) {((AbstractRunnable) original).onRejection(e);
}
}
@Override
public void onAfter() {if (original instanceof AbstractRunnable) {((AbstractRunnable) original).onAfter();}
}
@Override
public void onFailure(final Exception e) {if (original instanceof AbstractRunnable) {((AbstractRunnable) original).onFailure(e);
}
}
@Override
public boolean isForceExecution() {return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution();}
/**
* Return the time since this task was created until it finished running.
* If the task is still running or has not yet been run, returns -1.
*/
long getTotalNanos() {if (finishTimeNanos == -1) {// There must have been an exception thrown, the total time is unknown (-1)
return -1;
}
return Math.max(finishTimeNanos - creationTimeNanos, 1);
}
/**
* Return the time this task spent being run.
* If the task is still running or has not yet been run, returns -1.
*/
long getTotalExecutionNanos() {if (startTimeNanos == -1 || finishTimeNanos == -1) {// There must have been an exception thrown, the total time is unknown (-1)
return -1;
}
return Math.max(finishTimeNanos - startTimeNanos, 1);
}
@Override
public Runnable unwrap() {return original;}
}
- TimedRunnable 继承了 AbstractRunnable,同时实现了 WrappedRunnable 接口;它在 doRun 方法里头记录了原始 Runnable 的 startTimeNanos 及 finishTimeNanos;同时提供了 getTotalExecutionNanos 来返回该 task 的执行耗时
实例
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java
public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {
//......
protected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);
// A task has been completed, it has left the building. We should now be able to get the
// total time as a combination of the time in the queue and time spent running the task. We
// only want runnables that did not throw errors though, because they could be fast-failures
// that throw off our timings, so only check when t is null.
assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";
final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);
final long taskNanos = timedRunnable.getTotalNanos();
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos();
assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got:" + taskExecutionNanos;
executionEWMA.addValue(taskExecutionNanos);
if (taskCount.incrementAndGet() == this.tasksPerFrame) {final long endTimeNs = System.nanoTime();
final long totalRuntime = endTimeNs - this.startNs;
// Reset the start time for all tasks. At first glance this appears to need to be
// volatile, since we are reading from a different thread when it is set, but it
// is protected by the taskCount memory barrier.
// See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html
startNs = endTimeNs;
// Calculate the new desired queue size
try {final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L));
final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
final int oldCapacity = workQueue.capacity();
if (logger.isDebugEnabled()) {
final long avgTaskTime = totalNanos / tasksPerFrame;
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}]," +
"[{} tasks/s], optimal queue is [{}], current capacity [{}]",
getName(),
tasksPerFrame,
TimeValue.timeValueNanos(totalRuntime),
TimeValue.timeValueNanos(avgTaskTime),
TimeValue.timeValueNanos((long)executionEWMA.getAverage()),
String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
desiredQueueSize,
oldCapacity);
}
// Adjust the queue size towards the desired capacity using an adjust of
// QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
// values the queue size can have.
final int newCapacity =
workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);
if (oldCapacity != newCapacity && logger.isDebugEnabled()) {logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(),
newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,
oldCapacity, newCapacity);
}
} catch (ArithmeticException e) {
// There was an integer overflow, so just log about it, rather than adjust the queue size
logger.warn(() -> new ParameterizedMessage("failed to calculate optimal queue size for [{}] thread pool," +
"total frame time [{}ns], tasks [{}], task execution time [{}ns]",
getName(), totalRuntime, tasksPerFrame, totalNanos),
e);
} finally {
// Finally, decrement the task count and time back to their starting values. We
// do this at the end so there is no concurrent adjustments happening. We also
// decrement them instead of resetting them back to zero, as resetting them back
// to zero causes operations that came in during the adjustment to be uncounted
int tasks = taskCount.addAndGet(-this.tasksPerFrame);
assert tasks >= 0 : "tasks should never be negative, got:" + tasks;
if (tasks >= this.tasksPerFrame) {
// Start over, because we can potentially reach a "never adjusting" state,
//
// consider the following:
// - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10)
// - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25
// - Adjustment happens and we decrement the tasks by 10, taskCount is now 15
// - Since taskCount will now be incremented forever, it will never be 10 again,
// so there will be no further adjustments
logger.debug("[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName());
totalTaskNanos.getAndSet(1);
taskCount.getAndSet(0);
startNs = System.nanoTime();} else {
// Do a regular adjustment
totalTaskNanos.addAndGet(-totalNanos);
}
}
}
}
//......
}
- QueueResizingEsThreadPoolExecutor 的 afterExecute 会使用 timedRunnable.getTotalExecutionNanos() 的来进行 EWMA 统计
小结
TimedRunnable 继承了 AbstractRunnable,同时实现了 WrappedRunnable 接口;它在 doRun 方法里头记录了原始 Runnable 的 startTimeNanos 及 finishTimeNanos;同时提供了 getTotalExecutionNanos 来返回该 task 的执行耗时
doc
- TimedRunnable