springboot2.6.0版本提供了TaskExecutorMetricsAutoConfiguration,能够主动给线程池加上metrics

TaskExecutorMetricsAutoConfiguration

spring-boot-actuator-autoconfigure-2.7.14-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/task/TaskExecutorMetricsAutoConfiguration.java

/** * {@link EnableAutoConfiguration Auto-configuration} for metrics on all available * {@link ThreadPoolTaskExecutor task executors} and {@link ThreadPoolTaskScheduler task * schedulers}. * * @author Stephane Nicoll * @author Scott Frederick * @since 2.6.0 */@AutoConfiguration(after = { MetricsAutoConfiguration.class, SimpleMetricsExportAutoConfiguration.class,        TaskExecutionAutoConfiguration.class, TaskSchedulingAutoConfiguration.class })@ConditionalOnClass(ExecutorServiceMetrics.class)@ConditionalOnBean({ Executor.class, MeterRegistry.class })public class TaskExecutorMetricsAutoConfiguration {    @Autowired    public void bindTaskExecutorsToRegistry(Map<String, Executor> executors, MeterRegistry registry) {        executors.forEach((beanName, executor) -> {            if (executor instanceof ThreadPoolTaskExecutor) {                monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskExecutor) executor), beanName);            }            else if (executor instanceof ThreadPoolTaskScheduler) {                monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskScheduler) executor), beanName);            }        });    }    private void monitor(MeterRegistry registry, ThreadPoolExecutor threadPoolExecutor, String name) {        if (threadPoolExecutor != null) {            new ExecutorServiceMetrics(threadPoolExecutor, name, Collections.emptyList()).bindTo(registry);        }    }    private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskExecutor taskExecutor) {        try {            return taskExecutor.getThreadPoolExecutor();        }        catch (IllegalStateException ex) {            return null;        }    }    private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskScheduler taskScheduler) {        try {            return taskScheduler.getScheduledThreadPoolExecutor();        }        catch (IllegalStateException ex) {            return null;        }    }}
这里会遍历executors,而后挨个执行monitor办法,而monitor办法则是创立ExecutorServiceMetrics而后绑定到meterRegistry

ExecutorServiceMetrics

micrometer-core-1.9.13-sources.jar!/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java

@NonNullApi@NonNullFieldspublic class ExecutorServiceMetrics implements MeterBinder {    private static boolean allowIllegalReflectiveAccess = true;    private static final InternalLogger log = InternalLoggerFactory.getInstance(ExecutorServiceMetrics.class);    private static final String DEFAULT_EXECUTOR_METRIC_PREFIX = "";    @Nullable    private final ExecutorService executorService;    private final Iterable<Tag> tags;    private final String metricPrefix;    public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName,            Iterable<Tag> tags) {        this(executorService, executorServiceName, DEFAULT_EXECUTOR_METRIC_PREFIX, tags);    }    /**     * Create an {@code ExecutorServiceMetrics} instance.     * @param executorService executor service     * @param executorServiceName executor service name which will be used as     * {@literal name} tag     * @param metricPrefix metrics prefix which will be used to prefix metric name     * @param tags additional tags     * @since 1.5.0     */    public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName,            String metricPrefix, Iterable<Tag> tags) {        this.executorService = executorService;        this.tags = Tags.concat(tags, "name", executorServiceName);        this.metricPrefix = sanitizePrefix(metricPrefix);    }    @Override    public void bindTo(MeterRegistry registry) {        if (executorService == null) {            return;        }        String className = executorService.getClass().getName();        if (executorService instanceof ThreadPoolExecutor) {            monitor(registry, (ThreadPoolExecutor) executorService);        }        else if (executorService instanceof ForkJoinPool) {            monitor(registry, (ForkJoinPool) executorService);        }        else if (allowIllegalReflectiveAccess) {            if (className.equals("java.util.concurrent.Executors$DelegatedScheduledExecutorService")) {                monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass()));            }            else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")) {                monitor(registry,                        unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass()));            }            else {                log.warn("Failed to bind as {} is unsupported.", className);            }        }        else {            log.warn("Failed to bind as {} is unsupported or reflective access is not allowed.", className);        }    }           // ...... }    
这里次要是bindTo办法,辨别了ThreadPoolExecutor及ForkJoinPool

monitor ThreadPoolExecutor

    private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) {        if (tp == null) {            return;        }        FunctionCounter.builder(metricPrefix + "executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount)            .tags(tags)            .description("The approximate total number of tasks that have completed execution")            .baseUnit(BaseUnits.TASKS)            .register(registry);        Gauge.builder(metricPrefix + "executor.active", tp, ThreadPoolExecutor::getActiveCount)            .tags(tags)            .description("The approximate number of threads that are actively executing tasks")            .baseUnit(BaseUnits.THREADS)            .register(registry);        Gauge.builder(metricPrefix + "executor.queued", tp, tpRef -> tpRef.getQueue().size())            .tags(tags)            .description("The approximate number of tasks that are queued for execution")            .baseUnit(BaseUnits.TASKS)            .register(registry);        Gauge.builder(metricPrefix + "executor.queue.remaining", tp, tpRef -> tpRef.getQueue().remainingCapacity())            .tags(tags)            .description("The number of additional elements that this queue can ideally accept without blocking")            .baseUnit(BaseUnits.TASKS)            .register(registry);        Gauge.builder(metricPrefix + "executor.pool.size", tp, ThreadPoolExecutor::getPoolSize)            .tags(tags)            .description("The current number of threads in the pool")            .baseUnit(BaseUnits.THREADS)            .register(registry);        Gauge.builder(metricPrefix + "executor.pool.core", tp, ThreadPoolExecutor::getCorePoolSize)            .tags(tags)            .description("The core number of threads for the pool")            .baseUnit(BaseUnits.THREADS)            .register(registry);        Gauge.builder(metricPrefix + "executor.pool.max", tp, ThreadPoolExecutor::getMaximumPoolSize)            .tags(tags)            .description("The maximum allowed number of threads in the pool")            .baseUnit(BaseUnits.THREADS)            .register(registry);    }
针对ThreadPoolExecutor次要是上报了executor.completed、executor.active、executor.queued、executor.queue.remaining、executor.pool.size、executor.pool.core、executor.pool.max

monitor ForkJoinPool

    private void monitor(MeterRegistry registry, ForkJoinPool fj) {        FunctionCounter.builder(metricPrefix + "executor.steals", fj, ForkJoinPool::getStealCount)            .tags(tags)            .description("Estimate of the total number of tasks stolen from "                    + "one thread's work queue by another. The reported value "                    + "underestimates the actual total number of steals when the pool " + "is not quiescent")            .register(registry);        Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount)            .tags(tags)            .description("An estimate of the total number of tasks currently held in queues by worker threads")            .register(registry);        Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount)            .tags(tags)            .description("An estimate of the number of threads that are currently stealing or executing tasks")            .register(registry);        Gauge.builder(metricPrefix + "executor.running", fj, ForkJoinPool::getRunningThreadCount)            .tags(tags)            .description(                    "An estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization threads")            .register(registry);    }
针对ForkJoinPool次要是上报了executor.steals、executor.queued、executor.active、executor.running

小结

springboot 2.6.0版本提供了TaskExecutorMetricsAutoConfiguration,它利用micrometer的ExecutorServiceMetrics提供了对Executor的metrics上报。降级到新版本的服务就不必再手工给线程池进行指标上报了。

doc

  • TaskExecutorMetricsAutoConfiguration