大家好,这篇文章跟大家探讨下日常应用线程池的各种姿态,重点介绍怎么在 Spring 环境中正确应用线程池。
线程池应用姿态
首先问大家一个问题,你日常开发中是怎么应用线程池的?
我想大抵能够分为以下四种状况:
1.办法级,随用随建,用完敞开
2.类级共享,定义个 static final 润饰的 ThreadPoolExecutor,该类及子类(看修饰符)所有对象、办法共享
3.业务共享,按业务类型定义多个 ThreadPoolExecutor,雷同业务类型共享同一线程池对象
4.全局共享,服务所有中央共享同一全局线程池
一般来说,优先应用形式3,其次形式2,不要应用形式1跟4,起因如下
1.线程池呈现的目标就是为了对立治理线程资源,缩小频繁创立销毁线程带来的开销,应用池化技术复用线程执行工作,晋升零碎性能,在高并发、异步化的场景下,办法级应用基本达不到此目标,反而会使性能变低。
2.全局共享一个线程池,工作执行参差不齐,相互影响,高耗时工作会占满线程池资源,导致低耗时工作没机会执行;同时如果工作之间存在父子关系,可能会导致死锁的产生,进而引发OOM。
3.按业务类型进行线程池隔离,各工作执行互不影响,粒度也比类级共享大点,不会创立大量线程池,升高系统调度压力,像 Hystrix 线程池隔离就能够了解成这种模式。
综上,倡议大家都采纳形式3,按业务性能分类定义线程池。
Spring 我的项目中应用 ThreadPoolExecutor
Spring 作为一个 Bean 容器,咱们通常会将业务中用到的 ThreadPoolExecutor 注册到 Spring 容器中,同时 Spring 在容器刷新的时候会注入相应的 ThreadPoolExecutor 对象 到咱们的业务 Bean 中,而后就能够间接应用了,比方定义如下(ThreadPoolBuilder是封装的一个建造者模式实现):
@Configurationpublic class ThreadPoolConfiguration { @Bean public ThreadPoolExecutor jobExecutor() { return ThreadPoolBuilder.newBuilder() .corePoolSize(10) .maximumPoolSize(15) .keepAliveTime(15000) .timeUnit(TimeUnit.MILLISECONDS) .workQueue(LINKED_BLOCKING_QUEUE.getName(), 3000) .build(); } @Bean public ThreadPoolExecutor remotingExecutor() { return ThreadPoolBuilder.newBuilder() .corePoolSize(10) .maximumPoolSize(15) .keepAliveTime(15000) .timeUnit(TimeUnit.MILLISECONDS) .workQueue(SYNCHRONOUS_QUEUE.getName(), null) .build(); } @Bean public ThreadPoolExecutor consumeExecutor() { return ThreadPoolBuilder.newBuilder() .corePoolSize(10) .maximumPoolSize(15) .keepAliveTime(15000) .timeUnit(TimeUnit.MILLISECONDS) .workQueue(LINKED_BLOCKING_QUEUE.getName(), 5000) .build(); }}
以上按应用场景定义了三个线程池实例,一个用来执行耗时的定时工作、一个用来执行近程RPC调用、一个用来执行 Mq 生产。
这样应用 ThreadPoolExecutor 有个问题,Spring 容器敞开的时候可能工作队列里的工作还没解决完,有失落工作的危险。
咱们晓得 Spring 中的 Bean 是有生命周期的,如果 Bean 实现了 Spring 相应的生命周期接口(InitializingBean、DisposableBean接口),在 Bean 初始化、容器敞开的时候会调用相应的办法来做相应解决。
所以倡议最好不要间接应用 ThreadPoolExecutor 在 Spring 环境中,能够应用 Spring 提供的 ThreadPoolTaskExecutor,或者 DynamicTp 框架提供的 DtpExecutor 线程池实现。
一些 Spring 常识
这里分享一个源码浏览技巧,就是开源我的项目和Spring整合时,很多同学不知从何动手浏览源码。
咱们晓得Spring提供了很多的扩大点,第三方框架整合Spring其实大多也都是基于这些扩大接口来做的,所以咱们能够从这些扩大接口动手,断点调试,一步步深刻框架内核。
这些扩大包含但不限于以下接口:
BeanFactoryPostProcessor:在Bean实例化之前对BeanDefinition进行批改
BeanPostProcessor:在Bean初始化前后对Bean进行一些批改包装加强,比方返回代理对象
Aware:一个标记接口,实现该接口及子接口的类会收到Spring的告诉回调,赋予某种Spring框架的能力,比方ApplicationContextAware、EnvironmentAware等
ApplicationContextInitializer:在上下文筹备阶段,容器刷新之前做一些初始化工作,比方咱们罕用的配置核心client根本都是继承该初始化器,在容器刷新前将配置从近程拉到本地,而后封装成PropertySource放到Environment中供应用
ApplicationListener:Spring事件机制,监听特定的利用事件(ApplicationEvent),观察者模式的一种实现
FactoryBean:用来自定义Bean的创立逻辑(Mybatis、Feign等等)
ImportBeanDefinitionRegistrar:定义@EnableXXX注解,在注解上Import了一个 ImportBeanDefinitionRegistrar,实现注册BeanDefinition到容器中
ApplicationRunner/CommandLineRunner:容器启动后回调,执行一些初始化工作
上述列出了几个比拟罕用的接口,然而Spring扩大远不于此,还有很多扩大接口大家能够本人去理解。
DynamicTp 生成线程池对象
DynamicTp 框架外部定义了 DtpExecutor 线程池类,其继承关系如下:
EagerDtpExecutor:参考 Tomcat 线程池设计,调整了下线程池的执行流程,优先创立线程执行工作而不是放入队列中,次要用于IO密集型场景,继承 DtpExecutor
DtpExecutor:重写了 ThreadPoolExecutor 的 execute 办法、beforeExecute 办法、afterExecute 办法,次要做工作包装、执行超时、期待超时记录等,继承 DtpLifecycleSupport
DtpLifecycleSupport:实现了 Spring 中的 InitializingBean, DisposableBean 接口,在 Bean 初始化、Spring 容器销毁时执行相应的逻辑,destroy 办法逻辑如下:
@Override public void destroy() { internalShutdown(); } public void internalShutdown() { if (log.isInfoEnabled()) { log.info("Shutting down ExecutorService, poolName: {}", threadPoolName); } if (this.waitForTasksToCompleteOnShutdown) { // 如果须要期待工作执行结束,则调用shutdown()会执行先前已提交的工作,回绝新工作提交,线程池状态变成 SHUTDOWN this.shutdown(); } else { // 如果不须要期待工作执行结束,则间接调用shutdownNow()办法,尝试中断正在执行的工作,返回所有未执行的工作,线程池状态变成 STOP, 而后调用 Future 的 cancel 办法勾销 for (Runnable remainingTask : this.shutdownNow()) { cancelRemainingTask(remainingTask); } } awaitTerminationIfNecessary(); } protected void cancelRemainingTask(Runnable task) { if (task instanceof Future) { ((Future<?>) task).cancel(true); } } private void awaitTerminationIfNecessary() { if (this.awaitTerminationSeconds <= 0) { return; } try { // 配合 shutdown 应用,阻塞以后线程,期待已提交的工作执行结束或者超时 if (!awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS) && log.isWarnEnabled()) { log.warn("Timed out while waiting for executor {} to terminate", threadPoolName); } } catch (InterruptedException ex) { if (log.isWarnEnabled()) { log.warn("Interrupted while waiting for executor {} to terminate", threadPoolName); } Thread.currentThread().interrupt(); } }
DynamicTp 框架在整合 Spring 的时候,也是用到了上述说的扩大接口。
扩大1
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(DtpBeanDefinitionRegistrar.class) public @interface EnableDynamicTp { }
应用过 DynamicTp 的小伙伴应该晓得须要在启动类加 @EnableDynamicTp 注解,该注解其实就用到了 ImportBeanDefinitionRegistrar 扩大,次要代码如下:
@Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { DtpProperties dtpProperties = new DtpProperties(); // 将环境变量中的线程池相干配置绑定到 DtpProperties 对象上 PropertiesBinder.bindDtpProperties(environment, dtpProperties); val executors = dtpProperties.getExecutors(); if (CollUtil.isEmpty(executors)) { log.warn("DynamicTp registrar, no executors are configured."); return; } executors.forEach(x -> { // 判断线程池类型(common or eager) Class<?> executorTypeClass = ExecutorType.getClass(x.getExecutorType()); String beanName = x.getThreadPoolName(); // 线程池对象属性 Map<String, Object> properties = buildProperties(x); // 结构器参数 Object[] args = buildArgs(executorTypeClass, x); BeanUtil.registerIfAbsent(registry, beanName, executorTypeClass, properties, args); }); }
代码解读:
1.咱们晓得 ImportBeanDefinitionRegistrar 的实现是在 Spring 容器刷新的时候执行的,在此之前在上下文筹备阶段曾经从配置核心拉取到线程池配置放到环境变量里了,所以第一步咱们将环境变量里的线程池相干配置绑定到 DtpProperties 对象上。
2.而后结构 BeanDefinitionBuilder 对象,设置结构函数参数、设置属性值,注册到 BeanDefinition 到 Spring 容器中
public static void doRegister(BeanDefinitionRegistry registry, String beanName, Class<?> clazz, Map<String, Object> properties, Object... constructorArgs) { BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz); // 设置结构器参数,老八股文了 for (Object constructorArg : constructorArgs) { builder.addConstructorArgValue(constructorArg); } // 设置属性及值的KV对,后续在Bean populateBean 的时候会通过反射set办法赋值 if (CollUtil.isNotEmpty(properties)) { properties.forEach(builder::addPropertyValue); } registry.registerBeanDefinition(beanName, builder.getBeanDefinition()); }
3.Spring 容器刷新时会依据注册的 BeanDefinition 创立配置的线程池对象,初始化赋值,并注入到援用的 Bean 中。这样就不必在手动用 @Bean 申明线程池对象了,只须要在配置核心配置即可
扩大2
DtpPostProcessor 继承 BeanPostProcessor,在 Bean 初始化前后对 ThreadPoolExecutor 及其子类进行一些解决,次要用来获取线程池对象注册到 DynamicTp 框架外部定义的容器中(就个 Map)
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (!(bean instanceof ThreadPoolExecutor)) { return bean; } if (bean instanceof DtpExecutor) { DtpExecutor dtpExecutor = (DtpExecutor) bean; if (bean instanceof EagerDtpExecutor) { ((TaskQueue) dtpExecutor.getQueue()).setExecutor((EagerDtpExecutor) dtpExecutor); } registerDtp(dtpExecutor); return dtpExecutor; } ApplicationContext applicationContext = ApplicationContextHolder.getInstance(); DynamicTp dynamicTp; try { dynamicTp = applicationContext.findAnnotationOnBean(beanName, DynamicTp.class); if (dynamicTp == null) { return bean; } } catch (NoSuchBeanDefinitionException e) { log.error("There is no bean with the given name {}", beanName, e); return bean; } String poolName = StringUtils.isNotBlank(dynamicTp.value()) ? dynamicTp.value() : beanName; registerCommon(poolName, (ThreadPoolExecutor) bean); return bean; }
扩大3
ApplicationListener 次要用来解耦逻辑,公布监听事件,core 模块跟 adapter 模块通信次要就用该扩大,以及框架会监听 Spring 容器启动的各阶段事件,做相应的逻辑解决
public abstract class AbstractDtpHandleListener implements GenericApplicationListener { @Override public boolean supportsEventType(ResolvableType resolvableType) { Class<?> type = resolvableType.getRawClass(); if (type != null) { return RefreshEvent.class.isAssignableFrom(type) || CollectEvent.class.isAssignableFrom(type) || AlarmCheckEvent.class.isAssignableFrom(type); } return false; } @Override public void onApplicationEvent(@NonNull ApplicationEvent event) { try { if (event instanceof RefreshEvent) { doRefresh(((RefreshEvent) event).getDtpProperties()); } else if (event instanceof CollectEvent) { doCollect(((CollectEvent) event).getDtpProperties()); } else if (event instanceof AlarmCheckEvent) { doAlarmCheck(((AlarmCheckEvent) event).getDtpProperties()); } } catch (Exception e) { log.error("DynamicTp adapter, event handle failed.", e); } }}
扩大4
ApplicationRunner,等 Spring 容器启动后,会调用该钩子函数,执行一些初始化操作,DtpMonitor、DtpRegistry 等都用到了该扩大
所以 DynamicTp 的正确应用姿态,线程池只需在配置核心申明,而后服务启动时框架会基于 Spring 的这些扩大主动创立线程池对象注入到所需的 Bean 中,代码中不须要显示申明
再次介绍下 DynamicTp 框架
DynamicTp 是一个基于配置核心实现的轻量级动静线程池管理工具,次要性能能够总结为 动静调参、告诉报警、运行监控、三方包线程池治理等几大类。
通过几个版本迭代,目前最新版本v1.0.7具备以下个性
个性 ✅
- 代码零侵入:所有配置都放在配置核心,对业务代码零侵入
- 轻量简略:基于 springboot 实现,引入 starter,接入只需简略4步就可实现,顺利3分钟搞定
- 高可扩大:框架外围性能都提供 SPI 接口供用户自定义个性化实现(配置核心、配置文件解析、告诉告警、监控数据采集、工作包装等等)
- 线上大规模利用:参考美团线程池实际,美团外部曾经有该实践成熟的利用教训
- 多平台告诉报警:提供多种报警维度(配置变更告诉、活性报警、容量阈值报警、回绝触发报警、工作执行或期待超时报警),已反对企业微信、钉钉、飞书报警,同时提供 SPI 接口可自定义扩大实现
- 监控:定时采集线程池指标数据,反对通过 MicroMeter、JsonLog 日志输入、Endpoint 三种形式,可通过 SPI 接口自定义扩大实现
- 工作加强:提供工作包装性能,实现TaskWrapper接口即可,如 TtlTaskWrapper 能够反对线程池上下文信息传递,以及给工作设置标识id,不便问题追踪
- 兼容性:JUC 一般线程池也能够被框架监控,@Bean 定义时加 @DynamicTp 注解即可
- 可靠性:框架提供的线程池实现 Spring 生命周期办法,能够在 Spring 容器敞开前尽可能多的解决队列中的工作
- 多模式:参考Tomcat线程池提供了 IO 密集型场景应用的 EagerDtpExecutor 线程池
- 反对多配置核心:基于支流配置核心实现线程池参数动静调整,实时失效,已反对 Nacos、Apollo、Zookeeper、Consul,同时也提供 SPI 接口可自定义扩大实现
- 中间件线程池治理:集成治理罕用第三方组件的线程池,已集成Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix等组件的线程池治理(调参、监控报警)
我的项目地址
目前累计 1.8k star,感激你的star,欢送pr,业务之余一起给开源奉献一份力量
官网:https://dynamictp.cn
gitee地址:https://gitee.com/dromara/dynamic-tp
github地址:https://github.com/dromara/dynamic-tp