java 线程池常识汇总.
线程池参数含意
- int corePoolSize
外围线程数,也即失常状况工作线程数
- int maximumPoolSize,
最大线程数
- long keepAliveTime,
须要联合阻塞队列来了解:假如阻塞队列的长度是 3,外围数是 2,最大线程数是 5. 运行时是这样的:大于外围数时,会放到阻塞队列外面排队,如果队列满了才会启用新的工作线程,直到达到最大线程数
当达到最大线程数时,如果此时 submit 到线程池的工作变慢了,外围线程可能应答工作的话,这时线程池会动静缩小工作线程数到外围线程数。这里的 keepAliveTime 就是指 除外围线程以外的那几个线程的闲暇工夫。如果大于这个参数所指定的,线程池则会回收这些线程
- TimeUnit unit,
第三个参数的单位
- BlockingQueue<Runnable> workQueue,
比外围线程数多进去的线程会进入阻塞队列排队。别用 Executors.newFixedThreadPool 办法结构,默认指定的阻塞队列(LinkedBlockingQueue)大小是 Integer.MAX_VALUE,需显示指定
- ThreadFactory threadFactory,
用默认的就行,有需要的话辨别下线程名字
- RejectedExecutionHandler handler
回绝策略:
AbortPolicy(默认):间接抛异样
DiscardPolicy:随机抛弃
DiscardOldestPolicy:抛弃最老的线程
CallerRunsPolicy:将执行权回退给调用者线程。
源码剖析
- 准备常识(位运算)
- 原码(带符号位): 最高位为符号位, 正数为 1, 负数为 0
- 反码: 原码除符号位, 其余位取反.
- 补码(次要用于示意正数): 带符号的正数反码 + 1, 次要为了打消 -0, 只有正数才用补码示意. refer: https://www.zhihu.com/questio…
- 线程状态如何保留
线程池中, 用前三位代表运行状态, 用后 29 位代表工作线程数.
// 29
private static final int COUNT_BITS = Integer.SIZE - 3;
//1(原码示意)左移 29 位 + (-1)的补码 (32 位 1), 前三位都为 0, 后 29 位为 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 即取后 29 位
private static int workerCountOf(int c) {return c & CAPACITY;}
//CAPACITY 取反即前三位是 1, 后 29 位是 0, 故任何数与 ~CAPACITY 做与运算都只保留前 3 位.
private static int runStateOf(int c) {return c & ~CAPACITY;}
- 工作治理
submit 提交工作后的执行逻辑
int c = ctl.get();
// 获取后 29 位线程数, 还没超过外围数, 则 addWorker 更新线程数(ctl 的后 29 位)
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
return;
c = ctl.get();}
// 大于 core 线程数的话从新获取状态, 线程池还在运行的话, 则将工作放入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
// 线程池不在运行了的话, 间接回绝
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 第一个线程为空, 立刻执行, 见 Worker 类办法
addWorker(null, false);
}
// 调用 addWorker 办法, 更新 ctl 运行线程数
else if (!addWorker(command, false))
reject(command);
- Worker 线程治理
为了管理工作线程的生命周期, 设计了 worker 线程. 值得说一下的是 Worker 线程是一个 AQS, 运行之前会上锁. 运行完结之后会解锁
依赖于这一点, 再想要中断这些 idle 线程的时候, 能够通过是否可能上锁胜利来判断工作线程是否正在运行. 如果中断操作能上锁胜利, 则代表
线程没有再运行, 这时就能够调用 Worker 线程的 interrupt 办法中断线程, 并且在 hashSet 里 remove 掉 worker 线程援用. 期待 jvm 回收
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask 从阻塞队列里阻塞拿到 runnable, 而后运行
while (task != null || (task = getTask()) != null) {
//
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行队列工作.
task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();}
}
completedAbruptly = false;
} finally {
// 如果 阻塞队列外面 工作为空, 则执行回收逻辑
// 所有的 worker 线程援用会保留在一个 hashSet 外面,
processWorkerExit(w, completedAbruptly);
}
}
// 中断闲暇线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {for (Worker w : workers) {
Thread t = w.thread;
// 如果工作线程没有中断, 并且上锁胜利, 那么就执行中断操作
if (!t.isInterrupted() && w.tryLock()) {
try {t.interrupt();
} catch (SecurityException ignore) { } finally {w.unlock();
}
}
if (onlyOne)
break;
}
} finally {mainLock.unlock();
}
}
其余
- 线程池配多少个好
从教训上讲须要辨别是 cpu 密集型还是 io 密集型。cpu 密集型的话,为了防止上下文切换,数量不宜过多,个别为 cpu 核数 + 1;
如果是 io 密集型的话,能够多点,须要大抵明确 io 型工作耗时和 cpu 密集型工作耗时的比例.
- 死锁编码和定位
jstack 排查
- ExecutorService.submit(Runnable task, T result) 应该这样用
result 作为子线程和主线程沟通的桥梁, 作为结构参数传入 task 中. 子线程能够操作 result 对象.