大家好,这篇文章咱们来聊下动静线程池开源我的项目(DynamicTp)的告诉告警模块。目前我的项目提供以下告诉告警性能,每一个告诉项都能够独立配置是否开启、告警阈值、告警间隔时间、平台等,具体代码请看core模块notify包。
1.外围参数变更告诉
2.线程池活跃度告警
3.队列容量告警
4.回绝策略告警
5.工作执行超时告警
6.工作排队超时告警
DynamicTp我的项目地址
目前700star,感激你的star,欢送pr,业务之余一起给开源奉献一份力量
gitee地址:https://gitee.com/yanhom/dynamic-tp
github地址:https://github.com/lyh200/dynamic-tp
系列文章
美团动静线程池实际思路,开源了
动静线程池框架(DynamicTp),监控及源码解析篇
动静线程池(DynamicTp),动静调整Tomcat、Jetty、Undertow线程池参数篇
线程池解读
上篇文章里大略讲到了JUC线程池的执行流程,咱们这里再认真回顾下,上图是JUC下线程池ThreadPoolExecutor类的继承体系。
顶级接口Executor提供了一种形式,解耦工作的提交和执行,只定义了一个execute(Runnable command)办法用来提交工作,至于具体任务怎么执行则交给他的实现者去自定义实现。
ExecutorService接口继承Executor,且扩大了生命周期治理的办法、返回Futrue的办法、批量提交工作的办法
void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
AbstractExecutorService抽象类继承ExecutorService接口,对ExecutorService相干办法提供了默认实现,用RunnableFuture的实现类FutureTask包装Runnable工作,交给execute()办法执行,而后能够从该FutureTask阻塞获取执行后果,并且对批量工作的提交做了编排
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value);} public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask;}
ThreadPoolExecutor继承AbstractExecutorService,采纳池化思维治理肯定数量的线程来调度执行提交的工作,且定义了一套线程池的生命周期状态,用一个ctl变量来同时保留以后池状态(高3位)和以后池线程数(低29位)。看过源码的小伙伴会发现,ThreadPoolExecutor类里的办法大量有同时须要获取或更新池状态和池以后线程数的场景,放一个原子变量里,能够很好的保证数据的一致性以及代码的简洁性。
// 用此变量保留以后池状态(高3位)和以后线程数(低29位) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 能够承受新工作提交,也会解决工作队列中的工作 // 后果:111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 不承受新工作提交,但会解决工作队列中的工作 // 后果:000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 不承受新工作,不执行队列中的工作,且会中断正在执行的工作 // 后果:001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 工作队列为空,workerCount = 0,线程池的状态在转换为TIDYING状态时,会执行钩子办法terminated() // 后果:010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 调用terminated()钩子办法后进入TERMINATED状态 // 后果:010 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl // 低29位变为0,失去了线程池的状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 高3位变为为0,失去了线程池中的线程数 private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
外围入口execute()办法执行逻辑如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);}
能够总结出如下次要执行流程,当然看上述代码会有一些异样分支判断,能够本人顺理加到下述执行主流程里
1.判断线程池的状态,如果不是RUNNING状态,间接执行回绝策略
2.如果以后线程数 < 外围线程池,则新建一个线程来解决提交的工作
3.如果以后线程数 > 外围线程数且工作队列没满,则将工作放入工作队列期待执行
4.如果 外围线程池 < 以后线程池数 < 最大线程数,且工作队列已满,则创立新的线程执行提交的工作
5.如果以后线程数 > 最大线程数,且队列已满,则回绝该工作
addWorker()办法逻辑
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取以后池状态 int rs = runStateOf(c); // (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())) // 1.判断如果线程池状态 > SHUTDOWN,间接返回false,否则2 // 2.如果线程池状态 = SHUTDOWN,并且firstTask不为null则间接返回false,因为SHUTDOWN状态的线程池不能在承受新工作,否则3 // 3.如果线程池状态 = SHUTDOWN,并且firstTask == null,此时如果工作队列为空,则间接返回false if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 1.如果以后线程池线程数大于等于CAPACITY(实践上的最大值5亿),则返回fasle // 2.如果创立外围线程状况下以后池线程数 >= corePoolSize,则返回false // 3.如果创立非核心线程状况下以后池线程数 >= maximumPoolSize,则返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // cas 减少以后池线程数量,胜利则退出循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // cas 减少以后池线程数量失败(多线程并发),则从新获取ctl,计算出以后线程池状态,如果不等于上述计算的状态rs,则阐明线程池状态产生了扭转,须要跳到外层循环从新进行状态判断,否则执行外部循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 至此阐明线程池状态校验通过,且减少池线程数量胜利,则创立一个Worker线程来执行工作 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 拜访worker set时须要获取mainLock全局锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 1.以后池状态 < SHUTDOWN,也就是RUNNING状态,如果曾经started,抛出异样 // 2.以后池状态 = SHUTDOWN,且firstTask == null,须要解决工作队列中的工作,如果曾经started,抛出异样 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 刚创立线程增加到workers汇合中 workers.add(w); int s = workers.size(); // 判断更新历史最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动新建线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 启动失败,workerCount--,workers里移除该worker addWorkerFailed(w); } return workerStarted; }
线程池中的线程并不是间接用的Thread类,而是定义了一个外部工作线程Worker类,实现了AQS以及Runnable接口,而后持有一个Thread类的援用及一个firstTask(创立后第一个要执行的工作),每个Worker线程启动后会执行run()办法,该办法会调用执行外层runWorker(Worker w)办法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 1.如果task不为空,则作为该线程的第一个工作间接执行 // 2.如果task为空,则通过getTask()办法从工作队列中获取工作执行 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 线程池状态 >= STOP,则中断线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 理论执行工作前调用的钩子办法 beforeExecute(wt, task); Throwable thrown = null; try { // 理论执行工作 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 工作执行后调用的钩子办法 afterExecute(task, thrown); } } finally { // 工作置为null,从新获取新工作,实现数++ task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 无工作可执行,执行worker销毁逻辑 processWorkerExit(w, completedAbruptly); }}
getTask()办法逻辑
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 以下两种状况递加工作线程数量 // 1. rs >= STOP // 2. rs == SHUTDOWN && workQueue.isEmpty() if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // 容许外围线程超时 或者 以后线程数 > 外围线程数,有可能产生超时敞开 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // wc什么状况 > maximumPoolSize,调用setMaximumPoolSize()办法将maximumPoolSize调小了,会产生这种状况,此时须要敞开多余线程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 阻塞队列获取工作 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 产生中断,进行重试 timedOut = false; } }}
以上内容比拟具体的介绍了ThreadPoolExecutor的继承体系,以及相干的外围源码,基于此,当初咱们来看DynamicTp提供的告警告诉能力。
外围参数变更告诉
对应配置核心的监听端监听到配置变更后,封装到DtpProperties中而后交由DtpRegistry类中的refresh()办法去做配置更新,同时告诉时会高亮显示有变更的字段
线程池活跃度告警
活跃度 = activeCount / maximumPoolSize
服务启动后会开启一个定时监控工作,每隔肯定工夫(可配置)去计算线程池的活跃度,达到配置的threshold阈值后会触发一次告警,告警距离内屡次触发不会发送告警告诉
队列容量告警
容量使用率 = queueSize / queueCapacity
服务启动后会开启一个定时监控工作,每隔肯定工夫去计算工作队列的使用率,达到配置的threshold阈值后会触发一次告警,告警距离内屡次触发不会发送告警告诉
回绝策略告警
/** * Do sth before reject. * @param executor ThreadPoolExecutor instance */default void beforeReject(ThreadPoolExecutor executor) { if (executor instanceof DtpExecutor) { DtpExecutor dtpExecutor = (DtpExecutor) executor; dtpExecutor.incRejectCount(1); Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT); AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable); }}
线程池线程数达到配置的最大线程数,且工作队列已满,再提交工作会触发回绝策略。DtpExecutor线程池用到的RejectedExecutionHandler是通过动静代理包装过的,在执行具体的回绝策略之前会执行RejectedAware类beforeReject()办法,此办法会去做回绝数量累加(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警告诉(同时重置周期累加值为0及上次告警工夫为以后工夫),告警距离内屡次触发不会发送告警告诉
工作队列超时告警
重写ThreadPoolExecutor的execute()办法和beforeExecute()办法,如果配置了执行超时或排队超时值,则会用DtpRunnable包装工作,同时记录工作的提交工夫submitTime,beforeExecute依据以后工夫和submitTime的差值就能够计算到该工作在队列中的等待时间,而后判断如果差值大于配置的queueTimeout则累加排队超时工作数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警告诉(同时重置周期累加值为0及上次告警工夫为以后工夫),告警距离内屡次触发不会发送告警告诉
@Overridepublic void execute(Runnable command) { if (CollUtil.isNotEmpty(taskWrappers)) { for (TaskWrapper t : taskWrappers) { command = t.wrap(command); } } if (runTimeout > 0 || queueTimeout > 0) { command = new DtpRunnable(command); } super.execute(command);}
@Overrideprotected void beforeExecute(Thread t, Runnable r) { if (!(r instanceof DtpRunnable)) { super.beforeExecute(t, r); return; } DtpRunnable runnable = (DtpRunnable) r; long currTime = System.currentTimeMillis(); if (runTimeout > 0) { runnable.setStartTime(currTime); } if (queueTimeout > 0) { long waitTime = currTime - runnable.getSubmitTime(); if (waitTime > queueTimeout) { queueTimeoutCount.incrementAndGet(); Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT); AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask); } } super.beforeExecute(t, r);}
工作执行超时告警
重写ThreadPoolExecutor的afterExecute()办法,依据以后工夫和beforeExecute()中设置的startTime的差值即可算出工作的理论执行工夫,而后判断如果差值大于配置的runTimeout则累加排队超时工作数量(总数值累加、周期值累加)。且判断如果周期累计值达到配置的阈值,则会触发一次告警告诉(同时重置周期累加值为0及上次告警工夫为以后工夫),告警距离内屡次触发不会发送告警告诉
@Overrideprotected void afterExecute(Runnable r, Throwable t) { if (runTimeout > 0) { DtpRunnable runnable = (DtpRunnable) r; long runTime = System.currentTimeMillis() - runnable.getStartTime(); if (runTime > runTimeout) { runTimeoutCount.incrementAndGet(); Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT); AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask); } } super.afterExecute(r, t);}
告警告诉相干配置项
如果想应用告诉告警性能,配置文件必须要配置platforms字段,且能够配置多个平台,如钉钉、企微等;notifyItems配置具体告警项,包含阈值、平台、告警距离等。
spring: dynamic: tp: # 省略其余项 platforms: # 告诉平台 - platform: wechat urlKey: 38a98-0c5c3b649c receivers: test - platform: ding urlKey: f80db3e801d593604f4a08dcd6a secret: SECb5444a6f375d5b9d21 receivers: 17811511815 executors: # 动静线程池配置,都有默认值,采纳默认值的能够不配置该项,缩小配置量 - threadPoolName: dtpExecutor1 executorType: common # 线程池类型common、eager:实用于io密集型 corePoolSize: 2 maximumPoolSize: 4 queueCapacity: 200 queueType: VariableLinkedBlockingQueue # 工作队列,查看源码QueueTypeEnum枚举类 rejectedHandlerType: CallerRunsPolicy # 回绝策略,查看RejectedTypeEnum枚举类 keepAliveTime: 50 allowCoreThreadTimeOut: false threadNamePrefix: dtp1 # 线程名前缀 waitForTasksToCompleteOnShutdown: false # 参考spring线程池设计 awaitTerminationSeconds: 5 # 单位(s) preStartAllCoreThreads: false # 是否预热外围线程,默认false runTimeout: 200 # 工作执行超时阈值,目前只做告警用,单位(ms) queueTimeout: 100 # 工作在队列期待超时阈值,目前只做告警用,单位(ms) taskWrapperNames: ["ttl"] # 工作包装器名称,集成TaskWrapper接口 notifyItems: # 报警项,不配置主动会按默认值配置(变更告诉、容量报警、活性报警、回绝报警、工作超时报警) - type: capacity # 报警项类型,查看源码 NotifyTypeEnum枚举类 threshold: 80 # 报警阈值 platforms: [ding,wechat] # 可选配置,不配置默认拿下层platforms配置的所以平台 interval: 120 # 报警距离(单位:s) - type: change - type: liveness threshold: 80 interval: 120 - type: reject threshold: 1 interval: 160 - type: run_timeout threshold: 1 interval: 120 - type: queue_timeout threshold: 1 interval: 140
总结
本文结尾介绍了线程池ThreadPoolExecutor的继承体系,外围流程的源码解读。而后介绍了DynamicTp提供的以上6种告警告诉能力,心愿通过监控+告警能够让咱们及时感知到咱们业务线程池的执行负载状况,第一工夫做出调整,避免事变的产生。
分割我
对我的项目有什么想法或者倡议,能够加我微信交换,或者创立issues,一起欠缺我的项目
公众号:CodeFox
微信:yanhom1314