作者:京东批发 张宾
1.背景在后盾开发中,会常常用到线程池技术,对于线程池外围参数的配置很大水平上依附教训。然而,因为零碎运行过程中存在的不确定性,咱们很难一劳永逸地布局一个正当的线程池参数。在对线程池配置参数进行调整时,个别须要对服务进行重启,这样批改的老本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,零碎运行期间开发人员依据零碎运行状况对外围参数进行动静配置。
本文以公司DUCC配置平台作为服务配置核心,以批改线程池外围线程数、最大线程数为例,实现一个简略的动态化线程池。
2.代码实现以后我的项目中应用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又应用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员办法setCorePoolSize、setMaximumPoolSize能够在运行时设置外围线程数和最大线程数。
setCorePoolSize办法执行流程是:首先会笼罩之前构造函数设置的corePoolSize,而后,如果新的值比原始值要小,当多余的工作线程下次变成闲暇状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创立新的工作线程。流程图如下:
setMaximumPoolSize办法: 首先会笼罩之前构造函数设置的maximumPoolSize,而后,如果新的值比原来的值要小,当多余的工作线程下次变成闲暇状态的时候会被中断并销毁。
Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员办法setCorePoolSize、setMaximumPoolSize的调用。
基于以上源代码剖析,要实现一个简略的动静线程池须要以下几步:
(1)定义一个动静线程池类,继承ThreadPoolTaskExecutor,目标跟非动静配置的线程池类ThreadPoolTaskExecutor辨别开;
(2)定义和实现一个动静线程池配置定时刷的类,目标定时比照ducc配置的线程池数和本地利用中线程数是否统一,若不统一,则更新本地动静线程池线程池数;
(3)引入公司ducc配置平台相干jar包并创立一个动静线程池配置key;
(4)定义和实现一个利用启动后依据动静线程池Bean和从ducc配置平台拉取配置刷新利用中的线程数配置;
接下来代码一一实现:
(1)动静线程池类
/** * 动静线程池 * */public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {}(2)动静线程池配置定时刷新类
@Slf4jpublic class DynamicThreadPoolRefresh implements InitializingBean { /** * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor. */ private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>(); /** * @param threadPoolBeanName * @param threadPoolTaskExecutor */ public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) { log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor())); DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor); } @Override public void afterPropertiesSet() throws Exception { this.refresh(); //创立定时工作线程池 ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build()); //提早1秒执行,每个1分钟check一次 executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS); } private void refresh() { String dynamicThreadPool = ""; try { if (DTP_REGISTRY.isEmpty()) { log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty"); return; } dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL); if (StringUtils.isBlank(dynamicThreadPool)) { log.debug("DynamicThreadPool refresh dynamicThreadPool not config"); return; } log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool); List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() { }); if (CollectionUtils.isEmpty(threadPoolPropertiesList)) { log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool); return; } for (ThreadPoolProperties properties : threadPoolPropertiesList) { doRefresh(properties); } } catch (Exception e) { log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e); } } /** * @param properties */ private void doRefresh(ThreadPoolProperties properties) { if (StringUtils.isBlank(properties.getThreadPoolBeanName()) || properties.getCorePoolSize() < 1 || properties.getMaxPoolSize() < 1 || properties.getMaxPoolSize() < properties.getCorePoolSize()) { log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties); return; } DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName()); if (Objects.isNull(threadPoolTaskExecutor)) { log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName()); return; } ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor()); if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize()) && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) { log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName()); return; } if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) { threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize()); log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize()); } if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) { threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize()); log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize()); } ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor()); log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp); } private class RefreshThreadPoolConfig extends TimerTask { private RefreshThreadPoolConfig() { } @Override public void run() { DynamicThreadPoolRefresh.this.refresh(); } }}线程池配置类
...