共计 10425 个字符,预计需要花费 27 分钟才能阅读完成。
概述:
线程池 (Thread Pool) 是一种基于池化思维治理线程的工具,经常出现在多线程服务器中,如 MySQL。线程过多会带来额定的开销,其中包含创立销毁线程的开销、调度线程的开销等等,同时也升高了计算机的整体性能。线程池保护多个线程,期待监督管理者调配可并发执行的工作。这种做法,一方面防止了解决工作时创立销毁线程开销的代价,另一方面防止了线程数量收缩导致的过分调度问题,保障了对内核的充分利用。
线程池的劣势:
1. 升高系统资源耗费,通过重用已存在的线程,升高线程创立和销毁造成的耗费;
2. 进步零碎响应速度,当有工作达到时,通过复用已存在的线程,无需期待新线程的创立便能立刻执行;
3. 不便线程并发数的管控。因为线程若是无限度的创立,可能会导致内存占用过多而产生 OOM,并且会造成 cpu 适度切换(cpu 切换线程是有工夫老本的(须要放弃以后执行线程的现场,并复原要执行线程的现场))。
4. 提供更弱小的性能,延时定时线程池。
Java 线程池的应用形式有如下几种:
一、JDK 内置线程池有 4 种(JDK1.5 之后)
1. 固定线程数的线程池(newFixedThreadPool)
这种线程池外面的线程被设计成寄存固定数量的线程,具体线程数能够思考为 CPU 核数 *N(N 可大可小,取决于并发的线程数,计算机可用的硬件资源等)。能够通过上面的代码来获取以后计算机的 CPU 的核数。
int processors = Runtime.getRuntime().availableProcessors();
FixedThreadPool 是通过 java.util.concurrent.Executors 创立的 ThreadPoolExecutor 实例。这个实例会复用 固定数量的线程解决一个共享的无边界队列。任何工夫点,最多有 nThreads 个线程会处于活动状态执行工作。如果当所有线程都是流动时,有多的工作被提交过去,那么它会统一在队列中期待直到有线程可用。如果任何线程在执行过程中因为谬误而停止,新的线程会代替它的地位来执行后续的工作。所有线程都会统一存于线程池中,直到显式的执行 ExecutorService.shutdown() 敞开。因为阻塞队列应用了 LinkedBlockingQueue,是一个无界队列,因而永远不可能回绝工作。LinkedBlockingQueue 在入队列和出队列时应用的是不同的 Lock,意味着他们之间不存在互斥关系,在多 CPU 状况下,他们能正在在同一时刻既生产,又生产,真正做到并行。因而这种线程池不会回绝工作,而且不会开拓新的线程,也不会因为线程的长时间不应用而销毁线程。这是典型的生产者 —- 消费者问题,这种线程池适宜用在稳固且固定的并发场景,比方服务器。上面代码给出一个固定线程数的 DEMO,每个核绑定了 5 个线程。
案例:
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
// 获取计算机有几个核
int processors = Runtime.getRuntime().availableProcessors();
// 第一种线程池: 固定个数的线程池, 能够为每个 CPU 核绑定肯定数量的线程数
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(processors * 5)
for (int i = 0; i < 10; i++) {
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
fixedThreadPool.shutdown();
}
}
2. 缓存的线程池(newCachedThreadPool)
外围池大小为 0,线程池最大线程数目为最大整型,这意味着所有的工作一提交就会退出到阻塞队列中。当线程池中的线程 60s 没有执行工作就终止,阻塞队列为 SynchronousQueue。SynchronousQueue 的 take 操作须要 put 操作期待,put 操作须要 take 操作期待,否则会阻塞(线程池的阻塞队列不能存储,所以当目前线程解决繁忙状态时,所以开拓新的线程来解决申请),线程进入 wait set。总结下来:①这是一个能够有限扩充的线程池;②适宜解决执行工夫比拟小的工作;③线程闲暇工夫超过 60s 就会被杀死,所以长时间处于闲暇状态的时候,这种线程池简直不占用资源;④阻塞队列没有存储空间,只有申请到来,就必须找到一条闲暇线程去解决这个申请,找不到则在线程池新开拓一条线程。如果主线程提交工作的速度远远大于 CachedThreadPool 的处理速度,则 CachedThreadPool 会一直地创立新线程来执行工作,这样有可能会导致系统耗尽 CPU 和内存资源,所以在应用该线程池是,肯定要留神管制并发的工作数,否则创立大量的线程可能导致重大的性能问题。
案例:
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
// 缓存线程池,无下限
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
cachedThreadPool.shutdown();
}
}
3. 单个线程的线程池(newSingleThreadExecutor)
SingleThreadExecutor 是应用单个 worker 线程的 Executor,作为繁多 worker 线程的线程池,SingleThreadExecutor 把 corePool 和 maximumPoolSize 均被设置为 1,和 FixedThreadPool 一样应用的是无界队列 LinkedBlockingQueue, 所以带来的影响和 FixedThreadPool 一样。对于 newSingleThreadExecutor()来说,也是当线程运行时抛出异样的时候会有新的线程退出线程池替他实现接下来的工作。创立一个单线程化的线程池,它只会用惟一的工作线程来执行工作,保障所有工作依照指定程序 (FIFO, LIFO, 优先级) 执行,所以这个比拟适宜那些须要按序执行工作的场景。比方:一些不太重要的收尾,日志等工作能够放到单线程的线程中去执行。日志记录个别状况会比较慢(数据量大个别可能不写入数据库),程序执行会拖慢整个接口,沉积更多申请,还可能会对数据库造成影响(事务在开启中),所以日志记录齐全能够扔到单线程的线程中去,一条条的解决,也能够认为是一个单消费者的生产者消费者模式。
案例:
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
// 繁多线程池, 永远会保护存在一条线程
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int j = i;
singleThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + “:” + j);
}
});
}
singleThreadPool.shutdown();
}
}
4. 固定个数的线程池(newScheduledThreadPool)
相比于第一个固定个数的线程池弱小在 ①能够执行延时工作,②也能够执行带有返回值的工作
案例:
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException{
// 第四种线程池: 固定个数的线程池,相比于第一个固定个数的线程池 弱小在 ①能够执行延时工作,②也能够执行
// 有返回值的工作。
// scheduledThreadPool.submit(); 执行带有返回值的工作
// scheduledThreadPool.schedule() 用来执行延时工作.
// 固定个数的线程池,能够执行延时工作,也能够执行带有返回值的工作。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
FutureTask<String> ft = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(“hello”);
return Thread.currentThread().getName();
}
});
scheduledThreadPool.submit(ft);
// 通过 FutureTask 对象取得返回值.
String result = ft.get();
System.out.println(“result : ” + result);
// 执行延时工作
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ” : bobm!”);
}
}, 3L, TimeUnit.SECONDS);
}
}
二、Spring 线程池
多线程并发解决起来通常比拟麻烦,如果你应用 spring 容器来治理业务 bean,事件就好办了多了。spring 封装了 Java 的多线程的实现,你只须要关注于并发事物的流程以及一些并发负载量等个性,具体来说如何应用 spring 来解决并发事务。
线程池的启用有两种形式:配置文件或者注解
配置文件:新增 spring 的配置文件 spring-threadpool.xml
<?xml version=”1.0″ encoding=”UTF-8″?>
<beans xmlns=”http://www.springframework.org/schema/beans”
xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”xmlns:aop=”http://www.springframework.org/schema/aop”
xmlns:tx=”http://www.springframework.org/schema/tx”xmlns:jdbc=”http://www.springframework.org/schema/jdbc”
xmlns:context=”http://www.springframework.org/schema/context”
xmlns:mvc=”http://www.springframework.org/schema/mvc”xmlns:task=”http://www.springframework.org/schema/task”
xsi:schemaLocation=”http://www.springframework.org/schema/context http://www.springframework.or…
http://www.springframework.or… http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.or… http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
http://www.springframework.or… http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
http://www.springframework.or… http://www.springframework.org/schema/aop/spring-aop-3.1.xsd http://www.springframework.or… http://www.springframework.org/schema/task/spring-task.xsd”
default-autowire=”byName”>
<description> 流量音讯 spring 线程池配置 </description>
<!– 缺省的异步工作线程池 –>
<task:annotation-driven executor=”messageExecutor”/>
<task:executor id=”asyncExecutor” pool-size=”100-10000″ queue-capacity=”10″/>
<!– 解决 message 的线程池 –>
<task:executor id=”messageExecutor” pool-size=”15-50″ queue-capacity=”100″ keep-alive=”60″
rejection-policy=”CALLER_RUNS”/>
</beans>
注解:应用 @EnableAsync 标注启用 spring 线程池,@Async 将办法标注为异步办法,spring 扫描到后,执行该办法时,会另起新线程去执行,非常简单
package cn.leadeon.message.test;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
/**
- @author LiJunJun
- @since 2018/10/11
*/
@Component
@EnableAsync
public class AsyncTest {
@Async
public void test1() {
System.out.println(“ 异步执行 test1!!!”);
System.out.println(“ 线程 id:” + Thread.currentThread().getId());
System.out.println(“ 线程名称:” + Thread.currentThread().getName());
}
@Async
public void test2() {
System.out.println(“ 异步执行 test2!!!”);
System.out.println(“ 线程 id:” + Thread.currentThread().getId());
System.out.println(“ 线程名称:” + Thread.currentThread().getName());
}
@Async
public void test3() {
System.out.println(“ 异步执行 test3!!!”);
System.out.println(“ 线程 id:” + Thread.currentThread().getId());
System.out.println(“ 线程名称:” + Thread.currentThread().getName());
}
}
应用注解引入配置文件或者在本人的 spring 配置文件中 import 即可。
package cn.leadeon.message.test;
import org.springframework.context.annotation.ImportResource;
import org.springframework.scheduling.annotation.Async; importorg.springframework.stereotype.Component; /**
这里须要留神的是 @EnableAsync 注解与等价,两者只能应用其一,不然启动会报错。
三、本人定制线程池
首先自定义线程基类,自定义异样处理器, 自定义线程名(自定义的异样处理器用 excute 办法提交线程才无效,用 submit 办法提交的线程,异样会封装在 Future 对象中返还给调用者)。
class MyAppThread extends Thread{
private static final String DEFAULT_NAME=”TomDog”;
private static volatile boolean debugLifeCycle = true;
private static final AtomicInteger created=new AtomicInteger();
private static final AtomicInteger alive=new AtomicInteger();
private static final Logger log=Logger.getAnonymousLogger();
public MyAppThread(Runnable runnable){
this(runnable,DEFAULT_NAME);
}
public MyAppThread(Runnable runnable,String name){
super(runnable,name+”-“+created.incrementAndGet());
setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler(){
public void uncaughtException(Thread t, Throwable e) {
System.out.println(“UNCAUGHT in thread “+t.getName());
}
}
);
}
@Override
public void run(){
boolean debug=debugLifeCycle;
if (debug){
System.out.println(“Created “+ getName());
}
try {
alive.decrementAndGet();
super.run();
}finally {
alive.decrementAndGet();
if (debug){
System.out.println(“Exiting “+getName());
}
}
}
public static int getCreated() {
return created.get();
}
public static int getAlive() {
return alive.get();
}
public static void setDebug(boolean debugLifeCycle) {
MyAppThread.debugLifeCycle = debugLifeCycle;
}
}
有了线程基类接下来创立本人的线程工场,线程工场顾名思义就是生产线程的工场。
class MyThreadFactory implements ThreadFactory{
private final String threadName;
MyThreadFactory(String threadName){
this.threadName=threadName;
}
public Thread newThread(Runnable r) {
return new MyAppThread(r,threadName);
}
}
接下来创立自定义线程池,来实现记录工作的执行工夫,能够自定义重写 ThreadPoolExecutor 的 beforeExecute,afterExecute, terminated 办法来实现记录每次工作的执行工夫,工作的均匀执行工夫。
class MyTimingThreadPool extends ThreadPoolExecutor{
private final ThreadLocal<Long> startTime
=new ThreadLocal<Long>();
private final AtomicLong numTasks=new AtomicLong();
private final AtomicLong totalTime=new AtomicLong();
public MyTimingThreadPool(int corePoolSize, int maximumPoolSize, longkeepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread thread,Runnable runnable){
super.beforeExecute(thread,runnable);
System.out.println(String.format(“Thread %s:start %s”,thread,runnable));
startTime.set(System.nanoTime());
}
@Override
protected void afterExecute(Runnable runnable,Throwable throwable){
try {
long endTime=System.nanoTime();
long taskTime=endTime-startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
System.out.println(String.format(” Thread %s :end %s, time=%dns”,throwable,runnable,taskTime));
}finally {
super.afterExecute(runnable,throwable);
}
}
@Override
protected void terminated(){
try {
System.out.println(String.format(“Terminated : avg time=%dns”,totalTime.get()/numTasks.get()));
}finally {
super.terminated();
}
}
}
依据 ThreadPoolExecutor 的结构参数可知,咱们还须要为其指定工作队列,饱和策略,外围线程数,最大线程数,线程沉闷工夫,这些依据本人的业务场景来配置就能够了。
对于 cpu 密集型的工作,能够指定线程的个数为 cpu 的核数 +1 . 在 java 中能够通过 RunTime.getRunTime().availableProcessors()来获取。
接下来 new 一个自定义的线程池:
ThreadPoolExecutor executor=new MyTimingThreadPool(CPU_COUNT,maxPoolSize,keepAliveTime,timeUnit,blockingQueue,
threadFactory ,rejectedExecutionHandler);
这时本人定制的线程池就实现了。
其实咱们理论开发的零碎中,tomcat 有几个线程池,dubbo 有几个线程池,kafka client 中有几个线程池,zookeeper client 中存在着几个线程池 ……. 实现一个申请数据是怎么在多个线程之间传递的,这些都须要咱们对感兴趣的去理解革除,能力更明确的理解线程池的利用场景。
总结:线程池在开发中肯定会用到,如果能像 golang 一样,java 语言也有协程,兴许 java 程序员就少了一种包袱。看完这篇文章,你学会了么?