0 文章概述

动静线程池是指能够动静调节线程池某些参数,本文咱们联合Apollo和线程池实现一个动静线程池。

1 线程池根底

1.1 七个参数

咱们首先回顾Java线程池七大参数,这对后续设置线程池参数有帮忙。咱们查看ThreadPoolExecutor构造函数如下:

public class ThreadPoolExecutor extends AbstractExecutorService {    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||                maximumPoolSize <= 0 ||                maximumPoolSize < corePoolSize ||                keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.acc = System.getSecurityManager() == null ?                   null :                   AccessController.getContext();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }}

corePoolSize

线程池外围线程数,类比业务大厅开设的固定窗口。例如业务大厅开设2个固定窗口,那么这两个窗口不会敞开,全天都会进行业务办理

workQueue

存储已提交但尚未执行的工作,类比业务大厅等待区。例如业务大厅一开门进来很多顾客,2个固定窗口进行业务办理,其余顾客到等待区期待

maximumPoolSize

线程池能够包容同时执行最大线程数,类比业务大厅最大窗口数。例如业务大厅最大窗口数是5个,业务员看到2个固定窗口和等待区都满了,能够长期减少3个窗口

keepAliveTime

非核心线程数存活工夫。当业务不忙时方才新增的3个窗口须要敞开,闲暇工夫超过keepAliveTime闲暇会被敞开

unit

keepAliveTime存活工夫单位

threadFactory

线程工厂能够用来指定线程名

handler

线程池线程数已达到maximumPoolSize且队列已满时执行回绝策略。例如业务大厅5个窗口全副处于繁忙状态且等待区已满,业务员依据理论状况抉择回绝策略

1.2 四种回绝策略

(1) AbortPolicy

默认策略间接抛出RejectExecutionException阻止零碎失常运行

/** * AbortPolicy * * @author  * */public class AbortPolicyTest {    public static void main(String[] args) {        int coreSize = 1;        int maxSize = 2;        int queueSize = 1;        AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), abortPolicy);        for (int i = 0; i < 100; i++) {            executor.execute(new Runnable() {                @Override                public void run() {                    System.out.println(Thread.currentThread().getName() + " -> run");                }            });        }    }}

程序执行后果:

pool-1-thread-1 -> runpool-1-thread-2 -> runpool-1-thread-1 -> runException in thread "main" java.util.concurrent.RejectedExecutionException: Task com.xy.juc.threadpool.reject.AbortPolicyTest$1@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)    at com.xy.juc.threadpool.reject.AbortPolicyTest.main(AbortPolicyTest.java:21)

(2) CallerRunsPolicy

工作回退给调用者本人运行

/** * CallerRunsPolicy * * @author  * */public class CallerRunsPolicyTest {    public static void main(String[] args) {        int coreSize = 1;        int maxSize = 2;        int queueSize = 1;        CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), callerRunsPolicy);        for (int i = 0; i < 10; i++) {            executor.execute(new Runnable() {                @Override                public void run() {                    System.out.println(Thread.currentThread().getName() + " -> run");                }            });        }    }}

程序执行后果:

main -> runpool-1-thread-1 -> runpool-1-thread-2 -> runpool-1-thread-1 -> runmain -> runmain -> runpool-1-thread-2 -> runpool-1-thread-1 -> runmain -> runpool-1-thread-2 -> run

(3) DiscardOldestPolicy

摈弃队列中期待最久的工作不会抛出异样

/** * DiscardOldestPolicy * * @author  * */public class DiscardOldestPolicyTest {    public static void main(String[] args) {        int coreSize = 1;        int maxSize = 2;        int queueSize = 1;        DiscardOldestPolicy discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardOldestPolicy);        for (int i = 0; i < 10; i++) {            executor.execute(new Runnable() {                @Override                public void run() {                    System.out.println(Thread.currentThread().getName() + " -> run");                }            });        }    }}

程序执行后果:

pool-1-thread-1 -> runpool-1-thread-2 -> runpool-1-thread-1 -> run

(4) DiscardPolicy

间接抛弃工作不会抛出异样

/** * DiscardPolicy * * @author  * */public class DiscardPolicyTest {    public static void main(String[] args) {        int coreSize = 1;        int maxSize = 2;        int queueSize = 1;        DiscardPolicy discardPolicy = new ThreadPoolExecutor.DiscardPolicy();        ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardPolicy);        for (int i = 0; i < 10; i++) {            executor.execute(new Runnable() {                @Override                public void run() {                    System.out.println(Thread.currentThread().getName() + " -> run");                }            });        }    }}

程序执行后果:

pool-1-thread-1 -> runpool-1-thread-2 -> runpool-1-thread-1 -> run

1.3 批改参数

如果初始化线程池实现后,咱们是否能够批改线程池某些参数呢?答案是能够。咱们抉择线程池提供的四个批改办法进行源码剖析。

(1) setCorePoolSize

public class ThreadPoolExecutor extends AbstractExecutorService {    public void setCorePoolSize(int corePoolSize) {        if (corePoolSize < 0)            throw new IllegalArgumentException();        // 新外围线程数减去原外围线程数        int delta = corePoolSize - this.corePoolSize;        // 新外围线程数赋值        this.corePoolSize = corePoolSize;        // 如果以后线程数大于新外围线程数        if (workerCountOf(ctl.get()) > corePoolSize)            // 中断闲暇线程            interruptIdleWorkers();        // 如果须要新增线程则通过addWorker减少工作线程        else if (delta > 0) {            int k = Math.min(delta, workQueue.size());            while (k-- > 0 && addWorker(null, true)) {                if (workQueue.isEmpty())                    break;            }        }    }}

(2) setMaximumPoolSize

public class ThreadPoolExecutor extends AbstractExecutorService {    public void setMaximumPoolSize(int maximumPoolSize) {        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)            throw new IllegalArgumentException();        this.maximumPoolSize = maximumPoolSize;        // 如果以后线程数量大于新最大线程数量        if (workerCountOf(ctl.get()) > maximumPoolSize)            // 中断闲暇线程            interruptIdleWorkers();    }}

(3) setKeepAliveTime

public class ThreadPoolExecutor extends AbstractExecutorService {    public void setKeepAliveTime(long time, TimeUnit unit) {        if (time < 0)            throw new IllegalArgumentException();        if (time == 0 && allowsCoreThreadTimeOut())            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");        long keepAliveTime = unit.toNanos(time);        // 新超时工夫减去原超时工夫        long delta = keepAliveTime - this.keepAliveTime;        this.keepAliveTime = keepAliveTime;        // 如果新超时工夫小于原超时工夫        if (delta < 0)            // 中断闲暇线程            interruptIdleWorkers();    }}

(4) setRejectedExecutionHandler

public class ThreadPoolExecutor extends AbstractExecutorService {    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {        if (handler == null)            throw new NullPointerException();        // 设置回绝策略        this.handler = handler;    }}

当初咱们晓得线程池零碎上述调整参数的办法,但仅仅剖析到此是不够的,因为如果没有动静调整参数的办法,每次批改必须从新公布才能够失效,那么有没有办法不必公布就能够动静调整线程池参数呢?

2 Apollo配置核心

2.1 外围原理

Apollo是携程框架部门研发的分布式配置核心,可能集中化治理利用不同环境、不同集群的配置,配置批改后可能实时推送到利用端,并且具备标准的权限、流程治理等个性,实用于微服务配置管理场景。Apollo开源地址如下:

https://github.com/ctripcorp/apollo

咱们在应用配置核心时第一步用户在配置核心批改配置项,第二步配置核心告诉Apollo客户端有配置更新,第三步Apollo客户端从配置核心拉取最新配置,更新本地配置并告诉到利用,官网根底模型图如下:

配置核心配置项发生变化客户端如何感知呢?分为推和拉两种形式。推依赖客户端和服务端放弃了一个长连贯,产生数据变动时服务端推送信息给客户端,这就是长轮询机制。拉依赖客户端定时从配置核心服务端拉取利用最新配置,这是一个fallback机制。官网客户端设计图如下:

本文重点剖析配置更新推送形式,咱们首先看官网服务端设计图:

ConfigService模块提供配置的读取推送等性能,服务对象是Apollo客户端。AdminService模块提供配置的批改公布等性能,服务对象是Portal模块即治理界面。须要阐明Apollo并没有援用消息中间件,官网图中发送异步音讯是指ConfigService定时扫描异步音讯数据表:

音讯数据保留在MySQL音讯表:

CREATE TABLE `releasemessage` (  `Id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键',  `Message` varchar(1024) NOT NULL DEFAULT '' COMMENT '公布的音讯内容',  `DataChange_LastTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最初批改工夫',  PRIMARY KEY (`Id`),  KEY `DataChange_LastTime` (`DataChange_LastTime`),  KEY `IX_Message` (`Message`(191))) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='公布音讯'

2.2 实例剖析

2.2.1 服务端装置

服务端关键步骤是导入数据库和批改端口号,具体步骤请参看官方网站:

https://ctripcorp.github.io/apollo/#/zh/deployment/quick-start

启动胜利后拜访地址:

http://localhost:8070

输出用户名apollo、明码admin登录:

点击进入我创立myApp我的项目,咱们看到在DEV环境、default集群、application命名空间蕴含一个timeout配置项,100是这个配置项的值,上面咱们在应用程序读取这个配置项:

2.2.2 应用程序

(1) 引入依赖

<dependencies>    <dependency>    <groupId>com.ctrip.framework.apollo</groupId>    <artifactId>apollo-client</artifactId>    <version>1.7.0</version>    </dependency></dependencies>    

(2) 简略实例

public class GetApolloConfigTest extends BaseTest {    /**     * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080     *     * myApp+DEV+default+application     */    @Test    public void testGet() throws InterruptedException {        Config appConfig = ConfigService.getAppConfig();        while (true) {            String value = appConfig.getProperty("timeout", "200");            System.out.println("timeout=" + value);            TimeUnit.SECONDS.sleep(1);        }    }}

因为上述程序是通过while(true)一直获取配置项的值,所以程序输入后果如下:

timeout=100timeout=100timeout=100timeout=100timeout=100timeout=100

咱们当初把配置项的值改为200程序输入后果如下:

timeout=100timeout=100timeout=100timeout=100timeout=200timeout=200timeout=200

(3) 监听实例

生产环境咱们个别不必while(true)监听变动,而是通过注册监听器形式感知变动信息:

public class GetApolloConfigTest extends BaseTest {    /**     * 监听命名空间变动     *     * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080     *     * myApp+DEV+default+application     */    @Test    public void testListen() throws InterruptedException {        Config config = ConfigService.getConfig("application");        config.addChangeListener(new ConfigChangeListener() {            @Override            public void onChange(ConfigChangeEvent changeEvent) {                System.out.println("发生变化命名空间=" + changeEvent.getNamespace());                for (String key : changeEvent.changedKeys()) {                    ConfigChange change = changeEvent.getChange(key);                    System.out.println(String.format("发生变化key=%s,oldValue=%s,newValue=%s,changeType=%s", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()));                }            }        });        Thread.sleep(1000000L);    }}

咱们当初把timeout值从200改为300,程序输入后果:

发生变化命名空间=application发生变化key=timeout,oldValue=200,newValue=300,changeType=MODIFIED

3 动静线程池

当初咱们把线程池和Apollo联合起来构建动静线程池,具备了上述常识编写起来并不简单。首先咱们用默认值构建一个线程池,而后线程池会监听Apollo对于相干配置项,如果相干配置有变动则刷新相干参数。第一步在Apollo配置核心设置三个线程池参数(本文没有设置回绝策略):

第二步编写外围代码:

/** * 动静线程池工厂 * * @author  * */@Slf4j@Componentpublic class DynamicThreadPoolFactory {    private static final String NAME_SPACE = "threadpool-config";    /** 线程执行器 **/    private volatile ThreadPoolExecutor executor;    /** 外围线程数 **/    private Integer CORE_SIZE = 10;    /** 最大值线程数 **/    private Integer MAX_SIZE = 20;    /** 期待队列长度 **/    private Integer QUEUE_SIZE = 2000;    /** 线程存活工夫 **/    private Long KEEP_ALIVE_TIME = 1000L;    /** 线程名 **/    private String threadName;    public DynamicThreadPoolFactory() {        Config config = ConfigService.getConfig(NAME_SPACE);        init(config);        listen(config);    }    /**     * 初始化     */    private void init(Config config) {        if (executor == null) {            synchronized (DynamicThreadPoolFactory.class) {                if (executor == null) {                    String coreSize = config.getProperty(KeysEnum.CORE_SIZE.getNodeKey(), CORE_SIZE.toString());                    String maxSize = config.getProperty(KeysEnum.MAX_SIZE.getNodeKey(), MAX_SIZE.toString());                    String keepAliveTIme = config.getProperty(KeysEnum.KEEP_ALIVE_TIME.getNodeKey(), KEEP_ALIVE_TIME.toString());                    BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>(QUEUE_SIZE);                    executor = new ThreadPoolExecutor(Integer.valueOf(coreSize), Integer.valueOf(maxSize), Long.valueOf(keepAliveTIme), TimeUnit.MILLISECONDS, queueToUse, new NamedThreadFactory(threadName, true), new AbortPolicyDoReport(threadName));                }            }        }    }    /**     * 监听器     */    private void listen(Config config) {        config.addChangeListener(new ConfigChangeListener() {            @Override            public void onChange(ConfigChangeEvent changeEvent) {                log.info("命名空间发生变化={}", changeEvent.getNamespace());                for (String key : changeEvent.changedKeys()) {                    ConfigChange change = changeEvent.getChange(key);                    String newValue = change.getNewValue();                    refreshThreadPool(key, newValue);                    log.info("发生变化key={},oldValue={},newValue={},changeType={}", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType());                }            }        });    }    /**     * 刷新线程池     */    private void refreshThreadPool(String key, String newValue) {        if (executor == null) {            return;        }        if (KeysEnum.CORE_SIZE.getNodeKey().equals(key)) {            executor.setCorePoolSize(Integer.valueOf(newValue));            log.info("批改外围线程数key={},value={}", key, newValue);        }        if (KeysEnum.MAX_SIZE.getNodeKey().equals(key)) {            executor.setMaximumPoolSize(Integer.valueOf(newValue));            log.info("批改最大线程数key={},value={}", key, newValue);        }        if (KeysEnum.KEEP_ALIVE_TIME.getNodeKey().equals(key)) {            executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);            log.info("批改沉闷工夫key={},value={}", key, newValue);        }    }    public ThreadPoolExecutor getExecutor(String threadName) {        return executor;    }    enum KeysEnum {        CORE_SIZE("coreSize", "外围线程数"),        MAX_SIZE("maxSize", "最大线程数"),        KEEP_ALIVE_TIME("keepAliveTime", "线程沉闷工夫")        ;        private String nodeKey;        private String desc;        KeysEnum(String nodeKey, String desc) {            this.nodeKey = nodeKey;            this.desc = desc;        }        public String getNodeKey() {            return nodeKey;        }        public void setNodeKey(String nodeKey) {            this.nodeKey = nodeKey;        }        public String getDesc() {            return desc;        }        public void setDesc(String desc) {            this.desc = desc;        }    }}/** * 动静线程池执行器 * * @author  * */@Componentpublic class DynamicThreadExecutor {    @Resource    private DynamicThreadPoolFactory threadPoolFactory;    public void execute(String bizName, Runnable job) {        threadPoolFactory.getExecutor(bizName).execute(job);    }    public Future<?> sumbit(String bizName, Runnable job) {        return threadPoolFactory.getExecutor(bizName).submit(job);    }}

第三步运行测试用例并联合VisualVM察看线程数:

/** * 动静线程池测试 * * @author  * */public class DynamicThreadExecutorTest extends BaseTest {    @Resource    private DynamicThreadExecutor dynamicThreadExecutor;    /**     * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080     *     * myApp+DEV+default+thread-pool     */    @Test    public void testExecute() throws InterruptedException {        while (true) {            dynamicThreadExecutor.execute("bizName", new Runnable() {                @Override                public void run() {                    System.out.println("bizInfo");                }            });            TimeUnit.SECONDS.sleep(1);        }    }}

咱们在配置核心批改配置项把外围线程数设置为50,最大线程数设置为100:

通过VisualVM能够察看到线程数显著回升:

4 文章总结

本文咱们首先介绍了线程池基础知识,包含七大参数和四个回绝策略,随后咱们介绍了Apollo配置核心的原理和利用,最初咱们将线程池和配置核心相结合,实现了动静调整线程数的成果,心愿本文对大家有所帮忙。