关于java:深入理解线程池

52次阅读

共计 7183 个字符,预计需要花费 18 分钟才能阅读完成。

什么是线程池?

线程池是一种利用池化技术思维来实现的线程治理技术,次要是为了复用线程、便当地治理线程和工作、并将线程的创立和工作的执行解耦开来。咱们能够创立线程池来复用曾经创立的线程来升高频繁创立和销毁线程所带来的资源耗费。在 JAVA 中次要是应用 ThreadPoolExecutor 类来创立线程池。

线程池的长处

升高资源耗费 ,复用已创立的线程来升高创立和销毁线程的耗费。
进步响应速度 ,工作达到时,能够不须要期待线程的创立立刻执行。
进步线程的可管理性,应用线程池可能对立的调配、调优和监控。

线程池的状态及生命周期

ThreadPoolExecutor 的构造方法参数

corePoolSize,外围线程数量
maximumPoolSize,最大线程数量
keepAliveTime,线程闲暇时存活的工夫
unit,闲暇存活工夫单位
workQueue,工作队列,用于寄存已提交的工作(ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue)
threadFactory,线程工厂,用于创立线程执行工作
handler,回绝策略,当线程池处于饱和时,应用某种策略来回绝工作提交(AbortPolicy:抛异样;DiscardPolicy:间接抛弃;DiscardOldestPolicy:丢第一个;CallerRunsPolicy:谁提交工作,谁执行)

从 execute 源码剖析线程池执行工作的流程

线程池外围属性

// AtomicInteger, 就是一个 int,写操作用 CAS 实现,保障了原子性
// ctl 保护这线程池的 2 个核心内容:// 1:线程池状态(高 3 位,保护着线程池状态)// 2:工作线程数量(外围线程 + 非核心线程,低 29 位,保护着工作线程个数)private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 工作线程的最大个数
// 00100000 00000000 00000000 00000000 - 1
// 000111111111111111111111111111111  
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 拿到线程池状态
// 011... 
// 111...
private static int runStateOf(int c)     {return c & ~CAPACITY;}
// 拿到工作线程个数
// ...0000000111111
// ...1111111111111
private static int workerCountOf(int c)  {return c & CAPACITY;}

execute()

public void execute(Runnable command) {
    // 非空!!if (command == null)
        throw new NullPointerException();
    // 拿到 ctl
    int c = ctl.get();
    // 通过 ctl 获取当前工作线程个数
    if (workerCountOf(c) < corePoolSize) {
        // true:代表是外围线程,false:代表是非核心线程
        if (addWorker(command, true))
            // 如果增加外围线程胜利,return 完结掉
            return;
        // 如果增加失败,从新获取 ctl
        c = ctl.get();}
    // 外围线程数曾经到了最大值、增加时,线程池状态变为 SHUTDOWN/STOP
    // 判断线程池是否是运行状态 && 增加工作到工作队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次获取 ctl 的值
        int recheck = ctl.get();
        // 再次判断线程池状态。DCL
        // 如果状态不是 RUNNING,把工作从工作队列移除。if (! isRunning(recheck) && remove(command))
            // 走一波回绝策略。reject(command);
        // 线程池状态是 RUNNING。// 判断工作线程数是否是 0 个。// 能够将外围线程设置为 0,所有工作线程都是非核心线程。// 外围线程也能够通过 keepAlived 超时被销毁,所以如果凑巧外围线程被销毁,也会呈现以后成果
        else if (workerCountOf(recheck) == 0)
            // 增加空工作的非核心线程去解决工作队列中的工作
            addWorker(null, false);
    }
    // 可能工作队列中的工作存满了,没增加进去,到这就要增加非核心线程去解决工作
    else if (!addWorker(command, false))
        // 执行回绝策略!reject(command);
}

从 execute()办法中,能够看到,当有一个工作进来后
1. 首先会判断以后线程池里工作线程的个数是否小于设置的外围线程个数,如果 外围线程个数未满 ,会尝试调用 addWorker() 办法,新增一个外围线程,如果新增失败,则从新获取一次线程池的状态,走上面逻辑。同时如果外围线程个数已满。也走上面的逻辑。
2. 判断线程池状态是否是运行状态,如果是,调用阻塞队列的 offer()办法将此工作增加到阻塞队列中,如果不是或者阻塞队列已满不能增加进去,就执行回绝策略
3. 如果在增加到阻塞队列后,DCL 查看线程池的状态不是运行状态,就会把此工作从阻塞队列中移除,执行回绝策略。如果是运行状态,就再检查一下线程池的线程数量,如果是 0,就增加一个空工作的非核心线程去解决工作队列中的工作。

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    xxx:
    for (;;) {
        // 阿巴阿巴…………
        int c = ctl.get();
        int rs = runStateOf(c);
        // 判断线程池状态
        if (rs >= SHUTDOWN &&
              // 判断如果线程池的状态为 SHUTDOWN,还要解决工作队列中的工作
              // 如果你增加工作线程的形式,是工作的非核心线程,并且工作队列还有工作
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        // 判断工作线程个数
        for (;;) {
            // 阿巴阿巴……
            int wc = workerCountOf(c);
            // 判断 1:工作线程是否曾经 == 工作线程最大个数
            // 判断 2 -true 判断:判断是外围线程么?如果是判断是否超过外围线程个数
            // 判断 2 -false 判断:如果是非核心线程,查看是否超过设置的最大线程数
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 对工作线程进行 + 1 操作
            if (compareAndIncrementWorkerCount(c))
                // + 1 胜利,跳出外层循环,执行增加工作线程的业务
                // 以 CAS 形式,对 ctl+1,多线程并发操作,只有会有一个胜利
                break xxx;
            // 从新拿 ctl,c = ctl.get();
            // 判断线程池状态是否有变动
            if (runStateOf(c) != rs)
                continue xxx;
        }
    }

    // 增加工作线程的业务  
    // 工作线程启动了吗?boolean workerStarted = false;
    // 工作线程增加了吗?boolean workerAdded = false;
    // Worker 就是工作线程
    Worker w = null;
    try {
        // 创立工作线程,将工作传到 Worker 中
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // 只有你写的线程工厂返回的是 null,这里才会为 null
        if (t != null) {
            // 获取锁资源
            final ReentrantLock mainLock = this.mainLock;
            // 加锁。因为我要在启动这个工作线程时,防止线程池状态发生变化,加锁。mainLock.lock();
            try {
                // 从新获取 ctl,拿到线程池状态
                int rs = runStateOf(ctl.get());
                // DCL i think you know~~~
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                       // 判断 Worker 中的 thread 是否曾经启动了,个别不会启动,除非你在线程工厂把他启动了
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 将工作线程存储到 hashSet 中
                    workers.add(w);
                    // 获取工作线程个数,判断是否须要批改最大工作线程数记录。int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 工作线程增加胜利     0
                    workerAdded = true;
                }
            } finally {mainLock.unlock();
            }
            // 如果增加胜利
            if (workerAdded) {
                // 启动工作线程
                t.start();
                // 设置标识为 true
                workerStarted = true;
            }
        }
    } finally {
        // 如果工作线程启动失败
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker()办法有两个参数,一个是要执行的工作,另一个是是否要增加的是外围线程的布尔类型变量。
1. 第一局部是一个双重死循环,第一层循环是判断以后线程池是否还有增加线程的必要,当线程池的
状态为 STOP 以上的状态或者是 SHUTDOWN 且阻塞队列中没有工作的时候,不必增加线程,且不解决此次进来的工作,间接返回 false;第二层循环是依据咱们传的布尔类型的 core 变量判断,如果咱们要增加的是外围线程,就判断外围线程数量是否已满,如果满了,间接返回 false,在 execute()办法中,增加失败后,会把工作扔到阻塞队列里。如果增加的是非核心线程,判断以后线程池的线程数量是否超过设置的最大值,如果超过了,间接返回 false,在 execute()办法中,增加失败后,会间接执行回绝策略。当所有判断都满足后,用 CAS 的形式对线程池的工作线程数量 +1,胜利则跳出循环,失败则持续自旋,直到增加胜利,或者下面条件不满足,增加失败。
2. 第二局部就是开始增加工作线程了,先将此次要执行的工作封装到 Worker 对象中,用 ReentrantLock 对整个线程池加锁,防止此次在增加工作线程的时候,线程池被其余线程干掉。而后将此工作线程增加到线程池的工作线程汇合中,而后解锁。如果增加胜利,间接启动线程,因为 Worker 类继承了 Runnable 接口并重写了 run(), 所以这里持续追踪 Worker 类中的 runWorker()办法,如果增加失败,则执行 addWorkerFailed()办法。

addWorkerFailed()

// 如果增加工作线程失败,执行
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 阐明 worker 可能寄存到了 workers 的 hashSet 中。if (w != null)
            // 移除!workers.remove(w);
        // 减掉 workerCount 的数值 -1
        decrementWorkerCount();
        // 尝试干掉本人
        tryTerminate();} finally {mainLock.unlock();
    }
}

runWorker()

final void runWorker(Worker w) {
    // 拿到以后线程对象
    Thread wt = Thread.currentThread();
    // 拿到 worker 中寄存的 Runnable
    Runnable task = w.firstTask;
    // 将 worker 中的工作清空
    w.firstTask = null;
    // 揍是一个标识
    boolean completedAbruptly = true;
    try {
        // 如果 Worker 本身携带工作,间接执行
        // 如果 Worker 携带的是 null,通过 getTask 去工作队列获取工作
        while (task != null || (task = getTask()) != null) {w.lock();
            // 判断线程池状态是否大于等于 STOP,如果是要中断以后线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 // 中断以后线程(DCL)(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
  
            try {
                // 前置钩子
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行工作
                    task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                } finally {
                    // 后置钩子
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 当前工作执行完一个工作,就 ++
                w.completedTasks++;
                w.unlock();}
        }
        completedAbruptly = false;
    } finally {processWorkerExit(w, completedAbruptly);
    }
}

在这个办法中,入参是上一步传进来的 worker 对象,因为这个 worker 对象有可能是曾经封装了工作的,也有可能是没有封装工作的,所以这里循环条件是先从本人这里拿,如果没有,就用 getTask()去阻塞队列中拿,如果是非核心线程或者是外围线程且容许超时调用的是阻塞队列的 poll()办法,如果阻塞队列里始终没有工作,线程在这期待直到超时,如果是不容许超时的外围线程调用的是阻塞队列的 take()办法,如果阻塞队列里始终没有工作,就会应用 await()使线程阻塞住,直到队列是有工作增加,会 singal()唤醒。在真正执行工作前后有两个钩子办法,beforeExecute(),afterExecute()能够由咱们本人定义内容。

getTask()

private Runnable getTask() {
    // 超时 -false
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池状态判断
        // 如果线程池状态为 SHUTDOWN && 工作队列为空
        // 如果线程池状态为 STOP
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 对工作线程个数 --
            decrementWorkerCount();
            return null;
        }

        // 对数量的判断。int wc = workerCountOf(c);

        // 判断外围线程是否容许超时?// 工作线程个数是否大于外围线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 判断工作线程是否超过了最大线程数 && 工作队列为 null
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            // 工作线程数有问题,必须 -1,干掉当前工作线程
            // 工作线程是否超过了外围线程,如果超时,就干掉以后线程
            // 对工作线程个数 --
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果是非核心,走 poll,拉取工作队列工作,// 如果是外围线程,走 take 始终阻塞,拉取工作队列工作
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // 当工作队列没有工作时,这时就会被 Condition 通过 await 阻塞线程
                // 当有工作增加到工作线程后,这是增加完工作后,就会用过 Condition.signal 唤醒阻塞的线程
                workQueue.take();
            if (r != null)
                return r;
            // 执行的 poll 办法,并且在指定工夫没拿到工作,timedOut = true;
        } catch (InterruptedException retry) {timedOut = false;}
    }
}

至此,线程池执行的的流程完结了,流程如下图:

正文完
 0