共计 2705 个字符,预计需要花费 7 分钟才能阅读完成。
前言
在多线程开发中,应该避免显式创建线程,而是采用线程池里面的线程。使用线程池可以减少手动创建线程,减少线程创建和回收的损耗等。那么使用线程池就需要了解它的原理。这里我们 ThreadPoolExecutor.execute() 方法内部的具体实现逻辑
流程图
源码分析
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
// 获取状态变量
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 如果 worker 个数小于核心线程数量
if (addWorker(command, true))
return;
c = ctl.get();}
// 尝试将任务丢入工作队列中
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 工作队列满了的话再次尝试增加 worker
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取 worker 数量
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 判断是否大于核心线程数量或者超过线程池总大小
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建新的 worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 单线程处理工作者入队
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 这时候的 worker 不能已经是在运行中的
throw new IllegalThreadStateException();
// 将 worker 加入到工作者的集合中
workers.add(w);
int s = workers.size();
// 更新线程池的当前最大 size
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {mainLock.unlock();
}
if (workerAdded) {
// 如果加入到工作者集合中成功,那么就启动工作者
t.start();
workerStarted = true;
}
}
} finally {if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 这个方法是工作者的实际工作内容,看下工作者是怎么处理任务的
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
// 如果工作者携带的任务或任务队列不是空的,就会一直循环
while (task != null || (task = getTask()) != null) {w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {beforeExecute(wt, task);
Throwable thrown = null;
try {task.run();
} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {
// 处理任务执行之后的逻辑
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();}
}
completedAbruptly = false;
} finally {
// 处理 worker 退出的逻辑
processWorkerExit(w, completedAbruptly);
}
}
正文完