关于java:ThreadPoolExecutor-线程池分析

39次阅读

共计 4195 个字符,预计需要花费 11 分钟才能阅读完成。

1、为什么应用线程池

  1. 升高资源耗费
    通过反复利用已创立的线程升高线程创立和销毁造成的耗费
  2. 进步响应速度
    当工作达到时, 工作能够不须要等到线程创立就能立刻执行
  3. 进步线程的可管理性
    应用线程池能够对线程进行统一分配、治理和监控

3、创立线程池

应用 ThreadPoolExecutor 类创立线程池,它的构造方法一共有 7 个参数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

corePoolSize: 外围线程数,线程池的在闲暇状况下所可能放弃的线程数。

当提交一个工作到线程池时, 线程池会创立一个线程来执行工作,即便其余闲暇的根本线程可能执行新工作也会创立线程,等到须要执行的工作数大于线程池根本大小时就不再创立。如果调用了 prestartAllCoreThreads() 办法,线程池会提前创立并启动所有根本线程。

maximumPoolSize: 线程池所能保留的最大线程数量

如果阻塞队列满了并且线程池已创立的线程数小于最大线程数, 则线程池会再创立新的线程执行工作。如果应用了无界的工作队列,则该参数有效

keepAliveTime: 线程的生存工夫

当线程数大于 corePoolSize 时,多余的闲暇线程具备的生存工夫,当线程在肯定工夫内处于闲暇状态(没有执行工作),线程池会主动销毁该线程,但线程池不会销毁所有线程,而是保留最多 corePoolSize 个线程。

TimeUnit: 线程生存工夫的单位

BlockingQueue: 工作队列

用于保留期待执行的工作的阻塞队列。可选队列类型如下:

  • ArrayBlockingQueue:基于数组的有界阻塞队列,此队列按 FIFO(先进先出)原
    则对元素进行排序。
  • LinkedBlockingQueue: 一个基于链表的阻塞队列,如果创立时设置了 size 参数就是有界队列,否则是无界队列。此队列按 FIFO 排序元素, 吞吐量通常要高于数组队列。动态工厂办法 Executors.newFixedThreadPool() 应用了这个队列。
  • SynchronousQueue:存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作, 否则插入操作始终处于阻塞状态, 吞吐量通常要高于 Linked- BlockingQueue, 动态工厂办法 Executors.newCachedThreadPool() 应用了这个队列。
  • PriorityBlockingQueue: 具备优先级的有限阻塞队列。

ThreadFactory: 线程工厂

用于创立线程的工厂,能够在创立办法能够自定义线程的根本属性(如:名称、优先级、设置守护线程)

// 应用开源框架 guava 提供的 ThreadFactoryBuilder 创立线程工厂
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build() ;

RejectedExecutionHandler: 饱和策略

当阻塞队列和线程池都满了, 阐明线程池处于饱和状态,那么必须采取一种策略解决提交的新工作。在 JDK 中提供了以下 4 种策略:

  • AbortPolicy:间接抛出异样(默认)
  • DiscardPolicy:间接抛弃当前任务
  • DiscardOldestPolicy:抛弃队列里最近的工作,并执行当前任务。
  • CallerRunsPolicy:只用调用者所在线程来运行工作。

3、线程池工作流程


0)使用者向线程池提交了一个工作

1)如果以后运行的线程数少于corePoolSize,则创立新线程来执行工作(创立线程须要获取全局锁)。

2)如果运行的线程数大于等于corePoolSize,则将工作退出阻塞队列

3)如果阻塞队列已满,则创立新的线程来解决工作

4)如果创立新线程将使以后运行的线程超出最大线程数,则采取饱和策略解决

线程池采纳上述步骤的总体设计思路,是为了在执行 execute()办法时,尽可能地防止获取全局锁。在线程池实现预热之后 (以后运行的线程数大于等于 corePoolSize),简直所有的 execute() 办法调用都是执行步骤 2, 而步骤 2 不须要获取全局锁。

4、线程池的生命周期

调用线程池的 shutdown()shutdownNow()办法敞开线程池。

它们的原理是:遍历线程池中的工作线程, 而后一一调用线程的 interrupt 办法来中断线程,所以不能终止无奈响应中断的工作。

shutdownNow:首先将线程池的状态设置成 STOP,而后尝试进行所有的正在执行或暂停工作的线程, 并返回期待执行工作的列表

shutdown:只是将线程池的状态设置成 SHUTDOWN 状态,而后中断所有没有正在执行工作的线程。

5、线程池配置

要想正当地配置线程池,就必须首先剖析工作个性,性质不同的工作用不同规模的线程池离开解决。能够从以下几个角度来剖析:

  • 工作的性质:CPU 密集型工作、I0 密集型工作和混合型工作
  • 工作的优先级: 高、中和低
  • 工作的执行工夫: 长、中和短
  • 工作的依赖性: 是否依赖其余系统资源, 如数据库连贯

通过 Runtime.getRuntime().availableProcessors() 办法取得以后设施的 CPU 个数

CPU 密集型工作应配置尽可能小的线程,如配置 N + 1 个线程的线程池
IO 密集型工作的线程不是始终在执行工作(被 IO 阻塞),应配置尽可能多的线程,如:2*Ncpu

优先级不同的工作能够应用优先级队列 PriorityBlockingQueue 来解决。它能够让优先级高的工作先执行。留神如果始终有优先级高的工作提交到队列里, 那么优先级低的工作可能永远不能执行

执行工夫不同的工作能够交给不同规模的线程池来解决,或者能够应用优先级队列,让执行工夫短的工作先执行。

依赖数据库连接池的工作, 因为线程提交 SQL 后须要期待数据库返回后果,期待的工夫越长,则 CPU 闲暇工夫就越长,那么线程数应该设置得越大, 这样能力更好地利用 CPU。

6、线程复用原理

线程池有一个外部类叫 Worker,它实现了 Runnable 接口,它有 2 个重要的外部变量 thread (创立的线程) 和 firstTask (线程的首工作)

Worker 的构造方法接管一个 Runnable 参数,将其赋值给 firstTask,还会应用线程池的线程工厂的 newThread(Runnable task) 办法创立线程,将本身作为参数传入。这样一来,当创立的线程执行时,就会执行 Worker 对象 的 run() 办法,理论就是执行 runWorker() 办法。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
    // 创立的线程
    final Thread thread;
    // 线程执行的第一个工作
    Runnable firstTask;
    
    Worker(Runnable firstTask) {setState(-1); 
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {runWorker(this);
    }

}

runWorker() 中,当 Worker 对象的 firstTask 没有执行时,会执行它的 firstTask;若已执行,则会从阻塞队列中获取工作执行,这个过程会继续循环直到线程进行为止。至此就实现了线程的复用。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        
        try {while (task != null || (task = getTask()) != null) {
                ...
                task.run();
                ...
            }
        } finally {processWorkerExit(w, completedAbruptly);
        }
    }

线程池调用 execute 办法提交工作,通过 addWorker() 创立 Worker 对象,并启动新建的线程

public void execute(Runnable command) {
    // 如果以后线程数小于外围线程数,则创立新线程
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
            return;
        c = ctl.get();}

    // 将工作存入阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果阻塞队列已满,则创立新过程解决
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
    .......
    Worker w = null;
    try {
        // 创立 Worker
        w = new Worker(firstTask);
        // 获取 Worker 中的线程对象
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            ........
            // 运行线程
            if (workerAdded) {t.start();
                workerStarted = true;
            }
        }
    } finally {if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这是线程池 execute() 办法的时序图,通过时序图便于了解下面的过程

正文完
 0