关于线程池:Java多线程与线程池技术

39次阅读

共计 7578 个字符,预计需要花费 19 分钟才能阅读完成。

一、序言

Java 多线程编程线程池被宽泛应用,甚至成为了标配。

线程池实质是 池化技术 的利用,和连接池相似,创立连贯与敞开连贯属于耗时操作,创立线程与销毁线程也属于重操作,为了提高效率,先提前创立好一批线程,当有须要应用线程时从线程池取出,用完后放回线程池,这样防止了频繁创立与销毁线程。

// 工作
Runnable runnable = () -> System.out.println(Thread.currentThread().getId());

在利用中优先选用线程池执行异步工作,依据不同的场景选用不同的线程池,进步异步工作执行效率。

1、一般执行
new Thread(runnable).start();
2、线程池执行
Executors.newSingleThreadExecutor().execute(runnable)

二、线程池根底

(一)外围参数

1、外围参数

线程池的外围参数决定了池的类型,进而决定了池的个性。

参数 解释 行为
corePoolSize 外围线程数 池中长期保护的线程数量,不被动回收
maximumPoolSize 最大线程数 最大线程数大于等于外围线程数
keepAliveTime 线程最大闲暇工夫 非核心线程最大闲暇工夫,超时回收线程
workQueue 工作队列 工作队列间接决定线程池的类型
2、参数与池的关系

Executors 类默认创立线程池与参数对应关系。

线程池 corePoolSize maximumPoolSize keepAliveTime workQueue
newCachedThreadPool 0 Integer.MAX_VALUE 60 SynchronousQueue
newSingleThreadExecutor 1 1 0 LinkedBlockingQueue
newFixedThreadPool N N 0 LinkedBlockingQueue
newScheduledThreadPool N Integer.MAX_VALUE 0 DelayedWorkQueue

(二)线程池比照

依据应用场景抉择对应的线程池。

1、通用比照
线程池 特点 实用场景
newCachedThreadPool 超时未应用的线程回主动销毁,有新工作时主动创立 实用于低频、轻量级的工作。回收线程的目标是节约线程长时间闲暇而占有的资源。
newSingleThreadExecutor 线程池中有且只有一个线程 程序执行工作
newFixedThreadPool 线程池中有固定数量的线程,且始终存在 实用于高频的工作,即线程在大多数工夫里都处于工作状态。
newScheduledThreadPool 定时线程池 与定时调度相关联
2、拓展比照

保护仅有一个线程的线程池有如下两种形式,失常应用的状况下,二者差别不大;简单应用环境下,二者存在轻微的差别。用 newSingleThreadExecutor 形式创立的线程池在任何时刻至少只有一个线程,因而能够了解为用异步的形式执行程序工作;后者初始化的时候也只有一个线程,应用过程中可能会呈现最大线程数超过 1 的状况,这时要求线性执行的工作会并行执行,业务逻辑可能会呈现问题,与理论场景无关。

private final static ExecutorService executor = Executors.newSingleThreadExecutor();
private final static ExecutorService executor = Executors.newFixedThreadPool(1);

(三)线程池原理

线程池次要解决流程,工作提交之后是怎么执行的。大抵如下:

  1. 判断外围线程池是否已满,如果不是,则创立线程执行工作
  2. 如果外围线程池满了,判断队列是否满了,如果队列没满,将工作放在队列中
  3. 如果队列满了,则判断线程池是否已满,如果没满,创立线程执行工作
  4. 如果线程池也满了,则依照回绝策略对工作进行解决

(四)提交工作的形式

往线程池中提交工作,次要有两种办法:提交无返回值的工作和提交有返回值的工作。

1、无返回值工作

execute用于提交不须要返回后果的工作。

public static void main(String[] args) {ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.execute(() -> System.out.println("hello"));
}
2、有返回值工作

submit()用于提交一个须要返回果的工作。

该办法返回一个 Future 对象,通过调用这个对象的 get() 办法,咱们就能取得返回后果。get()办法会始终阻塞,直到返回后果返回。

咱们也能够应用它的重载办法get(long timeout, TimeUnit unit),这个办法也会阻塞,然而在超时工夫内依然没有返回后果时,将抛出异样TimeoutException

public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newFixedThreadPool(2);
    Future<Long> future = executor.submit(() -> {System.out.println("task is executed");
        return System.currentTimeMillis();});
    System.out.println("task execute time is:" + future.get());
}

在提交工作时,如果无返回值工作,优先应用execute

(无)敞开线程池

在线程池应用实现之后,咱们须要对线程池中的资源进行开释操作,这就波及到敞开性能。咱们能够调用线程池对象的 shutdown()shutdownNow()办法来敞开线程池。

这两个办法都是敞开操作,又有什么不同呢?

  1. shutdown()会将线程池状态置为SHUTDOWN,不再承受新的工作,同时会期待线程池中已有的工作执行实现再完结。
  2. shutdownNow()会将线程池状态置为 SHUTDOWN,对所有线程执行interrupt() 操作,清空队列,并将队列中的工作返回回来。

另外,敞开线程池波及到两个返回 boolean 的办法,isShutdown()isTerminated,别离示意是否敞开和是否终止。

三、Executors

Executors是一个线程池工厂,提供了很多的工厂办法,咱们来看看它大略能创立哪些线程池。

// 创立繁多线程的线程池
public static ExecutorService newSingleThreadExecutor();
// 创立固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads);
// 创立带缓存的线程池
public static ExecutorService newCachedThreadPool();
// 创立定时调度的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 创立流式(fork-join)线程池
public static ExecutorService newWorkStealingPool();
1、创立繁多线程的线程池

任何时候线程池中至少只有一个线程,当线程执行异样终止时会主动创立一个新线程替换。如果既有异步执行工作的需要又心愿工作得以程序执行,那么此类型线程池是首选。

若多个工作被提交到此线程池,那么会被缓存到队列。当线程闲暇的时候,依照 FIFO 的形式进行解决。

2、创立固定数量的线程池

创立外围线程与最大线程数相等的固定线程数的线程池,任何时刻至少有固定数目的线程,当线程因异样而终止时则会主动创立线程替换。

当有新工作退出时,如果池内线程均处于沉闷状态,则工作进入期待队列中,直到有闲暇线程,队列中的工作才会被程序执行;如果池内有非沉闷线程,则工作能够立即得以执行。

  • 如果线程的数量未达到指定数量,则创立线程来执行工作
  • 如果线程池的数量达到了指定数量,并且有线程是闲暇的,则取出闲暇线程执行工作
  • 如果没有线程是闲暇的,则将工作缓存到队列(队列长度为Integer.MAX_VALUE)。当线程闲暇的时候,依照 FIFO 的形式进行解决
3、创立可伸缩的线程池

这种形式创立的线程池,外围线程池的长度为 0,线程池最大长度为 Integer.MAX_VALUE。因为自身应用SynchronousQueue 作为期待队列的缘故,导致往队列外面每插入一个元素,必须期待另一个线程从这个队列删除一个元素。

  • 线程池可保护 0 到 Integer.MAX_VALUE 个线程资源,闲暇线程默认状况下超过 60 秒未应用则会被销毁,长期闲置的池占用较少的资源。
  • 当有新工作退出时,如果池中有闲暇且尚未销毁的线程,则将工作交给此线程执行;如果没有可用的线程,则创立一个新线程执行工作并增加到池中。
4、创立定时调度的线程池

和下面 3 个工厂办法返回的线程池类型有所不同,它返回的是 ScheduledThreadPoolExecutor 类型的线程池。平时咱们实现定时调度性能的时候,可能更多的是应用第三方类库,比方:quartz 等。然而对于更底层的性能,咱们依然须要理解。

四、手动创立线程池

实践上,咱们能够通过 Executors 来创立线程池,这种形式非常简单。但正是因为简略,所以限度了线程池的性能。比方:无长度限度的队列,可能因为工作沉积导致 OOM,这是十分重大的 bug,应尽可能地防止。怎么防止?归根结底,还是须要咱们通过更底层的形式来创立线程池。

抛开定时调度的线程池不论,咱们看看ThreadPoolExecutor。它提供了好几个构造方法,然而最底层的构造方法却只有一个。那么,咱们就从这个构造方法着手剖析。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler);

这个构造方法有 7 个参数,咱们逐个来进行剖析。

  1. corePoolSize,线程池中的外围线程数
  2. maximumPoolSize,线程池中的最大线程数
  3. keepAliveTime,闲暇工夫,当线程池数量超过外围线程数时,多余的闲暇线程存活的工夫,即:这些线程多久被销毁。
  4. unit,闲暇工夫的单位,能够是毫秒、秒、分钟、小时和天,等等
  5. workQueue,期待队列,线程池中的线程数超过外围线程数时,工作将放在期待队列,它是一个 BlockingQueue 类型的对象
  6. threadFactory,线程工厂,咱们能够应用它来创立一个线程
  7. handler,回绝策略,当线程池和期待队列都满了之后,须要通过该对象的回调函数进行回调解决

这些参数外面,根本类型的参数都比较简单,咱们不做进一步的剖析。咱们更关怀的是 workQueuethreadFactoryhandler,接下来咱们将进一步剖析。

(一)期待队列 -workQueue

期待队列是 BlockingQueue 类型的,实践上只有是它的子类,咱们都能够用来作为期待队列。

同时,jdk 外部自带一些阻塞队列,咱们来看看大略有哪些。

  1. ArrayBlockingQueue,队列是有界的,基于数组实现的阻塞队列
  2. LinkedBlockingQueue,队列能够有界,也能够无界。基于链表实现的阻塞队列
  3. SynchronousQueue,不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将始终处于阻塞状态。该队列也是 Executors.newCachedThreadPool() 的默认队列
  4. PriorityBlockingQueue,带优先级的无界阻塞队列

通常状况下,咱们须要指定阻塞队列的上界(比方 1024)。另外,如果执行的工作很多,咱们可能须要将工作进行分类,而后将不同分类的工作放到不同的线程池中执行。

(二)线程工厂 -threadFactory

ThreadFactory是一个接口,只有一个办法。既然是线程工厂,那么咱们就能够用它生产一个线程对象。来看看这个接口的定义。

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

Executors的实现应用了默认的线程工厂 -DefaultThreadFactory。它的实现次要用于创立一个线程,线程的名字为pool-{poolNum}-thread-{threadNum}

static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

很多时候,咱们须要自定义线程名字。咱们只须要本人实现ThreadFactory,用于创立特定场景的线程即可。

(三)回绝策略 -handler

所谓回绝策略,就是当线程池满了、队列也满了的时候,咱们对工作采取的措施。或者抛弃、或者执行、或者其余 …

jdk 自带 4 种回绝策略,咱们来看看。

  1. CallerRunsPolicy // 在调用者线程执行
  2. AbortPolicy // 间接抛出 RejectedExecutionException 异样
  3. DiscardPolicy // 工作间接抛弃,不做任何解决
  4. DiscardOldestPolicy // 抛弃队列里最旧的那个工作,再尝试执行当前任务

这四种策略各有优劣,比拟罕用的是 DiscardPolicy,然而这种策略有一个弊病就是工作执行的轨迹不会被记录下来。所以,咱们往往须要实现自定义的回绝策略,通过实现RejectedExecutionHandler 接口的形式。

五、其它

配置线程池的参数

后面咱们讲到了手动创立线程池波及到的几个参数,那么咱们要如何设置这些参数才算是正确的利用呢?实际上,须要依据工作的个性来剖析。

  1. 工作的性质:CPU 密集型、IO 密集型和混淆型
  2. 工作的优先级:高中低
  3. 工作执行的工夫:长中短
  4. 工作的依赖性:是否依赖数据库或者其余系统资源

不同的性质的工作,咱们采取的配置将有所不同。在《Java 并发编程实际》中有相应的计算公式。

通常来说,如果工作属于 CPU 密集型,那么咱们能够将线程池数量设置成 CPU 的个数,以缩小线程切换带来的开销。如果工作属于 IO 密集型,咱们能够将线程池数量设置得更多一些,比方 CPU 个数 *2。

PS:咱们能够通过 Runtime.getRuntime().availableProcessors() 来获取 CPU 的个数。

线程池监控

如果零碎中大量用到了线程池,那么咱们有必要对线程池进行监控。利用监控,咱们能在问题呈现前提前感知到,也能够依据监控信息来定位可能呈现的问题。

那么咱们能够监控哪些信息?又有哪些办法可用于咱们的扩大反对呢?

首先,ThreadPoolExecutor自带了一些办法。

  1. long getTaskCount(),获取曾经执行或正在执行的工作数
  2. long getCompletedTaskCount(),获取曾经执行的工作数
  3. int getLargestPoolSize(),获取线程池已经创立过的最大线程数,依据这个参数,咱们能够晓得线程池是否满过
  4. int getPoolSize(),获取线程池线程数
  5. int getActiveCount(),获取沉闷线程数(正在执行工作的线程数)

其次,ThreadPoolExecutor留给咱们自行处理的办法有 3 个,它在 ThreadPoolExecutor 中为空实现(也就是什么都不做)。

  1. protected void beforeExecute(Thread t, Runnable r) // 工作执行前被调用
  2. protected void afterExecute(Runnable r, Throwable t) // 工作执行后被调用
  3. protected void terminated() // 线程池完结后被调用

六、总结

  1. 尽量应用手动的形式创立线程池,防止应用 Executors 工厂类
  2. 依据场景,正当设置线程池的各个参数,包含线程池数量、队列、线程工厂和回绝策略

喜爱本文点个♥️赞♥️反对一下,如有须要,可通过微信 dream4s 与我分割。相干源码在 GitHub,视频解说在 B 站,本文珍藏在博客天地。


正文完
 0