说起 Java 8,咱们晓得 Java 8 大改变之一就是减少函数式编程,而 Stream API 便是函数编程的配角,Stream API 是一种流式的解决数据格调,也就是将要解决的数据当作流,在管道中进行传输,并在管道中的每个节点对数据进行解决,如过滤、排序、转换等。
首先咱们先看一个应用Stream API的示例,具体代码如下:
code1 Stream example
这是个很简略的一个Stream应用例子,咱们过滤掉空字符串后,转成int类型并计算出最大值,这其中包含了三个操作:filter、mapToInt、sum。置信大多数人再刚应用Stream API的时候都会有个疑难,Stream是指怎么实现的,是每一次函数调用就执行一次迭代吗?答案必定是否,因为如果真的是每一次函数调用就执行一次迭代,这个效率是很难承受的,Stream也不会那么受欢迎。
其实Stream外部是通过流水线(Pipeline)的形式来实现的,根本思维是在迭代的时候顺着流水线尽可能的执行更多的操作,从而防止屡次迭代。为了对Stream的操作有更清晰的意识,咱们汇总了Stream的所有操作。
从上表能够看出Stream将所有操作分为两类:两头操作和终止操作。其中两头操作分为无状态和有状态,终止操作分为非短路操作和短路操作,上面是针对这几个操作的含意阐明:
1、两头操作:两头操作只是一种标记,只有完结操作才会触发理论计算
- 无状态:指元素的解决不受后面元素的影响;
- 有状态:有状态的两头操作必须等到所有元素解决之后才晓得最终后果,比方排序是有状态操作,在读取所有元素之前并不能确定排序后果。
2、终止操作:顾名思义,就是得出最初计算结果的操作
- 短路操作:指不必解决全副元素就能够返回后果;
- 非短路操作:指必须解决所有元素能力失去最终后果。
Stream流水线解决方案
通过下面的介绍,咱们理解到Stream在执行两头操作时仅仅是记录,当用户调用终止操作时,会在一个迭代里将曾经记录的操作顺着流水线全副执行掉。沿着这个思路,有几个问题须要解决:
- 用户的操作如何记录?
- 操作如何叠加?
- 叠加之后的操作如何执行?
1、操作如何记录
图1-1
对于操作如何记录,在JDK源码正文中屡次用(操作)stage来标识用户的每一次操作,而通常状况下Stream的操作又须要一个回调函数,所以一个残缺的操作是由数据起源、操作、回调函数组成的三元组来示意。而在具体实现中,应用实例化的ReferencePipeline来示意,即图1-1中的Head、StatelessOp、StatefulOp的实例。接下来咱们来看下Stream几个罕用办法的源码。
code2 Collection.Stream()
code3 StreamSupport.stream()
code4 ReferencePipeline.map()
从下面源码中能够看进去,咱们调用stream()办法时最终会创立一个Head实例来示意流操作的头,当调用map()办法时则会创立无状态的两头操作实例StatelessOp,同样调用其余操作对应的办法也会生成一个ReferencePipeline实例,在这里就不一一列举。在用户调用一系列操作后,最终会造成一个双向链表,如下图所示:
图1-2
2、操作如何叠加
下面咱们阐明了Stream是通过stage记录操作,但stage只保留以后操作,它并不知道下个stage如何操作,须要什么操作。所以要执行的话还须要某种协定将各个stage关联起来。jdk中就是应用Slink接口来实现的,Slink接口定义begin()、end()、cancellationRequested()、accept()四个办法,如下表所示。
往回看code3 ReferencePipeline.map()的办法,咱们会发现咱们在创立一个ReferencePipeline实例的时候,须要重写opWrapSink办法来生成对应Sink实例。而且通过浏览源码会发现罕用的操作都会创立一个ChainedReference实例。咱们能够看下code5 ChainedReference抽象类的源码实现,因为ChainedReference只是个形象实现,不携带具体操作的个性,所以是更能体现作者的设计理念。
通过查看源码能够发现ChainedReference会持有下一个操作的Slink,并在调用begin、end、cancellationRequested办法会调用下一个操作的Slink的相应办法,以此来达到叠加的成果。
code5 ChainedReference
3、叠加之后的操作如何执行
Sink完满封装了Stream每一步操作,并给出了[解决->转发]的模式来叠加操作。这一连串的齿轮曾经咬合,就差最初一步拨动齿轮启动执行。是什么启动这一连串的操作呢?兴许你曾经想到了启动的原始能源就是完结操作(Terminal Operation),一旦调用某个完结操作,就会触发整个流水线的执行。
完结操作之后不能再有别的操作,所以完结操作不会创立新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延长了。完结操作会创立一个包装了本人操作的Sink,这也是流水线中最初一个Sink,这个Sink只须要解决数据而不须要将后果传递给上游的Sink(因为没有上游)。对于Sink的[解决->转发]模型,完结操作的Sink就是调用链的进口。
咱们再来考查一下上游的Sink是如何找到上游Sink的。一种可选的计划是在PipelineHelper中设置一个Sink字段,在流水线中找到上游Stage并拜访Sink字段即可。但Stream类库的设计者没有这么做,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)办法来失去Sink,该办法的作用是返回一个新的蕴含了以后Stage代表的操作以及可能将后果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?这是因为应用opWrapSink()能够将以后操作与上游Sink(上文中的downstream参数)联合成新Sink。试想只有从流水线的最初一个Stage开始,一直调用上一个Stage的opWrapSink()办法直到最开始(不包含stage0,因为stage0代表数据源,不蕴含操作),就能够失去一个代表了流水线上所有操作的Sink,用代码示意就是这样:
code6 AbstractPipeline.wrapSink
当初流水线上从开始到完结的所有的操作都被包装到了一个Sink里,执行这个Sink就相当于执行整个流水线,执行Sink的代码如下:
code7 AbstractPipeline.copyInto
上述代码首先调用wrappedSink.begin()办法通知Sink数据行将到来,而后调用spliterator.forEachRemaining()办法对数据进行迭代,最初调用wrappedSink.end()办法告诉Sink数据处理完结。逻辑如此清晰。
作者:Huang Rongpeng
发表回复