共计 4412 个字符,预计需要花费 12 分钟才能阅读完成。
一、并行流
1. 将顺序流转换为并行流
对顺序流调用 parallel 方法:
public static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
它在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 之后进行的所有操作都并行执行。类似地,你只需要对并行流调用 sequential 方法就可以把它变成顺序流。但最后一次 parallel 或 sequential 调用会影响整个流水线。
2. 测量流性能
- iterate 生成的是装箱的对象,必须拆箱成数字才能求和;
- 我们很难把 iterate 分成多个独立块来并行执行。
iterate 很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果,整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。
3. 正确使用并行流
错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。
public class Accumulator {
public long total = 0;
public void add(long value) {total += value;}
}
public static long sideEffectParallelSum(long n) {Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
上面的示例在本质上就是顺序的, 每次访问 total 都会出现数据竞争. 由于多个线程在同时访问累加器,执行 total += value,而这一句虽然看似简单,却不是一个原子操作。所得的结果也是不可控的(错误的)。
4. 高效使用并行流
- 留意装箱
- 有些操作本身在并行流上的性能就比顺序流差
- 还要考虑流的操作流水线的总计算成本。设 N 是要处理的元素的总数,Q 是一个元素通过流水线的大致处理成本,则 N * Q 就是这个对成本的一个粗略的定性估计。Q 值较高就意味着使用并行流时性能好的可能性比较大
- 对于较小的数据量,选择并行流几乎从来都不是一个好的决定
- 要考虑流背后的数据结构是否易于分解
- 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。
- 还要考虑终端操作中合并步骤的代价是大是小
二、分支 / 合并框架(Fork/Join)
详见第六章相关内容
注意:不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。相反,你应该始终直接调用 compute 或 fork 方法,只有顺序代码才应该用 invoke 来启动并行计算。
三、Spliterator
Spliterator是 Java 8 中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和 Iterator 一样,Spliterator 也用于遍历数据源中的元素,但它是为了并行执行而设计的。
Spliterator 接口
public interface Spliterator<T> {boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();}
与往常一样,T 是 Spliterator 遍历的元素的类型。tryAdvance 方法的行为类似于普通的 Iterator,因为它会按顺序一个一个使用 Spliterator 中的元素,并且如果还有其他元素要遍历就返回 true。但 trySplit 是专为 Spliterator 接口设计的,因为它可以把一些元素划出去分给第二个 Spliterator(由该方法返回),让它们两个并行处理。Spliterator 还可通过 estimateSize 方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点.
1. 拆分过程
将 Stream 拆分成多个部分的算法是一个递归过程,如图所示。第一步是对第一个 Spliterator 调用 trySplit,生成第二个 Spliterator。第二步对这两个 Spliterator 调用 trysplit,这样总共就有了四个 Spliterator。这个框架不断对 Spliterator 调用 trySplit 直到它返回 null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的 Spliterator 在调用 trySplit 时都返回了 null。
2. 实现你自己的 Spliterator
文中提到了reduce 的三参数重载方法
<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner)
它的三个参数:
- identity: 一个初始化的值;这个初始化的值其类型是泛型 U,与 Reduce 方法返回的类型一致;注意此时 Stream 中元素的类型是 T,与 U 可以不一样也可以一样,这样的话操作空间就大了;不管 Stream 中存储的元素是什么类型,U 都可以是任何类型,如 U 可以是一些基本数据类型的包装类型 Integer、Long 等;或者是 String,又或者是一些集合类型 ArrayList 等;后面会说到这些用法。
- accumulator: 其类型是 BiFunction,输入是 U 与 T 两个类型的数据,而返回的是 U 类型;也就是说返回的类型与输入的第一个参数类型是一样的,而输入的第二个参数类型与 Stream 中元素类型是一样的。
- combiner: 其类型是 BinaryOperator,支持的是对 U 类型的对象进行操作;
第三个参数 combiner 主要是使用在并行计算的场景下;如果 Stream 是非并行时,第三个参数实际上是不生效的。
代码实现:
class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCounter accumulate(Character c) {if (Character.isWhitespace(c)) {
return lastSpace ?
this :
new WordCounter(counter, true);
} else {
return lastSpace ?
new WordCounter(counter + 1, false) :
this;
}
}
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.counter,
wordCounter.lastSpace);
}
public int getCounter() {return counter;}
}
class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {this.string = string;}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {action.accept(string.charAt(currentChar++));
return currentChar < string.length();}
@Override
public Spliterator<Character> trySplit() {int currentSize = string.length() - currentChar;
if (currentSize < 10) {return null;}
for (int splitPos = currentSize / 2 + currentChar;
splitPos < string.length(); splitPos++) {if (Character.isWhitespace(string.charAt(splitPos))) {
Spliterator<Character> spliterator =
new WordCounterSpliterator(string.substring(currentChar,
splitPos));
currentChar = splitPos;
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {return string.length() - currentChar;
}
@Override
public int characteristics() {return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;}
}
final String SENTENCE =
"Nel mezzo del cammin di nostra vita" +
"mi ritrovai in una selva oscura" +
"ché la dritta via era smarrita";
private int countWords(Stream<Character> stream) {WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();}
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);
System.out.println("Found" + countWords(stream) + "words");
最后打印显示
Found 19 words