关于java:Java-Stream-源码分析

45次阅读

共计 10435 个字符,预计需要花费 27 分钟才能阅读完成。

前言

Java 8 的 Stream 使得代码更加简洁易懂,本篇文章深入分析 Java Stream 的工作原理,并探讨 Steam 的性能问题。


Java 8 汇合中的 Stream 相当于高级版的 Iterator,它能够通过 Lambda 表达式对汇合进行各种十分便当、高效的聚合操作(Aggregate Operation),或者大批量数据操作 (Bulk Data Operation)。

Stream 的聚合操作与数据库 SQL 的聚合操作 sorted、filter、map 等相似。咱们在应用层就能够高效地实现相似数据库 SQL 的聚合操作了,而在数据操作方面,Stream 不仅能够通过串行的形式实现数据操作,还能够通过并行的形式解决大批量数据,进步数据的解决效率。

操作分类

官网将 Stream 中的操作分为两大类:

  • 两头操作(Intermediate operations),只对操作进行了记录,即只会返回一个流,不会进行计算操作。
  • 终结操作(Terminal operations),实现了计算操作。

两头操作又能够分为:

  • 无状态(Stateless)操作,元素的解决不受之前元素的影响。
  • 有状态(Stateful)操作,指该操作只有拿到所有元素之后能力继续下去。

终结操作又能够分为:

  • 短路(Short-circuiting)操作,指遇到某些符合条件的元素就能够失去最终后果
  • 非短路(Unshort-circuiting)操作,指必须解决完所有元素能力失去最终后果。

操作分类详情如下图所示:

源码构造

Stream 相干类和接口的继承关系如下图所示:

BaseStream

最顶端的接口类,定义了流的根本接口办法,最次要的办法为 spliterator、isParallel。

Stream

最顶端的接口类。定义了流的罕用办法,例如 map、filter、sorted、limit、skip、collect 等。

ReferencePipeline

ReferencePipeline 是一个构造类,定义外部类组装了各种操作流,定义了 HeadStatelessOpStatefulOp 三个外部类,实现了 BaseStream 与 Stream 的接口办法。

Sink

Sink 接口定义了 Stream 之间的操作行为,蕴含 begin()end()cancellationRequested()accpt()四个办法。ReferencePipeline 最终会将整个 Stream 流操作组装成一个调用链,而这条调用链上的各个 Stream 操作的高低关系就是通过 Sink 接口协议来定义实现的。

操作叠加

Stream 的根底用法就不再叙述了,这里从一段代码开始,剖析 Stream 的工作原理。

@Test
public void testStream() {List<String> names = Arrays.asList("kotlin", "java", "go");
    int maxLength = names.stream().filter(name -> name.length() <= 4).map(String::length)
            .max(Comparator.naturalOrder()).orElse(-1);
    System.out.println(maxLength);
}

当应用 Stream 时,次要有 3 局部组成,上面一一解说。

加载数据源

调用 names.stream() 办法,会首次加载 ReferencePipeline 的 Head 对象,此时为加载数据源操作。

java.util.Collection#stream

default Stream<E> stream() {return StreamSupport.stream(spliterator(), false);
}

StreamSupport 类中的 stream 办法,初始化了一个 ReferencePipeline 的 Head 外部类对象。

java.util.stream.StreamSupport#stream(java.util.Spliterator<T>, boolean)

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

两头操作

接着为 filter(name -> name.length() <= 4).mapToInt(String::length),是两头操作,分为无状态两头操作 StatelessOp 对象和有状态操作 StatefulOp 对象,此时的 Stage 并没有执行,而是通过 AbstractPipeline 生成了一个两头操作 Stage 链表。

java.util.stream.ReferencePipeline#filter

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                    StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

java.util.stream.ReferencePipeline#map

@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

能够看到 filter 和 map 办法都返回了一个新的 StatelessOp 对象。new StatelessOp 将会调用父类 AbstractPipeline 的构造函数,这个构造函数将前后的 Stage 分割起来,生成一个 Stage 链表:

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

终结操作

最初为 max(Comparator.naturalOrder()),是终结操作,会生成一个最终的 Stage,通过这个 Stage 触发之前的两头操作,从最初一个 Stage 开始,递归产生一个 Sink 链。

java.util.stream.ReferencePipeline#max

@Override
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {return reduce(BinaryOperator.maxBy(comparator));
}

最终调用到 java.util.stream.AbstractPipeline#wrapSink,这个办法会调用 opWrapSink 生成一个 Sink 链表,对应到本文的例子,就是 filter 和 map 操作。

@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);

    for (@SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

在下面 opWrapSink 上断点调试,发现最终会调用到本例中的 filter 和 map 操作。

wrapAndCopyInto 生成 Sink 链表后,会通过 copyInfo 办法执行 Sink 链表的具体操作。

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();}
    else {copyIntoWithCancel(wrappedSink, spliterator);
    }
}

下面的外围代码是:

spliterator.forEachRemaining(wrappedSink);

java.util.Spliterators.ArraySpliterator#forEachRemaining

@Override
public void forEachRemaining(Consumer<? super T> action) {Object[] a; int i, hi; // hoist accesses and checks from loop
    if (action == null)
        throw new NullPointerException();
    if ((a = array).length >= (hi = fence) &&
        (i = index) >= 0 && i < (index = hi)) {do { action.accept((T)a[i]); } while (++i < hi);
    }
}

断点调试,能够发现首先进入了 filter 的 Sink,其中 accept 办法的入参是 list 中的第一个元素“kotlin”(代码中的 3 个元素是:”kotlin”, “java”, “go”)。filter 的传入是一个 Lambda 表达式:

filter(name -> name.length() <= 4)

显然这个第一个元素“kotlin”的 predicate 是不会进入的。

对于第二个元素“java”,predicate.test 会返回 true(字符串“java”的长度 <=4),则会进入 map 的 accept 办法。

本次调用 accept 办法时,empty 为 false,会将 map 后的后果(int 类型的 4)赋值给 t。

public static <T> TerminalOp<T, Optional<T>>
makeRef(BinaryOperator<T> operator) {Objects.requireNonNull(operator);
    class ReducingSink
            implements AccumulatingSink<T, Optional<T>, ReducingSink> {
        private boolean empty;
        private T state;

        public void begin(long size) {
            empty = true;
            state = null;
        }

        @Override
        public void accept(T t) {if (empty) {
                empty = false;
                state = t;
            } else {state = operator.apply(state, t);
            }
        }

        ……
        }
}

对于第三个元素“go”,也会进入 accept 办法,此时 empty 为 true, map 后的后果(int 类型的 2)会与上次的后果 4 通过自定义的比拟器相比拟,存入合乎后果的值。

public static <T> BinaryOperator<T> maxBy(Comparator<? super T> comparator) {Objects.requireNonNull(comparator);
    return (a, b) -> comparator.compare(a, b) >= 0 ? a : b;
}

本文代码中的 max 传入的比拟器为:

max(Comparator.naturalOrder())

至此会返回 int 类型的 4。

并行处理

下面的例子是串行解决的,如果要改成并行也很简略,只须要在 stream() 办法后加上 parallel() 就能够了,并行代码能够写成:

@Test
public void testStream() {List<String> names = Arrays.asList("kotlin", "java", "go");
    int maxLength = names.stream().parallel().filter(name -> name.length() <= 4)
            .map(String::length).max(Comparator.naturalOrder()).orElse(-1);
    System.out.println(maxLength);
}

Stream 的并行处理在执行终结操作之前,跟串行解决的实现是一样的。而在调用终结办法之后,实现的形式就有点不太一样,会调用 TerminalOp 的 evaluateParallel 办法进行并行处理。

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
            ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

外围是应用了 ForkJoin 框架,对 Stream 解决进行分片,最终会调用上面的代码,这里就不开展剖析了。

java.util.stream.AbstractTask#compute

@Override
public void compute() {
    Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();}
    task.setLocalResult(task.doLeaf());
    task.tryComplete();}

并行谬误的应用办法

@Test
public void testParallelWrong() {List<Integer> parallelList = new ArrayList<>();
    IntStream.range(0, 1000).boxed().parallel().filter(i -> i % 2 == 1)
            .forEach(parallelList::add);
    System.out.println(parallelList.size());
}

下面的输入后果会常常小于 500,这是因为 parallelList 的类型是 ArrayList,并不是线程平安的,在执行 add 操作时,可能正好赶上扩容或者线程被占用,会笼罩其余线程的赋好的值。

并行正确的应用办法

@Test
public void testParallelRight() {List<Integer> parallelList = IntStream.range(0, 1000).boxed().parallel()
            .filter(i -> i % 2 == 1).collect(Collectors.toList());
    System.out.println(parallelList.size());
}

性能

上面的文章参考自:JavaLambdaInternals/8-Stream Performance.md,侵删。

为保障测试后果真实可信,咱们将 JVM 运行在 -server 模式下,测试数据在 GB 量级,测试机器采纳常见的商用服务器,配置如下:

OS CentOS 6.7 x86_64
CPU Intel Xeon X5675, 12M Cache 3.06 GHz, 6 Cores 12 Threads
内存 96GB
JDK java version 1.8.0_91, Java HotSpot(TM) 64-Bit Server VM

测试所用代码在这里,测试后果汇总.

## 测试方法和测试数据

性能测试并不是容易的事,Java 性能测试更吃力,因为虚拟机对性能的影响很大,JVM 对性能的影响有两方面:

1. GC 的影响。GC 的行为是 Java 中很不好管制的一块,为减少确定性,咱们手动指定应用 CMS 收集器,并应用 10GB 固定大小的堆内存。具体到 JVM 参数就是-XX:+UseConcMarkSweepGC -Xms10G -Xmx10G
2. JIT(Just-In-Time)即时编译技术。即时编译技术会将热点代码在 JVM 运行的过程中编译成本地代码,测试时咱们会先对程序预热,触发对测试函数的即时编译。相干的 JVM 参数是-XX:CompileThreshold=10000

Stream 并行执行时用到 ForkJoinPool.commonPool() 失去的线程池,为管制并行度咱们应用 Linux 的 taskset 命令指定 JVM 可用的核数。

测试数据由程序随机生成。为避免一次测试带来的抖动,测试 4 次求出均匀工夫作为运行工夫。

## 试验一 根本类型迭代

测试内容:找出整型数组中的最小值。比照 for 循环内部迭代和 Stream API 外部迭代性能。

测试程序 IntTest,测试后果如下图:

图中展现的是 for 循环内部迭代耗时为基准的工夫比值。剖析如下:

1. 对于根本类型 Stream 串行迭代的性能开销显著高于内部迭代开销(两倍);
2. Stream 并行迭代的性能比串行迭代和内部迭代都好。

并行迭代性能跟可利用的核数无关,上图中的并行迭代应用了全副 12 个核,为考查应用核数对性能的影响,咱们专门测试了不同核数下的 Stream 并行迭代成果:

剖析,对于根本类型:

1. 应用 Stream 并行 API 在单核状况下性能很差,比 Stream 串行 API 的性能还差;
2. 随着应用核数的减少,Stream 并行成果逐步变好,比应用 for 循环内部迭代的性能还好。

以上两个测试阐明,对于根本类型的简略迭代,Stream 串行迭代性能更差,但多核状况下 Stream 迭代时性能较好。

## 试验二 对象迭代

再来看对象的迭代成果。

测试内容:找出字符串列表中最小的元素(天然程序),比照 for 循环内部迭代和 Stream API 外部迭代性能。

测试程序 StringTest,测试后果如下图:

后果剖析如下:

1. 对于对象类型 Stream 串行迭代的性能开销依然高于内部迭代开销(1.5 倍),但差距没有根本类型那么大。
2. Stream 并行迭代的性能比串行迭代和内部迭代都好。

再来独自考查 Stream 并行迭代成果:

剖析,对于对象类型:

1. 应用 Stream 并行 API 在单核状况下性能比 for 循环内部迭代差;
2. 随着应用核数的减少,Stream 并行成果逐步变好,多核带来的成果显著。

以上两个测试阐明,对于对象类型的简略迭代,Stream 串行迭代性能更差,但多核状况下 Stream 迭代时性能较好。

## 试验三 简单对象归约

从试验一、二的后果来看,Stream 串行执行的成果都比内部迭代差(很多),是不是阐明 Stream 真的不行了?先别下结论,咱们再来考查一下更简单的操作。

测试内容:给定订单列表,统计每个用户的总交易额。比照应用内部迭代手动实现和 Stream API 之间的性能。

咱们将订单简化为 <userName, price, timeStamp> 形成的元组,并用 Order 对象来示意。测试程序 ReductionTest,测试后果如下图:

剖析,对于简单的归约操作:

1. Stream API 的性能广泛好于内部手动迭代,并行 Stream 成果更佳;

再来考查并行度对并行成果的影响,测试后果如下:

剖析,对于简单的归约操作:

1. 应用 Stream 并行归约在单核状况下性能比串行归约以及手动归约都要差,简略说就是最差的;
2. 随着应用核数的减少,Stream 并行成果逐步变好,多核带来的成果显著。

以上两个试验阐明,对于简单的归约操作,Stream 串行归约成果好于手动归约,在多核状况下,并行归约成果更佳。咱们有理由置信,对于其余简单的操作,Stream API 也能体现出类似的性能体现。

## 论断

上述三个试验的后果能够总结如下:

1. 对于简略操作,比方最简略的遍历,Stream 串行 API 性能显著差于显示迭代,但并行的 Stream API 可能施展多核个性。
2. 对于简单操作,Stream 串行 API 性能能够和手动实现的成果匹敌,在并行执行时 Stream API 成果远超手动实现。

所以,如果出于性能思考,1. 对于简略操作举荐应用内部迭代手动实现,2. 对于简单操作,举荐应用 Stream API,3. 在多核状况下,举荐应用并行 Stream API 来施展多核优势,4. 单核状况下不倡议应用并行 Stream API。

# 参考文章

1. JavaLambdaInternals/6-Stream Pipelines.md
2. JavaLambdaInternals/8-Stream Performance.md
3. 极客工夫 -Java 性能调优实战 /06.Stream 如何进步遍历汇合效率?
# 公众号

coding 笔记、点滴记录,当前的文章也会同步到公众号(Coding Insight)中,心愿大家关注 ^_^

代码和思维导图在 GitHub 我的项目中,欢送大家 star!


正文完
 0