乐趣区

关于java:既生-ExecutorService-何生-CompletionService

前言

在 我会手动创立线程,为什么要应用线程池? 中具体的介绍了 ExecutorService,能够将整块工作拆分做简略的并行处理;

在 不会用 Java Future,我狐疑你泡茶没我快 中又具体的介绍了 Future 的应用,填补了 Runnable 不能获取线程执行后果的空缺

将二者联合起来应用看似要一招吃天下了(Java 有并发,并发之大,一口吃不下), but ~~ 是我太天真

ExecutorService VS CompletionService

假如咱们有 4 个工作 (A, B, C, D) 用来执行简单的计算,每个工作的执行工夫随着输出参数的不同而不同,如果将工作提交到 ExecutorService,置信你曾经能够“信手拈来”

ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));

// 遍历 Future list,通过 get() 办法获取每个 future 后果
for (Future future:futures) {Integer result = future.get();
    // 其余业务逻辑
}

先直入主题,用 CompletionService 实现同样的场景

ExecutorService executorService = Executors.newFixedThreadPool(4);

// ExecutorCompletionService 是 CompletionService 惟一实现类
CompletionService executorCompletionService= new ExecutorCompletionService<>(executorService);

List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));

// 遍历 Future list,通过 get() 办法获取每个 future 后果
for (int i=0; i<futures.size(); i++) {Integer result = executorCompletionService.take().get();
    // 其余业务逻辑
}

两种形式在代码实现上简直一毛一样,咱们已经说过 JDK 中不会反复造轮子,如果要造一个新轮子,必然是原有的轮子在某些场景的应用上有致命缺点

既然新轮子进去了,二者到底有啥不同呢?在 搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别?文中,咱们提到了 Future get() 办法的致命缺点:

如果 Future 后果没有实现,调用 get() 办法,程序会 阻塞 在那里,直至获取返回后果

先来看第一种实现形式,假如工作 A 因为参数起因,执行工夫绝对工作 B,C,D 都要长很多,然而依照程序的执行程序,程序在 get() 工作 A 的执行后果会阻塞在那里,导致工作 B,C,D 的后续工作没方法执行。又因为每个工作执行工夫是不固定的,所以无论怎样调整将工作放到 List 的程序,都不适合,这就是致命弊病

新轮子天然要解决这个问题,它的设计理念就是哪个工作先执行实现,get() 办法就会获取到相应的工作后果,这么做的益处是什么呢?来看个图你就霎时了解了

两张图一比照,执行时长高下立判了,在当今高并发的时代,这点时间差,在吞吐量上起到的成果可能不是一点半点了

那 CompletionService 是怎么做到获取最先执行完的工作后果的呢?

远看 CompletionService 轮廓

如果你应用过音讯队列,你应该秒懂我要说什么了,CompletionService 实现原理很简略

就是一个将异步工作的生产和工作实现后果的生产解耦的服务

用人话解释一下下面的抽象概念我只能再画一张图了

说白了,哪个工作执行的完,就间接将执行后果放到队列中,这样消费者拿到的后果天然就是最早拿到的那个了

从上图中看到,有 工作 ,有 后果队列,那 CompletionService 天然也要围绕着几个关键字做文章了

  • 既然是异步工作,那天然可能用到 Runnable 或 Callable
  • 既然能获取到后果,天然也会用到 Future 了

带着这些线索,咱们走进 CompletionService 源码看一看

近看 CompletionService 源码

CompletionService 是一个接口,它简略的只有 5 个办法:

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

对于 2 个 submit 办法,我在 不会用 Java Future,我狐疑你泡茶没我快 文章中做了十分具体的剖析以及案例应用阐明,这里不再过多赘述

另外 3 个办法都是从阻塞队列中获取并移除阻塞队列第一个元素,只不过他们的性能略有不同

  • Take: 如果 队列为空 ,那么调用 take() 办法的线程会 被阻塞
  • Poll: 如果 队列为空 ,那么调用 poll() 办法的线程会 返回 null
  • Poll-timeout: 以 超时的形式 获取并移除阻塞队列中的第一个元素,如果超时工夫到,队列还是空,那么该办法会返回 null

所以说,按大类划分下面 5 个办法,其实就是两个性能

  • 提交异步工作(submit)
  • 从队列中拿取并移除第一个元素 (take/poll)

CompletionService 只是接口,ExecutorCompletionService 是该接口的惟一实现类

ExecutorCompletionService 源码剖析

先来看一下类构造, 实现类外面并没有多少内容

<fancybox></fancybox>

ExecutorCompletionService 有两种构造函数:

private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;

public ExecutorCompletionService(Executor executor) {if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();}

public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) {if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;
}

两个构造函数都须要传入一个 Executor 线程池,因为是解决异步工作的,咱们是不被容许手动创立线程的,所以这里要应用线程池也就很好了解了

另外一个参数是 BlockingQueue,如果不传该参数,就会默认队列为 LinkedBlockingQueue,工作执行后果就是退出到这个阻塞队列中的

所以要彻底了解 ExecutorCompletionService,咱们只须要晓得一个问题的答案就能够了:

它是如何将异步工作后果放到这个阻塞队列中的?

想晓得这个问题的答案,那只须要看它提交工作之后都做了些什么?

public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

咱们后面也剖析过,execute 是提交 Runnable 类型的工作,自身得不到返回值,但又能够将执行后果放到阻塞队列外面,所以必定是在 QueueingFuture 外面做了文章

从上图中看一看出,QueueingFuture 实现的接口十分多,所以说也就具备了相应的接口能力。

重中之重是,它继承了 FutureTask,FutureTask 重写了 Runnable 的 run() 办法 (办法细节剖析能够查看 FutureTask 源码剖析) 文中具体阐明了,无论是 set() 失常后果,还是 setException() 后果,都会调用 finishCompletion() 办法:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

      // 重点 重点 重点
    done();

    callable = null;        // to reduce footprint
}

上述办法会执行 done() 办法,而 QueueingFuture 凑巧重写了 FutureTask 的 done() 办法:

办法实现很简略,就是将 task 放到阻塞队列中

protected void done() {completionQueue.add(task); 
}

执行到此的 task 曾经是前序步骤 set 过后果的 task,所以就能够通过生产阻塞队列获取相应的后果了

置信到这里,CompletionService 在你背后应该没什么机密可言了

CompletionService 的主要用途

在 JDK docs 上明确给了两个例子来阐明 CompletionService 的用处:

假如你有一组针对某个问题的 solvers,每个都返回一个类型为 Result 的值,并且想要并发地运行它们,解决每个返回一个非空值的后果,在某些办法应用(Result r)

其实就是文中结尾的应用形式

 void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     for (Callable<Result> s : solvers)
         ecs.submit(s);
     int n = solvers.size();
     for (int i = 0; i < n; ++i) {Result r = ecs.take().get();
         if (r != null)
             use(r);
     }
 }

假如你想应用工作集的第一个非空后果,疏忽任何遇到异样的工作,并在第一个工作筹备好时勾销所有其余工作

void solve(Executor e,
            Collection<Callable<Result>> solvers)
     throws InterruptedException {
     CompletionService<Result> ecs
         = new ExecutorCompletionService<Result>(e);
     int n = solvers.size();
     List<Future<Result>> futures
         = new ArrayList<Future<Result>>(n);
     Result result = null;
     try {for (Callable<Result> s : solvers)
             futures.add(ecs.submit(s));
         for (int i = 0; i < n; ++i) {
             try {Result r = ecs.take().get();
                 if (r != null) {
                     result = r;
                     break;
                 }
             } catch (ExecutionException ignore) {}}
     }
     finally {for (Future<Result> f : futures)
               // 留神这里的参数给的是 true,详解同样在前序 Future 源码剖析文章中
             f.cancel(true);
     }

     if (result != null)
         use(result);
 }

这两种形式都是十分经典的 CompletionService 应用 范式,请大家认真品尝每一行代码的用意

范式没有阐明 Executor 的应用,应用 ExecutorCompletionService,须要本人创立线程池,看上去尽管有些麻烦,但益处是你能够让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能防止几个特地耗时的工作拖垮整个利用的危险(这也是咱们重复说过屡次的,不要所有业务共用一个线程池

总结

CompletionService 的利用场景还是十分多的,比方

  • Dubbo 中的 Forking Cluster
  • 多仓库文件 / 镜像下载(从最近的服务中心下载后终止其余下载过程)
  • 多服务调用(天气预报服务,最先获取到的后果)

CompletionService 岂但能满足获取最快后果,还能起到肯定 “load balancer” 作用,获取可用服务的后果,应用也非常简单,只须要遵循范式即可

并发系列 讲了这么多,剖析源码的过程也碰到各种队列,接下来咱们就看看那些让人目迷五色的队列

灵魂诘问

  1. 通常处理结果还会用异步形式进行解决,如果采纳这种形式,有哪些注意事项?
  2. 如果是你,你会抉择应用无界队列吗?为什么?

日拱一兵 | 原创

退出移动版