关于java:深入理解线程池

什么是线程池?

线程池是一种利用池化技术思维来实现的线程治理技术,次要是为了复用线程、便当地治理线程和工作、并将线程的创立和工作的执行解耦开来。咱们能够创立线程池来复用曾经创立的线程来升高频繁创立和销毁线程所带来的资源耗费。在JAVA中次要是应用ThreadPoolExecutor类来创立线程池。

线程池的长处

升高资源耗费,复用已创立的线程来升高创立和销毁线程的耗费。
进步响应速度,工作达到时,能够不须要期待线程的创立立刻执行。
进步线程的可管理性,应用线程池可能对立的调配、调优和监控。

线程池的状态及生命周期

ThreadPoolExecutor的构造方法参数

corePoolSize,外围线程数量
maximumPoolSize,最大线程数量
keepAliveTime,线程闲暇时存活的工夫
unit,闲暇存活工夫单位
workQueue,工作队列,用于寄存已提交的工作(ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue)
threadFactory,线程工厂,用于创立线程执行工作
handler,回绝策略,当线程池处于饱和时,应用某种策略来回绝工作提交(AbortPolicy:抛异样;DiscardPolicy:间接抛弃;DiscardOldestPolicy:丢第一个;CallerRunsPolicy:谁提交工作,谁执行)

从execute源码剖析线程池执行工作的流程

线程池外围属性

// AtomicInteger,就是一个int,写操作用CAS实现,保障了原子性
// ctl保护这线程池的2个核心内容:
// 1:线程池状态(高3位,保护着线程池状态)
// 2:工作线程数量(外围线程+非核心线程,低29位,保护着工作线程个数)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 工作线程的最大个数
// 00100000 00000000 00000000 00000000 - 1
// 000111111111111111111111111111111  
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;


private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 拿到线程池状态
// 011... 
// 111...
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 拿到工作线程个数
// ...0000000111111
// ...1111111111111
private static int workerCountOf(int c)  { return c & CAPACITY; }

execute()

public void execute(Runnable command) {
    // 非空!!
    if (command == null)
        throw new NullPointerException();
    // 拿到ctl
    int c = ctl.get();
    // 通过ctl获取当前工作线程个数
    if (workerCountOf(c) < corePoolSize) {
        // true:代表是外围线程,false:代表是非核心线程
        if (addWorker(command, true))
            // 如果增加外围线程胜利,return完结掉
            return;
        // 如果增加失败,从新获取ctl
        c = ctl.get();
    }
    // 外围线程数曾经到了最大值、增加时,线程池状态变为SHUTDOWN/STOP
    // 判断线程池是否是运行状态 && 增加工作到工作队列
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次获取ctl的值
        int recheck = ctl.get();
        // 再次判断线程池状态。  DCL
        // 如果状态不是RUNNING,把工作从工作队列移除。
        if (! isRunning(recheck) && remove(command))
            // 走一波回绝策略。
            reject(command);
        // 线程池状态是RUNNING。
        // 判断工作线程数是否是0个。
        // 能够将外围线程设置为0,所有工作线程都是非核心线程。
        // 外围线程也能够通过keepAlived超时被销毁,所以如果凑巧外围线程被销毁,也会呈现以后成果
        else if (workerCountOf(recheck) == 0)
            // 增加空工作的非核心线程去解决工作队列中的工作
            addWorker(null, false);
    }
    // 可能工作队列中的工作存满了,没增加进去,到这就要增加非核心线程去解决工作
    else if (!addWorker(command, false))
        // 执行回绝策略!
        reject(command);
}

从execute()办法中,能够看到,当有一个工作进来后
1.首先会判断以后线程池里工作线程的个数是否小于设置的外围线程个数,如果外围线程个数未满,会尝试调用addWorker()办法,新增一个外围线程,如果新增失败,则从新获取一次线程池的状态,走上面逻辑。同时如果外围线程个数已满。也走上面的逻辑。
2.判断线程池状态是否是运行状态,如果是,调用阻塞队列的offer()办法将此工作增加到阻塞队列中,如果不是或者阻塞队列已满不能增加进去,就执行回绝策略
3.如果在增加到阻塞队列后,DCL查看线程池的状态不是运行状态,就会把此工作从阻塞队列中移除,执行回绝策略。如果是运行状态,就再检查一下线程池的线程数量,如果是0,就增加一个空工作的非核心线程去解决工作队列中的工作。

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    xxx:
    for (;;) {
        // 阿巴阿巴…………
        int c = ctl.get();
        int rs = runStateOf(c);
        // 判断线程池状态
        if (rs >= SHUTDOWN &&
              // 判断如果线程池的状态为SHUTDOWN,还要解决工作队列中的工作
              // 如果你增加工作线程的形式,是工作的非核心线程,并且工作队列还有工作
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        // 判断工作线程个数
        for (;;) {
            // 阿巴阿巴……
            int wc = workerCountOf(c);
            // 判断1:工作线程是否曾经 == 工作线程最大个数
            // 判断2-true判断:判断是外围线程么?如果是判断是否超过外围线程个数
            // 判断2-false判断:如果是非核心线程,查看是否超过设置的最大线程数
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 对工作线程进行 + 1操作
            if (compareAndIncrementWorkerCount(c))
                // +1胜利,跳出外层循环,执行增加工作线程的业务
                // 以CAS形式,对ctl+1,多线程并发操作,只有会有一个胜利
                break xxx;
            // 从新拿ctl,
            c = ctl.get();
            // 判断线程池状态是否有变动
            if (runStateOf(c) != rs)
                continue xxx;
        }
    }

    // 增加工作线程的业务  
    // 工作线程启动了吗?
    boolean workerStarted = false;
    // 工作线程增加了吗?
    boolean workerAdded = false;
    // Worker就是工作线程
    Worker w = null;
    try {
        // 创立工作线程,将工作传到Worker中
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // 只有你写的线程工厂返回的是null,这里才会为null
        if (t != null) {
            // 获取锁资源
            final ReentrantLock mainLock = this.mainLock;
            // 加锁。  因为我要在启动这个工作线程时,防止线程池状态发生变化,加锁。
            mainLock.lock();
            try {
                // 从新获取ctl,拿到线程池状态
                int rs = runStateOf(ctl.get());
                // DCL i think you know~~~
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                       // 判断Worker中的thread是否曾经启动了,个别不会启动,除非你在线程工厂把他启动了
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 将工作线程存储到hashSet中
                    workers.add(w);
                    // 获取工作线程个数,判断是否须要批改最大工作线程数记录。
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 工作线程增加胜利     0
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果增加胜利
            if (workerAdded) {
                // 启动工作线程
                t.start();
                // 设置标识为true
                workerStarted = true;
            }
        }
    } finally {
        // 如果工作线程启动失败
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker()办法有两个参数,一个是要执行的工作,另一个是是否要增加的是外围线程的布尔类型变量。
1.第一局部是一个双重死循环,第一层循环是判断以后线程池是否还有增加线程的必要,当线程池的
状态为STOP以上的状态或者是SHUTDOWN且阻塞队列中没有工作的时候,不必增加线程,且不解决此次进来的工作,间接返回false;第二层循环是依据咱们传的布尔类型的core变量判断,如果咱们要增加的是外围线程,就判断外围线程数量是否已满,如果满了,间接返回false,在execute()办法中,增加失败后,会把工作扔到阻塞队列里。如果增加的是非核心线程,判断以后线程池的线程数量是否超过设置的最大值,如果超过了,间接返回false,在execute()办法中,增加失败后,会间接执行回绝策略。当所有判断都满足后,用CAS的形式对线程池的工作线程数量+1,胜利则跳出循环,失败则持续自旋,直到增加胜利,或者下面条件不满足,增加失败。
2.第二局部就是开始增加工作线程了,先将此次要执行的工作封装到Worker对象中,用ReentrantLock对整个线程池加锁,防止此次在增加工作线程的时候,线程池被其余线程干掉。而后将此工作线程增加到线程池的工作线程汇合中,而后解锁。如果增加胜利,间接启动线程,因为Worker类继承了Runnable接口并重写了run(),所以这里持续追踪Worker类中的runWorker()办法,如果增加失败,则执行addWorkerFailed()办法。

addWorkerFailed()

// 如果增加工作线程失败,执行
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 阐明worker可能寄存到了workers的hashSet中。
        if (w != null)
            // 移除!
            workers.remove(w);
        // 减掉workerCount的数值 -1
        decrementWorkerCount();
        // 尝试干掉本人
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

runWorker()

final void runWorker(Worker w) {
    // 拿到以后线程对象
    Thread wt = Thread.currentThread();
    // 拿到worker中寄存的Runnable
    Runnable task = w.firstTask;
    // 将worker中的工作清空
    w.firstTask = null;
    // 揍是一个标识
    boolean completedAbruptly = true;
    try {
        // 如果Worker本身携带工作,间接执行
        // 如果Worker携带的是null,通过getTask去工作队列获取工作
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 判断线程池状态是否大于等于STOP,如果是要中断以后线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 // 中断以后线程(DCL)
                 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
  
            try {
                // 前置钩子
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行工作
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 后置钩子
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 当前工作执行完一个工作,就++
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

在这个办法中,入参是上一步传进来的worker对象,因为这个worker对象有可能是曾经封装了工作的,也有可能是没有封装工作的,所以这里循环条件是先从本人这里拿,如果没有,就用getTask()去阻塞队列中拿,如果是非核心线程或者是外围线程且容许超时调用的是阻塞队列的poll()办法,如果阻塞队列里始终没有工作,线程在这期待直到超时,如果是不容许超时的外围线程调用的是阻塞队列的take()办法,如果阻塞队列里始终没有工作,就会应用await()使线程阻塞住,直到队列是有工作增加,会singal()唤醒。在真正执行工作前后有两个钩子办法,beforeExecute(),afterExecute()能够由咱们本人定义内容。

getTask()

private Runnable getTask() {
    // 超时-false
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池状态判断
        // 如果线程池状态为SHUTDOWN && 工作队列为空
        // 如果线程池状态为STOP
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 对工作线程个数--
            decrementWorkerCount();
            return null;
        }

        // 对数量的判断。
        int wc = workerCountOf(c);

        // 判断外围线程是否容许超时?
        // 工作线程个数是否大于外围线程数
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 判断工作线程是否超过了最大线程数 && 工作队列为null
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            // 工作线程数有问题,必须-1,干掉当前工作线程
            // 工作线程是否超过了外围线程,如果超时,就干掉以后线程
            // 对工作线程个数--
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果是非核心,走poll,拉取工作队列工作,
            // 如果是外围线程,走take始终阻塞,拉取工作队列工作
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // 当工作队列没有工作时,这时就会被Condition通过await阻塞线程
                // 当有工作增加到工作线程后,这是增加完工作后,就会用过Condition.signal唤醒阻塞的线程
                workQueue.take();
            if (r != null)
                return r;
            // 执行的poll办法,并且在指定工夫没拿到工作,
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

至此,线程池执行的的流程完结了,流程如下图:

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理