Java™ 教程(执行器)

47次阅读

共计 4709 个字符,预计需要花费 12 分钟才能阅读完成。

执行器
在前面的所有示例中,由新的线程(由其 Runnable 对象定义)和线程本身(由 Thread 对象定义)完成的任务之间存在紧密的联系,这适用于小型应用程序,但在大型应用程序中,将线程管理和创建与应用程序的其余部分分开是有意义的,封装这些函数的对象称为执行器,以下小节详细描述了执行器。

执行器接口定义三个执行器对象类型。
线程池是最常见的执行器实现类型。
Fork/Join 是一个利用多个处理器的框架(JDK 7 中的新增功能)。

执行器接口
java.util.concurrent 包定义了三个执行器接口:

Executor,一个支持启动新任务的简单接口。

ExecutorService,Executor 的子接口,它添加了有助于管理生命周期的功能,包括单个任务和执行器本身。

ScheduledExecutorService,ExecutorService 的子接口,支持将来和 / 或定期执行任务。

通常,引用执行器对象的变量被声明为这三种接口类型之一,而不是执行器类类型。
Executor 接口
Executor 接口提供单个方法 execute,旨在成为常见线程创建语法的替代方法,如果 r 是 Runnable 对象,并且 e 是 Executor 对象,则可以替换
(new Thread(r)).start();

e.execute(r);
但是,execute 的定义不太具体,低级别语法创建一个新线程并立即启动它,根据 Executor 实现,execute 可能会做同样的事情,但更有可能使用现有的工作线程来运行 r,或者将 r 放在队列中以等待工作线程变为可用(我们将在线程池的部分中描述工作线程)。
java.util.concurrent 中的执行器实现旨在充分利用更高级的 ExecutorService 和 ScheduledExecutorService 接口,尽管它们也可以与基本 Executor 接口一起使用。
ExecutorService 接口
ExecutorService 接口使用类似但更通用的 submit 方法补充 execute,与 execute 一样,submit 接受 Runnable 对象,但也接受 Callable 对象,这允许任务返回一个值。submit 方法返回一个 Future 对象,该对象用于检索 Callable 返回值并管理 Callable 和 Runnable 任务的状态。
ExecutorService 还提供了提交大量 Callable 对象的方法,最后,ExecutorService 提供了许多用于管理执行器关闭的方法,为了支持立即关闭,任务应该正确处理中断。
ScheduledExecutorService 接口
ScheduledExecutorService 接口使用 schedule 补充其父级 ExecutorService 的方法,在指定的延迟后执行 Runnable 或 Callable 任务,此外,接口定义了 scheduleAtFixedRate 和 scheduleWithFixedDelay,它们以定义的间隔重复执行指定的任务。
线程池
java.util.concurrent 中的大多数执行器实现都使用由工作线程组成的线程池,这种线程与它执行的 Runnable 和 Callable 任务分开存在,通常用于执行多个任务。
使用工作线程可以最小化由于创建线程而带来的开销,线程对象使用大量内存,在大型应用程序中,分配和释放许多线程对象会产生大量的内存管理开销。
一种常见类型的线程池是固定线程池,这种类型的池始终具有指定数量的线程,如果一个线程在它仍在使用时以某种方式被终止,它将自动被一个新线程替换,任务通过内部队列提交到池中,当活动任务多于线程时,该队列将保存额外的任务。
固定线程池的一个重要优点是使用它的应用程序可以优雅地降级,要理解这一点,请考虑一个 Web 服务器应用程序,其中每个 HTTP 请求都由一个单独的线程处理。如果应用程序只是为每个新的 HTTP 请求创建一个新线程,并且系统接收的请求数量超过了可以立即处理的数量,当所有这些线程的开销超过系统容量时,应用程序将突然停止响应所有请求。由于可以创建的线程数量有限制,应用程序不会像 HTTP 请求进入时那样快地为它们提供服务,而是以系统能够承受的最快速度为它们提供服务。
创建使用固定线程池的执行器的一种简单方法是在 java.util.concurrent.Executors 中调用 newFixedThreadPool 工厂方法,该类还提供以下工厂方法:

newCachedThreadPool 方法使用可扩展线程池创建执行器,此执行器适用于启动许多短期任务的应用程序。

newSingleThreadExecutor 方法创建一次执行单个任务的执行器。
有几个工厂方法是上述执行器的 ScheduledExecutorService 版本。

如果上述工厂方法提供的执行器均无法满足你的需求,构造 java.util.concurrent.ThreadPoolExecutor 或 java.util.concurrent.ScheduledThreadPoolExecutor 的实例将为你提供额外选项。
Fork/Join
fork/join 框架是 ExecutorService 接口的一个实现,可帮助你利用多个处理器,它专为可以递归分解成小块的工作而设计,目标是使用所有可用的处理能力来增强应用程序的性能。
与任何 ExecutorService 实现一样,fork/join 框架将任务分配给线程池中的工作线程,fork/join 框架是不同的,因为它使用了工作窃取算法,没有事情可做的工作线程可以从仍然忙碌的其他线程中窃取任务。
fork/join 框架的中心是 ForkJoinPool 类,它是 AbstractExecutorService 类的扩展,ForkJoinPool 实现了核心工作窃取算法,可以执行 ForkJoinTask 进程。
基础用法
使用 fork/join 框架的第一步是编写执行工作片段的代码,你的代码应类似于以下伪代码:
if (我的工作部分足够小)
直接做这项工作
else
把我的工作分成两块
调用这两块并等待结果
将此代码包装在 ForkJoinTask 子类中,通常使用其更专业的类型之一,RecursiveTask(可以返回结果)或 RecursiveAction。
在 ForkJoinTask 子类准备就绪后,创建表示要完成的所有工作的对象,并将其传递给 ForkJoinPool 实例的 invoke() 方法。
模糊清晰度
为了帮助你了解 fork/join 框架的工作原理,请考虑以下示例,假设你想模糊图像,原始源图像由整数数组表示,其中每个整数包含单个像素的颜色值,模糊的目标图像也由与源相同大小的整数数组表示。
通过一次一个像素地处理源数组来完成模糊,将每个像素与其周围像素进行平均 (对红色、绿色和蓝色组件进行平均),并将结果放置在目标数组中,由于图像是大型数组,因此此过程可能需要很长时间,通过使用 fork/join 框架实现的算法,你可以利用多处理器系统上的并发处理,这是一个可能的实现:
public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;

// Processing window size; should be odd.
private int mBlurWidth = 15;

public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
}

protected void computeDirectly() {
int sidePixels = (mBlurWidth – 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0),
mSource.length – 1);
int pixel = mSource[mindex];
rt += (float)((pixel & 0x00ff0000) >> 16)
/ mBlurWidth;
gt += (float)((pixel & 0x0000ff00) >> 8)
/ mBlurWidth;
bt += (float)((pixel & 0x000000ff) >> 0)
/ mBlurWidth;
}

// Reassemble destination pixel.
int dpixel = (0xff000000) |
(((int)rt) << 16) |
(((int)gt) << 8) |
(((int)bt) << 0);
mDestination[index] = dpixel;
}
}


现在,你实现抽象的 compute() 方法,该方法可以直接执行模糊或将其拆分为两个较小的任务,简单的数组长度阈值有助于确定是执行还是拆分工作。
protected static int sThreshold = 100000;

protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
}

int split = mLength / 2;

invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength – split,
mDestination));
}
如果以前的方法在 RecursiveAction 类的子类中,那么将任务设置为在 ForkJoinPool 中运行是很简单的,涉及以下步骤:

创建一个代表要完成的所有工作的任务。
// source image pixels are in src
// destination image pixels are in dst
ForkBlur fb = new ForkBlur(src, 0, src.length, dst);

创建将运行任务的 ForkJoinPool。
ForkJoinPool pool = new ForkJoinPool();

运行任务。
pool.invoke(fb);

有关完整源代码(包括创建目标图像文件的一些额外代码),请参阅 ForkBlur 示例。
标准实现
除了使用 fork/join 框架来实现在多处理器系统上同时执行任务的自定义算法(例如 ForkBlur.java 示例),Java SE 中已经使用 fork/join 框架实现了一些通常有用的功能,在 Java SE 8 中引入的一种这样的实现被 java.util.Arrays 类用于其 parallelSort() 方法,这些方法类似于 sort(),但通过 fork/join 框架利用并发性。在多处理器系统上运行时,大型数组的并行排序比顺序排序更快,但是,这些方法如何利用 fork/join 框架超出了 Java 教程的范围,有关此信息,请参阅 Java API 文档。
fork/join 框架的另一个实现由 java.util.streams 包中的方法使用,这是 Project Lambda 计划用于 Java SE 8 版本的一部分,有关更多信息,请参阅 Lambda 表达式部分。

上一篇:Lock 对象

正文完
 0