乐趣区

java-8-实战读书笔记-第七章-并行数据处理与性能

一、并行流

1. 将顺序流转换为并行流

对顺序流调用 parallel 方法:

public static long parallelSum(long n) {return Stream.iterate(1L, i -> i + 1) 
 .limit(n) 
 .parallel() 
 .reduce(0L, Long::sum); 
}

它在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 之后进行的所有操作都并行执行。类似地,你只需要对并行流调用 sequential 方法就可以把它变成顺序流。但最后一次 parallel 或 sequential 调用会影响整个流水线。

2. 测量流性能

  • iterate 生成的是装箱的对象,必须拆箱成数字才能求和;
  • 我们很难把 iterate 分成多个独立块来并行执行。

iterate 很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果,整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。

3. 正确使用并行流

错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。

public class Accumulator { 
 public long total = 0; 
 public void add(long value) {total += value;} 
}

public static long sideEffectParallelSum(long n) {Accumulator accumulator = new Accumulator(); 
 LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); 
 return accumulator.total; 
}

上面的示例在本质上就是顺序的, 每次访问 total 都会出现数据竞争. 由于多个线程在同时访问累加器,执行 total += value,而这一句虽然看似简单,却不是一个原子操作。所得的结果也是不可控的(错误的)。

4. 高效使用并行流

  • 留意装箱
  • 有些操作本身在并行流上的性能就比顺序流差
  • 还要考虑流的操作流水线的总计算成本。设 N 是要处理的元素的总数,Q 是一个元素通过流水线的大致处理成本,则 N * Q 就是这个对成本的一个粗略的定性估计。Q 值较高就意味着使用并行流时性能好的可能性比较大
  • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定
  • 要考虑流背后的数据结构是否易于分解
  • 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。
  • 还要考虑终端操作中合并步骤的代价是大是小

二、分支 / 合并框架(Fork/Join)

详见第六章相关内容
注意:不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。相反,你应该始终直接调用 compute 或 fork 方法,只有顺序代码才应该用 invoke 来启动并行计算。

三、Spliterator

Spliterator是 Java 8 中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和 Iterator 一样,Spliterator 也用于遍历数据源中的元素,但它是为了并行执行而设计的。
Spliterator 接口

public interface Spliterator<T> {boolean tryAdvance(Consumer<? super T> action); 
 Spliterator<T> trySplit(); 
 long estimateSize(); 
 int characteristics();}

与往常一样,T 是 Spliterator 遍历的元素的类型。tryAdvance 方法的行为类似于普通的 Iterator,因为它会按顺序一个一个使用 Spliterator 中的元素,并且如果还有其他元素要遍历就返回 true。但 trySplit 是专为 Spliterator 接口设计的,因为它可以把一些元素划出去分给第二个 Spliterator(由该方法返回),让它们两个并行处理。Spliterator 还可通过 estimateSize 方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点.

1. 拆分过程

将 Stream 拆分成多个部分的算法是一个递归过程,如图所示。第一步是对第一个 Spliterator 调用 trySplit,生成第二个 Spliterator。第二步对这两个 Spliterator 调用 trysplit,这样总共就有了四个 Spliterator。这个框架不断对 Spliterator 调用 trySplit 直到它返回 null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的 Spliterator 在调用 trySplit 时都返回了 null。

2. 实现你自己的 Spliterator

文中提到了reduce 的三参数重载方法

<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner)

它的三个参数:

  • identity: 一个初始化的值;这个初始化的值其类型是泛型 U,与 Reduce 方法返回的类型一致;注意此时 Stream 中元素的类型是 T,与 U 可以不一样也可以一样,这样的话操作空间就大了;不管 Stream 中存储的元素是什么类型,U 都可以是任何类型,如 U 可以是一些基本数据类型的包装类型 Integer、Long 等;或者是 String,又或者是一些集合类型 ArrayList 等;后面会说到这些用法。
  • accumulator: 其类型是 BiFunction,输入是 U 与 T 两个类型的数据,而返回的是 U 类型;也就是说返回的类型与输入的第一个参数类型是一样的,而输入的第二个参数类型与 Stream 中元素类型是一样的。
  • combiner: 其类型是 BinaryOperator,支持的是对 U 类型的对象进行操作;

第三个参数 combiner 主要是使用在并行计算的场景下;如果 Stream 是非并行时,第三个参数实际上是不生效的。

代码实现:

class WordCounter { 
 private final int counter; 
 private final boolean lastSpace; 
 public WordCounter(int counter, boolean lastSpace) { 
 this.counter = counter; 
 this.lastSpace = lastSpace; 
 } 
 public WordCounter accumulate(Character c) {if (Character.isWhitespace(c)) { 
 return lastSpace ? 
 this : 
 new WordCounter(counter, true); 
 } else { 
 return lastSpace ? 
 new WordCounter(counter + 1, false) :
 this; 
 } 
 } 
 public WordCounter combine(WordCounter wordCounter) { 
 return new WordCounter(counter + wordCounter.counter, 
 wordCounter.lastSpace); 
 } 
 public int getCounter() {return counter;} 
}
class WordCounterSpliterator implements Spliterator<Character> { 
 private final String string; 
 private int currentChar = 0; 
 public WordCounterSpliterator(String string) {this.string = string;} 
 @Override 
 public boolean tryAdvance(Consumer<? super Character> action) {action.accept(string.charAt(currentChar++)); 
 return currentChar < string.length();} 
 @Override 
 public Spliterator<Character> trySplit() {int currentSize = string.length() - currentChar; 
 if (currentSize < 10) {return null;} 
 for (int splitPos = currentSize / 2 + currentChar; 
 splitPos < string.length(); splitPos++) {if (Character.isWhitespace(string.charAt(splitPos))) { 
 Spliterator<Character> spliterator = 
 new WordCounterSpliterator(string.substring(currentChar, 
 splitPos)); 
 currentChar = splitPos; 
 return spliterator; 
 } 
 } 
 return null; 
 } 
 @Override 
 public long estimateSize() {return string.length() - currentChar; 
 } 
 @Override 
 public int characteristics() {return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;} 
}
final String SENTENCE = 
 "Nel mezzo del cammin di nostra vita" + 
 "mi ritrovai in una selva oscura" + 
 "ché la dritta via era smarrita";


private int countWords(Stream<Character> stream) {WordCounter wordCounter = stream.reduce(new WordCounter(0, true), 
 WordCounter::accumulate, 
 WordCounter::combine); 
 return wordCounter.getCounter();}

Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); 
Stream<Character> stream = StreamSupport.stream(spliterator, true);

System.out.println("Found" + countWords(stream) + "words");

最后打印显示

Found 19 words
退出移动版