乐趣区

关于后端:填个坑再谈线程池动态调整那点事

你好呀,我是歪歪。

前几天和一个大佬聊天的时候他说本人最近在做线程池的监控,刚刚把动静调整的性能开发实现。

想起我之前写过这方面的文章,就找出来看了一下:《如何设置线程池参数?美团给出了一个让面试官虎躯一震的答复。》

而后给我指出了一个问题,我认真思考了一下,如同的确是留了一个坑。

为了更好的形容这个坑,我先给大家回顾一下线程池动静调整的几个关键点。

首先,为什么须要对线程池的参数进行动静调整呢?

因为随着业务的倒退,有可能呈现一个线程池开始够用,然而慢慢的被塞满的状况。

这样就会导致后续提交过去的工作被回绝。

没有一劳永逸的配置计划,相干的参数应该是随着零碎的浮动而浮动的。

所以,咱们能够对线程池进行多维度的监控,比方其中的一个维度就是队列应用度的监控。

当队列应用度超过 80% 的时候就发送预警短信,揭示相应的负责人提高警惕,能够到对应的治理后盾页面进行线程池参数的调整,防止出现工作被回绝的状况。

当前有人问你线程池的各个参数怎么配置的时候,你先把分为 IO 密集型和 CPU 密集型的这个八股文答案背完之后。

加上一个:然而,除了这些计划外,我在理论解决问题的时候用的是另外一套计划”。

而后把下面的话复述一遍。

那么线程池能够批改的参数有哪些呢?

失常来说是能够调整外围线程数和最大线程数的。

线程池也间接提供了其对应的 set 办法:

然而其实还有一个要害参数也是须要调整的,那就是队列的长度。

哦,对了,阐明一下,本文默认应用的队列是 LinkedBlockingQueue

其容量是 final 润饰的,也就是说指定之后就不能批改:

所以队列的长度调整起来略微要动点脑筋。

至于怎么绕过 final 这个限度,等下就说,先先给大家上个代码。

我个别是不会贴大段的代码的,然而这次为什么贴了呢?

因为我发现我之前的那篇文章就没有贴,之前写的代码也早就不晓得去哪里了。

所以,我又苦哈哈的敲了一遍 …

import cn.hutool.core.thread.NamedThreadFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadChangeDemo {public static void main(String[] args) {dynamicModifyExecutor();
    }

    private static ThreadPoolExecutor buildThreadPoolExecutor() {
        return new ThreadPoolExecutor(2,
                5,
                60,
                TimeUnit.SECONDS,
                new ResizeableCapacityLinkedBlockingQueue<>(10),
                new NamedThreadFactory("why 技术", false));
    }

    private static void dynamicModifyExecutor() {ThreadPoolExecutor executor = buildThreadPoolExecutor();
        for (int i = 0; i < 15; i++) {executor.execute(() -> {threadPoolStatus(executor,"创立工作");
                try {TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            });
        }
        threadPoolStatus(executor,"扭转之前");
        executor.setCorePoolSize(10);
        executor.setMaximumPoolSize(10);
        ResizeableCapacityLinkedBlockingQueue<Runnable> queue = (ResizeableCapacityLinkedBlockingQueue)executor.getQueue();
        queue.setCapacity(100);
        threadPoolStatus(executor,"扭转之后");
    }

    /**
     * 打印线程池状态
     *
     * @param executor
     * @param name
     */
    private static void threadPoolStatus(ThreadPoolExecutor executor, String name) {BlockingQueue<Runnable> queue = executor.getQueue();
        System.out.println(Thread.currentThread().getName() + "-" + name + "-:" +
                "外围线程数:" + executor.getCorePoolSize() +
                "流动线程数:" + executor.getActiveCount() +
                "最大线程数:" + executor.getMaximumPoolSize() +
                "线程池活跃度:" +
                divide(executor.getActiveCount(), executor.getMaximumPoolSize()) +
                "工作实现数:" + executor.getCompletedTaskCount() +
                "队列大小:" + (queue.size() + queue.remainingCapacity()) +
                "以后排队线程数:" + queue.size() +
                "队列残余大小:" + queue.remainingCapacity() +
                "队列应用度:" + divide(queue.size(), queue.size() + queue.remainingCapacity()));
    }

    private static String divide(int num1, int num2) {return String.format("%1.2f%%", Double.parseDouble(num1 + "") / Double.parseDouble(num2 +"") * 100);
    }
}

当你把这个代码粘过来之后,你会发现你没有 NamedThreadFactory 这个类。

没有关系,我用的是 hutool 工具包外面的,你要是没有,能够自定义一个,也能够在构造函数外面不传,这不是重点,问题不大。

问题大的是 ResizeableCapacityLinkedBlockingQueue 这个玩意。

它是怎么来的呢?

在之前的文章外面提到过:

就是把 LinkedBlockingQueue 粘贴一份进去,批改个名字,而后把 Capacity 参数的 final 修饰符去掉,并提供其对应的 get/set 办法。

感觉十分的简略,就能实现 capacity 参数的动静变更。

然而,我过后写的时候就感觉是有坑的。

毕竟这么简略的话,为什么官网要把它给设计为 final 呢?

坑在哪里?

对于 LinkedBlockingQueue 的工作原理就不在这里说了,都是属于必背八股文的内容。

次要说一下后面提到的场景中,如果我间接把 final 修饰符去掉,并提供其对应的 get/set 办法,这样的做法坑在哪里。

先说一下,如果没有非凡阐明,本文中的源码都是 JDK 8 版本。

咱们看一下这个 put 办法:

次要看这个被框起来的局部。

while 条件外面的 capacity 咱们晓得代表的是以后容量。

那么 count.get 是个什么玩意呢?

就是以后队列外面有多少个元素。

count.get == capacity 就是说队列曾经满了,而后执行 notFull.await() 把以后的这个 put 操作挂起来。

来个简略的例子验证一下:

申请一个长度为 5 的队列,而后在循环外面调用 put 办法,当队列满了之后,程序就阻塞住了。

通过 dump 以后线程能够晓得主线程的确是阻塞在了咱们后面剖析的中央:

所以,你想想。如果我把队列的 capacity 批改为了另外的值,这中央会感知到吗?

它感知不到啊,它在等着他人唤醒呢。

当初咱们把队列换成我批改后的队列验证一下。

上面验证程序的思路就是在一个子线程中执行队列的 put 操作,直到容量满了,被阻塞。

而后主线程把容量批改为 100。

下面的程序其实我想要达到的成果是当容量扩充之后,子线程不应该持续阻塞。

然而通过后面的剖析,咱们晓得这里并不会去唤醒子线程。

所以,输入后果是这样的:

子线程还是阻塞着,所以并没有达到预期。

所以这个时候咱们应该怎么办呢?

当然是去被动唤醒一下啦。

也就是批改一下 setCapacity 的逻辑:

public void setCapacity(int capacity) {
    final int oldCapacity = this.capacity;
    this.capacity = capacity;
    final int size = count.get();
    if (capacity > size && size >= oldCapacity) {signalNotFull();
    }
}

外围逻辑就是发现如果容量扩充了,那么就调用一下 signalNotFull 办法:

唤醒一下被 park 起来的线程。

如果看到这里你感觉你有点懵,不晓得 LinkedBlockingQueue 的这几个玩意是干啥的:

连忙去花一小时工夫补充一下 LinkedBlockingQueue 相干的知识点。这样玩意,面试也常常考的。

好了,咱们说回来。

批改完咱们自定义的 setCapacity 办法后,再次执行程序,就呈现了咱们预期的输入:

除了改 setCapacity 办法之外,我在写文章的时候不经意间还触发了另外一个答案:

在调用完 setCapacity 办法之后,再次调用 put 办法,也能失去预期的输入:

咱们察看 put 办法就能发现其实情理是一样的:

当调用完 setCapacity 办法之后,再次调用 put 办法,因为不满足标号为 ① 的代码的条件,所以就不会被阻塞。

于是能够顺利走到标号为 ② 的中央唤醒被阻塞的线程。

所以也就变相的达到了扭转队列长度,唤醒被阻塞的工作目标。

而究根结底,就是须要执行一次唤醒的操作。

那么那一种优雅一点呢?

那必定是第一种把逻辑封装在 setCapacity 办法外面操作起来更加优雅。

第二种形式,大多实用于那种“你也不晓得为什么,反正这样写程序就是失常了”的状况。

当初咱们晓得在线程池外面动静调整队列长度的坑是什么了。

那就是队列满了之后,调用 put 办法的线程就会被阻塞住,即便此时另外的线程调用了 setCapacity 办法,扭转了队列长度,如果没有线程再次触发 put 操作,被阻塞的线程也不会被唤醒。

是不是?

了不理解?

对不对?

这是不对的,敌人们。

看到后面内容,频频点头的敌人,要留神了。

这中央要开始转弯了。

开始转弯

线程池外面往队列外面增加对象的时候,用的是 offer 命令,并没有用 put 命令:

咱们看看 offer 命令在干啥事儿:

队列满了之后,间接返回 false,不会呈现阻塞的状况。

也就是说,线程池中基本就不会呈现我后面说的须要唤醒的状况,因为基本就没有阻塞中的线程。

在和大佬交换的过程中,他提到了一个 VariableLinkedBlockingQueue 的货色。

这个类位于 MQ 包外面,我后面提到的 setCapacity 办法的批改形式就是在它这里学来的:

同时,我的项目外面也用到了它的 put 办法:

所以,它是有可能呈现咱们后面剖析的状况,有须要被唤醒的线程。

然而,你想想,线程池外面并没有应用 put 办法,是不是就刚好防止这样的状况?

是的,的确是。

然而,不够谨严,如果晓得有问题了的话,为什么要留个坑在这里呢?

你学 MQ 的 VariableLinkedBlockingQueue 思考的周全一点,就算 put 办法阻塞的时候也能用,它不香吗?

写到这里其实如同除了让你相熟一下 LinkedBlockingQueue 外,仿佛是一个没啥卵用的知识点,

然而,我能让这个没有卵用的知识点起到大作用。

因为这其实是一个小细节。

假如我进来面试,在面试的时候提到动静调整办法的时候,在不经意间拿捏一下这个小细节,即便我没有真的落地过动静调整,然而我提到这样的一个小细节,就显得很实在。

面试官一听:很不错,有整体,有部分,应该是假不了。

在 VariableLinkedBlockingQueue 外面还有几处细节,拿 put 办法来说:

判断条件从 count.get() >= capacity 变成了 count.get() = capacity,目标是为了反对 capacity 由大变小的场景。

这样的中央还有好几处,就不一一列举了。

魔鬼,都在细节外面。

同学们得好好的拿捏一下。

JDK bug

其实原打算写到后面,就打算收尾了,因为我原本就只是想补充一下我之前没有留神到的细节。

然而,我手贱,跑到 JDK bug 列表外面去搜寻了一下 LinkedBlockingQueue,想看看还有没有什么其余的播种。

我是万万没想到,的确是有一点意外播种的。

首先是这一个 bug,它是在 2019-12-29 被提出来的:

https://bugs.openjdk.java.net…

看题目的意思也是想要给 LinkedBlockingQueue 赋能,能够让它的容量进行批改。

加上他上面的场景形容,应该也想要和线程池配合,找到队列的抓手,下钻到底层逻辑,联动监控零碎,拉通配置页面,打出一套动静适应的组合拳。

然而官网并没有驳回这个倡议。

回复外面说写 concurrent 包的这些哥们对于在并发类外面加货色是十分审慎的。他们感觉给 ThreadPoolExecutor 提供可动静批改的个性会带来或者曾经带来泛滥的 bug 了。

我了解就是简略一句话:倡议还是不错的,然而我不敢动。并发这块,牵一动员全身,不晓得会出些什么幺蛾子。

所以要实现这个性能,还是得本人想方法。

这里也就解释了为什么用 final 去润饰了队列的容量,毕竟把性能缩减一下,呈现 bug 的几率也少了很多。

第二个 bug 就有意思了,和咱们动静调整线程池的需要十分匹配:

https://bugs.openjdk.java.net…

这是一个 2020 年 3 月份提出的 bug,形容的是说在更新线程池的外围线程数的时候,会抛出一个回绝异样。

在 bug 形容的那局部他贴了很多代码,然而他给的代码写的很简单,不太好了解。

好在 Martin 大佬写了一个简化版,高深莫测,就好了解的多:

这段代码是干了个啥事儿呢,简略给大家汇报一下。

首先 main 办法外面有个循环,循环外面是调用了 test 办法,当 test 办法抛出异样的时候循环完结。

而后 test 办法外面是每次都搞一个新的线程池,接着往线程池外面提交队列长度加最大线程数个工作,最初敞开这个线程池。

同时还有另外一个线程把线程池的外围线程数从 1 批改为 5。

你能够关上后面提到的 bug 链接,把这段代码贴出来跑一下,十分的匪夷所思。

Martin 大佬他也认为这是一个 BUG.

说切实的,我跑了一下案例,我感觉这应该算是一个 bug,然而通过 Doug Lea 老爷子的亲自认证,他并不感觉这是一个 Bug。

次要是这个 bug 的确也有点超出我的认知,而且在链接中并没有明确的说具体起因是什么,导致我定位的工夫十分的长,甚至一度想要放弃。

然而最终定位到问题之后也是长叹一口:害,就这?没啥意思。

先看一下问题的体现是怎么样的:

下面的程序运行起来后,会抛出 RejectedExecutionException,也就是线程池拒绝执行该工作。

然而咱们后面剖析了,for 循环的次数是线程池刚好能包容的工作数:

按理来说不应该有问题啊?

这也就是发问的哥们纳闷的中央:

他说:我很费解啊,我提交的工作数量基本就不会超过 queueCapacity+maxThreads,为什么线程池还抛出了一个 RejectedExecutionException?而且这个问题十分的难以调试,因为在工作中增加任何模式的提早,这个问题都不会复现。

他的话中有话就是:这个问题十分的莫名其妙,然而我能够稳固复现,只是每次复现呈现问题的机会都十分的随机,我搞不定了,我感觉是一个 bug,你们帮忙看看吧。

我先不说我定位到的 Bug 的次要起因是啥吧。

先看看老爷子是怎么说的:

老爷子的观点简略来说就是四个字:

老爷子说他没有压服本人下面的这段程序应该被失常运行胜利。

意思就是他感觉抛出异样也是失常的事件。然而他没有说为什么。

一天之后,他又补了一句话:

我先给大家翻译一下:

他说当线程池的 submit 办法和 setCorePoolSize 或者 prestartAllCoreThreads 同时存在,且在不同的线程中运行的时候,它们之间会有竞争的关系。

在新线程处于预启动但还没齐全就绪承受队列中的工作的时候,会有一个短暂的窗口。在这个窗口中队列还是处于满的状态。

解决方案其实也很简略,比方能够在 setCorePoolSize 办法中把预启动线程的逻辑拿掉,然而如果是用 prestartAllCoreThreads 办法,那么还是会呈现后面的问题。

然而,不论是什么状况吧,我还是不确定这是一个须要被修复的问题。

怎么样,老爷子的话看起来是不是很懵?

是的,这段话我最开始的时候读了 10 遍,都是懵的,然而当我了解到这个问题呈现的起因之后,我还是不得不感叹一句:

还是老爷子总结到位,没有一句废话。

到底啥起因?

首先咱们看一下示例代码外面操作线程池的这两个中央:

批改外围线程数的是一个线程,即 CompletableFuture 的默认线程池 ForkJoinPool 中的一个线程。

往线程池外面提交工作是另外一个线程,即主线程。

老爷子的第一句话,说的就是这回事:

racing,就是开车,就是开快车,就是与 … 较量的意思。

这是一个多线程的场景,主线程和 ForkJoinPool 中的线程正在 race,即可能呈现谁先谁后的问题。

接着咱们看看 setCorePoolSize 办法干了啥事:

标号为 ① 的中央是计算新设置的外围线程数与原外围线程数之间的差值。

得出的差值,在标号为 ② 的中央进行应用。

也就是取差值和以后队列中正在排队的工作数中小的那一个。

比方以后的外围线程数配置就是 2,这个时候我要把它批改为 5。队列外面有 10 个工作在排队。

那么差值就是 5-2=3,即标号为 ① 处的 delta=3。

workQueue.size 就是正在排队的那 10 个工作。

也就是 Math.min(3,10),所以标号为 ② 处的 k=3。

含意为须要新增 3 个外围线程数,去帮忙把排队的工作给解决一下。

然而,你想新增 3 个就肯定是对的吗?

会不会在新增的过程中,队列中的工作曾经被解决完了,有可能基本就不须要 3 个这么多了?

所以,循环终止的条件除了老老实实的循环 k 次外,还有什么?

就是队列为空的时候:

同时,你去看代码下面的那一大段正文,你就晓得,其实它形容的和我是一回事。

好,咱们接着看 addWorker 外面,我想要让你看到中央:

在这个办法外面通过一系列判断后,会走入到 new Worker() 的逻辑,即工作线程。

而后把这个线程退出到 workers 外面。

workers 就是一个寄存工作线程的 HashSet 汇合:

你看我框起来的这两局代码,从 workers.add(w)t.start()

从退出到汇合到真正的启动,两头还有一些逻辑。

执行两头的逻辑的这一小段时间,就是老爷子说的“window”。

there’s a window while new threads are in the process of being prestarted but not yet taking tasks。

就是在新线程处于预启动,但尚未接受任务时,会有一个窗口。

这个窗口会产生啥事儿呢?

就是上面这句话:

the queue may remain (transiently) full。

队列有可能还是满的,然而只是临时的。

接下来咱们连起来看:

所以怎么了解下面被划线的这句话呢?

带入一个理论的场景,也就是后面的示例代码,只是调整一下参数:

这个线程池外围线程数是 1,最大线程数是 2,队列长度是 5,最多能包容的工作数是 7。

另外有一个线程在执行把外围线程池从 1 批改为 2 的操作。

假如咱们记线程池 submit 提交了 6 个工作,正在提交第 7 个工作的工夫点为 T1。

为什么是要强调这个工夫点呢?

因为当提交第 7 个工作的时候,就须要去启用非核心线程数了。

具体的源码在这里:

java.util.concurrent.ThreadPoolExecutor#execute

也就是说此时队列满了,workQueue.offer(command) 返回的是 fasle。因而要走到 addWorker(command, false) 办法中去了。

代码走到 1378 行这个工夫点,是 T1。

如果 1378 行的 addWorker 办法返回 false,阐明增加工作线程失败,抛出回绝异样。

后面示例程序抛出回绝异样就是因为这里返回了 fasle。

那么问题就变成了:为什么 1378 行中的 addWorker 执行后返回了 false 呢?

因为以后不满足这个条件了 wc >= (core ? corePoolSize : maximumPoolSize)

wc 就是以后线程池,正在工作的线程数。

把咱们后面的条件带进去,就是这样的 wc >=(false?2:2)

即 wc=2。

为什么会等于 2,不应该是 1 吗?

多的哪一个是哪里来的呢?

假相只有一个:恰好此时 setCorePoolSize 办法中的 addWorker 也执行到了 workers.add(w),导致 wc 从 1 变成了 2。

撞车了,所以抛出回绝异样。

那么为什么大多数状况下不会抛出异样呢?

因为从 workers.add(w)t.start() 这个工夫窗口,十分的短暂。

大多数状况下,setCorePoolSize 办法中的 addWorker 执行了后,就会了解从队列外面拿一个工作进去执行。

而这个状况下,另外的工作通过线程池提交进来后,发现队列还有位子,就放到队列外面去了,基本不会去执行 addWorker 办法。

情理,就是这样一个情理。

这个多线程问题的确是比拟难复现,我是怎么定位到的呢?

加日志。

源码外面怎么加日志呢?

我不仅搞了一个自定义队列,还把线程池的源码粘进去了一份,这样就能够加日志了:

另外,其实我这个定位计划也是很不谨严的。

调试多线程的时候,最好是不要应用 System.out.println,有坑!

场景

咱们再回头看看老爷子给出的计划:

其实它给了两个。

第一个是拿掉 setCorePoolSize 办法中的 addworker 的逻辑。

第二个是说原程序中,即提问者给的程序中,应用的是 prestartAllCoreThreads 办法,这个外面必须要调用 addWorker 办法,所以还是有肯定的几率呈现后面的问题。

然而,老爷子不明确为什么会这样写?

我想兴许他是没有想到什么适合的场景?

其实后面提到的这个 Bug,其实在动静调整的这个场景下,还是有可能会呈现的。

尽管,呈现的概率非常低,条件也十分刻薄。

然而,还是有几率呈现的。

万一呈现了,当共事都在抠脑壳的时候,你就说:这个嘛,我见过,是个 Bug。不肯定每次都呈现的。

这又是一个你能够拿捏的小细节。

然而,如果你在面试的时候遇到这个问题了,这属于一个傻逼问题。

毫无意义。

属于,面试官不晓得在哪看到了一个感觉很厉害的观点,肯定要展现出本人很厉害的样子。

然而他不晓得的是,这个题:

最初说一句

好了,看到了这里了,安顿一个点赞吧。写文章很累的,须要一点正反馈。

给各位读者敌人们磕一个了:

本文已收录自集体博客,欢送大家来玩:

https://www.whywhy.vip/

退出移动版