共计 11750 个字符,预计需要花费 30 分钟才能阅读完成。
大家好,这篇文章咱们来聊下动静线程池开源我的项目(DynamicTp)的告诉告警模块。目前我的项目提供以下告诉告警性能,每一个告诉项都能够独立配置是否开启、告警阈值、告警间隔时间、平台等,具体代码请看 core 模块 notify 包。
1. 外围参数变更告诉
2. 线程池活跃度告警
3. 队列容量告警
4. 回绝策略告警
5. 工作执行超时告警
6. 工作排队超时告警
DynamicTp 我的项目地址
目前 700star,感激你的 star,欢送 pr,业务之余一起给开源奉献一份力量
gitee 地址 :https://gitee.com/yanhom/dynamic-tp
github 地址 :https://github.com/lyh200/dynamic-tp
系列文章
美团动静线程池实际思路,开源了
动静线程池框架(DynamicTp),监控及源码解析篇
动静线程池(DynamicTp),动静调整 Tomcat、Jetty、Undertow 线程池参数篇
线程池解读
上篇文章里大略讲到了 JUC 线程池的执行流程,咱们这里再认真回顾下,上图是 JUC 下线程池 ThreadPoolExecutor 类的继承体系。
顶级接口 Executor 提供了一种形式,解耦工作的提交和执行,只定义了一个 execute(Runnable command) 办法用来提交工作,至于具体任务怎么执行则交给他的实现者去自定义实现。
ExecutorService 接口继承 Executor,且扩大了生命周期治理的办法、返回 Futrue 的办法、批量提交工作的办法
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
AbstractExecutorService 抽象类继承 ExecutorService 接口,对 ExecutorService 相干办法提供了默认实现,用 RunnableFuture 的实现类 FutureTask 包装 Runnable 工作,交给 execute() 办法执行,而后能够从该 FutureTask 阻塞获取执行后果,并且对批量工作的提交做了编排
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
}
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
ThreadPoolExecutor 继承 AbstractExecutorService,采纳池化思维治理肯定数量的线程来调度执行提交的工作,且定义了一套线程池的生命周期状态,用一个 ctl 变量来同时保留以后池状态(高 3 位)和以后池线程数(低 29 位)。看过源码的小伙伴会发现,ThreadPoolExecutor 类里的办法大量有同时须要获取或更新池状态和池以后线程数的场景,放一个原子变量里,能够很好的保证数据的一致性以及代码的简洁性。
// 用此变量保留以后池状态(高 3 位)和以后线程数(低 29 位)private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 能够承受新工作提交,也会解决工作队列中的工作
// 后果:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 不承受新工作提交,但会解决工作队列中的工作
// 后果:000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不承受新工作,不执行队列中的工作,且会中断正在执行的工作
// 后果:001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 工作队列为空,workerCount = 0,线程池的状态在转换为 TIDYING 状态时,会执行钩子办法 terminated()
// 后果:010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 调用 terminated() 钩子办法后进入 TERMINATED 状态
// 后果:010 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 低 29 位变为 0,失去了线程池的状态
private static int runStateOf(int c) {return c & ~CAPACITY;}
// 高 3 位变为为 0,失去了线程池中的线程数
private static int workerCountOf(int c) {return c & CAPACITY;}
private static int ctlOf(int rs, int wc) {return rs | wc;}
外围入口 execute() 办法执行逻辑如下:
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
能够总结出如下次要执行流程,当然看上述代码会有一些异样分支判断,能够本人顺理加到下述执行主流程里
1. 判断线程池的状态,如果不是 RUNNING 状态,间接执行回绝策略
2. 如果以后线程数 < 外围线程池,则新建一个线程来解决提交的工作
3. 如果以后线程数 > 外围线程数且工作队列没满,则将工作放入工作队列期待执行
4. 如果 外围线程池 < 以后线程池数 < 最大线程数,且工作队列已满,则创立新的线程执行提交的工作
5. 如果以后线程数 > 最大线程数,且队列已满,则回绝该工作
addWorker() 办法逻辑
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {int c = ctl.get();
// 获取以后池状态
int rs = runStateOf(c);
// (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
// 1. 判断如果线程池状态 > SHUTDOWN,间接返回 false,否则 2
// 2. 如果线程池状态 = SHUTDOWN,并且 firstTask 不为 null 则间接返回 false,因为 SHUTDOWN 状态的线程池不能在承受新工作,否则 3
// 3. 如果线程池状态 = SHUTDOWN,并且 firstTask == null,此时如果工作队列为空,则间接返回 false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {int wc = workerCountOf(c);
// 1. 如果以后线程池线程数大于等于 CAPACITY(实践上的最大值 5 亿),则返回 fasle
// 2. 如果创立外围线程状况下以后池线程数 >= corePoolSize,则返回 false
// 3. 如果创立非核心线程状况下以后池线程数 >= maximumPoolSize,则返回 false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas 减少以后池线程数量,胜利则退出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// cas 减少以后池线程数量失败(多线程并发),则从新获取 ctl,计算出以后线程池状态,如果不等于上述计算的状态 rs,则阐明线程池状态产生了扭转,须要跳到外层循环从新进行状态判断,否则执行外部循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 至此阐明线程池状态校验通过,且减少池线程数量胜利,则创立一个 Worker 线程来执行工作
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 拜访 worker set 时须要获取 mainLock 全局锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 1. 以后池状态 < SHUTDOWN,也就是 RUNNING 状态,如果曾经 started,抛出异样
// 2. 以后池状态 = SHUTDOWN,且 firstTask == null,须要解决工作队列中的工作,如果曾经 started,抛出异样
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 刚创立线程增加到 workers 汇合中
workers.add(w);
int s = workers.size();
// 判断更新历史最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {mainLock.unlock();
}
if (workerAdded) {
// 启动新建线程
t.start();
workerStarted = true;
}
}
} finally {if (! workerStarted)
// 启动失败,workerCount--,workers 里移除该 worker
addWorkerFailed(w);
}
return workerStarted;
}
线程池中的线程并不是间接用的 Thread 类,而是定义了一个外部工作线程 Worker 类,实现了 AQS 以及 Runnable 接口,而后持有一个 Thread 类的援用及一个 firstTask(创立后第一个要执行的工作),每个 Worker 线程启动后会执行 run() 办法,该办法会调用执行外层 runWorker(Worker w) 办法
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 1. 如果 task 不为空,则作为该线程的第一个工作间接执行
// 2. 如果 task 为空,则通过 getTask() 办法从工作队列中获取工作执行
while (task != null || (task = getTask()) != null) {w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 线程池状态 >= STOP,则中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 理论执行工作前调用的钩子办法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 理论执行工作
task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {
// 工作执行后调用的钩子办法
afterExecute(task, thrown);
}
} finally {
// 工作置为 null,从新获取新工作,实现数 ++
task = null;
w.completedTasks++;
w.unlock();}
}
completedAbruptly = false;
} finally {
// 无工作可执行,执行 worker 销毁逻辑
processWorkerExit(w, completedAbruptly);
}
}
getTask() 办法逻辑
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?
for (;;) {int c = ctl.get();
int rs = runStateOf(c);
// 以下两种状况递加工作线程数量
// 1. rs >= STOP
// 2. rs == SHUTDOWN && workQueue.isEmpty()
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 容许外围线程超时 或者 以后线程数 > 外围线程数,有可能产生超时敞开
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc 什么状况 > maximumPoolSize,调用 setMaximumPoolSize() 办法将 maximumPoolSize 调小了,会产生这种状况,此时须要敞开多余线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 阻塞队列获取工作
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 产生中断,进行重试
timedOut = false;
}
}
}
以上内容比拟具体的介绍了 ThreadPoolExecutor 的继承体系,以及相干的外围源码,基于此,当初咱们来看 DynamicTp 提供的告警告诉能力。
外围参数变更告诉
对应配置核心的监听端监听到配置变更后,封装到 DtpProperties 中而后交由 DtpRegistry 类中的 refresh() 办法去做配置更新,同时告诉时会高亮显示有变更的字段
线程池活跃度告警
活跃度 = activeCount / maximumPoolSize
服务启动后会开启一个定时监控工作,每隔肯定工夫(可配置)去计算线程池的活跃度,达到配置的 threshold 阈值后会触发一次告警,告警距离内屡次触发不会发送告警告诉
队列容量告警
容量使用率 = queueSize / queueCapacity
服务启动后会开启一个定时监控工作,每隔肯定工夫去计算工作队列的使用率,达到配置的 threshold 阈值后会触发一次告警,告警距离内屡次触发不会发送告警告诉
回绝策略告警
/**
* Do sth before reject.
* @param executor ThreadPoolExecutor instance
*/
default void beforeReject(ThreadPoolExecutor executor) {if (executor instanceof DtpExecutor) {DtpExecutor dtpExecutor = (DtpExecutor) executor;
dtpExecutor.incRejectCount(1);
Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT);
AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable);
}
}
线程池线程数达到配置的最大线程数,且工作队列已满,再提交工作会触发回绝策略。DtpExecutor 线程池用到的 RejectedExecutionHandler 是通过动静代理包装过的,在执行具体的回绝策略之前会执行 RejectedAware 类 beforeReject() 办法,此办法会去做回绝数量累加(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警告诉(同时重置周期累加值为 0 及上次告警工夫为以后工夫),告警距离内屡次触发不会发送告警告诉
工作队列超时告警
重写 ThreadPoolExecutor 的 execute() 办法和 beforeExecute() 办法,如果配置了执行超时或排队超时值,则会用 DtpRunnable 包装工作,同时记录工作的提交工夫 submitTime,beforeExecute 依据以后工夫和 submitTime 的差值就能够计算到该工作在队列中的等待时间,而后判断如果差值大于配置的 queueTimeout 则累加排队超时工作数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警告诉(同时重置周期累加值为 0 及上次告警工夫为以后工夫),告警距离内屡次触发不会发送告警告诉
@Override
public void execute(Runnable command) {if (CollUtil.isNotEmpty(taskWrappers)) {for (TaskWrapper t : taskWrappers) {command = t.wrap(command);
}
}
if (runTimeout > 0 || queueTimeout > 0) {command = new DtpRunnable(command);
}
super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {if (!(r instanceof DtpRunnable)) {super.beforeExecute(t, r);
return;
}
DtpRunnable runnable = (DtpRunnable) r;
long currTime = System.currentTimeMillis();
if (runTimeout > 0) {runnable.setStartTime(currTime);
}
if (queueTimeout > 0) {long waitTime = currTime - runnable.getSubmitTime();
if (waitTime > queueTimeout) {queueTimeoutCount.incrementAndGet();
Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask);
}
}
super.beforeExecute(t, r);
}
工作执行超时告警
重写 ThreadPoolExecutor 的 afterExecute() 办法,依据以后工夫和 beforeExecute() 中设置的 startTime 的差值即可算出工作的理论执行工夫,而后判断如果差值大于配置的 runTimeout 则累加排队超时工作数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警告诉(同时重置周期累加值为 0 及上次告警工夫为以后工夫),告警距离内屡次触发不会发送告警告诉
@Override
protected void afterExecute(Runnable r, Throwable t) {if (runTimeout > 0) {DtpRunnable runnable = (DtpRunnable) r;
long runTime = System.currentTimeMillis() - runnable.getStartTime();
if (runTime > runTimeout) {runTimeoutCount.incrementAndGet();
Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);
AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask);
}
}
super.afterExecute(r, t);
}
告警告诉相干配置项
如果想应用告诉告警性能,配置文件必须要配置 platforms 字段,且能够配置多个平台,如钉钉、企微等;notifyItems 配置具体告警项,包含阈值、平台、告警距离等。
spring:
dynamic:
tp:
# 省略其余项
platforms: # 告诉平台
- platform: wechat
urlKey: 38a98-0c5c3b649c
receivers: test
- platform: ding
urlKey: f80db3e801d593604f4a08dcd6a
secret: SECb5444a6f375d5b9d21
receivers: 17811511815
executors: # 动静线程池配置,都有默认值,采纳默认值的能够不配置该项,缩小配置量
- threadPoolName: dtpExecutor1
executorType: common # 线程池类型 common、eager:实用于 io 密集型
corePoolSize: 2
maximumPoolSize: 4
queueCapacity: 200
queueType: VariableLinkedBlockingQueue # 工作队列,查看源码 QueueTypeEnum 枚举类
rejectedHandlerType: CallerRunsPolicy # 回绝策略,查看 RejectedTypeEnum 枚举类
keepAliveTime: 50
allowCoreThreadTimeOut: false
threadNamePrefix: dtp1 # 线程名前缀
waitForTasksToCompleteOnShutdown: false # 参考 spring 线程池设计
awaitTerminationSeconds: 5 # 单位(s)preStartAllCoreThreads: false # 是否预热外围线程,默认 false
runTimeout: 200 # 工作执行超时阈值,目前只做告警用,单位(ms)queueTimeout: 100 # 工作在队列期待超时阈值,目前只做告警用,单位(ms)taskWrapperNames: ["ttl"] # 工作包装器名称,集成 TaskWrapper 接口
notifyItems: # 报警项,不配置主动会按默认值配置(变更告诉、容量报警、活性报警、回绝报警、工作超时报警)- type: capacity # 报警项类型,查看源码 NotifyTypeEnum 枚举类
threshold: 80 # 报警阈值
platforms: [ding,wechat] # 可选配置,不配置默认拿下层 platforms 配置的所以平台
interval: 120 # 报警距离(单位:s)- type: change
- type: liveness
threshold: 80
interval: 120
- type: reject
threshold: 1
interval: 160
- type: run_timeout
threshold: 1
interval: 120
- type: queue_timeout
threshold: 1
interval: 140
总结
本文结尾介绍了线程池 ThreadPoolExecutor 的继承体系,外围流程的源码解读。而后介绍了 DynamicTp 提供的以上 6 种告警告诉能力,心愿通过监控 + 告警能够让咱们及时感知到咱们业务线程池的执行负载状况,第一工夫做出调整,避免事变的产生。
分割我
对我的项目有什么想法或者倡议,能够加我微信交换,或者创立 issues,一起欠缺我的项目
公众号:CodeFox
微信:yanhom1314