共计 4584 个字符,预计需要花费 12 分钟才能阅读完成。
第一版地址:https://blog.csdn.net/wandou9527/article/details/107769598
第二版优化点:
- 线程实现提早创立,对内存更敌对
- 自定义线程池实现
java.util.concurrent.Executor
接口,更加符合规范 - submit 办法改为 execute 办法,语义更贴切
- 退出对工作(command)的非空判断
优化后又离完满线程池近了一步。
如果想玩转 Java 的多线程与高并发,线程池是你永远也绕不过的山。既然绕不过,咱们就啃他,吃透线程池,玩转高并发。
浏览 Jdk 线程池源码发现,Jdk 里的线程池实现的十分欠缺,有很多简单的逻辑解决,所以造成代码较长,而且代码格局也不标准(ps. 请原谅我指点江山,人家可能有人家的理论起因),eg. if 后没有大括号;很多变量命名都是单字母,比方 c、w 等。。。
本文,精简了线程池的一些简单逻辑,从骨干性能登程,实现骨干性能,我置信更有助于咱们了解线程池,而后再一步步深刻。
Jdk 里的线程池
次要属性
private volatile int corePoolSize; // 外围线程数
private volatile int maximumPoolSize; // 最大线程数
private volatile long keepAliveTime; // 存活工夫
private final BlockingQueue<Runnable> workQueue; // 工作期待队列
private volatile ThreadFactory threadFactory; // 线程工厂
private volatile RejectedExecutionHandler handler; // 回绝策略
上面介绍一下线程池执行工作的流程,了解各个属性的意义。当一个线程池初始化,向线程池提交工作,线程池新建线程执行工作,随着线程创立,线程数逐步增多,当达到 corePoolSize
线程池将不再新建线程,而是将工作放入工作期待队列 workQueue
。再继续向线程池提交工作,当期待队列满了,这时会持续新建线程,直到达到最大线程数 maximumPoolSize
,如果还持续有工作到来,线程池无奈解决,这时就启动回绝策略。
这个过程咱们能够以生存中的例子比喻一下。大抵咱们把线程池了解为理发店。那么流程就是:来了顾客开始理发,比方只有 4 个理发师 4 个座位,相当于外围线程。那么来了过多的顾客,理发师忙不过来就会先让你去等待区稍等排队期待,后面有理完发的会叫你,相当于期待队列。期待区满了呢?事实中理发店必定不会回绝顾客的啊,他可能让你先在里面等。但如果期待区每天都爆满,那么老板可能会思考扩充店面,裁减理发师团队了。所以,这只是个大抵的比喻。
自定义手写线程池
废话不说,咱们上代码。
package com.wandou.demo.thread.post.threapool;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author liming
* @date 2020/09
* @description 自定义线程池
* new 性能
* - 线程提早创立(来工作才创立)* 欠缺性能:* - 队列满了后持续创立线程,直到达到最大线程数
* - 回绝策略
* - 线程超时销毁
*/
public class MyThreadPool implements Executor {
/**
* 外围线程数(外围理发师数量)*/
private volatile int corePoolSize;
/**
* 工作期待队列(期待区座位数)*/
private final BlockingQueue<Runnable> workQueue;
/**
* 线程容器(理发师作业区)*/
private final HashSet<MyThreadPool.Worker> workers = new HashSet<>();
private final AtomicInteger workerCount = new AtomicInteger(0);
public MyThreadPool(int corePoolSize, BlockingQueue<Runnable> workQueue) {
this.corePoolSize = corePoolSize;
this.workQueue = workQueue;
}
/**
* 运行,来顾客了,安顿
*
* @param command
* @return
*/
@Override
public void execute(Runnable command) {if (command == null) {throw new NullPointerException();
}
int count = workerCount.get();
// 如果小于外围线程数,能够建设新线程
if (count < corePoolSize) {if (addWorker(command, true)) {return;}
}
if (workQueue.offer(command)) {int recheck = workerCount.get();
if (recheck == 0) {addWorker(null, false);
}
return;
}
// 回绝
throw new RejectedExecutionException("Task" + command.toString() +
"rejected from" +
this.toString());
}
private boolean addWorker(Runnable command, boolean core) {for (; ;) {int count = workerCount.get();
if (count >= corePoolSize) {return false;}
// 线程数 +1, 胜利向下走
if (workerCount.compareAndSet(count, count + 1)) {break;}
}
Worker worker = new Worker(command);
final Thread thread = worker.thread;
workers.add(worker);
thread.start();
return true;
}
// ----------------
/**
* 线程(理发师)*/
private class Worker implements Runnable {
/**
* 工作者运行的线程
*/
final Thread thread;
/**
* 初始运行的工作,可能为 null
*/
Runnable firstTask;
/**
* 创立一个工作者
*
* @param firstTask 第一个工作,能够为 null
*/
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = new Thread(this);
}
@Override
public void run() {System.out.println("run");
runWorker(this);
}
@Override
public boolean equals(Object o) {if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Worker worker = (Worker) o;
return Objects.equals(thread, worker.thread) &&
Objects.equals(firstTask, worker.firstTask);
}
@Override
public int hashCode() {return Objects.hash(thread, firstTask);
}
}
final void runWorker(Worker worker) {
Runnable task = worker.firstTask;
worker.firstTask = null;
while (task != null || (task = getTask()) != null) {
try {task.run();
} catch (Exception e) {e.printStackTrace();
} finally {
// 运行结束
task = null;
}
}
}
private Runnable getTask() {for (; ;) {
try {
// 阻塞拿工作
return workQueue.take();} catch (InterruptedException e) {System.out.println("InterruptedException!!!");
}
}
}
}
测试代码:
package com.wandou.demo.thread.post.threapool;
import org.junit.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author liming
* @date 2020/9/17
* @description
*/
public class MyThreadPoolDemo {private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
private MyThreadPool myThreadPool = new MyThreadPool(3, workQueue);
@Test
public void t1() throws Exception {AtomicInteger atomicInteger = new AtomicInteger(0);
Runnable task = new Runnable() {
@Override
public void run() {
try {System.out.println(atomicInteger.incrementAndGet()
+ "号顾客来理发,为其理发的理发师是:"
+ Thread.currentThread().getName());
} catch (Exception e) {e.printStackTrace();
}
}
};
for (int i = 0; i < 30; i++) {myThreadPool.execute(task);
}
Thread.sleep(5000);
System.out.println("================================================");
for (int i = 0; i < 30; i++) {myThreadPool.execute(task);
}
// 让主线程阻塞期待
System.in.read();}
}
测试后果: