0 文章概述
动静线程池是指能够动静调节线程池某些参数,本文咱们联合 Apollo 和线程池实现一个动静线程池。
1 线程池根底
1.1 七个参数
咱们首先回顾 Java 线程池七大参数,这对后续设置线程池参数有帮忙。咱们查看 ThreadPoolExecutor 构造函数如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
corePoolSize
线程池外围线程数,类比业务大厅开设的固定窗口。例如业务大厅开设 2 个固定窗口,那么这两个窗口不会敞开,全天都会进行业务办理
workQueue
存储已提交但尚未执行的工作,类比业务大厅等待区。例如业务大厅一开门进来很多顾客,2 个固定窗口进行业务办理,其余顾客到等待区期待
maximumPoolSize
线程池能够包容同时执行最大线程数,类比业务大厅最大窗口数。例如业务大厅最大窗口数是 5 个,业务员看到 2 个固定窗口和等待区都满了,能够长期减少 3 个窗口
keepAliveTime
非核心线程数存活工夫。当业务不忙时方才新增的 3 个窗口须要敞开,闲暇工夫超过 keepAliveTime 闲暇会被敞开
unit
keepAliveTime 存活工夫单位
threadFactory
线程工厂能够用来指定线程名
handler
线程池线程数已达到 maximumPoolSize 且队列已满时执行回绝策略。例如业务大厅 5 个窗口全副处于繁忙状态且等待区已满,业务员依据理论状况抉择回绝策略
1.2 四种回绝策略
(1) AbortPolicy
默认策略间接抛出 RejectExecutionException 阻止零碎失常运行
/**
* AbortPolicy
*
* @author
*
*/
public class AbortPolicyTest {public static void main(String[] args) {
int coreSize = 1;
int maxSize = 2;
int queueSize = 1;
AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), abortPolicy);
for (int i = 0; i < 100; i++) {executor.execute(new Runnable() {
@Override
public void run() {System.out.println(Thread.currentThread().getName() + "-> run");
}
});
}
}
}
程序执行后果:
pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.xy.juc.threadpool.reject.AbortPolicyTest$1@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.xy.juc.threadpool.reject.AbortPolicyTest.main(AbortPolicyTest.java:21)
(2) CallerRunsPolicy
工作回退给调用者本人运行
/**
* CallerRunsPolicy
*
* @author
*
*/
public class CallerRunsPolicyTest {public static void main(String[] args) {
int coreSize = 1;
int maxSize = 2;
int queueSize = 1;
CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), callerRunsPolicy);
for (int i = 0; i < 10; i++) {executor.execute(new Runnable() {
@Override
public void run() {System.out.println(Thread.currentThread().getName() + "-> run");
}
});
}
}
}
程序执行后果:
main -> run
pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
main -> run
main -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
main -> run
pool-1-thread-2 -> run
(3) DiscardOldestPolicy
摈弃队列中期待最久的工作不会抛出异样
/**
* DiscardOldestPolicy
*
* @author
*
*/
public class DiscardOldestPolicyTest {public static void main(String[] args) {
int coreSize = 1;
int maxSize = 2;
int queueSize = 1;
DiscardOldestPolicy discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardOldestPolicy);
for (int i = 0; i < 10; i++) {executor.execute(new Runnable() {
@Override
public void run() {System.out.println(Thread.currentThread().getName() + "-> run");
}
});
}
}
}
程序执行后果:
pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
(4) DiscardPolicy
间接抛弃工作不会抛出异样
/**
* DiscardPolicy
*
* @author
*
*/
public class DiscardPolicyTest {public static void main(String[] args) {
int coreSize = 1;
int maxSize = 2;
int queueSize = 1;
DiscardPolicy discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardPolicy);
for (int i = 0; i < 10; i++) {executor.execute(new Runnable() {
@Override
public void run() {System.out.println(Thread.currentThread().getName() + "-> run");
}
});
}
}
}
程序执行后果:
pool-1-thread-1 -> run
pool-1-thread-2 -> run
pool-1-thread-1 -> run
1.3 批改参数
如果初始化线程池实现后,咱们是否能够批改线程池某些参数呢?答案是能够。咱们抉择线程池提供的四个批改办法进行源码剖析。
(1) setCorePoolSize
public class ThreadPoolExecutor extends AbstractExecutorService {public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0)
throw new IllegalArgumentException();
// 新外围线程数减去原外围线程数
int delta = corePoolSize - this.corePoolSize;
// 新外围线程数赋值
this.corePoolSize = corePoolSize;
// 如果以后线程数大于新外围线程数
if (workerCountOf(ctl.get()) > corePoolSize)
// 中断闲暇线程
interruptIdleWorkers();
// 如果须要新增线程则通过 addWorker 减少工作线程
else if (delta > 0) {int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty())
break;
}
}
}
}
(2) setMaximumPoolSize
public class ThreadPoolExecutor extends AbstractExecutorService {public void setMaximumPoolSize(int maximumPoolSize) {if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
// 如果以后线程数量大于新最大线程数量
if (workerCountOf(ctl.get()) > maximumPoolSize)
// 中断闲暇线程
interruptIdleWorkers();}
}
(3) setKeepAliveTime
public class ThreadPoolExecutor extends AbstractExecutorService {public void setKeepAliveTime(long time, TimeUnit unit) {if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
// 新超时工夫减去原超时工夫
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
// 如果新超时工夫小于原超时工夫
if (delta < 0)
// 中断闲暇线程
interruptIdleWorkers();}
}
(4) setRejectedExecutionHandler
public class ThreadPoolExecutor extends AbstractExecutorService {public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {if (handler == null)
throw new NullPointerException();
// 设置回绝策略
this.handler = handler;
}
}
当初咱们晓得线程池零碎上述调整参数的办法,但仅仅剖析到此是不够的,因为如果没有动静调整参数的办法,每次批改必须从新公布才能够失效,那么有没有办法不必公布就能够动静调整线程池参数呢?
2 Apollo 配置核心
2.1 外围原理
Apollo 是携程框架部门研发的分布式配置核心,可能集中化治理利用不同环境、不同集群的配置,配置批改后可能实时推送到利用端,并且具备标准的权限、流程治理等个性,实用于微服务配置管理场景。Apollo 开源地址如下:
https://github.com/ctripcorp/apollo
咱们在应用配置核心时第一步用户在配置核心批改配置项,第二步配置核心告诉 Apollo 客户端有配置更新,第三步 Apollo 客户端从配置核心拉取最新配置,更新本地配置并告诉到利用,官网根底模型图如下:
配置核心配置项发生变化客户端如何感知呢?分为推和拉两种形式。推依赖客户端和服务端放弃了一个长连贯,产生数据变动时服务端推送信息给客户端,这就是长轮询机制。拉依赖客户端定时从配置核心服务端拉取利用最新配置,这是一个 fallback 机制。官网客户端设计图如下:
本文重点剖析配置更新推送形式,咱们首先看官网服务端设计图:
ConfigService 模块提供配置的读取推送等性能,服务对象是 Apollo 客户端。AdminService 模块提供配置的批改公布等性能,服务对象是 Portal 模块即治理界面。须要阐明 Apollo 并没有援用消息中间件,官网图中发送异步音讯是指 ConfigService 定时扫描异步音讯数据表:
音讯数据保留在 MySQL 音讯表:
CREATE TABLE `releasemessage` (`Id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键',
`Message` varchar(1024) NOT NULL DEFAULT ''COMMENT' 公布的音讯内容 ',
`DataChange_LastTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最初批改工夫',
PRIMARY KEY (`Id`),
KEY `DataChange_LastTime` (`DataChange_LastTime`),
KEY `IX_Message` (`Message`(191))
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='公布音讯'
2.2 实例剖析
2.2.1 服务端装置
服务端关键步骤是导入数据库和批改端口号,具体步骤请参看官方网站:
https://ctripcorp.github.io/apollo/#/zh/deployment/quick-start
启动胜利后拜访地址:
http://localhost:8070
输出用户名 apollo、明码 admin 登录:
点击进入我创立 myApp 我的项目,咱们看到在 DEV 环境、default 集群、application 命名空间蕴含一个 timeout 配置项,100 是这个配置项的值,上面咱们在应用程序读取这个配置项:
2.2.2 应用程序
(1) 引入依赖
<dependencies>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
(2) 简略实例
public class GetApolloConfigTest extends BaseTest {
/**
* -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080
*
* myApp+DEV+default+application
*/
@Test
public void testGet() throws InterruptedException {Config appConfig = ConfigService.getAppConfig();
while (true) {String value = appConfig.getProperty("timeout", "200");
System.out.println("timeout=" + value);
TimeUnit.SECONDS.sleep(1);
}
}
}
因为上述程序是通过 while(true) 一直获取配置项的值,所以程序输入后果如下:
timeout=100
timeout=100
timeout=100
timeout=100
timeout=100
timeout=100
咱们当初把配置项的值改为 200 程序输入后果如下:
timeout=100
timeout=100
timeout=100
timeout=100
timeout=200
timeout=200
timeout=200
(3) 监听实例
生产环境咱们个别不必 while(true) 监听变动,而是通过注册监听器形式感知变动信息:
public class GetApolloConfigTest extends BaseTest {
/**
* 监听命名空间变动
*
* -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080
*
* myApp+DEV+default+application
*/
@Test
public void testListen() throws InterruptedException {Config config = ConfigService.getConfig("application");
config.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {System.out.println("发生变化命名空间 =" + changeEvent.getNamespace());
for (String key : changeEvent.changedKeys()) {ConfigChange change = changeEvent.getChange(key);
System.out.println(String.format("发生变化 key=%s,oldValue=%s,newValue=%s,changeType=%s", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()));
}
}
});
Thread.sleep(1000000L);
}
}
咱们当初把 timeout 值从 200 改为 300,程序输入后果:
发生变化命名空间 =application
发生变化 key=timeout,oldValue=200,newValue=300,changeType=MODIFIED
3 动静线程池
当初咱们把线程池和 Apollo 联合起来构建动静线程池,具备了上述常识编写起来并不简单。首先咱们用默认值构建一个线程池,而后线程池会监听 Apollo 对于相干配置项,如果相干配置有变动则刷新相干参数。第一步在 Apollo 配置核心设置三个线程池参数(本文没有设置回绝策略):
第二步编写外围代码:
/**
* 动静线程池工厂
*
* @author
*
*/
@Slf4j
@Component
public class DynamicThreadPoolFactory {
private static final String NAME_SPACE = "threadpool-config";
/** 线程执行器 **/
private volatile ThreadPoolExecutor executor;
/** 外围线程数 **/
private Integer CORE_SIZE = 10;
/** 最大值线程数 **/
private Integer MAX_SIZE = 20;
/** 期待队列长度 **/
private Integer QUEUE_SIZE = 2000;
/** 线程存活工夫 **/
private Long KEEP_ALIVE_TIME = 1000L;
/** 线程名 **/
private String threadName;
public DynamicThreadPoolFactory() {Config config = ConfigService.getConfig(NAME_SPACE);
init(config);
listen(config);
}
/**
* 初始化
*/
private void init(Config config) {if (executor == null) {synchronized (DynamicThreadPoolFactory.class) {if (executor == null) {String coreSize = config.getProperty(KeysEnum.CORE_SIZE.getNodeKey(), CORE_SIZE.toString());
String maxSize = config.getProperty(KeysEnum.MAX_SIZE.getNodeKey(), MAX_SIZE.toString());
String keepAliveTIme = config.getProperty(KeysEnum.KEEP_ALIVE_TIME.getNodeKey(), KEEP_ALIVE_TIME.toString());
BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>(QUEUE_SIZE);
executor = new ThreadPoolExecutor(Integer.valueOf(coreSize), Integer.valueOf(maxSize), Long.valueOf(keepAliveTIme), TimeUnit.MILLISECONDS, queueToUse, new NamedThreadFactory(threadName, true), new AbortPolicyDoReport(threadName));
}
}
}
}
/**
* 监听器
*/
private void listen(Config config) {config.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {log.info("命名空间发生变化 ={}", changeEvent.getNamespace());
for (String key : changeEvent.changedKeys()) {ConfigChange change = changeEvent.getChange(key);
String newValue = change.getNewValue();
refreshThreadPool(key, newValue);
log.info("发生变化 key={},oldValue={},newValue={},changeType={}", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType());
}
}
});
}
/**
* 刷新线程池
*/
private void refreshThreadPool(String key, String newValue) {if (executor == null) {return;}
if (KeysEnum.CORE_SIZE.getNodeKey().equals(key)) {executor.setCorePoolSize(Integer.valueOf(newValue));
log.info("批改外围线程数 key={},value={}", key, newValue);
}
if (KeysEnum.MAX_SIZE.getNodeKey().equals(key)) {executor.setMaximumPoolSize(Integer.valueOf(newValue));
log.info("批改最大线程数 key={},value={}", key, newValue);
}
if (KeysEnum.KEEP_ALIVE_TIME.getNodeKey().equals(key)) {executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);
log.info("批改沉闷工夫 key={},value={}", key, newValue);
}
}
public ThreadPoolExecutor getExecutor(String threadName) {return executor;}
enum KeysEnum {CORE_SIZE("coreSize", "外围线程数"),
MAX_SIZE("maxSize", "最大线程数"),
KEEP_ALIVE_TIME("keepAliveTime", "线程沉闷工夫")
;
private String nodeKey;
private String desc;
KeysEnum(String nodeKey, String desc) {
this.nodeKey = nodeKey;
this.desc = desc;
}
public String getNodeKey() {return nodeKey;}
public void setNodeKey(String nodeKey) {this.nodeKey = nodeKey;}
public String getDesc() {return desc;}
public void setDesc(String desc) {this.desc = desc;}
}
}
/**
* 动静线程池执行器
*
* @author
*
*/
@Component
public class DynamicThreadExecutor {
@Resource
private DynamicThreadPoolFactory threadPoolFactory;
public void execute(String bizName, Runnable job) {threadPoolFactory.getExecutor(bizName).execute(job);
}
public Future<?> sumbit(String bizName, Runnable job) {return threadPoolFactory.getExecutor(bizName).submit(job);
}
}
第三步运行测试用例并联合 VisualVM 察看线程数:
/**
* 动静线程池测试
*
* @author
*
*/
public class DynamicThreadExecutorTest extends BaseTest {
@Resource
private DynamicThreadExecutor dynamicThreadExecutor;
/**
* -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080
*
* myApp+DEV+default+thread-pool
*/
@Test
public void testExecute() throws InterruptedException {while (true) {dynamicThreadExecutor.execute("bizName", new Runnable() {
@Override
public void run() {System.out.println("bizInfo");
}
});
TimeUnit.SECONDS.sleep(1);
}
}
}
咱们在配置核心批改配置项把外围线程数设置为 50,最大线程数设置为 100:
通过 VisualVM 能够察看到线程数显著回升:
4 文章总结
本文咱们首先介绍了线程池基础知识,包含七大参数和四个回绝策略,随后咱们介绍了 Apollo 配置核心的原理和利用,最初咱们将线程池和配置核心相结合,实现了动静调整线程数的成果,心愿本文对大家有所帮忙。