指标

  1. 实现动静调整线程池参数
  2. 对线程池运行状况进行监控

实现

一,线程池可调整的参数

  1. 外围线程数
  2. 超时工夫
  3. 最大线程数
  4. 回绝策略


而队列BlockingQueue因为是final类型,所以没有对外批改入口。但能够通过重写LinkedBlockingQueue并把capacity设置为非final。

二,联合配置核心实现动静调整

这里的配置核心应用Apollo, 通过监听配置核心变动,而后更新线程池配置。示例代码如下:

@Slf4j@Componentpublic class DynamicThreadPoolConfig {    /** 线程执行器 **/    private volatile ThreadPoolExecutor executor;    /** 外围线程数 **/    private Integer corePoolSize = 10;    /** 最大值线程数 **/    private Integer maximumPoolSize = 20;    /** 待执行工作的队列的长度 **/    private Integer workQueueSize = 1000;    /** 线程闲暇工夫 **/    private Long keepAliveTime = 1000L;    /** 线程名 **/    private String threadName;    private Config config = ConfigService.getConfig("lepu-activity-center");;    public DynamicThreadPoolConfig() {        init(config);    }    /** * 初始化 */    private void init(Config config) {        log.info("线程池初始化中..........");        if (executor == null) {            synchronized (DynamicThreadPoolConfig.class) {                if (executor == null) {                    String corePoolSizeProperty = config.getProperty("corePoolSize", corePoolSize.toString());                    log.info("批改前的外围线程池:{}",corePoolSizeProperty);                    String maximumPoolSizeProperty = config.getProperty("maximumPoolSize", maximumPoolSize.toString());                    String keepAliveTImeProperty = config.getProperty("keepAliveTime", keepAliveTime.toString());                    BlockingQueue<Runnable> workQueueProperty = new LinkedBlockingQueue<>(workQueueSize);                    executor = new ThreadPoolExecutor(Integer.valueOf(corePoolSizeProperty), Integer.valueOf(maximumPoolSizeProperty),                            Long.valueOf(keepAliveTImeProperty), TimeUnit.MILLISECONDS, workQueueProperty);                }            }        }    }    /**     * 监听到配置核心发生变化后,更新线程池配置     * @param changeEvent     */    @ApolloConfigChangeListener    public void onChange(ConfigChangeEvent changeEvent){        log.info("线程池参数配置发生变化,namespace:{}",changeEvent.getNamespace());            for(String key : changeEvent.changedKeys()){                ConfigChange change = changeEvent.getChange(key);                String newValue = change.getNewValue();                refreshThreadPool(key,newValue);            }    }    /**     * 更新线程池配置     * @param key     * @param newValue     */    private void refreshThreadPool(String key, String newValue) {        if (executor == null) {            return;        }        if (ParamsEnum.CORE_POOL_SIZE.getParam().equals(key)) {            executor.setCorePoolSize(Integer.valueOf(newValue));            log.info("批改外围线程数key={},value={}",key,newValue);        }        if (ParamsEnum.MAXIMUM_POOL_SIZE.getParam().equals(key)) {            executor.setMaximumPoolSize(Integer.valueOf(newValue));            log.info("批改最大线程数key={},value={}", key, newValue);        }        if (ParamsEnum.KEEP_ALIVE_TIME.getParam().equals(key)) {            executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);            log.info("批改线程闲暇工夫key={},value={}", key, newValue);        }    }    public ThreadPoolExecutor getExecutor() {        return executor;    }}

三,监控形式

批改线程池无关参数重要,但晓得何时批改同样重要,能够思考距离一段时间进行采集,通过日志输入,达到临界点后告警。
同样,ThreadPoolExecutor也提供获取线程池相干信息的API:

具体实现再看~