一、并行流

1.将顺序流转换为并行流

对顺序流调用parallel方法:

public static long parallelSum(long n) {  return Stream.iterate(1L, i -> i + 1)  .limit(n)  .parallel()  .reduce(0L, Long::sum); }

它在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。类似地,你只需要对并行流调用sequential方法就可以把它变成顺序流。但最后一次parallel或sequential调用会影响整个流水线。

2.测量流性能

  • iterate生成的是装箱的对象,必须拆箱成数字才能求和;
  • 我们很难把iterate分成多个独立块来并行执行。

iterate很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果,整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。

3.正确使用并行流

错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。

public class Accumulator {  public long total = 0;  public void add(long value) { total += value; } }public static long sideEffectParallelSum(long n) {  Accumulator accumulator = new Accumulator();  LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);  return accumulator.total; }
上面的示例在本质上就是顺序的,每次访问total都会出现数据竞争.由于多个线程在同时访问累加器,执行total += value,而这一句虽然看似简单,却不是一个原子操作。所得的结果也是不可控的(错误的)。

4.高效使用并行流

  • 留意装箱
  • 有些操作本身在并行流上的性能就比顺序流差
  • 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大
  • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定
  • 要考虑流背后的数据结构是否易于分解
  • 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。
  • 还要考虑终端操作中合并步骤的代价是大是小

二、分支/合并框架(Fork/Join)

详见第六章相关内容
注意:不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。

三、Spliterator

Spliterator是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和Iterator一样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。
Spliterator接口

public interface Spliterator<T> {  boolean tryAdvance(Consumer<? super T> action);  Spliterator<T> trySplit();  long estimateSize();  int characteristics(); }

与往常一样,T是Spliterator遍历的元素的类型。tryAdvance方法的行为类似于普通的Iterator,因为它会按顺序一个一个使用Spliterator中的元素,并且如果还有其他元素要遍历就返回true。但trySplit是专为Spliterator接口设计的,因为它可以把一些元素划出去分给第二个Spliterator(由该方法返回),让它们两个并行处理。Spliterator还可通过estimateSize方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点.

1.拆分过程

将Stream拆分成多个部分的算法是一个递归过程,如图所示。第一步是对第一个Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit直到它返回null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null。

2.实现你自己的 Spliterator

文中提到了reduce的三参数重载方法

<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner)

它的三个参数:

  • identity: 一个初始化的值;这个初始化的值其类型是泛型U,与Reduce方法返回的类型一致;注意此时Stream中元素的类型是T,与U可以不一样也可以一样,这样的话操作空间就大了;不管Stream中存储的元素是什么类型,U都可以是任何类型,如U可以是一些基本数据类型的包装类型Integer、Long等;或者是String,又或者是一些集合类型ArrayList等;后面会说到这些用法。
  • accumulator: 其类型是BiFunction,输入是U与T两个类型的数据,而返回的是U类型;也就是说返回的类型与输入的第一个参数类型是一样的,而输入的第二个参数类型与Stream中元素类型是一样的。
  • combiner: 其类型是BinaryOperator,支持的是对U类型的对象进行操作;

第三个参数combiner主要是使用在并行计算的场景下;如果Stream是非并行时,第三个参数实际上是不生效的。

代码实现:

class WordCounter {  private final int counter;  private final boolean lastSpace;  public WordCounter(int counter, boolean lastSpace) {  this.counter = counter;  this.lastSpace = lastSpace;  }  public WordCounter accumulate(Character c) {  if (Character.isWhitespace(c)) {  return lastSpace ?  this :  new WordCounter(counter, true);  } else {  return lastSpace ?  new WordCounter(counter + 1, false) : this;  }  }  public WordCounter combine(WordCounter wordCounter) {  return new WordCounter(counter + wordCounter.counter,  wordCounter.lastSpace);  }  public int getCounter() {  return counter;  } }
class WordCounterSpliterator implements Spliterator<Character> {  private final String string;  private int currentChar = 0;  public WordCounterSpliterator(String string) {  this.string = string;  }  @Override  public boolean tryAdvance(Consumer<? super Character> action) {  action.accept(string.charAt(currentChar++));  return currentChar < string.length();  }  @Override  public Spliterator<Character> trySplit() {  int currentSize = string.length() - currentChar;  if (currentSize < 10) {  return null;  }  for (int splitPos = currentSize / 2 + currentChar;  splitPos < string.length(); splitPos++) {  if (Character.isWhitespace(string.charAt(splitPos))) {  Spliterator<Character> spliterator =  new WordCounterSpliterator(string.substring(currentChar,  splitPos));  currentChar = splitPos;  return spliterator;  }  }  return null;  }  @Override  public long estimateSize() {  return string.length() - currentChar;  }  @Override  public int characteristics() {  return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;  } }
final String SENTENCE =  " Nel mezzo del cammin di nostra vita " +  "mi ritrovai in una selva oscura" +  " ché la dritta via era smarrita ";private int countWords(Stream<Character> stream) {  WordCounter wordCounter = stream.reduce(new WordCounter(0, true),  WordCounter::accumulate,  WordCounter::combine);  return wordCounter.getCounter(); }Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true);System.out.println("Found " + countWords(stream) + " words");

最后打印显示

Found 19 words