本文主要研究一下Elasticsearch的TimedRunnable

TimedRunnable

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/TimedRunnable.java

class TimedRunnable extends AbstractRunnable implements WrappedRunnable {    private final Runnable original;    private final long creationTimeNanos;    private long startTimeNanos;    private long finishTimeNanos = -1;    TimedRunnable(final Runnable original) {        this.original = original;        this.creationTimeNanos = System.nanoTime();    }    @Override    public void doRun() {        try {            startTimeNanos = System.nanoTime();            original.run();        } finally {            finishTimeNanos = System.nanoTime();        }    }    @Override    public void onRejection(final Exception e) {        if (original instanceof AbstractRunnable) {            ((AbstractRunnable) original).onRejection(e);        }    }    @Override    public void onAfter() {        if (original instanceof AbstractRunnable) {            ((AbstractRunnable) original).onAfter();        }    }    @Override    public void onFailure(final Exception e) {        if (original instanceof AbstractRunnable) {            ((AbstractRunnable) original).onFailure(e);        }    }    @Override    public boolean isForceExecution() {        return original instanceof AbstractRunnable && ((AbstractRunnable) original).isForceExecution();    }    /**     * Return the time since this task was created until it finished running.     * If the task is still running or has not yet been run, returns -1.     */    long getTotalNanos() {        if (finishTimeNanos == -1) {            // There must have been an exception thrown, the total time is unknown (-1)            return -1;        }        return Math.max(finishTimeNanos - creationTimeNanos, 1);    }    /**     * Return the time this task spent being run.     * If the task is still running or has not yet been run, returns -1.     */    long getTotalExecutionNanos() {        if (startTimeNanos == -1 || finishTimeNanos == -1) {            // There must have been an exception thrown, the total time is unknown (-1)            return -1;        }        return Math.max(finishTimeNanos - startTimeNanos, 1);    }    @Override    public Runnable unwrap() {        return original;    }}
  • TimedRunnable继承了AbstractRunnable,同时实现了WrappedRunnable接口;它在doRun方法里头记录了原始Runnable的startTimeNanos及finishTimeNanos;同时提供了getTotalExecutionNanos来返回该task的执行耗时

实例

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/QueueResizingEsThreadPoolExecutor.java

public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {    //......    protected void afterExecute(Runnable r, Throwable t) {        super.afterExecute(r, t);        // A task has been completed, it has left the building. We should now be able to get the        // total time as a combination of the time in the queue and time spent running the task. We        // only want runnables that did not throw errors though, because they could be fast-failures        // that throw off our timings, so only check when t is null.        assert super.unwrap(r) instanceof TimedRunnable : "expected only TimedRunnables in queue";        final TimedRunnable timedRunnable = (TimedRunnable) super.unwrap(r);        final long taskNanos = timedRunnable.getTotalNanos();        final long totalNanos = totalTaskNanos.addAndGet(taskNanos);        final long taskExecutionNanos = timedRunnable.getTotalExecutionNanos();        assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;        executionEWMA.addValue(taskExecutionNanos);        if (taskCount.incrementAndGet() == this.tasksPerFrame) {            final long endTimeNs = System.nanoTime();            final long totalRuntime = endTimeNs - this.startNs;            // Reset the start time for all tasks. At first glance this appears to need to be            // volatile, since we are reading from a different thread when it is set, but it            // is protected by the taskCount memory barrier.            // See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html            startNs = endTimeNs;            // Calculate the new desired queue size            try {                final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L));                final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);                final int oldCapacity = workQueue.capacity();                if (logger.isDebugEnabled()) {                    final long avgTaskTime = totalNanos / tasksPerFrame;                    logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " +                                    "[{} tasks/s], optimal queue is [{}], current capacity [{}]",                            getName(),                            tasksPerFrame,                            TimeValue.timeValueNanos(totalRuntime),                            TimeValue.timeValueNanos(avgTaskTime),                            TimeValue.timeValueNanos((long)executionEWMA.getAverage()),                            String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),                            desiredQueueSize,                            oldCapacity);                }                // Adjust the queue size towards the desired capacity using an adjust of                // QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max                // values the queue size can have.                final int newCapacity =                        workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);                if (oldCapacity != newCapacity && logger.isDebugEnabled()) {                    logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(),                            newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,                            oldCapacity, newCapacity);                }            } catch (ArithmeticException e) {                // There was an integer overflow, so just log about it, rather than adjust the queue size                logger.warn(() -> new ParameterizedMessage(                                "failed to calculate optimal queue size for [{}] thread pool, " +                                "total frame time [{}ns], tasks [{}], task execution time [{}ns]",                                getName(), totalRuntime, tasksPerFrame, totalNanos),                        e);            } finally {                // Finally, decrement the task count and time back to their starting values. We                // do this at the end so there is no concurrent adjustments happening. We also                // decrement them instead of resetting them back to zero, as resetting them back                // to zero causes operations that came in during the adjustment to be uncounted                int tasks = taskCount.addAndGet(-this.tasksPerFrame);                assert tasks >= 0 : "tasks should never be negative, got: " + tasks;                if (tasks >= this.tasksPerFrame) {                    // Start over, because we can potentially reach a "never adjusting" state,                    //                    // consider the following:                    // - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10)                    // - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25                    // - Adjustment happens and we decrement the tasks by 10, taskCount is now 15                    // - Since taskCount will now be incremented forever, it will never be 10 again,                    //   so there will be no further adjustments                    logger.debug(                            "[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName());                    totalTaskNanos.getAndSet(1);                    taskCount.getAndSet(0);                    startNs = System.nanoTime();                } else {                    // Do a regular adjustment                    totalTaskNanos.addAndGet(-totalNanos);                }            }        }    }    //......}
  • QueueResizingEsThreadPoolExecutor的afterExecute会使用timedRunnable.getTotalExecutionNanos()的来进行EWMA统计

小结

TimedRunnable继承了AbstractRunnable,同时实现了WrappedRunnable接口;它在doRun方法里头记录了原始Runnable的startTimeNanos及finishTimeNanos;同时提供了getTotalExecutionNanos来返回该task的执行耗时

doc

  • TimedRunnable