乐趣区

关于高并发:高并发深度解析线程池中那些重要的顶层接口和抽象类

大家好,我是冰河~~

在上一篇《【高并发】不得不说的线程池与 ThreadPoolExecutor 类浅析》一文中,从整体上介绍了 Java 的线程池。如果细细品味线程池的底层源码实现,你会发现整个线程池体系的设计是十分优雅的!这些代码的设计值得咱们去细细品味和钻研,从中学习优雅代码的设计规范,造成本人的设计思维,为我所用!哈哈,说多了,接下来,咱们就来看看线程池中那些十分重要的接口和抽象类,深度剖析下线程池中是如何将形象这一思维使用的酣畅淋漓的!

通过对线程池中接口和抽象类的剖析,你会发现,整个线程池设计的是如此的优雅和弱小,从线程池的代码设计中,咱们学到的不只是代码而已!!

题外话:膜拜 Java 大神 Doug Lea,Java 中的并发包正是这位老爷子写的,他是这个世界上对 Java 影响力最大的一个人。

一、接口和抽象类总览

说起线程池中提供的重要的接口和抽象类,基本上就是如下图所示的接口和类。

接口与类的简略阐明:

  • Executor 接口:这个接口也是整个线程池中最顶层的接口,提供了一个无返回值的提交工作的办法。
  • ExecutorService 接口:派生自 Executor 接口,扩大了很过性能,例如敞开线程池,提交工作并返回后果数据、唤醒线程池中的工作等。
  • AbstractExecutorService 抽象类:派生自 ExecutorService 接口,实现了几个十分实现的办法,供子类进行调用。
  • ScheduledExecutorService 定时工作接口,派生自 ExecutorService 接口,领有 ExecutorService 接口定义的全副办法,并扩大了定时工作相干的办法。

接下来,咱们就别离从源码角度来看下这些接口和抽象类从顶层设计上提供了哪些性能。

二、Executor 接口

Executor 接口的源码如下所示。

public interface Executor {
    // 提交运行工作,参数为 Runnable 接口对象,无返回值
    void execute(Runnable command);
}

从源码能够看出,Executor 接口非常简单,只提供了一个无返回值的提交工作的 execute(Runnable) 办法。

因为这个接口过于简略,咱们无奈得悉线程池的执行后果数据,如果咱们不再应用线程池,也无奈通过 Executor 接口来敞开线程池。此时,咱们就须要 ExecutorService 接口的反对了。

三、ExecutorService 接口

ExecutorService 接口是非定时工作类线程池的外围接口,通过 ExecutorService 接口可能向线程池中提交工作(反对有返回后果和无返回后果两种形式)、敞开线程池、唤醒线程池中的工作等。ExecutorService 接口的源码如下所示。

package java.util.concurrent;
import java.util.List;
import java.util.Collection;
public interface ExecutorService extends Executor {

    // 敞开线程池,线程池中不再承受新提交的工作,然而之前提交的工作持续运行,直到实现
    void shutdown();
    
    // 敞开线程池,线程池中不再承受新提交的工作,会尝试进行线程池中正在执行的工作。List<Runnable> shutdownNow();
    
    // 判断线程池是否曾经敞开
    boolean isShutdown();
    
    // 判断线程池中的所有工作是否完结,只有在调用 shutdown 或者 shutdownNow 办法之后调用此办法才会返回 true。boolean isTerminated();

    // 期待线程池中的所有工作执行完结,并设置超时工夫
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    
    // 提交一个 Callable 接口类型的工作,返回一个 Future 类型的后果
    <T> Future<T> submit(Callable<T> task);
    
    // 提交一个 Callable 接口类型的工作,并且给定一个泛型类型的接管后果数据参数,返回一个 Future 类型的后果
    <T> Future<T> submit(Runnable task, T result);

    // 提交一个 Runnable 接口类型的工作,返回一个 Future 类型的后果
    Future<?> submit(Runnable task);

    // 批量提交工作并取得他们的 future,Task 列表与 Future 列表一一对应
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    
    // 批量提交工作并取得他们的 future,并限定解决所有工作的工夫
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit) throws InterruptedException;
    
    // 批量提交工作并取得一个曾经胜利执行的工作的后果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException; 
    
    // 批量提交工作并取得一个曾经胜利执行的工作的后果,并限定解决工作的工夫
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

对于 ExecutorService 接口中每个办法的含意,间接上述接口源码中的正文即可,这些接口办法都比较简单,我就不一一反复列举形容了。这个接口也是咱们在应用非定时工作类的线程池中最常应用的接口。

四、AbstractExecutorService 抽象类

AbstractExecutorService 类是一个抽象类,派生自 ExecutorService 接口,在其根底上实现了几个比拟实用的办法,提供给子类进行调用。咱们还是来看下 AbstractExecutorService 类的源码。

留神:大家能够到 java.util.concurrent 包下查看残缺的 AbstractExecutorService 类的源码,这里,我将 AbstractExecutorService 源码进行拆解,详解每个办法的作用。

  • newTaskFor 办法
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}

RunnableFuture 类用于获取执行后果,在理论应用时,咱们常常应用的是它的子类 FutureTask,newTaskFor 办法的作用就是将工作封装成 FutureTask 对象,后续将 FutureTask 对象提交到线程池。

  • doInvokeAny 办法
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    // 提交的工作为空,抛出空指针异样
    if (tasks == null)
        throw new NullPointerException();
    // 记录待执行的工作的残余数量
    int ntasks = tasks.size();
    // 工作汇合中的数据为空,抛出非法参数异样
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    // 以以后实例对象作为参数构建 ExecutorCompletionService 对象
    // ExecutorCompletionService 负责执行工作,前面调用用 poll 返回第一个执行后果
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    try {
        // 记录可能抛出的执行异样
        ExecutionException ee = null;
        // 初始化超时工夫
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();
    
        // 提交工作,并将返回的后果数据增加到 futures 汇合中
        // 提交一个工作次要是确保在进入循环之前开始一个工作
        futures.add(ecs.submit(it.next()));
        --ntasks;
        // 记录正在执行的工作数量
        int active = 1;

        for (;;) {
            // 从实现工作的 BlockingQueue 队列中获取并移除下一个将要实现的工作的后果。// 如果 BlockingQueue 队列中中的数据为空,则返回 null
            // 这里的 poll() 办法是非阻塞办法
            Future<T> f = ecs.poll();
            // 获取的后果为空
            if (f == null) {
                // 汇合中仍有未执行的工作数量
                if (ntasks > 0) {
                    // 未执行的工作数量减 1
                    --ntasks;
                    // 提交实现并将后果增加到 futures 汇合中
                    futures.add(ecs.submit(it.next()));
                    // 正在执行的工作数量加•1
                    ++active;
                }
                // 所有工作执行实现,并且返回了后果数据,则退出循环
                // 之所以解决 active 为 0 的状况,是因为 poll() 办法是非阻塞办法,可能导致未返回后果时 active 为 0
                else if (active == 0)
                    break;
                // 如果 timed 为 true,则执行获取后果数据时设置超时工夫,也就是超时获取后果示意
                else if (timed) {f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();}
                // 没有设置超时,并且所有工作都被提交了,则始终阻塞,直到返回一个执行后果
                else
                    f = ecs.take();}
            // 获取到执行后果,则将正在执行的工作减 1,从 Future 中获取后果并返回
            if (f != null) {
                --active;
                try {return f.get();
                } catch (ExecutionException eex) {ee = eex;} catch (RuntimeException rex) {ee = new ExecutionException(rex);
                }
            }
        }

        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {
        // 如果从所有执行的工作中获取到一个后果数据,则勾销所有执行的工作,不再向下执行
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}

这个办法是批量执行线程池的工作,最终返回一个后果数据的外围办法,通过源代码的剖析,咱们能够发现,这个办法只有获取到一个后果数据,就会勾销线程池中所有运行的工作,并将后果数据返回。这就好比是很多要进入一个居民小区一样,只有有一个人有门禁卡,门卫就不再查看其他人是否有门禁卡,间接放行。

在上述代码中,咱们看到提交工作应用的 ExecutorCompletionService 对象的 submit 办法,咱们再来看下 ExecutorCompletionService 类中的 submit 办法,如下所示。

public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

public Future<V> submit(Runnable task, V result) {if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture(f));
    return f;
}

能够看到,ExecutorCompletionService 类中的 submit 办法实质上调用的还是 Executor 接口的 execute 办法。

  • invokeAny 办法
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

这两个 invokeAny 办法实质上都是在调用 doInvokeAny 办法,在线程池中提交多个工作,只有返回一个后果数据即可。

间接看下面的代码,大家可能有点晕。这里,我举一个例子,咱们在应用线程池的时候,可能会启动多个线程去执行各自的工作,比方线程 A 负责 task_a,线程 B 负责 task_b,这样能够大规模晋升零碎解决工作的速度。如果咱们心愿其中一个线程执行实现返回后果数据时立刻返回,而不须要再让其余线程继续执行工作。此时,就能够应用 invokeAny 办法。

  • invokeAll 办法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    // 标识所有工作是否实现
    boolean done = false;
    try {
        // 遍历所有工作
        for (Callable<T> t : tasks) {
            将每个工作封装成 RunnableFuture 对象提交工作
            RunnableFuture<T> f = newTaskFor(t);
            // 将后果数据增加到 futures 汇合中
            futures.add(f);
            // 执行工作
            execute(f);
        }
        // 遍历后果数据汇合
        for (int i = 0, size = futures.size(); i < size; i++) {Future<T> f = futures.get(i);
            // 工作没有实现
            if (!f.isDone()) {
                try {
                    // 阻塞期待工作实现并返回后果
                    f.get();} catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}
        }
        // 工作实现(不论是失常完结还是异样实现)done = true;
        // 返回后果数据汇合
        return futures;
    } finally {
        // 如果产生中断异样 InterruptedException 则勾销曾经提交的工作
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        for (int i = 0; i < size; i++) {execute((Runnable)futures.get(i));
            // 在增加执行工作时超时判断,如果超时则立即返回 futures 汇合
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)
                return futures;
        }
         // 遍历所有工作
        for (int i = 0; i < size; i++) {Future<T> f = futures.get(i);
            if (!f.isDone()) {
                // 对后果进行判断时进行超时判断
                if (nanos <= 0L)
                    return futures;
                try {f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {} catch (ExecutionException ignore) {} catch (TimeoutException toe) {return futures;}
                // 重置工作的超时工夫
                nanos = deadline - System.nanoTime();}
        }
        done = true;
        return futures;
    } finally {if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

invokeAll 办法同样实现了无超时工夫设置和有超时工夫设置的逻辑。

无超时工夫设置的 invokeAll 办法总体逻辑为:将所有工作封装成 RunnableFuture 对象,调用 execute 办法执行工作,将返回的后果数据增加到 futures 汇合,之后对 futures 汇合进行遍历判断,检测工作是否实现,如果没有实现,则调用 get 办法阻塞工作,直到返回后果数据,此时会疏忽异样。最终在 finally 代码块中对所有工作是否实现的标识进行判断,如果存在未实现的工作,则勾销曾经提交的工作。

有超时设置的 invokeAll 办法总体逻辑与无超时工夫设置的 invokeAll 办法总体逻辑基本相同,只是在两个中央增加了超时的逻辑判断。一个是在增加执行工作时进行超时判断,如果超时,则立即返回 futures 汇合;另一个是每次对后果数据进行判断时增加了超时解决逻辑。

invokeAll 办法中实质上还是调用 Executor 接口的 execute 办法来提交工作。

  • submit 办法

submit 办法的逻辑比较简单,就是将工作封装成 RunnableFuture 对象并提交,执行工作后返回 Future 后果数据。如下所示。

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

从源码中能够看出 submit 办法提交工作时,实质上还是调用的 Executor 接口的 execute 办法。

综上所述,在非定时工作类的线程池中提交工作时,实质上都是调用的 Executor 接口的 execute 办法 。至于调用的是哪个具体实现类的 execute 办法,咱们在前面的文章中深入分析。

五、ScheduledExecutorService 接口

ScheduledExecutorService 接口派生自 ExecutorService 接口,继承了 ExecutorService 接口的所有性能,并提供了定时解决工作的能力,ScheduledExecutorService 接口的源代码比较简单,如下所示。

package java.util.concurrent;

public interface ScheduledExecutorService extends ExecutorService {

    // 延时 delay 工夫来执行 command 工作,只执行一次
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    // 延时 delay 工夫来执行 callable 工作,只执行一次
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    // 延时 initialDelay 工夫首次执行 command 工作,之后每隔 period 工夫执行一次
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
                                                  
    // 延时 initialDelay 工夫首次执行 command 工作,之后每延时 delay 工夫执行一次
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

至此,咱们剖析了线程池体系中重要的顶层接口和抽象类。

通过对这些顶层接口和抽象类的剖析,咱们须要从中感悟并领会软件开发中的抽象思维,深刻了解抽象思维在具体编码中的实现,最终,造成本人的编程思维,使用到理论的我的项目中,这也是咱们可能从源码中所能学到的泛滥细节之一。这也是高级或资深工程师和架构师必须理解源码细节的起因之一。

好了,明天就到这儿吧,我是冰河,咱们下期见~~

退出移动版