指标
- 实现动静调整线程池参数
- 对线程池运行状况进行监控
实现
一,线程池可调整的参数
- 外围线程数
- 超时工夫
- 最大线程数
- 回绝策略
而队列BlockingQueue因为是final类型,所以没有对外批改入口。但能够通过重写LinkedBlockingQueue并把capacity设置为非final。
二,联合配置核心实现动静调整
这里的配置核心应用Apollo, 通过监听配置核心变动,而后更新线程池配置。示例代码如下:
@Slf4j@Componentpublic class DynamicThreadPoolConfig { /** 线程执行器 **/ private volatile ThreadPoolExecutor executor; /** 外围线程数 **/ private Integer corePoolSize = 10; /** 最大值线程数 **/ private Integer maximumPoolSize = 20; /** 待执行工作的队列的长度 **/ private Integer workQueueSize = 1000; /** 线程闲暇工夫 **/ private Long keepAliveTime = 1000L; /** 线程名 **/ private String threadName; private Config config = ConfigService.getConfig("lepu-activity-center");; public DynamicThreadPoolConfig() { init(config); } /** * 初始化 */ private void init(Config config) { log.info("线程池初始化中.........."); if (executor == null) { synchronized (DynamicThreadPoolConfig.class) { if (executor == null) { String corePoolSizeProperty = config.getProperty("corePoolSize", corePoolSize.toString()); log.info("批改前的外围线程池:{}",corePoolSizeProperty); String maximumPoolSizeProperty = config.getProperty("maximumPoolSize", maximumPoolSize.toString()); String keepAliveTImeProperty = config.getProperty("keepAliveTime", keepAliveTime.toString()); BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize); executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty), Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty); } } } } /** * 监听到配置核心发生变化后,更新线程池配置 * @param changeEvent */ @ApolloConfigChangeListener public void onChange(ConfigChangeEvent changeEvent){ log.info("线程池参数配置发生变化,namespace:{}",changeEvent.getNamespace()); for(String key : changeEvent.changedKeys()){ ConfigChange change = changeEvent.getChange(key); String newValue = change.getNewValue(); refreshThreadPool(key,newValue); } } /** * 更新线程池配置 * @param key * @param newValue */ private void refreshThreadPool(String key, String newValue) { if (executor == null) { return; } if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) { executor.setCorePoolSize(Integer.valueOf(newValue)); log.info("批改外围线程数key={},value={}",key,newValue); } if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) { executor.setMaximumPoolSize(Integer.valueOf(newValue)); log.info("批改最大线程数key={},value={}", key, newValue); } if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) { executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS); log.info("批改线程闲暇工夫key={},value={}", key, newValue); } } public ThreadPoolExecutor getExecutor() { return executor; }}
三,监控形式
批改线程池无关参数重要,但晓得何时批改同样重要,能够思考距离一段时间进行采集,通过日志输入,达到临界点后告警。
同样,ThreadPoolExecutor也提供获取线程池相干信息的API:
具体实现再看~