关于java:Java-线程池详解

38次阅读

共计 13713 个字符,预计需要花费 35 分钟才能阅读完成。

1 概述

本文次要解说了 Java 外面线程池的接口以及实现类,以及它们的根本应用办法,内容包含:

  • Executor/Executors
  • ExecutorService
  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

2 两个重要的接口:Executor+ExecutorService

Executor是一个接口,外面只是定义了一个简略的工作提交办法:

//Executor
package java.util.concurrent;

public interface Executor {void execute(Runnable var1);
}

ExecutorService 也是一个接口,继承了Executor,并且提供了更多用于工作提交和治理的一些办法,比方进行工作的执行等:

//ExecutorService
package java.util.concurrent;

import java.util.Collection;
import java.util.List;

public interface ExecutorService extends Executor {void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long var1, TimeUnit var3) throws InterruptedException;

    <T> Future<T> submit(Callable<T> var1);

    <T> Future<T> submit(Runnable var1, T var2);

    Future<?> submit(Runnable var1);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> var1, long var2, TimeUnit var4) throws InterruptedException, ExecutionException, TimeoutException;
}

上面将具体讲述 ExecutorService 的两个重要实现:

  • ThreadPoolExecutor
  • ScheduledThreadPoolExecutor

3 ThreadPoolExecutor

这就是通常所说的线程池类,通常来说,一个线程池有如下特色:

  • 线程池有肯定数量的工作线程
  • 线程数量以及工作数量会受到肯定的管制和治理
  • 工作的执行以异步的形式进行
  • 线程池会负责执行工作的信息统计

3.1 一个简略的例子

先来看一个简略的例子:

public class Main {public static void main(String[] args) throws Exception {ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        // 执行没有返回值的工作
        executor.execute(()-> System.out.println("Execute the runnable task."));
        // 执行带返回值的工作,用到了 Future 泛型类
        Future<String> future = executor.submit(()->"Execute the callable task and this is the result.");
        // 通过 get()获取工作后果,get()会在工作未实现时始终阻塞
        System.out.println(future.get());
           // 手动敞开线程池
        executor.shutdown();}
}

从这个简略的例子能够看到,线程池能够执行带返回值以及不带返回值的工作,带返回值的话须要应用 get() 办法阻塞获取。另外,运行结束后须要手动敞开线程池,否则 JVM 不会退出,因为线程池中有指定数量的沉闷线程数量,而 JVM 失常退出的条件是 JVM 过程中不存在任何运行着的非守护过程。

3.2 构造方法

构造方法的源码如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

尽管提供了四个构造方法,但实质上调用的是最初一个构造方法,该构造方法带有 7 个参数,别离是:

  • corePoolSize:外围线程数量,即便当线程池中的外围线程不工作,外围线程的数量也不会缩小。该参数的最小值为 0,且小于等于maximumPoolSize
  • maximumPoolSize:用于设置线程池中容许的线程数量的最大值
  • keepAliveTime:当线程池中的线程数量超过外围线程数并且处于闲暇时,线程池将会回收一部分线程让出系统资源,该参数可用于设置超过 corePoolSize 数量的线程在多长时间后被回收,与后一个示意工夫单位的参数 unit 配合应用
  • unit:用于设定 keepAliveTime 的工夫单位
  • workQueure:用于寄存已提交至线程池但未被执行的工作
  • threadFactory:用于创立线程的工厂,开发者能够自定义 ThreadFactory 来创立线程
  • handler:回绝策略,当工作超过阻塞队列的边界时,线程池会回绝新增的工作,次要用于设置回绝策略

3.3 工作执行流程

线程池被胜利创立后,外部的运行线程并不会被立刻创立,ThreadPoolExecutor会采纳一种 Lazy 的形式去创立并且运行。首次调用执行工作办法时才会创立线程,比方:

ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
assert executor.getActiveCount() == 0;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;
executor.execute(()-> System.out.println("Execute the runnable task."));
assert executor.getActiveCount() == 1;
assert executor.getMaximumPoolSize() == 4;
assert executor.getCorePoolSize() == 2;

(运行的时候请加上 -ea 参数)

上面看一下工作的具体执行流程:

  • 如果运行中线程数少于外围线程数,创立新线程并立刻执行工作
  • 如果运行中的线程大于等于外围线程数,且工作队列未满时,会将工作先放进工作队列,直到运行中的线程数执行实现本人的工作后,再去轮询工作队列以获取工作运行
  • 如果工作队列已满,且运行中的线程数量小于最大线程数量时,线程池会创立线程执行工作,创立的线程数量会少于最大线程数
  • 如果工作队列已满且运行中的线程数量已达到最大线程数量,并且此刻没有闲暇的运行线程,会执行工作回绝策略,取决于RejectedEcecutionHandler
  • 若线程池中的线程是闲暇的,且闲暇工夫达到 keepAliveTime 指定工夫,会回收线程,直到保留 corePoolSize 个外围线程为止(不过外围线程也能够设置被超时回收,默认不开启外围线程超时)

3.4 线程工厂

线程工厂 ThreadFactory 是一个接口:

package java.util.concurrent;

public interface ThreadFactory {Thread newThread(Runnable var1);
}

应用线程工厂能够在创立线程时退出自定义配置,比方指定名字、优先级、是否为守护线程等,比方上面是线程工厂的一个简略实现:

public class TestThreadFactory implements ThreadFactory {
    private final static String PREFIX = "Test thread[";
    private final static String SUFFIX = "]";
    private final static AtomicInteger THREAD_NUM = new AtomicInteger();
    @Override
    public Thread newThread(Runnable runnable) {ThreadGroup group = new ThreadGroup("My pool");
        Thread thread = new Thread(group,runnable,PREFIX+THREAD_NUM.getAndIncrement()+SUFFIX);
        thread.setPriority(5);
        return thread;
    }
}

3.5 回绝策略

默认状况下,ThreadPoolExecutor提供了四种回绝策略:

  • DiscardPolicy:抛弃策略,间接抛弃工作
  • AbortPolicy:终止策略,抛出RejectedExecutionException
  • DiscardOldestPolicy:抛弃队列中最老工作的策略(严格意义来说须要依据工作队列去抉择,因为不是所有的队列都是 FIFO 的)
  • CallerRunsPolicy:调用者线程执行策略,工作会在以后线程中阻塞执行

当然,如果不能满足需要,能够实现 RejectedExecutionHandler 接口去自定义策略:

public interface RejectedExecutionHandler {void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}

3.6 敞开线程池

如果不须要线程池,那么须要手动对线程池敞开。线程池提供了如下三种形式:

  • 有序敞开:shutdown()
  • 立刻敞开:shutdownNow()
  • 组合敞开:shutdown()+shutdownNow()

3.6.1 有序敞开

shutdown()提供了一种有序敞开的形式去敞开线程池,调用该办法后,会期待以后执行的工作全副执行实现而后敞开,同时新提交工作将会被回绝。留神该办法是非阻塞,立刻返回的。如果须要查看敞开状态,能够应用:

  • isShutdown():返回是否调用了 shutdown() 的后果
  • isTerminating():返回是否正在完结中
  • isTerminated():返回是否曾经完结

3.6.2 立刻敞开

shutdownNow()办法首先将线程池状态批改为 shutdown 状态,而后将未被执行的工作挂起,接着将尝试中断运行中的线程,最初返回未执行的工作:

public static void main(String[] args) throws Exception {ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    IntStream.range(0,10).forEach(i-> executor.execute(()-> {
        try{TimeUnit.SECONDS.sleep(5);
        }catch (Exception e){e.printStackTrace();
        }
    }));
    List<Runnable> runnables = executor.shutdownNow();
    System.out.println(runnables.size());
}

输入:

8

BUILD SUCCESSFUL in 326ms
2 actionable tasks: 2 executed
java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at java.base/java.lang.Thread.sleep(Thread.java:339)
    at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
    at com.company.Main.lambda$main$0(Main.java:29)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at java.base/java.lang.Thread.sleep(Thread.java:339)
    at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:446)
    at com.company.Main.lambda$main$0(Main.java:29)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
3:14:36 AM: Task execution finished 'Main.main()'.

3.6.3 组合敞开

为了确保安全敞开线程池,个别会应用组合形式敞开,确保正在运行的工作被失常执行的同时又能进步线程池被敞开的成功率,例子如下:

 ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new TestThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
IntStream.range(0,10).forEach(i-> executor.execute(()-> {
    try{TimeUnit.SECONDS.sleep(5);
    }catch (Exception e){e.printStackTrace();
    }
}));

// 首先调用 shutdown()尝试敞开
executor.shutdown();
try{
    // 如果期待一段时间后还没敞开
    if(!executor.awaitTermination(10,TimeUnit.SECONDS)){
        // 强制敞开
        executor.shutdownNow();
        // 如果强制敞开失败,比方运行的线程异样耗时且不能被中断
        if(!executor.awaitTermination(10,TimeUnit.SECONDS)){
            // 其余解决,这里只是输入中断失败的信息
            System.out.println("Terminate failed.");
        }
    }
}catch (InterruptedException e){
    // 如果以后线程被中断,并且捕捉了异样,执行立刻敞开办法
    executor.shutdownNow();
    // 从新抛出中断信号
    Thread.currentThread().interrupt();
}

4 ScheduledThreadPoolExecutor

ScheduledExecutorService继承了 ExecutorService,并且提供了工作被定时执行的个性,能够应用ScheduledThreadPoolExecutor 去实现某些非凡的工作执行。当然实现固定工作的办法或者框架有很多,有原生的 shell 实现,老式的 Timer/TimerTask 实现,或者专门的框架 Quartz 实现,这里要说的是 JDK 外部的实现ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承了 ThreadPoolExecutor,除了具备ThreadPoolExecutor 的所有办法外,还定义了 4 个与 schedule 无关的办法:

  • <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):一个one-shot(只执行一次)的办法,工作(callable)会在单位工夫(delay)后被执行,并且立刻返回ScheduledFuture
  • ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit):也是一个 one-shot 办法,工作会在单位工夫后被执行,与第一个办法不同的是返回的 ScheduledFuture 不蕴含任何执行后果,然而能够通过返回的 ScheduledFuture 判断工作是否执行完结
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):工作会依据固定的速率在 initialDelay 后一直被执行
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):工作将以固定提早单位工夫的形式执行工作

对于后两者的区别如下:

public static void main(String[] args) throws Exception {ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
    Runnable runnable = ()->{long startTimestamp = System.currentTimeMillis();
        System.out.println("current timestamp:"+startTimestamp);
        try{TimeUnit.MILLISECONDS.sleep(current().nextInt(100));
        }catch (Exception e){e.printStackTrace();
        }
        System.out.println("elapsed time:"+(System.currentTimeMillis() - startTimestamp));
    };

    executor.scheduleAtFixedRate(runnable,10,1000,TimeUnit.MILLISECONDS);
//        executor.scheduleWithFixedDelay(runnable,10,1000,TimeUnit.MILLISECONDS);
}

输入:

current timestamp: 1619351675438
elapsed time: 97
current timestamp: 1619351676438
elapsed time: 85
current timestamp: 1619351677438
elapsed time: 1
current timestamp: 1619351678438
elapsed time: 1
current timestamp: 1619351679438
elapsed time: 68
current timestamp: 1619351680438
elapsed time: 99

能够看到工作始终以一种固定的速率运行,每次运行的开始工夫始终相隔1000ms

而应用 FixedDelay 的输入如下:

current timestamp: 1619351754890
elapsed time: 53
current timestamp: 1619351755944
elapsed time: 30
current timestamp: 1619351756974
elapsed time: 13
current timestamp: 1619351757987
elapsed time: 80
current timestamp: 1619351759068
elapsed time: 94
current timestamp: 1619351760162
elapsed time: 29

每次开始的工夫为上一次执行实现后的工夫再加上工夫距离(1000ms)。

5 Executors中的线程池

Executors类提供了六种创立线程池的静态方法:

  • FixedThreadPool
  • SingleThreadExecutor
  • CachedThreadPool
  • ScheduledThreadPool
  • SingleThreadScheduledExecutor
  • WorkStealingPool

上面别离来看一下。

5.1 FixedThreadPool

源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory);
}

FixedThreadPool底层调用的是ThreadPoolExecutor,默认创立的外围线程数与最大线程数相等,工作队列为无边界的LinkedBlockingQueue

5.2 SingleThreadExecutor

相干源码如下:

public static ExecutorService newSingleThreadExecutor() {return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory));
}

private static class FinalizableDelegatedExecutorService extends Executors.DelegatedExecutorService {FinalizableDelegatedExecutorService(ExecutorService executor) {super(executor);
    }

    protected void finalize() {super.shutdown();
    }
}

能够看到 SingleThreadPool 实际上是外部类 FinalizableDelegatedExecutorService 的包装,外围线程与最大线程数均为 1,工作队列为无边界的 LinkedBlockingQueue。产生GC 的时候,会调用 shutdown() 办法。

5.3 CachedThreadPool

源码如下:

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory);
}

CachedThreadPool会依据须要创立新线程,通常用于执行量大的,耗时较短的异步工作。未被应用且闲暇工夫超过 60s 的线程会被回收。

5.4 ScheduledThreadPool

源码如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

创立指定外围数ScheduledThreadPoolExecutor

5.5 SingleThreadScheduledExecutor

源码如下:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));
}

private static class DelegatedScheduledExecutorService extends Executors.DelegatedExecutorService implements ScheduledExecutorService {
    private final ScheduledExecutorService e;

    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {super(executor);
        this.e = executor;
    }

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {return this.e.schedule(command, delay, unit);
    }

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {return this.e.schedule(callable, delay, unit);
    }

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {return this.e.scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {return this.e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }
}

其实就是SingelThreadPool+ScheduledThreadPool

5.6 WorkStealingPool

源码如下:

public static ExecutorService newWorkStealingPool(int parallelism) {return new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
}

public static ExecutorService newWorkStealingPool() {return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, (UncaughtExceptionHandler)null, true);
}

WorkStealingPoolJDK8 引入的线程池,返回的是 ForkJoinPool。在WorkStealingPool 中,如果每个线程解决的工作执行比拟耗时,那么它负责的工作会被其余线程“窃取”,进而进步并发解决的效率。

正文完
 0