乐趣区

关于架构:利用DUCC配置平台实现一个动态化线程池

作者:京东批发 张宾

1. 背景

在后盾开发中,会常常用到线程池技术,对于线程池外围参数的配置很大水平上依附教训。然而,因为零碎运行过程中存在的不确定性,咱们很难一劳永逸地布局一个正当的线程池参数。在对线程池配置参数进行调整时,个别须要对服务进行重启,这样批改的老本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,零碎运行期间开发人员依据零碎运行状况对外围参数进行动静配置。

本文以公司 DUCC 配置平台作为服务配置核心,以批改线程池外围线程数、最大线程数为例,实现一个简略的动态化线程池。

2. 代码实现

以后我的项目中应用的是 Spring 框架提供的线程池类 ThreadPoolTaskExecutor,而 ThreadPoolTaskExecutor 底层又应用里了 JDK 中线程池类 ThreadPoolExecutor,线程池类 ThreadPoolExecutor 有两个成员办法 setCorePoolSize、setMaximumPoolSize 能够在运行时设置外围线程数和最大线程数。

setCorePoolSize 办法执行流程是:首先会笼罩之前构造函数设置的 corePoolSize,而后,如果新的值比原始值要小,当多余的工作线程下次变成闲暇状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创立新的工作线程。流程图如下:

setMaximumPoolSize 办法: 首先会笼罩之前构造函数设置的 maximumPoolSize,而后,如果新的值比原来的值要小,当多余的工作线程下次变成闲暇状态的时候会被中断并销毁。

Spring 框架提供的线程池类 ThreadPoolTaskExecutor,此类封装了对 ThreadPoolExecutor 有两个成员办法 setCorePoolSize、setMaximumPoolSize 的调用。

基于以上源代码剖析,要实现一个简略的动静线程池须要以下几步:

(1)定义一个动静线程池类,继承 ThreadPoolTaskExecutor,目标跟非动静配置的线程池类 ThreadPoolTaskExecutor 辨别开;

(2)定义和实现一个动静线程池配置定时刷的类,目标定时比照 ducc 配置的线程池数和本地利用中线程数是否统一,若不统一,则更新本地动静线程池线程池数;

(3)引入公司 ducc 配置平台相干 jar 包并创立一个动静线程池配置 key;

(4)定义和实现一个利用启动后依据动静线程池 Bean 和从 ducc 配置平台拉取配置刷新利用中的线程数配置;

接下来代码一一实现:

(1)动静线程池类

/**
 * 动静线程池
 *
 */
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {}

(2)动静线程池配置定时刷新类

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
    /**
     * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
     */
    private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();

    /**
     * @param threadPoolBeanName
     * @param threadPoolTaskExecutor
     */
    public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
        DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
    }

    @Override
    public void afterPropertiesSet() throws Exception {this.refresh();
        // 创立定时工作线程池
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
        // 提早 1 秒执行, 每个 1 分钟 check 一次
        executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
    }

    private void refresh() {
        String dynamicThreadPool = "";
        try {if (DTP_REGISTRY.isEmpty()) {log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
                return;
            }
            dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
            if (StringUtils.isBlank(dynamicThreadPool)) {log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
                return;
            }
            log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
            List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {});
            if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
                return;
            }
            for (ThreadPoolProperties properties : threadPoolPropertiesList) {doRefresh(properties);
            }
        } catch (Exception e) {log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
        }
    }

    /**
     * @param properties
     */
    private void doRefresh(ThreadPoolProperties properties) {if (StringUtils.isBlank(properties.getThreadPoolBeanName())
                || properties.getCorePoolSize() < 1
                || properties.getMaxPoolSize() < 1
                || properties.getMaxPoolSize() < properties.getCorePoolSize()) {log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
            return;
        }
        DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
        if (Objects.isNull(threadPoolTaskExecutor)) {log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
            return;
        }
        ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
                && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
            return;
        }
        if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
            log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
        }
        if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
            log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
        }
       
        ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
    }

    private class RefreshThreadPoolConfig extends TimerTask {private RefreshThreadPoolConfig() { }

        @Override
        public void run() {DynamicThreadPoolRefresh.this.refresh();
        }
    }

}

线程池配置类

@Data
public class ThreadPoolProperties {
    /**
     * 线程池名称
     */
    private String threadPoolBeanName;
    /**
     * 线程池外围线程数量
     */
    private int corePoolSize;
    /**
     * 线程池最大线程池数量
     */
    private int maxPoolSize;
}

(3) 引入公司 ducc 配置平台相干 jar 包并创立一个动静线程池配置 key

ducc 配置平台应用见:https://cf.jd.com/pages/viewp…

动静线程池配置 key:dynamic.thread.pool

配置 value:

[
  {
    "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",
    "corePoolSize": 32,
    "maxPoolSize": 128
  }
]

(4) 利用启动刷新利用本地动静线程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof DynamicThreadPoolTaskExecutor) {DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
        }
        return bean;
    }
}

3. 动静线程池利用

动静线程池 Bean 申明

    <!-- 一般线程池 -->
    <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">
        <!-- 外围线程数,默认为 -->
        <property name="corePoolSize" value="128"/>
        <!-- 最大线程数,默认为 Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="512"/>
        <!-- 队列最大长度,个别须要设置值 >=notifyScheduledMainExecutor.maxNum;默认为 Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 线程池保护线程所容许的闲暇工夫,默认为 60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 线程池对回绝工作(无线程可用)的解决策略,目前只反对 AbortPolicy、CallerRunsPolicy;默认为后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy: 间接抛出 java.util.concurrent.RejectedExecutionException 异样 -->
            <!-- CallerRunsPolicy: 主线程间接执行该工作,执行完之后尝试增加下一个工作到线程池中,能够无效升高向线程池内增加工作的速度 -->
            <!-- DiscardOldestPolicy: 摈弃旧的工作、暂不反对;会导致被抛弃的工作无奈再次被执行 -->
            <!-- DiscardPolicy: 摈弃当前任务、暂不反对;会导致被抛弃的工作无奈再次被执行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 动静线程池 -->
    <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
        <!-- 外围线程数,默认为 -->
        <property name="corePoolSize" value="32"/>
        <!-- 最大线程数,默认为 Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="128"/>
        <!-- 队列最大长度,个别须要设置值 >=notifyScheduledMainExecutor.maxNum;默认为 Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 线程池保护线程所容许的闲暇工夫,默认为 60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 线程池对回绝工作(无线程可用)的解决策略,目前只反对 AbortPolicy、CallerRunsPolicy;默认为后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy: 间接抛出 java.util.concurrent.RejectedExecutionException 异样 -->
            <!-- CallerRunsPolicy: 主线程间接执行该工作,执行完之后尝试增加下一个工作到线程池中,能够无效升高向线程池内增加工作的速度 -->
            <!-- DiscardOldestPolicy: 摈弃旧的工作、暂不反对;会导致被抛弃的工作无奈再次被执行 -->
            <!-- DiscardPolicy: 摈弃当前任务、暂不反对;会导致被抛弃的工作无奈再次被执行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 动静线程池刷新配置 -->
    <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
    <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>

业务类注入 Spring Bean 后,间接应用即可

 @Resource
 private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;

 
 Runnable asyncTask = ()->{...};
 CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4. 小结

本文从理论我的项目的业务痛点场景登程,并基于公司已有的 ducc 配置平台简略实现了线程池线程数量可配置。

退出移动版