一、收集器简介
把列表中的交易按货币分组:
Map<Currency, List<Transaction>> transactionsByCurrencies =
transactions.stream().collect(groupingBy(Transaction::getCurrency));
从 Collectors
类提供的工厂方法(例如 groupingBy)创建的收集器。它们主要提供了三大功能:
- 将流元素归约和汇总为一个值
- 元素分组
- 元素分区
二、归约和汇总
数一数菜单里有多少种菜:
long howManyDishes = menu.stream().collect(Collectors.counting());
这还可以写得更为直接:
long howManyDishes = menu.stream().count();
1. 查找流中的最大值和最小值
可以使用两个收集器,Collectors.maxBy 和 Collectors.minBy,来计算流中的最大或最小值。这两个收集器接收一个 Comparator 参数来比较流中的元素.
找出菜单中热量最高的菜:
Comparator<Dish> dishCaloriesComparator =
Comparator.comparingInt(Dish::getCalories);
Optional<Dish> mostCalorieDish =
menu.stream()
.collect(maxBy(dishCaloriesComparator));
2. 汇总
-
Collectors.summingInt
它可接受一个把对象映射为求和所需 int 的函数,并返回一个收集器;该收集器在传递给普通的 collect 方法后即执行我们需要的汇总操作。
eg:int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));
另外,Collectors.summingLong 和 Collectors.summingDouble方法的作用完全一样,可以用于求和字段为 long 或 double 的情况。还有 Collectors.averagingInt,连同对应的averagingLong 和 averagingDouble 可以计算数值的平均数。
-
summarizing 操作
通过一次 summarizing 操作你可以就数出菜单中元素的个数,并得到菜肴热量 总和、平均值、最大值和最小值:IntSummaryStatistics menuStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));
这个收集器会把所有这些信息收集到一个叫作 IntSummaryStatistics 的类里,它提供了方便的取值 (getter) 方法来访问结果。打印 menuStatisticobject 会得到以下输出:
IntSummaryStatistics{count=9, sum=4300, min=120, average=477.777778, max=800}
同样,相应的 summarizingLong 和 summarizingDouble 工厂方法有相关的 LongSummaryStatistics 和 DoubleSummaryStatistics 类型。
3. 连接字符串
joining工厂方法返回的收集器会把对流中每一个对象应用 toString 方法得到的所有字符串连接成一个字符串。
String shortMenu = menu.stream().map(Dish::getName).collect(joining());
joining 工厂方法有一个重载版本可以接受元素之间的分界符
String shortMenu = menu.stream().map(Dish::getName).collect(joining(","));
4. 广义的归约汇总
可以用 reducing 方法创建的收集器来计算你菜单的总热量,如下所示:
int totalCalories = menu.stream().collect(reducing(0, Dish::getCalories, (i, j) -> i + j));
- 第一个参数是归约操作的起始值。
- 第二个参数将菜肴转换成一个表示其所含热量的 int。
- 第三个参数是一个 BinaryOperator,将两个项目累积成一个同类型的值。这里它就是对两个 int 求和。
单参数形式的 reducing 来找到热量最高的菜,如下所示:
Optional<Dish> mostCalorieDish =
menu.stream().collect(reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2));
相比 stream 的 reduce 方法 collect 方法特别适合表达可变容器上的归约,更关键的是它适合并行操作
计算菜单里所有菜肴的卡路里总和,以不同的方法执行同样的操作:
第一种:
int totalCalories = menu.stream().collect(reducing(0,
Dish::getCalories,
Integer::sum));
第二种:
int totalCalories =
menu.stream().map(Dish::getCalories).reduce(Integer::sum).get();//reduce 返回的是 Optional
第三种:
int totalCalories = menu.stream().mapToInt(Dish::getCalories).sum();
最后一种最佳。
三、分组
假设你要把菜单中的菜按照类型进行分类,有肉的放一组,有鱼的放一组,其他的都放另一组。用 Collectors.groupingBy 工厂方法返回的收集器就可以轻松地完成这项任务,如下所示:
Map<Dish.Type, List<Dish>> dishesByType =
menu.stream().collect(groupingBy(Dish::getType));
其结果是下面的 Map:
{FISH=[prawns, salmon], OTHER=[french fries, rice, season fruit, pizza],
MEAT=[pork, beef, chicken]}
给 groupingBy 方法传递了一个 Function(以方法引用的形式),它提取了流中每一道 Dish 的 Dish.Type。我们把这个 Function 叫作 分类函数
如果 Dish 中没有定义类型获取方法,可以使用 lambda 表达式:
public enum CaloricLevel {DIET, NORMAL, FAT}
Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(
groupingBy(dish -> {if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return
CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
} ));
1. 多级分组
使用一个由双参数版本的 Collectors.groupingBy 工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受 collector 类型的第二个参数:
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel =
menu.stream().collect(
groupingBy(Dish::getType,
groupingBy(dish -> {if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
} )
)
);
这种多级分组操作可以扩展至任意层级,n 级分组就会得到一个代表 n 级树形结构的 n 级 Map
2. 按子组收集数据
传递给第一个 groupingBy 的第二个收集器可以是任何类型,而不一定是另一 groupingBy
Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));
其结果是下面的 Map:
{MEAT=3, FISH=2, OTHER=4}
普通的单参数 groupingBy(f)(其中 f 是分类函数)实际上是 groupingBy(f, toList()) 的简便写法。
-
把收集器的结果转换为另一种类型
查找每个子组中热量最高的 DishMap<Dish.Type, Dish> mostCaloricByType = menu.stream() .collect(groupingBy(Dish::getType, collectingAndThen(maxBy(comparingInt(Dish::getCalories)), //maxBy 工厂方法生成的收集器的类型是 Optional Optional::get)));
包装的 Optional 没什么用,把收集器返回的结果转换为另一种类型,你可以使用 Collectors.collectingAndThen 工厂方法;返回的收集器 groupingBy 收集器只有在应用分组条件后,第一次在流中找到某个键对应的元素时才会把键加入分组 Map 中, 所以 Optional::get 这个操作放在这里是安全的,因为 reducing 收集器永远都不会返回 Optional.empty()
-
与 groupingBy 联合使用的其他收集器的例子
Map<Dish.Type, Integer> totalCaloriesByType = menu.stream().collect(groupingBy(Dish::getType, summingInt(Dish::getCalories)));
对于每种类型的 Dish,菜单中都有哪些 CaloricLevel。我们可以把 groupingBy 和 mapping 收集器结合起来,如下所示:
Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType = menu.stream().collect( groupingBy(Dish::getType, mapping( dish -> {if (dish.getCalories() <= 400) return CaloricLevel.DIET; else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; else return CaloricLevel.FAT; }, toSet())));// 生成的 CaloricLevel 流传递给一个 toSet 收集器,// 它和 toList 类似,不过是把流中的元素累积到一个 Set 而不是 List 中,以便仅保留各不相同的值。
但通过使用toCollection,你就可以有更多的控制。例如,你可以给它传递一个构造函数引用来要求 HashSet:
Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType = menu.stream().collect( groupingBy(Dish::getType, mapping(dish -> { if (dish.getCalories() <= 400) return CaloricLevel.DIET; else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; else return CaloricLevel.FAT; }, toCollection(HashSet::new) )));
四、分区
1. 分区的优势
分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组 Map 的键类型是 Boolean,于是它最多可以分为两组——true 是一组,false 是一组。例如,如果你是素食者或是请了一位素食的朋友来共进晚餐,可能会想要把菜单按照素食和非素食分开:
Map<Boolean, List<Dish>> partitionedMenu =
menu.stream().collect(partitioningBy(Dish::isVegetarian));
计算素食和非素食的数量:
menu.stream().collect(partitioningBy(Dish::isVegetarian,
counting()));
2. 将数字按质数和非质数分区
public boolean isPrime(int candidate) {int candidateRoot = (int) Math.sqrt((double) candidate);
return IntStream.rangeClosed(2, candidateRoot)
.noneMatch(i -> candidate % i == 0);
}
public Map<Boolean, List<Integer>> partitionPrimes(int n) {return IntStream.rangeClosed(2, n).boxed()
.collect(partitioningBy(candidate -> isPrime(candidate)));
}
Collectors 类的静态工厂方法:
五、收集器接口
public interface Collector<T, A, R> {Supplier<A> supplier();
BiConsumer<A, T> accumulator();
Function<A, R> finisher();
BinaryOperator<A> combiner();
Set<Characteristics> characteristics();}
- T 是流中要收集的项目的泛型。
- A 是累加器的类型,累加器是在收集过程中用于累积部分结果的对象。
- R 是收集操作得到的对象(通常但并不一定是集合)的类型。
例如,你可以实现一个 ToListCollector<T> 类,将 Stream<T> 中的所有元素收集 List<T> 里,它的签名如下:
public class ToListCollector<T> implements Collector<T, List<T>, List<T>>
1. 理解 Collector 接口声明的方法
(1)建立新的结果容器:supplier 方法
在调用时它会创建一个空的累加器实例,供数据收集过程使用
public Supplier<List<T>> supplier() {return () -> new ArrayList<T>();}
或者使用构造函数引用;
public Supplier<List<T>> supplier() {return ArrayList::new;}
(2)将元素添加到结果容器:accumulator 方法
accumulator 方法会返回执行归约操作的函数。当遍历到流中第 n 个元素时,这个函数执行时会有两个参数:保存归约结果的累加器(已收集了流中的前 n1 个项目),还有第 n 个元素本身。该函数将返回 void,因为累加器是原位更新,即函数的执行改变了它的内部状态以体现遍历的元素的效果。对于 ToListCollector,这个函数仅仅会把当前项目添加至已经遍历过的项目的列表:
public BiConsumer<List<T>, T> accumulator() {return (list, item) -> list.add(item);
}
你也可以使用方法引用,这会更为简洁:
public BiConsumer<List<T>, T> accumulator() {return List::add;}
(3)对结果容器应用最终转换:finisher 方法
在遍历完流后,finisher 方法必须返回在累积过程的最后要调用的一个函数,以便将累加器对象转换为整个集合操作的最终结果。
public Function<List<T>, List<T>> finisher() {return Function.identity(); // 累加器对象恰好符合预期的最终结果,// 因此无需进行转换。所以 finisher 方法只需返回 identity 函数
}
(4) 合并两个结果容器:combiner 方法
combiner 方法会返回一个供归约操作使用的函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得的累加器要如何合并。
public BinaryOperator<List<T>> combiner() {return (list1, list2) -> {list1.addAll(list2);
return list1; }
}
有了这第四个方法,就可以对流进行并行归约了, 会用到 Java 7 中引入的 Fork/Join 框架和 Spliterator 抽象
Fork/Join 是什么?
Fork/Join 框架是 Java7 提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继续分解,然后每个小任务分别计算出结果再合并起来,最后将汇总的结果作为大任务结果。其思想和 MapReduce 的思想非常类似。对于任务的分割,要求各个子任务之间相互独立,能够并行独立地执行任务,互相之间不影响。Fork/Join 的运行流程图如下:
我们可以通过 Fork/Join 单词字面上的意思去理解这个框架。Fork 是叉子分叉的意思,即将大任务分解成并行的小任务,Join 是连接结合的意思,即将所有并行的小任务的执行结果汇总起来。
工作窃取算法
ForkJoin 采用了工作窃取(work-stealing)算法,若一个工作线程的任务队列为空没有任务执行时,便从其他工作线程中获取任务主动执行。为了实现工作窃取,在工作线程中维护了双端队列,窃取任务线程从队尾获取任务,被窃取任务线程从队头获取任务。这种机制充分利用线程进行并行计算,减少了线程竞争。但是当队列中只存在一个任务了时,两个线程去取反而会造成资源浪费。工作窃取的运行流程图如下:
Fork/Join 核心类
1.ForkJoinPool
ForkJoinPool 是 ForkJoin 框架中的任务调度器,和 ThreadPoolExecutor 一样实现了自己的线程池,提供了三种调度子任务的方法:
execute:异步执行指定任务,无返回结果;
invoke、invokeAll:同步执行指定任务,等待完成才返回结果;
submit:异步执行指定任务,并立即返回一个 Future 对象;
2.ForkJoinTask
Fork/Join 框架中的实际的执行任务类,有以下两种实现,一般继承这两种实现类即可。
RecursiveAction:用于无结果返回的子任务;
RecursiveTask:用于有结果返回的子任务;
Fork/Join 框架实战
下面实现一个 Fork/Join 小例子,从 1 +2+…10 亿,每个任务只能处理 1000 个数相加,超过 1000 个的自动分解成小任务并行处理;并展示了通过不使用 Fork/Join 和使用时的时间损耗对比。import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinTask extends RecursiveTask<Long> { private static final long MAX = 1000000000L; private static final long THRESHOLD = 1000L; private long start; private long end; public ForkJoinTask(long start, long end) { this.start = start; this.end = end; } public static void main(String[] args) {test(); System.out.println("--------------------"); testForkJoin();} private static void test() {System.out.println("test"); long start = System.currentTimeMillis(); Long sum = 0L; for (long i = 0L; i <= MAX; i++) {sum += i;} System.out.println(sum); System.out.println(System.currentTimeMillis() - start + "ms"); } private static void testForkJoin() {System.out.println("testForkJoin"); long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); Long sum = forkJoinPool.invoke(new ForkJoinTask(1, MAX)); System.out.println(sum); System.out.println(System.currentTimeMillis() - start + "ms"); } @Override protected Long compute() { long sum = 0; if (end - start <= THRESHOLD) {for (long i = start; i <= end; i++) {sum += i;} return sum; } else {long mid = (start + end) / 2; ForkJoinTask task1 = new ForkJoinTask(start, mid); task1.fork(); ForkJoinTask task2 = new ForkJoinTask(mid + 1, end); task2.fork(); return task1.join() + task2.join(); } } }
这里需要计算结果,所以任务继承的是 RecursiveTask 类。ForkJoinTask 需要实现 compute 方法,在这个方法里首先需要判断任务是否小于等于阈值 1000,如果是就直接执行任务。否则分割成两个子任务,每个子任务在调用 fork 方法时,又会进入 compute 方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用 join 方法会阻塞并等待子任务执行完并得到其结果。
程序输出:
test 500000000500000000 4992ms -------------------- testForkJoin 500000000500000000 508ms
需要特别注意的是:
ForkJoinPool 使用 submit 或 invoke 提交的区别:invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit 是异步执行,只有在 Future 调用 get 的时候会阻塞。
这里继承的是 RecursiveTask,还可以继承 RecursiveAction。前者适用于有返回值的场景,而后者适合于没有返回值的场景
这一点是最容易忽略的地方,其实这里执行子任务调用 fork 方法并不是最佳的选择,最佳的选择是 invokeAll 方法。leftTask.fork(); rightTask.fork();
替换为
invokeAll(leftTask, rightTask);
具体说一下原理:对于 Fork/Join 模式,假如 Pool 里面线程数量是固定的,那么调用子任务的 fork 方法相当于 A 先分工给 B,然后 A 当监工不干活,B 去完成 A 交代的任务。所以上面的模式相当于浪费了一个线程。那么如果使用 invokeAll 相当于 A 分工给 B 后,A 和 B 都去完成工作。这样可以更好的利用线程池,缩短执行的时间。
(5) characteristics 方法
返回一个不可变的 Characteristics 集合,它定义了收集器的行为——尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示。
Characteristics 是一个包含三个项目的枚举。
- UNORDERED——归约结果不受流中项目的遍历和累积顺序的影响。
- CONCURRENT——accumulator 函数可以从多个线程同时调用,且该收集器可以并行归约流。如果收集器没有标为 UNORDERED,那它仅在用于无序数据源时才可以并行归约。
- IDENTITY_FINISH——这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器 A 不加检查地转换为结果 R 是安全的。
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, CONCURRENT));
}
2. 进行自定义收集而不去实现 Collector
Stream 有一个重载的 collect 方法可以接受另外三个函数——supplier、accumulator 和 combiner,其语义和 Collector 接口的相应方法返回的函数完全相同。
List<Dish> dishes = menuStream.collect(
ArrayList::new,
List::add,
List::addAll);// 它永远都是一个 IDENTITY_FINISH 和 CONCURRENT 但并非 UNORDERED 的收集器。