共计 13957 个字符,预计需要花费 35 分钟才能阅读完成。
前言
平时操作汇合数据,咱们个别都是 for 或者 iterator 去遍历,不是很难看。java 提供了 Stream 的概念,它能够让咱们把汇合数据当做一个个元素在解决,并且提供多线程模式
- 流的创立
- 流的各种数据操作
- 流的终止操作
- 流的聚合解决
- 并发流和 CompletableFuture 的配合应用
关注公众号,一起交换,微信搜一搜: 潜行前行
github 地址,感激 star
1 stream 的结构形式
stream 内置的构造方法
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b)
public static<T> Builder<T> builder()
public static<T> Stream<T> of(T t)
public static<T> Stream<T> empty()
public static<T> Stream<T> generate(Supplier<T> s)
Collection 申明的 stream 函数
default Stream<E> stream()
- Collection 申明了 stream 转化函数,也就是说,任意 Collection 子类都存在官网替咱们实现的由 Collection 转为 Stream 的办法
示例,List 转 Stream
public static void main(String[] args){List<String> demo = Arrays.asList("a","b","c"); long count = demo.stream().peek(System.out::println).count(); System.out.println(count); } -------result-------- a b c 3
2 接口 stream 对元素的操作方法定义
过滤 filter
Stream<T> filter(Predicate<? super T> predicate)
- Predicate 是函数式接口,能够间接用 lambda 代替;如果有简单的过滤逻辑,则用 or、and、negate 办法组合
示例
List<String> demo = Arrays.asList("a", "b", "c"); Predicate<String> f1 = item -> item.equals("a"); Predicate<String> f2 = item -> item.equals("b"); demo.stream().filter(f1.or(f2)).forEach(System.out::println); -------result-------- a b
映射转化 map
<R> Stream<R> map(Function<? super T, ? extends R> mapper)
IntStream mapToInt(ToIntFunction<? super T> mapper);
LongStream mapToLong(ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
示例
static class User{public User(Integer id){this.id = id;} Integer id; public Integer getId() { return id;} } public static void main(String[] args) {List<User> demo = Arrays.asList(new User(1), new User(2), new User(3)); // User 转为 Integer(id) demo.stream().map(User::getId).forEach(System.out::println); } -------result-------- 1 2 3
数据处理 peek
Stream<T> peek(Consumer<? super T> action);
- 与 map 的区别是其无返回值
示例
static class User{public User(Integer id){this.id = id;} Integer id; public Integer getId() { return id;} public void setId(Integer id) {this.id = id;} } public static void main(String[] args) {List<User> demo = Arrays.asList(new User(1), new User(2), new User(3)); // id 平方,User 转为 Integer(id) demo.stream().peek(user -> user.setId(user.id * user.id)).map(User::getId).forEach(System.out::println); } -------result-------- 1 4 9
映射撵平 flatMap
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);
- flatMap:将元素为 Stream\<T> 类型的流撵平成一个元素类型为 T 的 Stream 流
示例
public static void main(String[] args) {List<Stream<Integer>> demo = Arrays.asList(Stream.of(5), Stream.of(2), Stream.of(1)); demo.stream().flatMap(Function.identity()).forEach(System.out::println); } -------result-------- 5 2 1
去重 distinct
Stream<T> distinct();
示例
List<Integer> demo = Arrays.asList(1, 1, 2); demo.stream().distinct().forEach(System.out::println); -------result-------- 1 2
排序 sorted
Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
示例
List<Integer> demo = Arrays.asList(5, 1, 2); // 默认升序 demo.stream().sorted().forEach(System.out::println); // 降序 Comparator<Integer> comparator = Comparator.<Integer, Integer>comparing(item -> item).reversed(); demo.stream().sorted(comparator).forEach(System.out::println); ------- 默认升序 result-------- 1 2 5 ------- 降序 result-------- 5 2 1
个数限度 limit 和跳过 skip
// 截取前 maxSize 个元素
Stream<T> limit(long maxSize);
// 跳过前 n 个流
Stream<T> skip(long n);
示例
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6); // 跳过前两个,而后限度截取两个 demo.stream().skip(2).limit(2).forEach(System.out::println); -------result-------- 3 4
JDK9 提供的新操作
和 filter 的区别,takeWhile 是取满足条件的元素,直到不满足为止;dropWhile 是抛弃满足条件的元素,直到不满足为止
default Stream<T> takeWhile(Predicate<? super T> predicate); default Stream<T> dropWhile(Predicate<? super T> predicate);
3 stream 的终止操作 action
遍历生产
// 遍历生产
void forEach(Consumer<? super T> action);
// 程序遍历生产, 和 forEach 的区别是 forEachOrdered 在多线程 parallelStream 执行,其程序也不会乱
void forEachOrdered(Consumer<? super T> action);
示例
List<Integer> demo = Arrays.asList(1, 2, 3); demo.parallelStream().forEach(System.out::println); demo.parallelStream().forEachOrdered(System.out::println); -------forEach result-------- 2 3 1 -------forEachOrdered result-------- 1 2 3
获取数组后果
// 流转成 Object 数组
Object[] toArray();
// 流转成A[] 数组,指定类型 A
<A> A[] toArray(IntFunction<A[]> generator)
示例
List<String> demo = Arrays.asList("1", "2", "3"); //<A> A[] toArray(IntFunction<A[]> generator) String[] data = demo.stream().toArray(String[]::new);
最大最小值
// 获取最小值
Optional<T> min(Comparator<? super T> comparator)
// 获取最大值
Optional<T> max(Comparator<? super T> comparator)
示例
List<Integer> demo = Arrays.asList(1, 2, 3); Optional<Integer> min = demo.stream().min(Comparator.comparing(item->item)); Optional<Integer> max = demo.stream().max(Comparator.comparing(item->item)); System.out.println(min.get()+"-"+max.get()); -------result-------- 1-3
查找匹配
// 任意一个匹配
boolean anyMatch(Predicate<? super T> predicate)
// 全副匹配
boolean allMatch(Predicate<? super T> predicate)
// 不匹配
boolean noneMatch(Predicate<? super T> predicate)
// 查找第一个
Optional<T> findFirst();
// 任意一个
Optional<T> findAny();
归约合并
// 两两合并
Optional<T> reduce(BinaryOperator<T> accumulator)
// 两两合并,带初始值的
T reduce(T identity, BinaryOperator<T> accumulator)
// 先转化元素类型再两两合并,带初始值的
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
示例
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8); // 数字转化为字符串,而后应用“-”拼接起来 String data = demo.stream().reduce("0", (u, t) -> u + "-" + t, (s1, s2) -> s1 + "-" + s2); System.out.println(data); -------result-------- 0-1-2-3-4-5-6-7-8
计算元素个数
long count()
示例
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6); System.out.println(demo.stream().count()); -------result-------- 6
对流的聚合解决
/**
* supplier: 返回后果类型的生产者
* accumulator: 元素消费者(解决并退出 R)* combiner: 返回后果 R 怎么组合(多线程执行时,会产生多个返回值 R,须要合并)*/
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
/**
* collector 个别是由 supplier、accumulator、combiner、finisher、characteristics 组合成的聚合类
* Collectors 可提供一些内置的聚合类或者办法
*/
<R, A> R collect(Collector<? super T, A, R> collector);
- 示例,看上面
4 Collector(聚合类) 的工具类集 Collectors
接口 Collector 和实现类 CollectorImpl
// 返回值类型的生产者
Supplier<A> supplier();
// 流元素消费者
BiConsumer<A, T> accumulator();
// 返回值合并器(多个线程操作时,会产生多个返回值,须要合并)BinaryOperator<A> combiner();
// 返回值转化器(最初一步解决,理论返回后果,个别原样返回)Function<A, R> finisher();
// 流的个性
Set<Characteristics> characteristics();
public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier,
BiConsumer<A, T> accumulator, BinaryOperator<A> combiner,
Function<A, R> finisher, Characteristics... characteristics)
流聚合转换成 List, Set
// 流转化成 List
public static <T> Collector<T, ?, List<T>> toList()
// 流转化成 Set
public static <T> Collector<T, ?, Set<T>> toSet()
示例
List<Integer> demo = Arrays.asList(1, 2, 3); List<Integer> col = demo.stream().collect(Collectors.toList()); Set<Integer> set = demo.stream().collect(Collectors.toSet());
流聚合转化成 Map
// 流转化成 Map
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper)
/**
* mergeFunction: 雷同的 key, 值怎么合并
*/
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction)
/**
* mergeFunction: 雷同的 key, 值怎么合并
* mapSupplier:返回值 Map 的生产者
*/
public static <T, K, U, M extends Map<K, U>> Collector<T, ?, M> toMap(
Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier)
- 如果存在雷同 key 的元素,会报错; 或者应用 groupBy
示例
List<User> demo = Arrays.asList(new User(1), new User(2), new User(3)); Map<Integer,User> map = demo.stream().collect(Collectors.toMap(User::getId,item->item)); System.out.println(map); -------result------- {1=TestS$User@7b23ec81, 2=TestS$User@6acbcfc0, 3=TestS$User@5f184fc6}
字符串流聚合拼接
// 多个字符串拼接成一个字符串
public static Collector<CharSequence, ?, String> joining();
// 多个字符串拼接成一个字符串(指定分隔符)public static Collector<CharSequence, ?, String> joining(CharSequence delimiter)
示例
List<String> demo = Arrays.asList("c", "s", "c","w","潜行前行"); String name = demo.stream().collect(Collectors.joining("-")); System.out.println(name); -------result------- c-s-c-w- 潜行前行
映射解决再聚合流
相当于先 map 再 collect
/** * mapper: 映射处理器 * downstream: 映射解决后须要再次聚合解决 */ public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream);
示例
List<String> demo = Arrays.asList("1", "2", "3"); List<Integer> data = demo.stream().collect(Collectors.mapping(Integer::valueOf, Collectors.toList())); System.out.println(data); -------result------- [1, 2, 3]
聚合后再转换后果
/**
* downstream: 聚合解决
* finisher: 后果转换解决
*/
public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream,
Function<R, RR> finisher);
示例
List<Integer> demo = Arrays.asList(1, 2, 3, 4, 5, 6); // 聚合成 List, 最初提取数组的 size 作为返回值 Integer size = demo.stream().collect(Collectors.collectingAndThen(Collectors.toList(), List::size)); System.out.println(size); ---------result---------- 6
流分组(Map 是 HashMap)
/**
* classifier 指定 T 类型某一属性作为 Key 值分组
* 分组后,应用 List 作为每个流的容器
*/
public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(Function<? super T, ? extends K> classifier);
/**
* classifier: 流分组器
* downstream: 每组流的聚合处理器
*/
public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(
Function<? super T, ? extends K> classifier,Collector<? super T, A, D> downstream)
/**
* classifier: 流分组器
* mapFactory: 返回值 map 的工厂(Map 的子类)* downstream: 每组流的聚合处理器
*/
public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(
Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream)
示例
public static void main(String[] args) throws Exception {List<Integer> demo = Stream.iterate(0, item -> item + 1) .limit(15) .collect(Collectors.toList()); // 分成三组,并且每组元素转化为 String 类型 Map<Integer, List<String>> map = demo.stream() .collect(Collectors.groupingBy(item -> item % 3, HashMap::new, Collectors.mapping(String::valueOf, Collectors.toList()))); System.out.println(map); } ---------result---------- {0=[0, 3, 6, 9, 12], 1=[1, 4, 7, 10, 13], 2=[2, 5, 8, 11, 14]}
流分组 (分组应用的 Map 是 ConcurrentHashMap)
/**
* classifier: 分组器;分组后,应用 List 作为每个流的容器
*/
public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(Function<? super T, ? extends K> classifier);
/**
* classifier: 分组器
* downstream: 流的聚合处理器
*/
public static <T, K, A, D> Collector<T, ?, ConcurrentMap<K, D>> groupingByConcurrent(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream)
/**
* classifier: 分组器
* mapFactory: 返回值类型 map 的生产工厂(ConcurrentMap 的子类)* downstream: 流的聚合处理器
*/
public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(
Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream);
- 用法和 groupingBy 一样
拆分流,一变二(相当于非凡的 groupingBy)
public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate)
/**
* predicate: 二分器
* downstream: 流的聚合处理器
*/
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T, A, D> downstream)
示例
List<Integer> demo = Arrays.asList(1, 2,3,4, 5,6); // 奇数偶数分组 Map<Boolean, List<Integer>> map = demo.stream() .collect(Collectors.partitioningBy(item -> item % 2 == 0)); System.out.println(map); ---------result---------- {false=[1, 3, 5], true=[2, 4, 6]}
聚合求平均值
// 返回 Double 类型
public static <T> Collector<T, ?, Double> averagingDouble(ToDoubleFunction<? super T> mapper)
// 返回 Long 类型
public static <T> Collector<T, ?, Double> averagingLong(ToLongFunction<? super T> mapper)
// 返回 Int 类型
public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper)
示例
List<Integer> demo = Arrays.asList(1, 2, 5); Double data = demo.stream().collect(Collectors.averagingInt(Integer::intValue)); System.out.println(data); ---------result---------- 2.6666666666666665
流聚合查找最大最小值
// 最小值
public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator)
// 最大值
public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator)
示例
List<Integer> demo = Arrays.asList(1, 2, 5); Optional<Integer> min = demo.stream().collect(Collectors.minBy(Comparator.comparing(item -> item))); Optional<Integer> max = demo.stream().collect(Collectors.maxBy(Comparator.comparing(item -> item))); System.out.println(min.get()+"-"+max.get()); ---------result---------- 1-5
聚合计算统计后果
能够取得元素总个数,元素累计总和,最小值,最大值,平均值
// 返回 Int 类型 public static <T> Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper) // 返回 Double 类型 public static <T> Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> mapper) // 返回 Long 类型 public static <T> Collector<T, ?, LongSummaryStatistics> summarizingLong(ToLongFunction<? super T> mapper)
示例
List<Integer> demo = Arrays.asList(1, 2, 5); IntSummaryStatistics data = demo.stream().collect(Collectors.summarizingInt(Integer::intValue)); System.out.println(data); ---------result---------- IntSummaryStatistics{count=3, sum=8, min=1, average=2.666667, max=5}
JDK12 提供的新聚合办法
// 流别离通过 downstream1、downstream2 聚合解决,再合并两聚合后果
public static <T, R1, R2, R> Collector<T, ?, R> teeing(
Collector<? super T, ?, R1> downstream1,
Collector<? super T, ?, R2> downstream2,
BiFunction<? super R1, ? super R2, R> merger)
5 并发 paralleStream 的应用
- 配合 CompletableFuture 和线程池的应用
示例
public static void main(String[] args) throws Exception{List<Integer> demo = Stream.iterate(0, item -> item + 1) .limit(5) .collect(Collectors.toList()); // 示例 1 Stopwatch stopwatch = Stopwatch.createStarted(Ticker.systemTicker()); demo.stream().forEach(item -> { try {Thread.sleep(500); System.out.println("示例 1 -"+Thread.currentThread().getName()); } catch (Exception e) {}}); System.out.println("示例 1 -"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); // 示例 2, 留神须要 ForkJoinPool,parallelStream 才会应用 executor 指定的线程,否则还是用默认的 ForkJoinPool.commonPool() ExecutorService executor = new ForkJoinPool(10); stopwatch.reset(); stopwatch.start(); CompletableFuture.runAsync(() -> demo.parallelStream().forEach(item -> { try {Thread.sleep(1000); System.out.println("示例 2 -" + Thread.currentThread().getName()); } catch (Exception e) {}}), executor).join(); System.out.println("示例 2 -"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); // 示例3 stopwatch.reset(); stopwatch.start(); demo.parallelStream().forEach(item -> { try {Thread.sleep(1000); System.out.println("示例 3 -"+Thread.currentThread().getName()); } catch (Exception e) {}}); System.out.println("示例 3 -"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); executor.shutdown();}
——————-result————————–
示例 1 -main 示例 1 -main 示例 1 -main 示例 1 -main 示例 1 -main 示例 1 -2501 示例 2 -ForkJoinPool-1-worker-19 示例 2 -ForkJoinPool-1-worker-9 示例 2 -ForkJoinPool-1-worker-5 示例 2 -ForkJoinPool-1-worker-27 示例 2 -ForkJoinPool-1-worker-23 示例 2 -1004 示例 3 -main 示例 3 -ForkJoinPool.commonPool-worker-5 示例 3 -ForkJoinPool.commonPool-worker-7 示例 3 -ForkJoinPool.commonPool-worker-9 示例 3 -ForkJoinPool.commonPool-worker-3 示例 3 -1001
- parallelStream 的办法的确会应用多线程去运行,并且能够指定线程池,不过自定义线程必须是 ForkJoinPool 类型,否则会默认使 ForkJoinPool.commonPool() 的线程
正文完