大家好,这篇文章咱们来介绍下动静线程池框架(DynamicTp)的adapter模块,上篇文章也大略介绍过了,该模块次要是用来适配一些第三方组件的线程池治理,让第三方组件内置的线程池也能享受到动静参数调整,监控告警这些加强性能。
DynamicTp我的项目地址
目前500多star,感激你的star,欢送pr,业务之余给开源奉献一份力量
gitee地址:https://gitee.com/yanhom/dynamic-tp
github地址:https://github.com/lyh200/dynamic-tp
系列文章
美团动静线程池实际思路,开源了
动静线程池框架(DynamicTp),监控及源码解析篇
adapter已接入组件
adapter模块目前曾经接入了SpringBoot内置的三大WebServer(Tomcat、Jetty、Undertow)的线程池治理,实现层面也是和外围模块做理解耦,利用spring的事件机制进行告诉监听解决。
能够看出有两个监听器
- 当监听到配置核心配置变更时,在更新咱们我的项目外部线程池后会公布一个RefreshEvent事件,DtpWebRefreshListener监听到该事件后会去更新对应WebServer的线程池参数。
- 同样监控告警也是如此,在DtpMonitor中执行监控工作时会公布CollectEvent事件,DtpWebCollectListener监听到该事件后会去采集相应WebServer的线程池指标数据。
要想去治理第三方组件的线程池,首先必定要对这些组件有肯定的相熟度,理解整个申请的一个处理过程,找到对应解决申请的线程池,这些线程池不肯定是JUC包下的ThreadPoolExecutor类,也可能是组件本人实现的线程池,然而基本原理都差不多。
Tomcat、Jetty、Undertow这三个都是这样,他们并没有间接应用JUC提供的线程池实现,而是本人实现了一套,或者扩大了JUC的实现;翻源码找到相应的线程池后,而后看有没有裸露public办法供咱们调用获取,如果没有就须要思考通过反射来拿了。
Tomcat外部线程池的实现
- Tomcat外部线程池没有间接应用JUC下的ThreadPoolExecutor,而是抉择继承JUC下的Executor体系类,而后重写execute()等办法,不同版本有差别。
1.继承JUC原生ThreadPoolExecutor(9.0.50版本及以下),并覆写了一些办法,次要execute()和afterExecute()
2.继承JUC的AbstractExecutorService(9.0.51版本及以上),代码根本是拷贝JUC的ThreadPoolExecutor,也相应的微调了execute()办法
留神Tomcat实现的线程池类名称也叫ThreadPoolExecutor,名字跟JUC下的是一样的,Tomcat的ThreadPoolExecutor类execute()办法如下:
public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
能够看出他是先调用父类的execute()办法,而后捕捉RejectedExecutionException异样,再去判断如果工作队列类型是TaskQueue,则尝试将工作增加到工作队列中,如果增加失败,证实队列已满,而后再执行回绝策略,此处submittedCount是一个原子变量,记录提交到此线程池但未执行实现的工作数(次要在上面要提到的TaskQueue队列的offer()办法用),为什么要这样设计呢?持续往下看!
- Tomcat定义了阻塞队列TaskQueue继承自LinkedBlockingQueue,该队列次要重写了offer()办法。
@Override public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o); //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o); //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); }
能够看到他在入队之前做了几个判断,这里的parent就是所属的线程池对象
1.如果parent为null,间接调用父类offer办法入队
2.如果以后线程数等于最大线程数,则间接调用父类offer()办法入队
3.如果以后未执行的工作数量小于等于以后线程数,认真思考下,是不是阐明有闲暇的线程呢,那么间接调用父类offer()入队后就马上有线程去执行它
4.如果以后线程数小于最大线程数量,则间接返回false,而后回到JUC线程池的执行流程回忆下,是不是就去增加新线程去执行工作了呢
5.其余状况都间接入队
- 因为Tomcat线程池次要是来做IO工作的,做这所有的目标次要也是为了以最小代价的改变更好的反对IO密集型的场景,JUC自带的线程池次要是适宜于CPU密集型的场景,能够回忆一下JUC原生线程池ThreadPoolExecutor#execute()办法的执行流程
1.判断如果以后线程数小于外围线程池,则新建一个线程来解决提交的工作
2.如果以后线程数大于外围线程数且队列没满,则将工作放入工作队列期待执行
3.如果以后以后线程池数大于外围线程池,小于最大线程数,且工作队列已满,则创立新的线程执行提交的工作
4.如果以后线程数等于最大线程数,且队列已满,则回绝该工作
能够看出当以后线程数大于外围线程数时,JUC原生线程池首先是把工作放到队列里期待执行,而不是先创立线程执行。
如果Tomcat接管的申请数量大于外围线程数,申请就会被放到队列中,期待外围线程解决,这样会升高申请的总体处理速度,所以Tomcat并没有应用JUC原生线程池,利用TaskQueue的offer()办法奇妙的批改了JUC线程池的执行流程,改写后Tomcat线程池执行流程如下:
1.判断如果以后线程数小于外围线程池,则新建一个线程来解决提交的工作
2.如果以后以后线程池数大于外围线程池,小于最大线程数,则创立新的线程执行提交的工作
3.如果以后线程数等于最大线程数,则将工作放入工作队列期待执行
4.如果队列已满,则执行回绝策略
- Tomcat外围线程池有对应的获取办法,获取形式如下
public Executor doGetTp(WebServer webServer) { TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer; return tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor(); }
- 想要动静调整Tomcat线程池的线程参数,能够在引入DynamicTp依赖后,在配置文件中增加以下配置就行,参数名称也是和SpringBoot提供的Properties配置类参数雷同,配置文件残缺示例看我的项目readme介绍
spring: dynamic: tp: // 其余配置项 tomcatTp: # tomcat web server线程池配置 minSpare: 100 # 外围线程数 max: 400 # 最大线程数
Tomcat线程池就介绍到这里吧,通过以上的一些介绍想必大家对Tomcat线程池执行工作的流程都很分明了吧。
Jetty外部线程池的实现
- Jetty外部线程池,定义了一个继承自Executor的ThreadPool顶级接口,实现类有以下几个
- 外部次要应用QueuedThreadPool这个实现类,该线程池执行流程就不在具体解读了,感兴趣的能够本人去看源码,核心思想都差不多,围绕外围线程数、最大线程数、工作队列三个参数动手,跟Tocmat比对着来看,其实也挺简略的。
public void execute(Runnable job) { // Determine if we need to start a thread, use and idle thread or just queue this job int startThread; while (true) { // Get the atomic counts long counts = _counts.get(); // Get the number of threads started (might not yet be running) int threads = AtomicBiInteger.getHi(counts); if (threads == Integer.MIN_VALUE) throw new RejectedExecutionException(job.toString()); // Get the number of truly idle threads. This count is reduced by the // job queue size so that any threads that are idle but are about to take // a job from the queue are not counted. int idle = AtomicBiInteger.getLo(counts); // Start a thread if we have insufficient idle threads to meet demand // and we are not at max threads. startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0; // The job will be run by an idle thread when available if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1)) continue; break; } if (!_jobs.offer(job)) { // reverse our changes to _counts. if (addCounts(-startThread, 1 - startThread)) LOG.warn("{} rejected {}", this, job); throw new RejectedExecutionException(job.toString()); } if (LOG.isDebugEnabled()) LOG.debug("queue {} startThread={}", job, startThread); // Start a thread if one was needed while (startThread-- > 0) startThread(); }
- Jetty线程池有提供public的获取办法,获取形式如下
public Executor doGetTp(WebServer webServer) { JettyWebServer jettyWebServer = (JettyWebServer) webServer; return jettyWebServer.getServer().getThreadPool(); }
- 想要动静调整Jetty线程池的线程参数,能够在引入DynamicTp依赖后,在配置文件中增加以下配置就行,参数名称也是和SpringBoot提供的Properties配置类参数雷同,配置文件残缺示例看我的项目readme介绍
spring: dynamic: tp: // 其余配置项 jettyTp: # jetty web server线程池配置 min: 100 # 外围线程数 max: 400 # 最大线程数
Undertow外部线程池的实现
- Undertow因为其性能彪悍,轻量,当初用的还是挺多的,wildfly(前身Jboss)从8开始外部默认的WebServer用Undertow了,之前是Tomcat吧。理解Undertow的小伙伴应该晓得,他底层是基于XNIO框架(3.X之前)来做的,这也是Jboss开发的一款基于java nio的优良网络框架。但Undertow发表从3.0开始底层网络框架要切换成Netty了,官网给的起因是说起网络编程,Netty曾经是事实上规范,用Netty的益处远大于XNIO能提供的,所以让咱们期待3.0的公布吧,只惋惜三年前就发表了,至今也没动静,不晓得是夭折了还是咋的,说实话,改变也挺大的,看啥时候公布吧,以下的介绍是基于Undertow 2.x版本来的
- Undertow外部是定义了一个叫TaskPool的线程池顶级接口,该接口有如图所示的几个实现。其实这几个实现类都是采纳组合的形式,外部都保护一个JUC的Executor体系类或者保护Jboss提供的EnhancedQueueExecutor类(也继承JUC ExecutorService类),执行流程能够本人去剖析
- 具体的创立代码如下,依据内部是否传入,如果有传入则用内部传入的类,如果没有,依据参数设置外部创立一个,具体是用JUC的ThreadPoolExecutor还是Jboss的EnhancedQueueExecutor,依据配置参数抉择
- Undertow线程池没有提供public的获取办法,所以通过反射来获取,获取形式如下
public Executor doGetTp(WebServer webServer) { UndertowWebServer undertowWebServer = (UndertowWebServer) webServer; Field undertowField = ReflectionUtils.findField(UndertowWebServer.class, "undertow"); if (Objects.isNull(undertowField)) { return null; } ReflectionUtils.makeAccessible(undertowField); Undertow undertow = (Undertow) ReflectionUtils.getField(undertowField, undertowWebServer); if (Objects.isNull(undertow)) { return null; } return undertow.getWorker(); }
- 想要动静调整Undertow线程池的线程参数,能够在引入DynamicTp依赖后,在配置文件中增加以下配置就行,配置文件残缺示例看我的项目readme介绍
spring: dynamic: tp: // 其余配置项 undertowTp: # undertow web server线程池配置 coreWorkerThreads: 100 # worker外围线程数 maxWorkerThreads: 400 # worker最大线程数 workerKeepAlive: 60 # 闲暇线程超时工夫
总结
以上介绍了Tomcat、Jetty、Undertow三大WebServer内置线程池的一些状况,重点介绍了Tomcat的,篇幅无限,其余两个感兴趣能够本人剖析,原理都差不多。同时也介绍了基于DynamicTp怎么动静调整线程池的参数,当咱们做WebServer性能调优时,能动静调整参数真的是十分好用的。
再次欢送大家应用DynamicTp框架,一起欠缺我的项目。
下篇文章打算分享一个DynamicTp应用过程中因为Tomcat版本不统一导致的监控线程halt住的奇葩问题,通过一个问题来把握ScheduledExecutorService的原理,欢送大家继续关注。
分割我
欢送加我微信或者关注公众号交换,一起变强!
公众号:CodeFox
微信:yanhom1314