ExecutorExecutors和ExecutorService

40次阅读

共计 5107 个字符,预计需要花费 13 分钟才能阅读完成。

Executor

Executor 是 java.util.concurrent 包中的一个接口,是一个执行提交的 Runnable 任务的对象。这个接口提供了一种方式把任务提交从每个任务会如何执行的方法中解耦,包括线城市用,调度等的细节。使用 Executor 代替了显式创建线程。例如,比起对一组 task 中的每一个调用new Thread(new(RunnableTask())).start(),你可以用:

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

但是,Executor 接口不是严格需要执行是异步的。在最简单的情况中,一个 executor 能够在调用者的线程上立即运行提交的任务:

class DirectExecutor implements Executor {public void execute(Runnable r) {r.run();
  }
}

更典型的是,任务执行在非调用者线程。下面的 executor 为每个 task 产出一个新的线程:

class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) {new Thread(r).start();}
}

很多 Executor 的实现强制加入了一些关于如何以及何时任务被调度的限制。下面的 executor 串行提交的任务到第二个 executor,表明它是一个混合的 executor:

class SerialExecutor implements Executor {final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
  final Executor executor;
  Runnable active;

  SerialExecutor(Executor executor) {this.executor = executor;}

  public synchronized void execute(final Runnable r) {tasks.offer(new Runnable() {public void run() {
        try {r.run();
        } finally {scheduleNext();
        }
      }
    });
    if (active == null) {scheduleNext();
    }
  }

  protected synchronized void scheduleNext() {if ((active = tasks.poll()) != null) {executor.execute(active);
    }
  }
}

在 java.util.concurrent 包中提供的 Executor 接口的实现(如 ThreadPoolExecutorScheduledThreadPoolExecutorForkJoinPoolAbstractExecutorService)也同时实现了ExecutorService,这是一个更广泛的接口。ThreadPoolExecutor 类提供了一个可扩展的线程池实现。Executors类为这些 Executors 提供了方便的工厂方法。

内存一致性效应:线程中在提交一个 Runnable 对象给一个 Executor 之前发生的操作 happen-before 执行这个 Runnable(可能在另一个线程中执行)。

实现 Executor 接口需要实现 execute 方法,定义如下:

void execute(Runnbale command)

这个方法在未来某个时间点执行给定的 command。这个 command 可能执行在一个新的线程中,在一个池化的线程中,或在调用者线程中,这取决于 Executor 的实现。

ExecutorService

ExecutorService 接口继承 Executor 接口,是提供管理终止的方法以及 produce 出 Future 去跟踪一个或多个异步任务进度的方法的 Executor。

一个 ExecutorService 可以被 shutdown,会导致它拒绝新的 tasks。提供了两个不同的方法去关闭一个 ExecutorService。shutdown方法允许之前提交的任务在终止之前执行,shutdownNow方法禁止等待已经开始任务并试图结束正在执行的任务。如果一个 ExecutorService 终止了,一个 executor 没有正在执行的活跃任务,没有等待执行的任务,也没有新任务能被提交。一个没有使用的 ExecutorService 应该被关闭以回收资源。

submit方法根据 Executor 的 execute(Runnable)方法扩展,创建并返回一个 Future,能够用来 cancel 执行以及等待执行完成。invokeAny方法以及 invokeAll 方法执行最普通的批量执行,执行一组任务然后等待至少一个,或者所有任务完成。

Executors 类为 ExecutorService 提供工厂方法。

使用举例

这里有一个网络服务,其中一个线程池中的线程为请求提供服务。它使用预配置的 Executors 的 newFixedThreadPool 工厂方法:

class NetworkService implements Runnable {
  private final ServerSocket serverSocket;
  private final ExecutorService pool;

  public NetworkService(int port, int poolSize) 
      throws IOException {serverSocket = new ServerSocket(port);
    pool = Executors.newFixedTreadPool(poolSize);
  }

  public void run() { // run the service
    try {for (;;) {pool.execute(new Handler(serverSocket.accept()));
      }
    } catch (IOException ex) {pool.shutdown();
    }
  }
}

class Handler implements Runnable {
  private final Socket socket;
  Handler(Socket socket) {this.socket = socket;}
  public void run() {// read and service request on socket}
}

下面的方法哦通过两步关闭一个 ExecutorService,首先调用 shutdown 以拒绝新来的 tasks,然后调用 shutdownNow(如果有必要的话),去 cancel 任何执行的任务:

void shutdownAndAwaitTermination(ExecutorService pool) {pool.shutdown(); // Disable new tasks from being submitted
  try {
    // Wait a while for existing tasks to terminate
    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {pool.shutdownNow(); // Cancel currently executing tasks
      // Wait a while for tasks to respond to being cancelled
      if (!pool.awaitTermination(60, TimeUnit.SECONDS))
        System.err.println("Pool did not terminate");
    }
  } catch (InterruptedException ie) {// (Re-)Cancel if current thread also interrupted
    pool.shutdownNow();
    // Preserve interrupt status
    Thread.currentThread.interrupt();}
}

ExecutorService 中定义的方法

shutdown

void shutdown()

启动有序关闭,执行先前提交的任务,但是不接受新的任务。如果已经关闭再次执行没有影响。

这个方法不等待先前提交的任务完成执行,使用 awaitTermination 去等待任务执行完毕。

shutdownNow

List<Runnable> shutdownNow()

试图停止所有正在执行的任务,停止等待任务的处理,返回一个等待执行的任务列表。

这个方法不等待正在执行的任务终止,使用 awaitTermination 去等待任务终止。

尽力(best-effort)去停止正在执行的任务。例如,标准实现会通过 Thread 的 interrupt 去 cancel 任务,因此任何回应终止失败的任务永远不会终止。

isShutdown

boolean isShutdown()

如果 executor 已经被 shutdown 则返回 true。

isTerminated

boolean isTerminated()

如果所有任务在 shutdown 之后都执行完成则返回 true。注意 isTerminated 永远不会为 true 除非 shutdown 或 shutdownNow 先执行。

awaitTermination

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

在一个 shutdown 请求后阻塞直到所有任务完成执行,或者 timeout 发生,或者当前线程被 interrupt,看哪个先发生。

submit

<T> Future<T> submit(Callable<T> task)

提交一个返回值的任务去执行并返回一个代表任务挂起结果的 Future 对象。Future 的 get 方法会返回成功完成的任务结果。

如果你想要立即阻塞等待一个任务完成,你可以使用result = exec.submit(aCallable).get()

submit

<T> Future<T> submit(Runnable task, T result)

提交一个 Runnable 任务去执行并返回一个代表这个任务的 Future。Future 的 get 方法会返回成功完成的任务结果。

submit

Future<?> submit(Runnable task)

提交一个 Runnable 任务去执行并返回一个代表这个任务的 Future。Future 的 get 方法会返回成功完成的任务结果 null。

invokeAll

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException

执行给定的一组任务,返回一个包含这些任务状态和结果的 Future 列表。Future 的 isDone 方法对返回列表中的每个元素调用都返回 true。

这个方法会阻塞,等待所有 task 完成。因此返回的 list 中的每个 Future 都是完成状态。

invokeAll

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
       throws InterruptedException

执行给定的一组任务,返回一个包含这些任务状态和结果的列表。所有任务完成或 timeout 超时时方法返回。Future 的 isDone 方法对返回列表中的每个元素调用都返回 true。

返回后,没有完成的任务被 cancel。

invokeAny

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
      throws InterruptedException, ExcutionException

执行给定的一组任务,如果其中一个任务成功完成(没有抛出异常)则返回。返回后,所有没完成的任务都被 cancel。

invokeAny

<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException

执行给定的一组任务,如果其中一个任务成功完成(没有抛出异常)则返回。

正文完
 0