共计 7179 个字符,预计需要花费 18 分钟才能阅读完成。
序
springboot2.6.0 版本提供了 TaskExecutorMetricsAutoConfiguration,能够主动给线程池加上 metrics
TaskExecutorMetricsAutoConfiguration
spring-boot-actuator-autoconfigure-2.7.14-sources.jar!/org/springframework/boot/actuate/autoconfigure/metrics/task/TaskExecutorMetricsAutoConfiguration.java
/**
* {@link EnableAutoConfiguration Auto-configuration} for metrics on all available
* {@link ThreadPoolTaskExecutor task executors} and {@link ThreadPoolTaskScheduler task
* schedulers}.
*
* @author Stephane Nicoll
* @author Scott Frederick
* @since 2.6.0
*/
@AutoConfiguration(after = { MetricsAutoConfiguration.class, SimpleMetricsExportAutoConfiguration.class,
TaskExecutionAutoConfiguration.class, TaskSchedulingAutoConfiguration.class })
@ConditionalOnClass(ExecutorServiceMetrics.class)
@ConditionalOnBean({Executor.class, MeterRegistry.class})
public class TaskExecutorMetricsAutoConfiguration {
@Autowired
public void bindTaskExecutorsToRegistry(Map<String, Executor> executors, MeterRegistry registry) {executors.forEach((beanName, executor) -> {if (executor instanceof ThreadPoolTaskExecutor) {monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskExecutor) executor), beanName);
}
else if (executor instanceof ThreadPoolTaskScheduler) {monitor(registry, safeGetThreadPoolExecutor((ThreadPoolTaskScheduler) executor), beanName);
}
});
}
private void monitor(MeterRegistry registry, ThreadPoolExecutor threadPoolExecutor, String name) {if (threadPoolExecutor != null) {new ExecutorServiceMetrics(threadPoolExecutor, name, Collections.emptyList()).bindTo(registry);
}
}
private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskExecutor taskExecutor) {
try {return taskExecutor.getThreadPoolExecutor();
}
catch (IllegalStateException ex) {return null;}
}
private ThreadPoolExecutor safeGetThreadPoolExecutor(ThreadPoolTaskScheduler taskScheduler) {
try {return taskScheduler.getScheduledThreadPoolExecutor();
}
catch (IllegalStateException ex) {return null;}
}
}
这里会遍历 executors,而后挨个执行 monitor 办法,而 monitor 办法则是创立 ExecutorServiceMetrics 而后绑定到 meterRegistry
ExecutorServiceMetrics
micrometer-core-1.9.13-sources.jar!/io/micrometer/core/instrument/binder/jvm/ExecutorServiceMetrics.java
@NonNullApi
@NonNullFields
public class ExecutorServiceMetrics implements MeterBinder {
private static boolean allowIllegalReflectiveAccess = true;
private static final InternalLogger log = InternalLoggerFactory.getInstance(ExecutorServiceMetrics.class);
private static final String DEFAULT_EXECUTOR_METRIC_PREFIX = "";
@Nullable
private final ExecutorService executorService;
private final Iterable<Tag> tags;
private final String metricPrefix;
public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName,
Iterable<Tag> tags) {this(executorService, executorServiceName, DEFAULT_EXECUTOR_METRIC_PREFIX, tags);
}
/**
* Create an {@code ExecutorServiceMetrics} instance.
* @param executorService executor service
* @param executorServiceName executor service name which will be used as
* {@literal name} tag
* @param metricPrefix metrics prefix which will be used to prefix metric name
* @param tags additional tags
* @since 1.5.0
*/
public ExecutorServiceMetrics(@Nullable ExecutorService executorService, String executorServiceName,
String metricPrefix, Iterable<Tag> tags) {
this.executorService = executorService;
this.tags = Tags.concat(tags, "name", executorServiceName);
this.metricPrefix = sanitizePrefix(metricPrefix);
}
@Override
public void bindTo(MeterRegistry registry) {if (executorService == null) {return;}
String className = executorService.getClass().getName();
if (executorService instanceof ThreadPoolExecutor) {monitor(registry, (ThreadPoolExecutor) executorService);
}
else if (executorService instanceof ForkJoinPool) {monitor(registry, (ForkJoinPool) executorService);
}
else if (allowIllegalReflectiveAccess) {if (className.equals("java.util.concurrent.Executors$DelegatedScheduledExecutorService")) {monitor(registry, unwrapThreadPoolExecutor(executorService, executorService.getClass()));
}
else if (className.equals("java.util.concurrent.Executors$FinalizableDelegatedExecutorService")) {
monitor(registry,
unwrapThreadPoolExecutor(executorService, executorService.getClass().getSuperclass()));
}
else {log.warn("Failed to bind as {} is unsupported.", className);
}
}
else {log.warn("Failed to bind as {} is unsupported or reflective access is not allowed.", className);
}
}
// ......
}
这里次要是 bindTo 办法,辨别了 ThreadPoolExecutor 及 ForkJoinPool
monitor ThreadPoolExecutor
private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) {if (tp == null) {return;}
FunctionCounter.builder(metricPrefix + "executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount)
.tags(tags)
.description("The approximate total number of tasks that have completed execution")
.baseUnit(BaseUnits.TASKS)
.register(registry);
Gauge.builder(metricPrefix + "executor.active", tp, ThreadPoolExecutor::getActiveCount)
.tags(tags)
.description("The approximate number of threads that are actively executing tasks")
.baseUnit(BaseUnits.THREADS)
.register(registry);
Gauge.builder(metricPrefix + "executor.queued", tp, tpRef -> tpRef.getQueue().size())
.tags(tags)
.description("The approximate number of tasks that are queued for execution")
.baseUnit(BaseUnits.TASKS)
.register(registry);
Gauge.builder(metricPrefix + "executor.queue.remaining", tp, tpRef -> tpRef.getQueue().remainingCapacity())
.tags(tags)
.description("The number of additional elements that this queue can ideally accept without blocking")
.baseUnit(BaseUnits.TASKS)
.register(registry);
Gauge.builder(metricPrefix + "executor.pool.size", tp, ThreadPoolExecutor::getPoolSize)
.tags(tags)
.description("The current number of threads in the pool")
.baseUnit(BaseUnits.THREADS)
.register(registry);
Gauge.builder(metricPrefix + "executor.pool.core", tp, ThreadPoolExecutor::getCorePoolSize)
.tags(tags)
.description("The core number of threads for the pool")
.baseUnit(BaseUnits.THREADS)
.register(registry);
Gauge.builder(metricPrefix + "executor.pool.max", tp, ThreadPoolExecutor::getMaximumPoolSize)
.tags(tags)
.description("The maximum allowed number of threads in the pool")
.baseUnit(BaseUnits.THREADS)
.register(registry);
}
针对 ThreadPoolExecutor 次要是上报了 executor.completed、executor.active、executor.queued、executor.queue.remaining、executor.pool.size、executor.pool.core、executor.pool.max
monitor ForkJoinPool
private void monitor(MeterRegistry registry, ForkJoinPool fj) {FunctionCounter.builder(metricPrefix + "executor.steals", fj, ForkJoinPool::getStealCount)
.tags(tags)
.description("Estimate of the total number of tasks stolen from"
+ "one thread's work queue by another. The reported value "+"underestimates the actual total number of steals when the pool "+"is not quiescent")
.register(registry);
Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount)
.tags(tags)
.description("An estimate of the total number of tasks currently held in queues by worker threads")
.register(registry);
Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount)
.tags(tags)
.description("An estimate of the number of threads that are currently stealing or executing tasks")
.register(registry);
Gauge.builder(metricPrefix + "executor.running", fj, ForkJoinPool::getRunningThreadCount)
.tags(tags)
.description("An estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization threads")
.register(registry);
}
针对 ForkJoinPool 次要是上报了 executor.steals、executor.queued、executor.active、executor.running
小结
springboot 2.6.0 版本提供了 TaskExecutorMetricsAutoConfiguration,它利用 micrometer 的 ExecutorServiceMetrics 提供了对 Executor 的 metrics 上报。降级到新版本的服务就不必再手工给线程池进行指标上报了。
doc
- TaskExecutorMetricsAutoConfiguration