前言

平时操作汇合数据,咱们个别都是for或者iterator去遍历,不是很难看。java提供了Stream的概念,它能够让咱们把汇合数据当做一个个元素在解决,并且提供多线程模式

  • 流的创立
  • 流的各种数据操作
  • 流的终止操作
  • 流的聚合解决
  • 并发流和CompletableFuture的配合应用

1 stream的结构形式

stream内置的构造方法

public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)public static<T> Builder<T> builder()public static<T> Stream<T> of(T t)public static<T> Stream<T> empty()public static<T> Stream<T> generate(Supplier<T> s) 复制代码

Collection申明的stream函数

default Stream<E> stream() 复制代码
  • Collection申明了stream转化函数,也就是说,任意Collection子类都存在官网替咱们实现的由Collection转为Stream的办法

《2020最新Java根底精讲视频教程和学习路线!》

  • 示例,List转Stream
public static void main(String[] args){    List<String> demo =  Arrays.asList("a","b","c");    long count = demo.stream().peek(System.out::println).count();    System.out.println(count);}-------result--------abc3复制代码

2 接口stream对元素的操作方法定义

过滤 filter

Stream<T> filter(Predicate<? super T> predicate) 复制代码
  • Predicate是函数式接口,能够间接用lambda代替;如果有简单的过滤逻辑,则用or、and、negate办法组合
  • 示例
List<String> demo = Arrays.asList("a", "b", "c");Predicate<String> f1 = item -> item.equals("a");Predicate<String> f2 = item -> item.equals("b");demo.stream().filter(f1.or(f2)).forEach(System.out::println);-------result--------ab复制代码

映射转化 map

<R> Stream<R> map(Function<? super T, ? extends R> mapper)IntStream mapToInt(ToIntFunction<? super T> mapper);LongStream mapToLong(ToLongFunction<? super T> mapper);DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);复制代码
  • 示例
static class User{    public User(Integer id){this.id = id; }    Integer id; public Integer getId() {  return id; }}public static void main(String[] args) {    List<User> demo = Arrays.asList(new User(1), new User(2), new User(3));    // User 转为 Integer(id)    demo.stream().map(User::getId).forEach(System.out::println);}-------result--------123复制代码

数据处理 peek

Stream<T> peek(Consumer<? super T> action);复制代码
  • 与map的区别是其无返回值
  • 示例
static class User{    public User(Integer id){this.id = id; }    Integer id;    public Integer getId() {  return id; }    public void setId(Integer id) {  this.id = id; }}public static void main(String[] args) {    List<User> demo = Arrays.asList(new User(1), new User(2), new User(3));    // id平方,User 转为 Integer(id)    demo.stream().peek(user -> user.setId(user.id * user.id)).map(User::getId).forEach(System.out::println);}-------result--------149复制代码

映射撵平 flatMap

<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);复制代码
  • flatMap:将元素为Stream<T>类型的流撵平成一个元素类型为T的Stream流
  • 示例
public static void main(String[] args) {    List<Stream<Integer>> demo = Arrays.asList(Stream.of(5), Stream.of(2), Stream.of(1));    demo.stream().flatMap(Function.identity()).forEach(System.out::println);}-------result--------521复制代码

去重 distinct

Stream<T> distinct();复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 1, 2);demo.stream().distinct().forEach(System.out::println);-------result--------12复制代码

排序 sorted

Stream<T> sorted();Stream<T> sorted(Comparator<? super T> comparator);复制代码
  • 示例
List<Integer> demo = Arrays.asList(5, 1, 2);//默认升序demo.stream().sorted().forEach(System.out::println);//降序Comparator<Integer> comparator = Comparator.<Integer, Integer>comparing(item -> item).reversed();demo.stream().sorted(comparator).forEach(System.out::println);-------默认升序 result--------125-------降序 result--------521复制代码

个数限度limit和跳过skip

//截取前maxSize个元素Stream<T> limit(long maxSize);//跳过前n个流Stream<T> skip(long n);复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6);//跳过前两个,而后限度截取两个demo.stream().skip(2).limit(2).forEach(System.out::println);-------result--------34复制代码

JDK9提供的新操作

  • 和filter的区别,takeWhile是取满足条件的元素,直到不满足为止;dropWhile是抛弃满足条件的元素,直到不满足为止
default Stream<T> takeWhile(Predicate<? super T> predicate);default Stream<T> dropWhile(Predicate<? super T> predicate);复制代码

3 stream的终止操作action

遍历生产

//遍历生产void forEach(Consumer<? super T> action);//程序遍历生产,和forEach的区别是forEachOrdered在多线程parallelStream执行,其程序也不会乱void forEachOrdered(Consumer<? super T> action);复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 3);demo.parallelStream().forEach(System.out::println);demo.parallelStream().forEachOrdered(System.out::println);-------forEach result--------231-------forEachOrdered result--------123复制代码

获取数组后果

//流转成Object数组Object[] toArray();//流转成A[]数组,指定类型A<A> A[] toArray(IntFunction<A[]> generator)复制代码
  • 示例
List<String> demo = Arrays.asList("1", "2", "3");//<A> A[] toArray(IntFunction<A[]> generator)String[] data = demo.stream().toArray(String[]::new);复制代码

最大最小值

//获取最小值Optional<T> min(Comparator<? super T> comparator)//获取最大值Optional<T> max(Comparator<? super T> comparator) 复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 3);Optional<Integer> min = demo.stream().min(Comparator.comparing(item->item));Optional<Integer> max = demo.stream().max(Comparator.comparing(item->item));System.out.println(min.get()+"-"+max.get());-------result--------1-3复制代码

查找匹配

//任意一个匹配boolean anyMatch(Predicate<? super T> predicate)//全副匹配boolean allMatch(Predicate<? super T> predicate)//不匹配 boolean noneMatch(Predicate<? super T> predicate)//查找第一个Optional<T> findFirst();//任意一个Optional<T> findAny();复制代码

归约合并

//两两合并Optional<T> reduce(BinaryOperator<T> accumulator)//两两合并,带初始值的T reduce(T identity, BinaryOperator<T> accumulator)//先转化元素类型再两两合并,带初始值的<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) 复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);//数字转化为字符串,而后应用“-”拼接起来String data = demo.stream().reduce("0", (u, t) -> u + "-" + t, (s1, s2) -> s1 + "-" + s2);System.out.println(data);-------result--------0-1-2-3-4-5-6-7-8复制代码

计算元素个数

long count() 复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6);System.out.println(demo.stream().count());-------result--------6复制代码

对流的聚合解决

/** * supplier:返回后果类型的生产者 * accumulator:元素消费者(解决并退出R) * combiner: 返回后果 R 怎么组合(多线程执行时,会产生多个返回值R,须要合并) */<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);/** * collector个别是由 supplier、accumulator、combiner、finisher、characteristics组合成的聚合类 * Collectors 可提供一些内置的聚合类或者办法 */<R, A> R collect(Collector<? super T, A, R> collector);复制代码
  • 示例,看上面

4 Collector(聚合类)的工具类集Collectors

接口Collector和实现类CollectorImpl

//返回值类型的生产者Supplier<A> supplier();//流元素消费者BiConsumer<A, T> accumulator();//返回值合并器(多个线程操作时,会产生多个返回值,须要合并)BinaryOperator<A> combiner();//返回值转化器(最初一步解决,理论返回后果,个别原样返回)Function<A, R> finisher();//流的个性Set<Characteristics> characteristics();public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,    BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,    Function<A, R> finisher, Characteristics... characteristics) 复制代码

流聚合转换成List, Set

//流转化成Listpublic static <T> Collector<T, ?, List<T>> toList()//流转化成Setpublic static <T> Collector<T, ?, Set<T>> toSet()复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 3);List<Integer> col = demo.stream().collect(Collectors.toList());Set<Integer> set = demo.stream().collect(Collectors.toSet());复制代码

流聚合转化成Map

//流转化成Mappublic static <T, K, U> Collector<T, ?, Map<K,U>> toMap(    Function<? super T, ? extends K> keyMapper,    Function<? super T, ? extends U> valueMapper)/** * mergeFunction:雷同的key,值怎么合并 */public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(    Function<? super T, ? extends K> keyMapper,    Function<? super T, ? extends U> valueMapper,    BinaryOperator<U> mergeFunction)/** * mergeFunction:雷同的key,值怎么合并 * mapSupplier:返回值Map的生产者 */public static <T, K, U, M extends Map<K, U>> Collector<T, ?, M> toMap(    Function<? super T, ? extends K> keyMapper,    Function<? super T, ? extends U> valueMapper,    BinaryOperator<U> mergeFunction,    Supplier<M> mapSupplier)复制代码
  • 如果存在雷同key的元素,会报错;或者应用groupBy
  • 示例
List<User> demo = Arrays.asList(new User(1), new User(2), new User(3));Map<Integer,User> map = demo.stream().collect(Collectors.toMap(User::getId,item->item));System.out.println(map);-------result-------{1=TestS$User@7b23ec81, 2=TestS$User@6acbcfc0, 3=TestS$User@5f184fc6}复制代码

字符串流聚合拼接

//多个字符串拼接成一个字符串public static Collector<CharSequence, ?, String> joining();//多个字符串拼接成一个字符串(指定分隔符)public static Collector<CharSequence, ?, String> joining(CharSequence delimiter)复制代码
  • 示例
List<String> demo = Arrays.asList("c", "s", "c","w","潜行前行");String name = demo.stream().collect(Collectors.joining("-"));System.out.println(name);-------result-------c-s-c-w-潜行前行复制代码

映射解决再聚合流

  • 相当于先map再collect
/** * mapper:映射处理器 * downstream:映射解决后须要再次聚合解决 */public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,         Collector<? super U, A, R> downstream);复制代码
  • 示例
List<String> demo = Arrays.asList("1", "2", "3");List<Integer> data = demo.stream().collect(Collectors.mapping(Integer::valueOf, Collectors.toList()));System.out.println(data);-------result-------[1, 2, 3]复制代码

聚合后再转换后果

/** * downstream:聚合解决 * finisher:后果转换解决 */public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream,        Function<R, RR> finisher); 复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6);//聚合成List,最初提取数组的size作为返回值Integer size = demo.stream().collect(Collectors.collectingAndThen(Collectors.toList(), List::size));System.out.println(size);---------result----------6复制代码

流分组(Map是HashMap)

/** * classifier 指定T类型某一属性作为Key值分组 * 分组后,应用List作为每个流的容器 */public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(        Function<? super T, ? extends K> classifier);           /** * classifier: 流分组器 * downstream: 每组流的聚合处理器 */public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(        Function<? super T, ? extends K> classifier,         Collector<? super T, A, D> downstream)/** * classifier: 流分组器 * mapFactory: 返回值map的工厂(Map的子类) * downstream: 每组流的聚合处理器 */public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(        Function<? super T, ? extends K> classifier,        Supplier<M> mapFactory,        Collector<? super T, A, D> downstream)复制代码
  • 示例
public static void main(String[] args) throws Exception {    List<Integer> demo = Stream.iterate(0, item -> item + 1)            .limit(15)            .collect(Collectors.toList());    // 分成三组,并且每组元素转化为String类型     Map<Integer, List<String>> map = demo.stream()            .collect(Collectors.groupingBy(item -> item % 3,                    HashMap::new,                    Collectors.mapping(String::valueOf, Collectors.toList())));    System.out.println(map);}---------result----------    {0=[0, 3, 6, 9, 12], 1=[1, 4, 7, 10, 13], 2=[2, 5, 8, 11, 14]}    复制代码

流分组(分组应用的Map是ConcurrentHashMap)

/** * classifier: 分组器 ; 分组后,应用List作为每个流的容器 */public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(        Function<? super T, ? extends K> classifier);/** * classifier: 分组器 * downstream: 流的聚合处理器 */public static <T, K, A, D> Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(        Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream)/** * classifier: 分组器 * mapFactory: 返回值类型map的生产工厂(ConcurrentMap的子类) * downstream: 流的聚合处理器 */public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(        Function<? super T, ? extends K> classifier,         Supplier<M> mapFactory,        Collector<? super T, A, D> downstream);复制代码
  • 用法和groupingBy一样

拆分流,一变二(相当于非凡的groupingBy)

public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(        Predicate<? super T> predicate)/** * predicate: 二分器 * downstream: 流的聚合处理器 */public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(        Predicate<? super T> predicate, Collector<? super T, A, D> downstream)复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2,3,4, 5,6);// 奇数偶数分组Map<Boolean, List<Integer>> map = demo.stream()    .collect(Collectors.partitioningBy(item -> item % 2 == 0));System.out.println(map);---------result----------{false=[1, 3, 5], true=[2, 4, 6]}复制代码

聚合求平均值

// 返回Double类型public static <T> Collector<T, ?, Double> averagingDouble(ToDoubleFunction<? super T> mapper)// 返回Long 类型public static <T> Collector<T, ?, Double> averagingLong(ToLongFunction<? super T> mapper)//返回Int 类型public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper)复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 5);Double data = demo.stream().collect(Collectors.averagingInt(Integer::intValue));System.out.println(data);---------result----------2.6666666666666665复制代码

流聚合查找最大最小值

//最小值public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator) //最大值public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator)    复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 5);Optional<Integer> min = demo.stream().collect(Collectors.minBy(Comparator.comparing(item -> item)));Optional<Integer> max = demo.stream().collect(Collectors.maxBy(Comparator.comparing(item -> item)));System.out.println(min.get()+"-"+max.get());---------result----------1-5复制代码

聚合计算统计后果

  • 能够取得元素总个数,元素累计总和,最小值,最大值,平均值
//返回Int 类型public static <T> Collector<T, ?, IntSummaryStatistics> summarizingInt(        ToIntFunction<? super T> mapper)//返回Double 类型public static <T> Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(        ToDoubleFunction<? super T> mapper)//返回Long 类型public static <T> Collector<T, ?, LongSummaryStatistics> summarizingLong(        ToLongFunction<? super T> mapper)        复制代码
  • 示例
List<Integer> demo = Arrays.asList(1, 2, 5);IntSummaryStatistics data = demo.stream().collect(Collectors.summarizingInt(Integer::intValue));System.out.println(data);---------result----------IntSummaryStatistics{count=3, sum=8, min=1, average=2.666667, max=5}复制代码

JDK12提供的新聚合办法

//流别离通过downstream1、downstream2聚合解决,再合并两聚合后果public static <T, R1, R2, R> Collector<T, ?, R> teeing(        Collector<? super T, ?, R1> downstream1,        Collector<? super T, ?, R2> downstream2,        BiFunction<? super R1, ? super R2, R> merger) 复制代码

5 并发paralleStream的应用

  • 配合CompletableFuture和线程池的应用
  • 示例
public static void main(String[] args)  throws Exception{    List<Integer> demo = Stream.iterate(0, item -> item + 1)            .limit(5)            .collect(Collectors.toList());    //示例1    Stopwatch stopwatch = Stopwatch.createStarted(Ticker.systemTicker());    demo.stream().forEach(item -> {        try {            Thread.sleep(500);            System.out.println("示例1-"+Thread.currentThread().getName());        } catch (Exception e) { }    });    System.out.println("示例1-"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));    //示例2, 留神须要ForkJoinPool,parallelStream才会应用executor指定的线程,否则还是用默认的 ForkJoinPool.commonPool()    ExecutorService executor = new ForkJoinPool(10);    stopwatch.reset(); stopwatch.start();    CompletableFuture.runAsync(() -> demo.parallelStream().forEach(item -> {        try {            Thread.sleep(1000);            System.out.println("示例2-" + Thread.currentThread().getName());        } catch (Exception e) { }    }), executor).join();    System.out.println("示例2-"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));    //示例3    stopwatch.reset(); stopwatch.start();    demo.parallelStream().forEach(item -> {        try {            Thread.sleep(1000);            System.out.println("示例3-"+Thread.currentThread().getName());        } catch (Exception e) { }    });    System.out.println("示例3-"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));    executor.shutdown();}复制代码
  • -------------------result--------------------------
示例1-main示例1-main示例1-main示例1-main示例1-main示例1-2501示例2-ForkJoinPool-1-worker-19示例2-ForkJoinPool-1-worker-9示例2-ForkJoinPool-1-worker-5示例2-ForkJoinPool-1-worker-27示例2-ForkJoinPool-1-worker-23示例2-1004示例3-main示例3-ForkJoinPool.commonPool-worker-5示例3-ForkJoinPool.commonPool-worker-7示例3-ForkJoinPool.commonPool-worker-9示例3-ForkJoinPool.commonPool-worker-3示例3-1001复制代码
  • parallelStream的办法的确会应用多线程去运行,并且能够指定线程池,不过自定义线程必须是ForkJoinPool类型,否则会默认使ForkJoinPool.commonPool()的线程

链接:https://juejin.cn/post/690333...