修炼内功Java8-Stream是怎么工作的

25次阅读

共计 12351 个字符,预计需要花费 31 分钟才能阅读完成。

Java8 中新增的 Stream,相信使用过的同学都已经感受到了它的便利,允许你以声明性的方式处理集合,而不用去做繁琐的 for-loop/while-loop,并且可以以极低的成本并行地处理集合数据

如果需要从菜单中筛选出卡路里在 400 以下的菜品,并按卡路里排序后,输出菜品名称

在 java8 之前,需要进行两次显示迭代,并且还需要借助中间结果存储

List<Menu> lowCaloricDishes = new LinkedList<>();

// 按照热量值进行筛选
for(Dish dish : dishes) {if (dish.getCalories() < 400) {lowCaloricDishes.add(dish);
  }
}

// 按照热量进行排序
lowCaloricDishes.sort(new Comparator<Dish>() {
  @Override
  public int compare(Dish d1, Dish d2) {return d1.getCalories().compareTo(d2.getCalories);
  }
})

// 提取名称
List<String> lowCaloricDishesName = new LinkedList<>();
for(Dish dish : lowCaloricDishes) {lowCaloricDishesName.add(dish.getName());
}

如果使用 Stream API,只需要

List<String> lowCaloricDishesName = 
    dishes.parallelStream() // 开启并行处理
          .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选
          .sorted(Comparator.comparing(Dish::getCalories)) // 按照热量进行排序
          .map(Dish::getName) // 提取名称
          .collect(Collectors.toList()); // 将结果存入 List

甚至,可以写出更复杂的功能

Map<Integer, List<String>> lowCaloricDishesNameGroup = 
    dishes.parallelStream() // 开启并行处理
          .filter(d -> d.getCalories() < 400) // 按照热量值进行筛选
          .sorted(comparing(Dish::getCalories)) // 按照热量进行排序
          .collect(Collectors.groupingBy( // 将菜品名按照热量进行分组
              Dish::getCalories, 
              Collectors.mapping(Dish::getName, Collectors.toList())
          ));

是不是非常简洁,并且越发形似 SQL
如此简洁的 API 是如何实现的?中间过程是如何衔接起来的?每一步都会进行一次迭代么,需要中间结果存储么?并行处理是怎么做到的?

什么是 Stream?

Stream 使用一种类似 SQL 语句的方式,提供对集合运算的高阶抽象,可以将其处理的元素集合看做一种数据流,流在管道中传输,数据在管道节点上进行处理,比如筛选、排序、聚合等

数据流在管道中经过中间操作 (intermediate operation) 处理,由终止操作 (terminal operation) 得到前面处理的结果
和以往的集合操作不同,Stream 操作有两个基础特征:

  • pipelining:中间操作 会返回流对象,多个操作最终串联成一个管道,管道并不直接操作数据,最终由 终止操作 触发数据在管道中的流动及处理,并收集最终的结果

    Stream 的实现使用流水线(pipelining)的方式巧妙的避免了多次迭代,其基本思想是在 一次迭代 中尽可能多的执行用户指定的操作

  • 内部迭代:区别于以往使用 iterator 或者 for-each 等显示地在集合外部进行迭代计算的方式,内部迭代隐式的在集合内部进行迭代计算

Stream 操作分为两类:中间操作 终止操作

  • 中间操作:将流一层层的进行处理,并向下一层进行传递,如 filter map sorted

    中间操作又分为有状态 (stateful) 及无状态(stateless)

    • 有状态:必须等上一步操作完拿到全部元素后才可操作,如sorted
    • 无状态:该操作的数据不收上一步操作的影响,如filter map
  • 终止操作:触发数据的流动,并收集结果,如 collect findFirst forEach

    终止操作又分为短路操作 (short-circuiting) 及非短路操作(non-short-circuiting)

    • 短路操作:会在适当的时刻终止遍历,类似于 break,如 anyMatch findFirst
    • 非短路操作:会遍历所有元素,如 collect max

Stream 采用某种方式记录用户每一步的操作,当用户调用终止操作时将之前记录的操作叠加到一起,尽可能地在一次迭代中全部执行掉,那么

  1. 用户的操作如何记录?
  2. 操作如何叠加?
  3. 叠加后的操作如何执行?
  4. 执行后的结果(如果有)在哪里?

Stream 如何实现

操作如何记录

Stream 中使用 Stage 的概念来描述一个完整的操作,并用某种实例化后的 PipelineHelper 来代表 Stage,将各 Pipeline 按照先后顺序连接到一起,就构成了整个流水线

与 Stream 相关类和接口的继承关系如下图

Head 用于表示第一个 Stage,该 Stage 不包含任何操作
StatelessOp 和 StatefulOp 分别表示无状态和有状态的 Stage

使用 Collection.stream Arrays.streamStream.of等接口会生成 Head,其内部均采用StreamSupport.stream 方法,将原始数据包装为 Spliterator 存放在 Stage 中

  • Head 记录 Stream 起始操作,将包装为 Spliterator 的原始数据存放在 Stage 中
  • StatelessOp 记录无状态的中间操作
  • StatefulOp 记录有状态的中间操作
  • TerminalOp 用于触发数据数据在各 Stage 间的流动及处理,并收集最终数据(如果有)

Head StatelessOp StatefulOp 三个操作实例化会指向其父类 AbstractPipeline

对于 Head

/**
 * Constructor for the head of a stream pipeline.
 *
 * @param source {@code Spliterator} describing the stream source
 * @param sourceFlags the source flags for the stream source, described in
 * {@link StreamOpFlag}
 * @param parallel {@code true} if the pipeline is parallel
 */
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

其会将包装为 Spliterator 的原始数据存放在 Stage 中,并将自身存放在 sourceStage 中

对于 StatelessOp 及 StatefulOp

/**
 * Constructor for appending an intermediate operation stage onto an
 * existing pipeline.
 *
 * @param previousStage the upstream pipeline stage
 * @param opFlags the operation flags for the new stage, described in
 * {@link StreamOpFlag}
 */
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

每一个 Stage 都会存放原始的 sourceStage,即 Head

通过 previousStage 及 nextStage,将各 Stage 串联为一个双向链表,使得每一步都知道上一步与下一步的操作

操作如何叠加

以上已经解决了如何记录操作的问题,想要让 pipeline 运行起来,需要一种将所有操作叠加到一起的方案

由于前面的 Stage 并不知道后面的 Stage 导致需要执行何种操作,只有当前 Stage 本身知道该如何执行自己包含的动作,这就需要某种协议来协调相邻 Stage 之间的调用关系

Stream 类库采用了 Sink 接口来协调各 Stage 之间的关系

interface Sink<T> extends Consumer<T> {
    /**
     * Resets the sink state to receive a fresh data set.  This must be called
     * before sending any data to the sink.  After calling {@link #end()},
     * you may call this method to reset the sink for another calculation.
     * @param size The exact size of the data to be pushed downstream, if
     * known or {@code -1} if unknown or infinite.
     *
     * <p>Prior to this call, the sink must be in the initial state, and after
     * this call it is in the active state.
     *
     * 开始遍历前调用,通知 Sink 做好准备
     */
    default void begin(long size) {}

    /**
     * Indicates that all elements have been pushed.  If the {@code Sink} is
     * stateful, it should send any stored state downstream at this time, and
     * should clear any accumulated state (and associated resources).
     *
     * <p>Prior to this call, the sink must be in the active state, and after
     * this call it is returned to the initial state.
     *
     * 所有元素遍历完成后调用,通知 Sink 没有更多元素了
     */
    default void end() {}
    
    /**
     * Indicates that this {@code Sink} does not wish to receive any more data.
     *
     * @implSpec The default implementation always returns false.
     *
     * @return true if cancellation is requested
     *
     * 是否可以结束操作,可以让短路操作尽早结束
     */
    default boolean cancellationRequested() {}

    /**
     * Accepts a value.
     *
     * @implSpec The default implementation throws IllegalStateException.
     *
     * @throws IllegalStateException if this sink does not accept values
     *
     * 遍历时调用,接收的一个待处理元素,并对元素进行处理
     * Stage 把自己包含的操作和回调方法封装到该方法里,前一个 Stage 只需要调用当前 Stage.accept 方法即可
     */
    default void accept(T value) {}}

Sink 的四个接口方法常常相互协作,共同完成计算任务

实际上 Stream API 内部实现的的本质,就是如何重载 Sink 的这四个接口方法,下面结合具体源码来理解 Stage 是如何将自身的操作包装秤 Sink,以及 Sink 是如何将处理结果转发给下一个 Sink 的

无状态 Stage,Stream.map

// Stream.map 将生成一个新 Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {// 该方法将回调函数 (处理逻辑) 包装成 Sink
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    // 接收数据,使用当前包装的回调函数处理数据,并传递给下游 Sink
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

上述代码逻辑非常简单,接下来可以看一下有状态 Stage,Stream.sorted

private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    // 存放用于排序的元素
    private ArrayList<T> list;

    RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {super(sink, comparator);
    }

    @Override
    public void begin(long size) {if (size >= Nodes.MAX_ARRAY_SIZE)
            throw new IllegalArgumentException(Nodes.BAD_SIZE);
        // 创建用于存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();}

    @Override
    public void end() {
        // 只有在接收到所有元素后才开始排序
        list.sort(comparator);
        downstream.begin(list.size());
        // 排序完成后,将数据传递给下游 Sink
        if (!cancellationWasRequested) {
            // 下游 Sink 不包含短路操作,将数据依次传递给下游 Sink
            list.forEach(downstream::accept);
        }
        else {
            // 下游 Sink 包含短路操作
            for (T t : list) {
                // 对于每一个元素,都要询问是否可以结束处理
                if (downstream.cancellationRequested()) break;
                // 将元素传递给下游 Sink
                downstream.accept(t);
            }
        }
        // 告知下游 Sink 数据传递完毕
        downstream.end();
        list = null;
    }

    @Override
    public void accept(T t) {
        // 依次将需要排序的元素加入到临时列表中
        list.add(t);
    }
}

Stream.sorted 会在接收到所有元素之后再进行排序,在此之后才开始将数据依次传递给下游 Sink

叠加后的操作如何执行

Sink 就如齿轮,每一步的操作逻辑是封装在 Sink 中的,那各 Sink 是如何串联咬合在一起的,首个 Sink 又是如何启动来触发整个 pipeline 执行的?
结束操作 (TerminalOp) 之后不能再有别的操作,结束操作会创建一个包装了自己操作的 Sink,这个 Sink 只处理数据而不会将数据传递到下游 Sink

TerminalOp 的类图非常简单

FindOp: 用于查找,如findFirstfindAny,生成FindSink
ReduceOp: 用于规约,如reduce collect,生成ReduceSink
MatchOp: 用于匹配,如allMatch anyMatch,生成MatchSink
ForEachOp: 用于遍历,如forEach,生成ForEachSink

在调用 Stream 的终止操作时,会执行AbstractPipeline.evaluate

/**
 * Evaluate the pipeline with a terminal operation to produce a result.
 *
 * @param <R> the type of result
 * @param terminalOp the terminal operation to be applied to the pipeline.
 * @return the result
 */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各种终止操作 */) {assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
}

最终会根据是否并行执行 TerminalOp 中不同的的 evaluate 方法,在 TerminalOp 的 evaluate 方法中会调用 helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get() 来串联各层 Sink,触发 pipeline,并获取最终结果,那 TerminalOp 到底是如何串联各层 Sink 的?

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator<P_IN> spliterator) {copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

其中玄机尽在warpSink

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);

    // AbstractPipeline.this,最后一层 Stage
    for (@SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        // 从下游向上游遍历,不断包装 Sink
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一层 Stage 的 Sink */);
    }
    return (Sink<P_IN>) sink;
}

还记得 opWrapSink 么?它会返回一个新的 Sink,实现 begin end accept 等方法,当前 Stage 的处理逻辑封装在其中,并将处理后的结果传递给下游的 Sink
这样,便将从开始到结束的所有操作都包装到了一个 Sink 里,执行这个 Sink 就相当于执行首个 Sink,并带动所有下游的 Sink,使整个 pipeline 运行起来

有了包含所有操作的 Sink,如何执行 Sink 呢?wrapAndCopyInto中还有一个 copyInto 方法

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        // 不包含短路操作
        
        // 1. begin
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        // 2. 遍历调用 sink.accept
        spliterator.forEachRemaining(wrappedSink);
        // 3. end
        wrappedSink.end();}
    else {
        // 包含短路操作
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {@SuppressWarnings({"rawtypes","unchecked"})
    AbstractPipeline p = AbstractPipeline.this;
    while (p.depth > 0) {p = p.previousStage;}
    // 1. begin
    wrappedSink.begin(spliterator.getExactSizeIfKnown());
    // 2. 遍历调用 sink.accept
    //    每一次遍历都询问 cancellationRequested 结果
    //    如果 cancellationRequested 为 true,则中断遍历
    p.forEachWithCancel(spliterator, wrappedSink);
    // 3. end
    wrappedSink.end();}

copyInto会根据不同的情况依次

  1. 调用sink.bigin
  2. 遍历调用sink.accept

    如果包含短路操作,则每次遍历都需要询问 cancellationRequested,适时中断遍历

  3. 调用sink.end

执行后的结果在哪里

各层 Stage 通过 Sink 协议将所有的操作串联到一起,遍历原始数据并执行,终止操作会创建一个包装了自己操作的 TerminalSink,该 Sink 中处理最终的数据并做数据收集(如果需要),每一种 TerminalSink 中均会提供一个获取最终数据的方法

TerminalOp 通过调用 TerminalSink 中的对应方法,获取最终的数据并返回,如 ReduceOp 中

@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<P_IN> spliterator) {return helper.wrapAndCopyInto(makeSink(), spliterator)/* 执行各 Sink */.get()/* 获取最终数据 */;}

并发是如何做到的

使用 Collection.parallelStreamStream.parallel等方法可以将当前的流 标记 为并发,重新来看AbstractPipeline.evaluate,该方法会在终止操作时被执行

/**
 * Evaluate the pipeline with a terminal operation to produce a result.
 *
 * @param <R> the type of result
 * @param terminalOp the terminal operation to be applied to the pipeline.
 * @return the result
 */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各种终止操作 */) {assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
}

如果被标记为 sequential,则会调用TerminalOp.evaluateSequential,evaluateSequential 的调用过程上文已经讲述的很清楚
如果被标记为 parallel,则会调用TerminalOp.evaluateParallel,对于该方法不同的 TerminalOp 会有不同的实现,但都使用了 ForkJoin 框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential 类似的动作,最后将每一个单元计算的结果依次整合,得到最终结果

默认情况下,ForkJoin 的线程数即为机器的 CPU 核数,如果想自定义 Stream 并行执行的线程数,可以参考 Custom Thread Pools In Java 8 Parallel Streams

在将原始数据进行拆分的时候,拆分的策略是什么?拆分的粒度又是什么(拆分到什么程度)?

还记得上文所说,原始数据是如何存放的么?Spliterator(可分迭代器 splitable iterator),无论使用何种 API,均会将原始数据封装为 Spliterator 后存放在 Stage 中,在进行 parallel 计算时,对原始数据的拆分以及拆分粒度都是基于 Spliterator 的,和 Iterator 一样,Spliterator 也用于遍历数据源中的数据,但它是专门为并行执行而设计的

public interface Spliterator<T> {
    /**
     * 如果还有元素需要遍历,则遍历该元素并执行 action,返回 true,否则返回 false
     */
    boolean tryAdvance(Consumer<? super T> action);
    
    /**
     * 如果可以,则将一部分元素划分出去,构造另一个 Spliterator,使得两个 Spliterator 可以并行处理
     */
    Spliterator<T> trySplit();
    
    /**
     * 估算还有多少元素需要遍历
     */
    long estimateSize();
    
    /**
     * 遍历所有未遍历的元素
     */
    default void forEachRemaining(Consumer<? super T> action) {do {} while (tryAdvance(action));
    }
}

动图如下

在使用 Stream parallel 时,如果默认 Spliterator 的拆分逻辑不能满足你的需求,便可以自定义 Spliterator,具体示例可以参考《Java 8 in Action》中『7.3.2 实现你自己的 Spliterator』

结语

  1. Head会生成一个不包含任何操作的 Stage,并将原始数据 Spliterator 存放在 sourceStage
  2. 中间操作 StagelessOp StagefulOp 将当前操作封装在 Sink 中,生成一个新的 Stage,并使用双链表结构将前后的 Stage 链接在一起,Sink 用于调用当前指定的操作处理数据,并将处理后的结果传递给下游 Sink
  3. 终止操作 TerminalOp 生成一个 TerminalSink,从下游向上游遍历 Stage,不断包装各 Stage 中的 Sink,最终生成一个串联了所有操作的 TerminalSink,适时调用该 Sink 的begin accept end 等方法,触发整个 pipeline 的数据流动及处理,最终调用 TerminalSink 的 get 方法,获取最终结果(如果有)
  4. 被标记为 parallel 的流,会使用 ForkJoin 框架,将原始流拆分为更小的单元,对每一个单元分别作计算,并将各单元的计算结果进行整合,得到最终结果

JavaLambdaInternals – 6-Stream Pipelines)
java8 实战:Stream 执行原理

正文完
 0