关于java:动态线程池DynamicTp动态调整TomcatJettyUndertow线程池参数篇

34次阅读

共计 7197 个字符,预计需要花费 18 分钟才能阅读完成。

大家好,这篇文章咱们来介绍下动静线程池框架(DynamicTp)的 adapter 模块,上篇文章也大略介绍过了,该模块次要是用来适配一些第三方组件的线程池治理,让第三方组件内置的线程池也能享受到动静参数调整,监控告警这些加强性能。


DynamicTp 我的项目地址

目前 500 多 star,感激你的 star,欢送 pr,业务之余给开源奉献一份力量

gitee 地址 :https://gitee.com/yanhom/dynamic-tp

github 地址 :https://github.com/lyh200/dynamic-tp


系列文章

美团动静线程池实际思路,开源了

动静线程池框架(DynamicTp),监控及源码解析篇


adapter 已接入组件

adapter 模块目前曾经接入了 SpringBoot 内置的三大 WebServer(Tomcat、Jetty、Undertow)的线程池治理,实现层面也是和外围模块做理解耦,利用 spring 的事件机制进行告诉监听解决。

能够看出有两个监听器

  1. 当监听到配置核心配置变更时,在更新咱们我的项目外部线程池后会公布一个 RefreshEvent 事件,DtpWebRefreshListener 监听到该事件后会去更新对应 WebServer 的线程池参数。
  2. 同样监控告警也是如此,在 DtpMonitor 中执行监控工作时会公布 CollectEvent 事件,DtpWebCollectListener 监听到该事件后会去采集相应 WebServer 的线程池指标数据。

要想去治理第三方组件的线程池,首先必定要对这些组件有肯定的相熟度,理解整个申请的一个处理过程,找到对应解决申请的线程池,这些线程池不肯定是 JUC 包下的 ThreadPoolExecutor 类,也可能是组件本人实现的线程池,然而基本原理都差不多。

Tomcat、Jetty、Undertow 这三个都是这样,他们并没有间接应用 JUC 提供的线程池实现,而是本人实现了一套,或者扩大了 JUC 的实现;翻源码找到相应的线程池后,而后看有没有裸露 public 办法供咱们调用获取,如果没有就须要思考通过反射来拿了。


Tomcat 外部线程池的实现

  • Tomcat 外部线程池没有间接应用 JUC 下的 ThreadPoolExecutor,而是抉择继承 JUC 下的 Executor 体系类,而后重写 execute() 等办法,不同版本有差别。

1. 继承 JUC 原生 ThreadPoolExecutor(9.0.50 版本及以下),并覆写了一些办法,次要 execute() 和 afterExecute()

2. 继承 JUC 的 AbstractExecutorService(9.0.51 版本及以上),代码根本是拷贝 JUC 的 ThreadPoolExecutor,也相应的微调了 execute() 办法

留神 Tomcat 实现的线程池类名称也叫 ThreadPoolExecutor,名字跟 JUC 下的是一样的,Tomcat 的 ThreadPoolExecutor 类 execute() 办法如下:

public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();
        try {super.execute(command);
        } catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();
                try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                    }
                } catch (InterruptedException x) {submittedCount.decrementAndGet();
                    throw new RejectedExecutionException(x);
                }
            } else {submittedCount.decrementAndGet();
                throw rx;
            }

        }
    }

能够看出他是先调用父类的 execute() 办法,而后捕捉 RejectedExecutionException 异样,再去判断如果工作队列类型是 TaskQueue,则尝试将工作增加到工作队列中,如果增加失败,证实队列已满,而后再执行回绝策略,此处 submittedCount 是一个原子变量,记录提交到此线程池但未执行实现的工作数(次要在上面要提到的 TaskQueue 队列的 offer() 办法用),为什么要这样设计呢?持续往下看!

  • Tomcat 定义了阻塞队列 TaskQueue 继承自 LinkedBlockingQueue,该队列次要重写了 offer() 办法。
 @Override
    public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

能够看到他在入队之前做了几个判断,这里的 parent 就是所属的线程池对象

1. 如果 parent 为 null,间接调用父类 offer 办法入队

2. 如果以后线程数等于最大线程数,则间接调用父类 offer() 办法入队

3. 如果以后未执行的工作数量小于等于以后线程数,认真思考下,是不是阐明有闲暇的线程呢,那么间接调用父类 offer() 入队后就马上有线程去执行它

4. 如果以后线程数小于最大线程数量,则间接返回 false,而后回到 JUC 线程池的执行流程回忆下,是不是就去增加新线程去执行工作了呢

5. 其余状况都间接入队

  • 因为 Tomcat 线程池次要是来做 IO 工作的,做这所有的目标次要也是为了以最小代价的改变更好的反对 IO 密集型的场景,JUC 自带的线程池次要是适宜于 CPU 密集型的场景,能够回忆一下 JUC 原生线程池 ThreadPoolExecutor#execute() 办法的执行流程

1. 判断如果以后线程数小于外围线程池,则新建一个线程来解决提交的工作

2. 如果以后线程数大于外围线程数且队列没满,则将工作放入工作队列期待执行

3. 如果以后以后线程池数大于外围线程池,小于最大线程数,且工作队列已满,则创立新的线程执行提交的工作

4. 如果以后线程数等于最大线程数,且队列已满,则回绝该工作

能够看出当以后线程数大于外围线程数时,JUC 原生线程池首先是把工作放到队列里期待执行,而不是先创立线程执行。

如果 Tomcat 接管的申请数量大于外围线程数,申请就会被放到队列中,期待外围线程解决,这样会升高申请的总体处理速度,所以 Tomcat 并没有应用 JUC 原生线程池,利用 TaskQueue 的 offer() 办法奇妙的批改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:

1. 判断如果以后线程数小于外围线程池,则新建一个线程来解决提交的工作

2. 如果以后以后线程池数大于外围线程池,小于最大线程数,则创立新的线程执行提交的工作

3. 如果以后线程数等于最大线程数,则将工作放入工作队列期待执行

4. 如果队列已满,则执行回绝策略

  • Tomcat 外围线程池有对应的获取办法,获取形式如下
    public Executor doGetTp(WebServer webServer) {TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer;
        return tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor();
    }
  • 想要动静调整 Tomcat 线程池的线程参数,能够在引入 DynamicTp 依赖后,在配置文件中增加以下配置就行,参数名称也是和 SpringBoot 提供的 Properties 配置类参数雷同,配置文件残缺示例看我的项目 readme 介绍
spring:
  dynamic:
    tp:
      // 其余配置项
      tomcatTp:       # tomcat web server 线程池配置
        minSpare: 100   # 外围线程数
        max: 400        # 最大线程数 

Tomcat 线程池就介绍到这里吧,通过以上的一些介绍想必大家对 Tomcat 线程池执行工作的流程都很分明了吧。


Jetty 外部线程池的实现

  • Jetty 外部线程池,定义了一个继承自 Executor 的 ThreadPool 顶级接口,实现类有以下几个

  • 外部次要应用 QueuedThreadPool 这个实现类,该线程池执行流程就不在具体解读了,感兴趣的能够本人去看源码,核心思想都差不多,围绕外围线程数、最大线程数、工作队列三个参数动手,跟 Tocmat 比对着来看,其实也挺简略的。
public void execute(Runnable job)
    {
        // Determine if we need to start a thread, use and idle thread or just queue this job
        int startThread;
        while (true)
        {
            // Get the atomic counts
            long counts = _counts.get();

            // Get the number of threads started (might not yet be running)
            int threads = AtomicBiInteger.getHi(counts);
            if (threads == Integer.MIN_VALUE)
                throw new RejectedExecutionException(job.toString());

            // Get the number of truly idle threads. This count is reduced by the
            // job queue size so that any threads that are idle but are about to take
            // a job from the queue are not counted.
            int idle = AtomicBiInteger.getLo(counts);

            // Start a thread if we have insufficient idle threads to meet demand
            // and we are not at max threads.
            startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;

            // The job will be run by an idle thread when available
            if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
                continue;

            break;
        }

        if (!_jobs.offer(job))
        {
            // reverse our changes to _counts.
            if (addCounts(-startThread, 1 - startThread))
                LOG.warn("{} rejected {}", this, job);
            throw new RejectedExecutionException(job.toString());
        }

        if (LOG.isDebugEnabled())
            LOG.debug("queue {} startThread={}", job, startThread);

        // Start a thread if one was needed
        while (startThread-- > 0)
            startThread();}
  • Jetty 线程池有提供 public 的获取办法,获取形式如下
    public Executor doGetTp(WebServer webServer) {JettyWebServer jettyWebServer = (JettyWebServer) webServer;
        return jettyWebServer.getServer().getThreadPool();
    }
  • 想要动静调整 Jetty 线程池的线程参数,能够在引入 DynamicTp 依赖后,在配置文件中增加以下配置就行,参数名称也是和 SpringBoot 提供的 Properties 配置类参数雷同,配置文件残缺示例看我的项目 readme 介绍
spring:
  dynamic:
    tp:
      // 其余配置项
      jettyTp:       # jetty web server 线程池配置
        min: 100     # 外围线程数
        max: 400     # 最大线程数 

Undertow 外部线程池的实现

  • Undertow 因为其性能彪悍,轻量,当初用的还是挺多的,wildfly(前身 Jboss)从 8 开始外部默认的 WebServer 用 Undertow 了,之前是 Tomcat 吧。理解 Undertow 的小伙伴应该晓得,他底层是基于 XNIO 框架(3.X 之前)来做的,这也是 Jboss 开发的一款基于 java nio 的优良网络框架。但 Undertow 发表从 3.0 开始底层网络框架要切换成 Netty 了,官网给的起因是说起网络编程,Netty 曾经是事实上规范,用 Netty 的益处远大于 XNIO 能提供的,所以让咱们期待 3.0 的公布吧,只惋惜三年前就发表了,至今也没动静,不晓得是夭折了还是咋的,说实话,改变也挺大的,看啥时候公布吧,以下的介绍是基于 Undertow 2.x 版本来的
  • Undertow 外部是定义了一个叫 TaskPool 的线程池顶级接口,该接口有如图所示的几个实现。其实这几个实现类都是采纳组合的形式,外部都保护一个 JUC 的 Executor 体系类或者保护 Jboss 提供的 EnhancedQueueExecutor 类(也继承 JUC ExecutorService 类),执行流程能够本人去剖析

  • 具体的创立代码如下,依据内部是否传入,如果有传入则用内部传入的类,如果没有,依据参数设置外部创立一个,具体是用 JUC 的 ThreadPoolExecutor 还是 Jboss 的 EnhancedQueueExecutor,依据配置参数抉择

  • Undertow 线程池没有提供 public 的获取办法,所以通过反射来获取,获取形式如下
    public Executor doGetTp(WebServer webServer) {UndertowWebServer undertowWebServer = (UndertowWebServer) webServer;
        Field undertowField = ReflectionUtils.findField(UndertowWebServer.class, "undertow");
        if (Objects.isNull(undertowField)) {return null;}
        ReflectionUtils.makeAccessible(undertowField);
        Undertow undertow = (Undertow) ReflectionUtils.getField(undertowField, undertowWebServer);
        if (Objects.isNull(undertow)) {return null;}
        return undertow.getWorker();}
  • 想要动静调整 Undertow 线程池的线程参数,能够在引入 DynamicTp 依赖后,在配置文件中增加以下配置就行,配置文件残缺示例看我的项目 readme 介绍
spring:
  dynamic:
    tp:
      // 其余配置项
      undertowTp:   # undertow web server 线程池配置
        coreWorkerThreads: 100  # worker 外围线程数
        maxWorkerThreads: 400   # worker 最大线程数
        workerKeepAlive: 60     # 闲暇线程超时工夫 

总结

以上介绍了 Tomcat、Jetty、Undertow 三大 WebServer 内置线程池的一些状况,重点介绍了 Tomcat 的,篇幅无限,其余两个感兴趣能够本人剖析,原理都差不多。同时也介绍了基于 DynamicTp 怎么动静调整线程池的参数,当咱们做 WebServer 性能调优时,能动静调整参数真的是十分好用的。

再次欢送大家应用 DynamicTp 框架,一起欠缺我的项目。

下篇文章打算分享一个 DynamicTp 应用过程中因为 Tomcat 版本不统一导致的监控线程 halt 住的奇葩问题,通过一个问题来把握 ScheduledExecutorService 的原理,欢送大家继续关注。


分割我

欢送加我微信或者关注公众号交换,一起变强!

公众号:CodeFox

微信:yanhom1314

正文完
 0