指标
- 实现动静调整线程池参数
- 对线程池运行状况进行监控
实现
一,线程池可调整的参数
- 外围线程数
- 超时工夫
- 最大线程数
- 回绝策略
而队列 BlockingQueue 因为是 final 类型,所以没有对外批改入口。但能够通过重写 LinkedBlockingQueue 并把 capacity 设置为非 final。
二,联合配置核心实现动静调整
这里的配置核心应用 Apollo, 通过监听配置核心变动,而后更新线程池配置。示例代码如下:
@Slf4j
@Component
public class DynamicThreadPoolConfig {
/** 线程执行器 **/
private volatile ThreadPoolExecutor executor;
/** 外围线程数 **/
private Integer corePoolSize = 10;
/** 最大值线程数 **/
private Integer maximumPoolSize = 20;
/** 待执行工作的队列的长度 **/
private Integer workQueueSize = 1000;
/** 线程闲暇工夫 **/
private Long keepAliveTime = 1000L;
/** 线程名 **/
private String threadName;
private Config config = ConfigService.getConfig("lepu-activity-center");;
public DynamicThreadPoolConfig() {init(config);
}
/** * 初始化 */
private void init(Config config) {log.info("线程池初始化中..........");
if (executor == null) {synchronized (DynamicThreadPoolConfig.class) {if (executor == null) {String corePoolSizeProperty = config.getProperty("corePoolSize", corePoolSize.toString());
log.info("批改前的外围线程池:{}",corePoolSizeProperty);
String maximumPoolSizeProperty = config.getProperty("maximumPoolSize", maximumPoolSize.toString());
String keepAliveTImeProperty = config.getProperty("keepAliveTime", keepAliveTime.toString());
BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize);
executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty),
Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty);
}
}
}
}
/**
* 监听到配置核心发生变化后,更新线程池配置
* @param changeEvent
*/
@ApolloConfigChangeListener
public void onChange(ConfigChangeEvent changeEvent){log.info("线程池参数配置发生变化,namespace:{}",changeEvent.getNamespace());
for(String key : changeEvent.changedKeys()){ConfigChange change = changeEvent.getChange(key);
String newValue = change.getNewValue();
refreshThreadPool(key,newValue);
}
}
/**
* 更新线程池配置
* @param key
* @param newValue
*/
private void refreshThreadPool(String key, String newValue) {if (executor == null) {return;}
if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) {executor.setCorePoolSize(Integer.valueOf(newValue));
log.info("批改外围线程数 key={},value={}",key,newValue);
}
if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) {executor.setMaximumPoolSize(Integer.valueOf(newValue));
log.info("批改最大线程数 key={},value={}", key, newValue);
}
if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) {executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);
log.info("批改线程闲暇工夫 key={},value={}", key, newValue);
}
}
public ThreadPoolExecutor getExecutor() {return executor;}
}
三,监控形式
批改线程池无关参数重要,但晓得何时批改同样重要,能够思考距离一段时间进行采集,通过日志输入,达到临界点后告警。
同样,ThreadPoolExecutor 也提供获取线程池相干信息的 API:
具体实现再看~