多线程精通ThreadPoolExecutor

55次阅读

共计 2705 个字符,预计需要花费 7 分钟才能阅读完成。

前言

在多线程开发中,应该避免显式创建线程,而是采用线程池里面的线程。使用线程池可以减少手动创建线程,减少线程创建和回收的损耗等。那么使用线程池就需要了解它的原理。这里我们 ThreadPoolExecutor.execute() 方法内部的具体实现逻辑

流程图

源码分析

    public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
       // 获取状态变量
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
      // 如果 worker 个数小于核心线程数量
            if (addWorker(command, true))
                return;
            c = ctl.get();}
        // 尝试将任务丢入工作队列中
        if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
      // 工作队列满了的话再次尝试增加 worker
        else if (!addWorker(command, false))
            reject(command);
    }
  private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                 // 获取 worker 数量
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 判断是否大于核心线程数量或者超过线程池总大小
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建新的 worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 单线程处理工作者入队
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 这时候的 worker 不能已经是在运行中的
                            throw new IllegalThreadStateException();
                      // 将 worker 加入到工作者的集合中
                        workers.add(w);
                        int s = workers.size();
                      //   更新线程池的当前最大 size
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {mainLock.unlock();
                }
                if (workerAdded) {
                    //   如果加入到工作者集合中成功,那么就启动工作者
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
// 这个方法是工作者的实际工作内容,看下工作者是怎么处理任务的
  final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允许中断
        boolean completedAbruptly = true;
        try {
            // 如果工作者携带的任务或任务队列不是空的,就会一直循环
            while (task != null || (task = getTask()) != null) {w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {task.run();
                    } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {
                        // 处理任务执行之后的逻辑
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {
            // 处理 worker 退出的逻辑
            processWorkerExit(w, completedAbruptly);
        }
    }

正文完
 0