共计 8630 个字符,预计需要花费 22 分钟才能阅读完成。
你好呀,我是歪歪。
明天给大家分享一个通过扩大后的线程池,且我感觉扩大的思路十分好的。
释怀,我题目党来着,我感觉面试不会有人考这个玩意,然而工作中是有可能真的会遇到相应的场景。
为了引出这个线程池,我先给大家搞个场景,不便了解。
就拿上面这个表情包来做例子吧。
假如咱们有两个程序员,就叫贫贱和旺财吧。
下面这个表情包就是这两个程序员一天的工作写照,用程序来示意是这样的。
首先咱们搞一个对象,示意程序员过后正在做的事儿:
public class CoderDoSomeThing {
private String name;
private String doSomeThing;
public CoderDoSomeThing(String name, String doSomeThing) {
this.name = name;
this.doSomeThing = doSomeThing;
}
}
而后,用代码形容一下贫贱和旺财做的事儿:
public class NbThreadPoolTest {public static void main(String[] args) {CoderDoSomeThing rich1 = new CoderDoSomeThing("贫贱", "启动 Idea");
CoderDoSomeThing rich2 = new CoderDoSomeThing("贫贱", "搞数据库, 连 tomcat,crud 一顿输入");
CoderDoSomeThing rich3 = new CoderDoSomeThing("贫贱", "嘴角疯狂上扬");
CoderDoSomeThing rich4 = new CoderDoSomeThing("贫贱", "接口拜访报错");
CoderDoSomeThing rich5 = new CoderDoSomeThing("贫贱", "心态崩了,卸载 Idea");
CoderDoSomeThing www1 = new CoderDoSomeThing("旺财", "启动 Idea");
CoderDoSomeThing www2 = new CoderDoSomeThing("旺财", "搞数据库, 连 tomcat,crud 一顿输入");
CoderDoSomeThing www3 = new CoderDoSomeThing("旺财", "嘴角疯狂上扬");
CoderDoSomeThing www4 = new CoderDoSomeThing("旺财", "接口拜访报错");
CoderDoSomeThing www5 = new CoderDoSomeThing("旺财", "心态崩了,卸载 Idea");
}
}
简略解释一下变量的名称,表明我还是通过三思而行了的。
贫贱,就是有钱,所以变量名叫做 rich。
旺财,就是汪汪汪,所以变量名叫做 www。
你看我这个类的名称,NbThreadPoolTest,就晓得我是要用到线程池了。
理论状况中,贫贱和旺财两个人是能够各干各的事儿,互不烦扰的,也就是他们应该是各自的线程。
各干各的事儿,互不烦扰,这听起来如同是能够用线程池的。
所以,我把程序修改成了上面这个样子,把线程池用起来:
public class NbThreadPoolTest {public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(5);
List<CoderDoSomeThing> coderDoSomeThingList = new ArrayList<>();
coderDoSomeThingList.add(new CoderDoSomeThing("贫贱", "启动 Idea"));
coderDoSomeThingList.add(new CoderDoSomeThing("贫贱", "搞数据库, 连 tomcat,crud 一顿输入"));
coderDoSomeThingList.add(new CoderDoSomeThing("贫贱", "嘴角疯狂上扬"));
coderDoSomeThingList.add(new CoderDoSomeThing("贫贱", "接口拜访报错"));
coderDoSomeThingList.add(new CoderDoSomeThing("贫贱", "心态崩了,卸载 Idea"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "启动 Idea"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "搞数据库, 连 tomcat,crud 一顿输入"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "嘴角疯狂上扬"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "接口拜访报错"));
coderDoSomeThingList.add(new CoderDoSomeThing("旺财", "心态崩了,卸载 Idea"));
coderDoSomeThingList.forEach(coderDoSomeThing -> {executorService.execute(() -> {System.out.println(coderDoSomeThing.toString());
});
});
}
}
下面程序就是把贫贱和旺财两人做的事件都封装到了 list 外面,而后遍历这个 list 把外面的货色,即“做的事件”都扔到线程池外面去。
那么下面的程序执行后,一种可能的输入是这样的:
.jpg)
乍一看没问题,贫贱和旺财都在同时做事。
然而认真一看,每个人做的事件的程序不对了啊。
比方旺财看起来有点“精神分裂”,刚刚启动 Idea,嘴角就开始疯狂上扬了。
所以,到这里能够引出我想要的货色了。
我想要的是什么样的货色呢?
就是在保障贫贱和旺财在同时做事的状况下,还要保障他们的做的事件是有肯定程序的,即依照我投放到线程池外面的程序来执行。
用正式一点的话来形容是这样的:
我须要这样的一个线程池,它能够确保投递进来的工作按某个维度划分出工作,而后依照工作提交的程序顺次执行。这个线程池能够通过并行处理 (多个线程) 来进步吞吐量、又要保障肯定范畴内的工作依照严格的先后顺序来运行。
用我后面的例子,“按某个维度”就是人名,就是贫贱和旺财这个维度。
请问你怎么做?
一顿剖析
我会怎么做?
首先,我能够必定的是 JDK 的线程池是干不成这个事儿的。
因为从线程池原理的角度来说,并行和先后顺序它是不能同时满足的。
你明确我意思吧?
比方我要用线程池来保障先后顺序,那么它是这样的:
只有一个线程的线程池,它能够保障先后顺序。
然而这玩意有意义吗?
有点意义,因为它并不占用主线程,然而意义不大,毕竟阉割了重要的“多线程”能力。
所以咱们怎么在这个场景下把并行能力给提上去呢?
等等,咱们如同曾经有一个能够保障先后顺序的线程池了。
那么咱们把它横向扩容,多搞几个,不就具备了并行的能力了吗?
而后后面提到的“按某个维度”,如果有多个只有一个线程的线程池了,那我也能够依照这个维度去映射“维度”和“每个线程池”呀。
用程序来说就是这样的:
标号为 ① 的中央就是搞了多个只有一个线程的线程池,目标是为了保障生产的程序性。
标号为 ② 的中央就是通过一个 map 映射人名和线程池之间的关系。这里只是一个示意,比方咱们还能够用用户号取模的形式去定位对应的线程池,比方用户号为奇数的用一个线程池,为偶数的用另外一个线程。
所以并不是“某个维度”外面有多少个数据就要定义多少个只有一个线程的线程池,它们也是能够复用的,这个中央有个小弯要转过来。
标号为 ③ 的中央就是依据名称去 map 外面去对应的线程池。
从输入后果来看,也是没有故障的:
看到这里有的敌人就要说:你这不是舞弊吗?
不是说好一个线程池吗,你这都弄了多个了。
你要这个角度看问题的话,那就把路走窄了。
你要想着有一个大的线程池,外面又放了很多个只有一个线程的线程池。
这样格局就关上了。
我下面的写法是一个十分简陋的 Demo,次要是引出这个计划的思路。
我要介绍的,就是基于这个思路搞出的一个开源我的项目。
是一位大公司的大佬写的,我看了一下源码,赞不绝口:写的真他娘的好。
我先给你上一个应用案例和输入后果:
从案例看起来,应用形式也是十分的简略。
和 JDK 原生的用法的差别点就是我框起来的局部。
首先搞一个 KeyAffinityExecutor 的对象,来代替原生的线程池。
KeyAffinityExecutor 其中波及到一个单词,Affinity。
翻译过去有类同的含意:
所以 KeyAffinityExecutor 翻译过去就是 key 类同的线程池,当你明确它的性能和作用范畴后会感觉这个名字取的是针不戳。
接着是调用了 KeyAffinityExecutor 对象的 executeEx 办法,能够多传入一个参数,这个参数就是辨别某一类雷同工作的维度,比方我这里就给的是 name 字段。
从应用案例上看来,能够说封装的十分好,开箱即用。
KeyAffinityExecutor 用法
先说说这个类的用法吧。
其对应的开源我的项目地址是这个:
https://github.com/PhantomThi…
如果你想把它用起来,得引入上面这个 maven 地址:
<dependency>
<groupId>com.github.phantomthief</groupId>
<artifactId>more-lambdas</artifactId>
<version>0.1.55</version>
</dependency>
其外围代码是这个接口:
com.github.phantomthief.pool.KeyAffinityExecutor
这个接口外面有大量的正文,大家能够拉下来看一下。
我这里次要给大家看一下接口下面,作者写的正文,他是这样介绍本人的这个工具的。
这是一个按指定的 Key 亲和程序生产的线程池。
KeyAffinityExecutor 是一个非凡的工作线程池。
它能够确保投递进来的工作按 Key 雷同的工作按照提交程序顺次执行。在既要通过并行处理来进步吞吐量、又要保障肯定范畴内的工作依照严格的先后顺序来运行的场景下十分实用。
KeyAffinityExecutor 的内建实现形式,是将指定的 Key 映射到固定的单线程线程池上,它外部会保护多个(数量可配)这样的单线程线程池,来放弃肯定的工作并行度。
须要留神的是,此接口定义的 KeyAffinityExecutor,并不要求 Key 雷同的工作在雷同的线程上运行,只管实现类能够依照这种形式来实现,但它并非一个强制性的要求,因而在应用时也请不要依赖这样的假设。
很多人问,这和本人应用一个线程池的数组,并通过简略取模的形式来实现有什么区别?
事实上,大多数场景确实差别不大,然而当数据歪斜产生时,被散列到雷同地位的数据可能会因为热点歪斜数据被延误。
本实现在并发度较低时(阈值可设置),会筛选最闲置的线程池投递,尽最大可能隔离歪斜数据,缩小对其它数据带来的影响。
在作者的这段介绍外面,简略的阐明了该项目标利用场景和外部原理,和咱们后面剖析的差不多。
除此之外,还有两个须要特地留神的中央。
第一个中央是这里:
作为辨别的工作维度的对象,如果是自定义对象,那么肯定要重写其 hashCode、equals,以确保能够起到标识作用。
这一处的揭示就和 HashMap 的 key 如果是对象的话,应该要重写 hashCode、equals 办法的起因是一样一样的。
编程根底,只提一下,不多赘述。
第二个中央得好好说一下,属于他的核心思想。
他没有采纳简略取模的形式,因为在简略取模的场景上,数据是有可能产生歪斜的。
我集体是这样了解作者的思路的。
首先阐明一下取模的数据歪斜是咋回事,举个简略的例子:
下面的代码片段中,我退出了一个新角色“摸鱼巨匠”。同时给对象新增了一个 id 字段。
假如,咱们对 id 字段用 2 取余:
那么会呈现的状况就是巨匠和贫贱对应的 id 取余后果都是 1,它们将同用一个线程池。
很显著,因为巨匠的频繁操作,导致“摸鱼”变成了热点数据,从而导致编号为 0 的连接池发了歪斜,进而影响到了贫贱的失常工作。
而 KeyAffinityExecutor 的策略是什么样的呢?
它会筛选最闲置的线程池进行投递。
怎么了解呢?
还是下面的例子,如果咱们构建这样的线程池:
KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(3, 200, "MY-POOL-%d");
第一个参数 3,代表它会在这里线程池外面构建 3 个只有一个线程的线程池。
那么当用它来提交工作的时候,因为维度是 id 维度,咱们刚好三个 id,所以刚好把这个线程池占满:
这个时候是不存在数据歪斜的。
然而,如果我把后面构建线程池的参数从 3 变成 2 呢?
KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(2, 200, "MY-POOL-%d");
提交形式不变,外面加上对 id 为 1 和 2 的工作提早的逻辑,目标是察看 id 为 3 的数据怎么解决:
毋庸置疑,当提交执行巨匠的摸鱼操作的时候线程池必定不够用了,怎么办?
这个时候,依据作者形容“会筛选最闲置的线程池投递”。
我用这样的数据来阐明:
所以,当执行巨匠摸鱼操作的时候,会去从仅有的两个选项当选一个进去。
怎么选?
谁的并发度低,就选谁。
因为有延迟时间在工作外面,所以咱们能够察看到执行贫贱的线程的并发度是 5,而执行旺财的线程的并发度是 6。
因而执行巨匠的摸鱼操作的时候,会抉择并发度为 5 的线程进行解决。
这个场景下就呈现了数据歪斜。然而歪斜的前提产生了变动,变成了以后曾经没有可用线程了。
所以,作者说“尽最大可能隔离歪斜数据”。
这两个计划最大的差别就是对线程资源的利用水平,如果是单纯的取模,那么有可能呈现产生数据歪斜的时候,还有可用线程。
如果是 KeyAffinityExecutor 的形式,它能够保障产生数据歪斜的时候,线程池外面的线程肯定是曾经用完了。
而后,你再品一品这两个计划之间的轻微差别。
KeyAffinityExecutor 源码
源码不算多,一共就这几个类:
然而他的源码外面绝大部分都是 lambdas 的写法,基本上都是函数式编程,如果你对这方面比拟单薄的话那么看起来会比拟吃力一点。
如果你想把握其源码的话,我倡议是把我的项目拉到本地,而后从他的测试用例动手:
https://github.com/PhantomThi…
我给大家汇报一下我看到的一些要害的中央,不便大家本人去看的时候梳理思路。
首先必定是从它的构造方法动手,每一个入参的含意作者都标注的十分分明了:
假如咱们的构造函数是这样的,含意是构建 3 个只有一个线程的线程池,每个线程池的队列大小是 200:
KeyAffinityExecutor executorService =
KeyAffinityExecutor.newSerializingExecutor(3, 200, "WHY-POOL-%d");
首先咱们要找到构建“只有一个线程的线程池”的逻辑在哪。
就藏在构造函数外面的这个办法:
com.github.phantomthief.pool.KeyAffinityExecutorUtils#executor(java.lang.String, int)
在这里能够看到咱们始终提到的“只有一个线程的线程池”,队列的长度也能够指定:
该办法返回的是一个 Supplier 接口,等下就要用到。
接下来,咱们要找到“3”这个数字是体现在哪儿的呢?
就藏在构造函数的 build 办法外面,该办法最终会调用到这个办法来:
com.github.phantomthief.pool.impl.KeyAffinityImpl#KeyAffinityImpl
你到时候在这个中央打个断点,而后 Debug 看一眼,就十分明确了:
对于框起来的这部分的几个要害参数,我解释一下:
首先是 count 参数,就是咱们定义的 3。那么 range(0,3),就是 0,1,2。
而后是 supplier,这玩意就是后面咱们说的 executor 办法返回的 supplier 接口,能够看到外面封装的就是个线程池。
接着是外面有一个十分要害的操作:map(ValueRef::new)。
这个操作外面的 ValueRef 对象,很要害:
com.github.phantomthief.pool.impl.KeyAffinityImpl.ValueRef
要害的中央就是这个对象外面的 concurrency 变量。
还记得最后面说的“筛选最闲置的执行器(线程池)”这句话吗?
怎么判断是否闲置?
靠的就是 concurrency 变量。
其对应的代码在这:
com.github.phantomthief.pool.impl.KeyAffinityImpl#select
能走到断点的中央,阐明以后这个 key 是之前没有被映射过的,所以须要为其指定一个线程池。
而指定这个线程池的操作,就是循环这个 all 汇合,汇合外面装的就是 ValueRef 对象:
所以,comparingInt(ValueRef::concurrency) 办法就是在选以后所有的线程池,并发度最小的一个。
如果这个线程池素来没有用过或者目前没有工作在应用,那么并发度必然是 0,所有会被选出来。
如果所有线程池正在被应用,就会选 concurrency 这个值最低的线程池。
我这里只是给大家说一个大略的思路,如果要深刻理解的话,本人去翻源码去。
如果你十分理解 lambdas 的用法的话,你会感觉写的真的很优雅,看起来很难受。
如果你不理解 lambdas 的话 …
那你还不连忙去学?
另外我还发现了两个相熟的货色。
敌人们,请看这是什么:
这难道不就是线程池参数的动静调整吗?
第二个是这样的:
RabbitMQ 外面的动静调整我也写过啊,也是强调过这三处中央:
- 减少 {@link #setCapacity(int)} 和 {@link #getCapacity()}
- {@link #capacity} 判断边界从 == 改为 >=
- 局部 signal() 信号触发改为 signalAll()
另外作者还提到了 RabbitMQ 的版本外面会有导致 NPE 的 BUG 的问题。
这个就没细钻研了,有趣味的能够去比照一下代码,就应该能晓得问题出在哪里。
说说 Dubbo
为什么要说一下 Dubbo 呢?
因为我仿佛在 Dubbo 外面也发现了 KeyAffinityExecutor 的形迹。
为什么说是仿佛呢?
因为最终没有被合并到代码库外面去。
其对应的链接是这里:
https://github.com/apache/dub…
这一次提交一共提交了这么多文件:
外面是能够找到咱们相熟的货色:
其实思路都是一样的,然而你会发现即便是思路一样,然而两个不同的人写进去的代码构造还是很不一样的。
Dubbo 这里把代码的层次分的更加显著一点,比方定义了一个形象的 AbstractKeyAffinity 对象,而后在去实现了随机和最小并发两种计划。
在这些细节处上是有不同的。
然而这个代码的提供者最终没有用这些代码,而是拿出了一个代替计划:
https://github.com/apache/dub…
在这一次提交外面,他次要提交了这个类:
org.apache.dubbo.common.threadpool.serial.SerializingExecutor
这个类从名字上你就晓得了,它强调的是串行化。
带大家看看它的测试用例,你就晓得它是怎么用的了:
首先是它的构造方法入参是另外一个线程池。
而后提交工作的时候用 SerializingExecutor 的 execute 办法进行提交。
在工作外部,干的事就是从 map 外面取出 val 对应的 key,而后进行加 1 操作再放回去。
大家都晓得下面的这个操作在多线程的状况是线程不平安的,最终加进去的后果肯定是小于循环次数的。
然而,如果是单线程的状况下,那必定是没问题的。
那么怎么把线程池映射为单线程呢?
SerializingExecutor 干得就是这事。
而且它的原理特地简略,外围代码就几行。
首先它本人搞了个队列:
提交进来的工作都扔到队列外面去。
接下来再一个个的执行。
怎么保障一个个的执行呢?
办法有很多,它这里是搞了个 AtomicBoolean 对象来管制:
这样就实现了把多线程工作搞成串行化的场景。
只是让我奇怪的是 SerializingExecutor 这个类目前在 Dubbo 外面并没有应用场景。
然而,如果你时候你就要实现这样奇怪的性能,比方他人给你一个线程池,然而到你的流程外面出入某种思考,须要把工作串行化,这个时候必定是不能动他人的线程池的,那么你能够想起 Dubbo 这里有一个现成的,比拟优雅的、逼格较高的解决方案。
最初说一句
好了,看到了这里了,转发、在看、点赞轻易安顿一个吧,要是你都安顿上我也不介意。写文章很累的,须要一点正反馈。
给各位读者敌人们磕一个了:
本文已收录至集体博客,欢送大家来玩。
https://www.whywhy.vip/