共计 12351 个字符,预计需要花费 31 分钟才能阅读完成。
Java8 中新增的 Stream,相信使用过的同学都已经感受到了它的便利,允许你以声明性的方式处理集合,而不用去做繁琐的 for-loop/while-loop,并且可以以极低的成本并行地处理集合数据
如果需要从菜单中筛选出卡路里在 400 以下的菜品,并按卡路里排序后,输出菜品名称
在 java8 之前,需要进行两次显示迭代,并且还需要借助中间结果存储
List<Menu> lowCaloricDishes = new LinkedList<>();
// 按照热量值进行筛选
for(Dish dish : dishes) {if (dish.getCalories() < 400) {lowCaloricDishes.add(dish);
}
}
// 按照热量进行排序
lowCaloricDishes.sort(new Comparator<Dish>() {
@Override
public int compare(Dish d1, Dish d2) {return d1.getCalories().compareTo(d2.getCalories);
}
})
// 提取名称
List<String> lowCaloricDishesName = new LinkedList<>();
for(Dish dish : lowCaloricDishes) {lowCaloricDishesName.add(dish.getName());
}
如果使用 Stream API,只需要
List<String> lowCaloricDishesName =
dishes.parallelStream() // 开启并行处理
.filter(d -> d.getCalories() < 400) // 按照热量值进行筛选
.sorted(Comparator.comparing(Dish::getCalories)) // 按照热量进行排序
.map(Dish::getName) // 提取名称
.collect(Collectors.toList()); // 将结果存入 List
甚至,可以写出更复杂的功能
Map<Integer, List<String>> lowCaloricDishesNameGroup =
dishes.parallelStream() // 开启并行处理
.filter(d -> d.getCalories() < 400) // 按照热量值进行筛选
.sorted(comparing(Dish::getCalories)) // 按照热量进行排序
.collect(Collectors.groupingBy( // 将菜品名按照热量进行分组
Dish::getCalories,
Collectors.mapping(Dish::getName, Collectors.toList())
));
是不是非常简洁,并且越发形似 SQL
如此简洁的 API 是如何实现的?中间过程是如何衔接起来的?每一步都会进行一次迭代么,需要中间结果存储么?并行处理是怎么做到的?
什么是 Stream?
Stream 使用一种类似 SQL 语句的方式,提供对集合运算的高阶抽象,可以将其处理的元素集合看做一种数据流,流在管道中传输,数据在管道节点上进行处理,比如筛选、排序、聚合等
数据流在管道中经过中间操作 (intermediate operation) 处理,由终止操作 (terminal operation) 得到前面处理的结果
和以往的集合操作不同,Stream 操作有两个基础特征:
-
pipelining:
中间操作
会返回流对象,多个操作最终串联成一个管道,管道并不直接操作数据,最终由终止操作
触发数据在管道中的流动及处理,并收集最终的结果Stream 的实现使用流水线(pipelining)的方式巧妙的避免了多次迭代,其基本思想是在 一次迭代 中尽可能多的执行用户指定的操作
- 内部迭代:区别于以往使用 iterator 或者 for-each 等显示地在集合外部进行迭代计算的方式,内部迭代隐式的在集合内部进行迭代计算
Stream 操作分为两类:中间操作
及终止操作
-
中间操作:将流一层层的进行处理,并向下一层进行传递,如
filter
map
sorted
等中间操作又分为有状态 (stateful) 及无状态(stateless)
- 有状态:必须等上一步操作完拿到全部元素后才可操作,如
sorted
- 无状态:该操作的数据不收上一步操作的影响,如
filter
map
- 有状态:必须等上一步操作完拿到全部元素后才可操作,如
-
终止操作:触发数据的流动,并收集结果,如
collect
findFirst
forEach
等终止操作又分为短路操作 (short-circuiting) 及非短路操作(non-short-circuiting)
- 短路操作:会在适当的时刻终止遍历,类似于 break,如
anyMatch
findFirst
等 - 非短路操作:会遍历所有元素,如
collect
max
等
- 短路操作:会在适当的时刻终止遍历,类似于 break,如
Stream 采用某种方式记录用户每一步的操作,当用户调用终止操作时将之前记录的操作叠加到一起,尽可能地在一次迭代中全部执行掉,那么
- 用户的操作如何记录?
- 操作如何叠加?
- 叠加后的操作如何执行?
- 执行后的结果(如果有)在哪里?
Stream 如何实现
操作如何记录
Stream 中使用 Stage 的概念来描述一个完整的操作,并用某种实例化后的 PipelineHelper 来代表 Stage,将各 Pipeline 按照先后顺序连接到一起,就构成了整个流水线
与 Stream 相关类和接口的继承关系如下图
Head 用于表示第一个 Stage,该 Stage 不包含任何操作
StatelessOp 和 StatefulOp 分别表示无状态和有状态的 Stage
使用 Collection.stream
Arrays.stream
或Stream.of
等接口会生成 Head
,其内部均采用StreamSupport.stream
方法,将原始数据包装为 Spliterator
存放在 Stage 中
- Head 记录 Stream 起始操作,将包装为 Spliterator 的原始数据存放在 Stage 中
- StatelessOp 记录无状态的中间操作
- StatefulOp 记录有状态的中间操作
- TerminalOp 用于触发数据数据在各 Stage 间的流动及处理,并收集最终数据(如果有)
Head StatelessOp StatefulOp 三个操作实例化会指向其父类 AbstractPipeline
对于 Head
/**
* Constructor for the head of a stream pipeline.
*
* @param source {@code Spliterator} describing the stream source
* @param sourceFlags the source flags for the stream source, described in
* {@link StreamOpFlag}
* @param parallel {@code true} if the pipeline is parallel
*/
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
this.previousStage = null;
this.sourceSpliterator = source;
this.sourceStage = this;
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
// The following is an optimization of:
// StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
其会将包装为 Spliterator 的原始数据存放在 Stage 中,并将自身存放在 sourceStage 中
对于 StatelessOp 及 StatefulOp
/**
* Constructor for appending an intermediate operation stage onto an
* existing pipeline.
*
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
*/
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
每一个 Stage 都会存放原始的 sourceStage,即 Head
通过 previousStage 及 nextStage,将各 Stage 串联为一个双向链表,使得每一步都知道上一步与下一步的操作
操作如何叠加
以上已经解决了如何记录操作的问题,想要让 pipeline 运行起来,需要一种将所有操作叠加到一起的方案
由于前面的 Stage 并不知道后面的 Stage 导致需要执行何种操作,只有当前 Stage 本身知道该如何执行自己包含的动作,这就需要某种协议来协调相邻 Stage 之间的调用关系
Stream 类库采用了 Sink 接口来协调各 Stage 之间的关系
interface Sink<T> extends Consumer<T> {
/**
* Resets the sink state to receive a fresh data set. This must be called
* before sending any data to the sink. After calling {@link #end()},
* you may call this method to reset the sink for another calculation.
* @param size The exact size of the data to be pushed downstream, if
* known or {@code -1} if unknown or infinite.
*
* <p>Prior to this call, the sink must be in the initial state, and after
* this call it is in the active state.
*
* 开始遍历前调用,通知 Sink 做好准备
*/
default void begin(long size) {}
/**
* Indicates that all elements have been pushed. If the {@code Sink} is
* stateful, it should send any stored state downstream at this time, and
* should clear any accumulated state (and associated resources).
*
* <p>Prior to this call, the sink must be in the active state, and after
* this call it is returned to the initial state.
*
* 所有元素遍历完成后调用,通知 Sink 没有更多元素了
*/
default void end() {}
/**
* Indicates that this {@code Sink} does not wish to receive any more data.
*
* @implSpec The default implementation always returns false.
*
* @return true if cancellation is requested
*
* 是否可以结束操作,可以让短路操作尽早结束
*/
default boolean cancellationRequested() {}
/**
* Accepts a value.
*
* @implSpec The default implementation throws IllegalStateException.
*
* @throws IllegalStateException if this sink does not accept values
*
* 遍历时调用,接收的一个待处理元素,并对元素进行处理
* Stage 把自己包含的操作和回调方法封装到该方法里,前一个 Stage 只需要调用当前 Stage.accept 方法即可
*/
default void accept(T value) {}}
Sink 的四个接口方法常常相互协作,共同完成计算任务
实际上 Stream API 内部实现的的本质,就是如何重载 Sink 的这四个接口方法,下面结合具体源码来理解 Stage 是如何将自身的操作包装秤 Sink,以及 Sink 是如何将处理结果转发给下一个 Sink 的
无状态 Stage,Stream.map
// Stream.map 将生成一个新 Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {// 该方法将回调函数 (处理逻辑) 包装成 Sink
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
// 接收数据,使用当前包装的回调函数处理数据,并传递给下游 Sink
downstream.accept(mapper.apply(u));
}
};
}
};
}
上述代码逻辑非常简单,接下来可以看一下有状态 Stage,Stream.sorted
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
// 存放用于排序的元素
private ArrayList<T> list;
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {super(sink, comparator);
}
@Override
public void begin(long size) {if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
// 创建用于存放排序元素的列表
list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();}
@Override
public void end() {
// 只有在接收到所有元素后才开始排序
list.sort(comparator);
downstream.begin(list.size());
// 排序完成后,将数据传递给下游 Sink
if (!cancellationWasRequested) {
// 下游 Sink 不包含短路操作,将数据依次传递给下游 Sink
list.forEach(downstream::accept);
}
else {
// 下游 Sink 包含短路操作
for (T t : list) {
// 对于每一个元素,都要询问是否可以结束处理
if (downstream.cancellationRequested()) break;
// 将元素传递给下游 Sink
downstream.accept(t);
}
}
// 告知下游 Sink 数据传递完毕
downstream.end();
list = null;
}
@Override
public void accept(T t) {
// 依次将需要排序的元素加入到临时列表中
list.add(t);
}
}
Stream.sorted 会在接收到所有元素之后再进行排序,在此之后才开始将数据依次传递给下游 Sink
叠加后的操作如何执行
Sink 就如齿轮,每一步的操作逻辑是封装在 Sink 中的,那各 Sink 是如何串联咬合在一起的,首个 Sink 又是如何启动来触发整个 pipeline 执行的?
结束操作 (TerminalOp) 之后不能再有别的操作,结束操作会创建一个包装了自己操作的 Sink,这个 Sink 只处理数据而不会将数据传递到下游 Sink
TerminalOp 的类图非常简单
FindOp: 用于查找,如findFirst
,findAny
,生成FindSink
ReduceOp: 用于规约,如reduce
collect
,生成ReduceSink
MatchOp: 用于匹配,如allMatch
anyMatch
,生成MatchSink
ForEachOp: 用于遍历,如forEach
,生成ForEachSink
在调用 Stream 的终止操作时,会执行AbstractPipeline.evaluate
/**
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @param <R> the type of result
* @param terminalOp the terminal operation to be applied to the pipeline.
* @return the result
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各种终止操作 */) {assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
}
最终会根据是否并行执行 TerminalOp 中不同的的 evaluate 方法,在 TerminalOp 的 evaluate 方法中会调用 helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get()
来串联各层 Sink,触发 pipeline,并获取最终结果,那 TerminalOp 到底是如何串联各层 Sink 的?
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator<P_IN> spliterator) {copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
其中玄机尽在warpSink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);
// AbstractPipeline.this,最后一层 Stage
for (@SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
// 从下游向上游遍历,不断包装 Sink
sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一层 Stage 的 Sink */);
}
return (Sink<P_IN>) sink;
}
还记得 opWrapSink
么?它会返回一个新的 Sink,实现 begin
end
accept
等方法,当前 Stage 的处理逻辑封装在其中,并将处理后的结果传递给下游的 Sink
这样,便将从开始到结束的所有操作都包装到了一个 Sink 里,执行这个 Sink 就相当于执行首个 Sink,并带动所有下游的 Sink,使整个 pipeline 运行起来
有了包含所有操作的 Sink,如何执行 Sink 呢?wrapAndCopyInto
中还有一个 copyInto
方法
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
// 不包含短路操作
// 1. begin
wrappedSink.begin(spliterator.getExactSizeIfKnown());
// 2. 遍历调用 sink.accept
spliterator.forEachRemaining(wrappedSink);
// 3. end
wrappedSink.end();}
else {
// 包含短路操作
copyIntoWithCancel(wrappedSink, spliterator);
}
}
final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {p = p.previousStage;}
// 1. begin
wrappedSink.begin(spliterator.getExactSizeIfKnown());
// 2. 遍历调用 sink.accept
// 每一次遍历都询问 cancellationRequested 结果
// 如果 cancellationRequested 为 true,则中断遍历
p.forEachWithCancel(spliterator, wrappedSink);
// 3. end
wrappedSink.end();}
copyInto
会根据不同的情况依次
- 调用
sink.bigin
- 遍历调用
sink.accept
如果包含短路操作,则每次遍历都需要询问 cancellationRequested,适时中断遍历
- 调用
sink.end
执行后的结果在哪里
各层 Stage 通过 Sink 协议将所有的操作串联到一起,遍历原始数据并执行,终止操作会创建一个包装了自己操作的 TerminalSink,该 Sink 中处理最终的数据并做数据收集(如果需要),每一种 TerminalSink 中均会提供一个获取最终数据的方法
TerminalOp 通过调用 TerminalSink 中的对应方法,获取最终的数据并返回,如 ReduceOp 中
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {return helper.wrapAndCopyInto(makeSink(), spliterator)/* 执行各 Sink */.get()/* 获取最终数据 */;}
并发是如何做到的
使用 Collection.parallelStream
或Stream.parallel
等方法可以将当前的流 标记
为并发,重新来看AbstractPipeline.evaluate
,该方法会在终止操作时被执行
/**
* Evaluate the pipeline with a terminal operation to produce a result.
*
* @param <R> the type of result
* @param terminalOp the terminal operation to be applied to the pipeline.
* @return the result
*/
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各种终止操作 */) {assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 并发执行 */
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行执行 */
}
如果被标记为 sequential
,则会调用TerminalOp.evaluateSequential
,evaluateSequential 的调用过程上文已经讲述的很清楚
如果被标记为 parallel
,则会调用TerminalOp.evaluateParallel
,对于该方法不同的 TerminalOp 会有不同的实现,但都使用了 ForkJoin 框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential
类似的动作,最后将每一个单元计算的结果依次整合,得到最终结果
默认情况下,ForkJoin 的线程数即为机器的 CPU 核数,如果想自定义 Stream 并行执行的线程数,可以参考 Custom Thread Pools In Java 8 Parallel Streams
在将原始数据进行拆分的时候,拆分的策略是什么?拆分的粒度又是什么(拆分到什么程度)?
还记得上文所说,原始数据是如何存放的么?Spliterator
(可分迭代器 splitable iterator),无论使用何种 API,均会将原始数据封装为 Spliterator
后存放在 Stage 中,在进行 parallel 计算时,对原始数据的拆分以及拆分粒度都是基于 Spliterator
的,和 Iterator 一样,Spliterator 也用于遍历数据源中的数据,但它是专门为并行执行而设计的
public interface Spliterator<T> {
/**
* 如果还有元素需要遍历,则遍历该元素并执行 action,返回 true,否则返回 false
*/
boolean tryAdvance(Consumer<? super T> action);
/**
* 如果可以,则将一部分元素划分出去,构造另一个 Spliterator,使得两个 Spliterator 可以并行处理
*/
Spliterator<T> trySplit();
/**
* 估算还有多少元素需要遍历
*/
long estimateSize();
/**
* 遍历所有未遍历的元素
*/
default void forEachRemaining(Consumer<? super T> action) {do {} while (tryAdvance(action));
}
}
动图如下
在使用 Stream parallel 时,如果默认 Spliterator 的拆分逻辑不能满足你的需求,便可以自定义 Spliterator,具体示例可以参考《Java 8 in Action》中『7.3.2 实现你自己的 Spliterator』
结语
-
Head
会生成一个不包含任何操作的 Stage,并将原始数据Spliterator
存放在sourceStage
中 - 中间操作
StagelessOp
StagefulOp
将当前操作封装在 Sink 中,生成一个新的 Stage,并使用双链表结构将前后的 Stage 链接在一起,Sink 用于调用当前指定的操作处理数据,并将处理后的结果传递给下游 Sink - 终止操作
TerminalOp
生成一个TerminalSink
,从下游向上游遍历 Stage,不断包装各 Stage 中的 Sink,最终生成一个串联了所有操作的 TerminalSink,适时调用该 Sink 的begin
accept
end
等方法,触发整个 pipeline 的数据流动及处理,最终调用 TerminalSink 的get
方法,获取最终结果(如果有) - 被标记为 parallel 的流,会使用 ForkJoin 框架,将原始流拆分为更小的单元,对每一个单元分别作计算,并将各单元的计算结果进行整合,得到最终结果
JavaLambdaInternals – 6-Stream Pipelines)
java8 实战:Stream 执行原理