说起 Java 8,咱们晓得 Java 8 大改变之一就是减少函数式编程,而 Stream API 便是函数编程的配角,Stream API 是一种流式的解决数据格调,也就是将要解决的数据当作流,在管道中进行传输,并在管道中的每个节点对数据进行解决,如过滤、排序、转换等。
首先咱们先看一个应用 Stream API 的示例,具体代码如下:
code1 Stream example
这是个很简略的一个 Stream 应用例子,咱们过滤掉空字符串后,转成 int 类型并计算出最大值,这其中包含了三个操作:filter、mapToInt、sum。置信大多数人再刚应用 Stream API 的时候都会有个疑难,Stream 是指怎么实现的,是每一次函数调用就执行一次迭代吗?答案必定是否,因为如果真的是每一次函数调用就执行一次迭代,这个效率是很难承受的,Stream 也不会那么受欢迎。
其实 Stream 外部是通过流水线(Pipeline)的形式来实现的,根本思维是在迭代的时候顺着流水线尽可能的执行更多的操作,从而防止屡次迭代。为了对 Stream 的操作有更清晰的意识,咱们汇总了 Stream 的所有操作。
从上表能够看出 Stream 将所有操作分为两类:两头操作和终止操作。其中两头操作分为无状态和有状态,终止操作分为非短路操作和短路操作,上面是针对这几个操作的含意阐明:
1、两头操作:两头操作只是一种标记,只有完结操作才会触发理论计算
- 无状态:指元素的解决不受后面元素的影响;
- 有状态:有状态的两头操作必须等到所有元素解决之后才晓得最终后果,比方排序是有状态操作,在读取所有元素之前并不能确定排序后果。
2、终止操作:顾名思义,就是得出最初计算结果的操作
- 短路操作:指不必解决全副元素就能够返回后果;
- 非短路操作:指必须解决所有元素能力失去最终后果。
Stream 流水线解决方案
通过下面的介绍,咱们理解到 Stream 在执行两头操作时仅仅是记录,当用户调用终止操作时,会在一个迭代里将曾经记录的操作顺着流水线全副执行掉。沿着这个思路,有几个问题须要解决:
- 用户的操作如何记录?
- 操作如何叠加?
- 叠加之后的操作如何执行?
1、操作如何记录
图 1 -1
对于操作如何记录,在 JDK 源码正文中屡次用(操作)stage 来标识用户的每一次操作,而通常状况下 Stream 的操作又须要一个回调函数,所以一个残缺的操作是由数据起源、操作、回调函数组成的三元组来示意。而在具体实现中,应用实例化的 ReferencePipeline 来示意,即图 1 - 1 中的 Head、StatelessOp、StatefulOp 的实例。接下来咱们来看下 Stream 几个罕用办法的源码。
code2 Collection.Stream()
code3 StreamSupport.stream()
code4 ReferencePipeline.map()
从下面源码中能够看进去,咱们调用 stream()办法时最终会创立一个 Head 实例来示意流操作的头,当调用 map()办法时则会创立无状态的两头操作实例 StatelessOp,同样调用其余操作对应的办法也会生成一个 ReferencePipeline 实例,在这里就不一一列举。在用户调用一系列操作后,最终会造成一个双向链表,如下图所示:
图 1 -2
2、操作如何叠加
下面咱们阐明了 Stream 是通过 stage 记录操作,但 stage 只保留以后操作,它并不知道下个 stage 如何操作,须要什么操作。所以要执行的话还须要某种协定将各个 stage 关联起来。jdk 中就是应用 Slink 接口来实现的,Slink 接口定义 begin()、end()、cancellationRequested()、accept()四个办法,如下表所示。
往回看 code3 ReferencePipeline.map()的办法,咱们会发现咱们在创立一个 ReferencePipeline 实例的时候,须要重写 opWrapSink 办法来生成对应 Sink 实例。而且通过浏览源码会发现罕用的操作都会创立一个 ChainedReference 实例。咱们能够看下 code5 ChainedReference 抽象类的源码实现,因为 ChainedReference 只是个形象实现,不携带具体操作的个性,所以是更能体现作者的设计理念。
通过查看源码能够发现 ChainedReference 会持有下一个操作的 Slink,并在调用 begin、end、cancellationRequested 办法会调用下一个操作的 Slink 的相应办法,以此来达到叠加的成果。
code5 ChainedReference
3、叠加之后的操作如何执行
Sink 完满封装了 Stream 每一步操作,并给出了 [解决 -> 转发] 的模式来叠加操作。这一连串的齿轮曾经咬合,就差最初一步拨动齿轮启动执行。是什么启动这一连串的操作呢?兴许你曾经想到了启动的原始能源就是完结操作(Terminal Operation),一旦调用某个完结操作,就会触发整个流水线的执行。
完结操作之后不能再有别的操作,所以完结操作不会创立新的流水线阶段 (Stage),直观的说就是流水线的链表不会在往后延长了。完结操作会创立一个包装了本人操作的 Sink,这也是流水线中最初一个 Sink,这个 Sink 只须要解决数据而不须要将后果传递给上游的 Sink(因为没有上游)。对于 Sink 的[解决 -> 转发] 模型,完结操作的 Sink 就是调用链的进口。
咱们再来考查一下上游的 Sink 是如何找到上游 Sink 的。一种可选的计划是在 PipelineHelper 中设置一个 Sink 字段,在流水线中找到上游 Stage 并拜访 Sink 字段即可。但 Stream 类库的设计者没有这么做,而是设置了一个 Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)办法来失去 Sink,该办法的作用是返回一个新的蕴含了以后 Stage 代表的操作以及可能将后果传递给 downstream 的 Sink 对象。为什么要产生一个新对象而不是返回一个 Sink 字段?这是因为应用 opWrapSink()能够将以后操作与上游 Sink(上文中的 downstream 参数)联合成新 Sink。试想只有从流水线的最初一个 Stage 开始,一直调用上一个 Stage 的 opWrapSink()办法直到最开始(不包含 stage0,因为 stage0 代表数据源,不蕴含操作),就能够失去一个代表了流水线上所有操作的 Sink,用代码示意就是这样:
code6 AbstractPipeline.wrapSink
当初流水线上从开始到完结的所有的操作都被包装到了一个 Sink 里,执行这个 Sink 就相当于执行整个流水线,执行 Sink 的代码如下:
code7 AbstractPipeline.copyInto
上述代码首先调用 wrappedSink.begin()办法通知 Sink 数据行将到来,而后调用 spliterator.forEachRemaining()办法对数据进行迭代,最初调用 wrappedSink.end()办法告诉 Sink 数据处理完结。逻辑如此清晰。
作者:Huang Rongpeng