Stream流与Lambda表达式(四) 自定义收集器

7次阅读

共计 11133 个字符,预计需要花费 28 分钟才能阅读完成。

一、自定义 SetCustomCollector 收集器
package com.java.design.Stream.CustomCollector;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

/**
* @author 陈杨
*/

// 将 List 集合转换为 Set 集合 存放相同元素
public class SetCustomCollector<T> implements Collector<T, Set<T>, Set<T>> {

@Override
public Supplier<Set<T>> supplier() {

System.out.println(“supplier invoked!”);
// return TreeSet::new;
return HashSet::new;
}

@Override
public BiConsumer<Set<T>, T> accumulator() {

System.out.println(“accumulator invoked!”);
return Set<T>::add;
}

@Override
public BinaryOperator<Set<T>> combiner() {

System.out.println(“combiner invoked!”);
return (first, last) -> {
first.addAll(last);
return first;
};
}

@Override
public Function<Set<T>, Set<T>> finisher() {

System.out.println(“finisher invoked!”);
return Function.identity();
}

@Override
public Set<Characteristics> characteristics() {

System.out.println(“characteristics invoked!”);
return Collections.unmodifiableSet(EnumSet.of
(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED));
// return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED));
}

}
二、自定义 StudentCustomCollector 收集器
package com.java.design.Stream.CustomCollector;

import com.java.design.java8.entity.Student;

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;

/**
* @author 陈杨
*/

// 将学生对象 按照 HashMap<Integer,Student> 存放 sid student
public class StudentCustomCollector implements Collector<Student, List<Student>, Map<Integer, Student>> {

@Override
public Supplier<List<Student>> supplier() {
System.out.println(“supplier invoked!”);
return ArrayList::new;
}

@Override
public BiConsumer<List<Student>, Student> accumulator() {
System.out.println(“accumulator invoked!”);
return (list, student) -> {
System.out.println(“accumulator:” + Thread.currentThread().getName());
list.add(student);
};
}

@Override
public BinaryOperator<List<Student>> combiner() {
System.out.println(“combiner invoked!”);
return (first, last) -> {
first.addAll(last);
return first;
};
}

@Override
public Function<List<Student>, Map<Integer, Student>> finisher() {
System.out.println(“finisher invoked!”);
return list -> {
Map<Integer, Student> map = new HashMap<>();
list.forEach(student -> map.put(student.getId(), student));
return map;
};
}

@Override
public Set<Characteristics> characteristics() {
System.out.println(“Characteristics invoked!”);
return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT));
}

// Characteristics.IDENTITY_FINISH 从中间容器数据类型 转换为 结果类型 数据类型一致
// 若不一致 抛出类型转换异常 finisher 对中间容器数据 –> 结果类型 进行强制类型转换

// Characteristics.CONCURRENT 多个线程同时操作同一个容器 –> 并行
// Indicates that this collector is <em>concurrent</em>, meaning that
// the result container can support the accumulator function being
// called concurrently with the same result container from multiple threads.

// parallelStream(多线程)并行流 操作 多个结果容器 –> 执行 combiner

// Characteristics.CONCURRENT + parallelStream 结果容器只有 1 个 —> 不执行 combiner

// ConcurrentModificationException 并发修改异常
// 注意:并行情况下 累加器对结果容器执行单一操作
// 不要在累加器返回的函数式接口实例中做额外的操作
// 不能打印集合类容 同时向集合里添加新元素
// This exception may be thrown by methods that have detected concurrent
// modification of an object when such modification is not permissible
}
三、SetCustomCollectorTest 测试
package com.java.design.java8.Stream.CustomCollector;

import com.java.design.Stream.CustomCollector.SetCustomCollector;
import com.java.design.java8.entity.Student;
import com.java.design.java8.entity.Students;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;
import java.util.Set;

/**
* @author 陈杨
*/

@SpringBootTest
@RunWith(SpringRunner.class)
public class SetCustomCollectorTest {

private List<Student> students;

@Before
public void init() {
students = new Students().init();
}

@Test
public void testSetCustomCollector() {

Set<Student> set = students.stream().collect(new SetCustomCollector<>());

System.out.println(set);
}

/*public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
BiConsumer<I, ? super T> accumulator = collector.accumulator();
BinaryOperator<I> combiner = collector.combiner();
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}

@Override
public void accept(T t) {
accumulator.accept(state, t);
}

@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}

@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}*/

/*public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}*/

// 执行流程 方法调用顺序
// container = evaluate(ReduceOps.makeRef(collector));
// Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
// BiConsumer<I, ? super T> accumulator = collector.accumulator();
// BinaryOperator<I> combiner = collector.combiner();
// return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) 是否有序
// return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) 是否包含 IDENTITY_FINISH
// ? (R) container 注意强制类型转换(中间类型 与 返回结果类型)

// 注意强制类型转换
/*CollectorImpl(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Set<Characteristics> characteristics) {
this(supplier, accumulator, combiner, castingIdentity(), characteristics);
}

@SuppressWarnings(“unchecked”)
private static <I, R> Function<I, R> castingIdentity() {
return i -> (R) i;
}*/

// EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED)
// 包含 IDENTITY_FINISH 打印结果
// supplier invoked!
// accumulator invoked!
// combiner invoked!
// characteristics invoked!
// characteristics invoked!
// Set<Student> 集合对象

// EnumSet.of(Characteristics.UNORDERED)
// 不包含 IDENTITY_FINISH 打印结果
// supplier invoked!
// accumulator invoked!
// combiner invoked!
// characteristics invoked!
// characteristics invoked!
// finisher invoked!
// Set<Student> 集合对象

}
四、StudentCustomCollectorTest 测试
package com.java.design.java8.Stream.CustomCollector;

import com.java.design.Stream.CustomCollector.StudentCustomCollector;
import com.java.design.java8.entity.Student;
import com.java.design.java8.entity.Students;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;
import java.util.Map;

/**
* @author 陈杨
*/

@SpringBootTest
@RunWith(SpringRunner.class)
public class StudentCustomCollectorTest {

private List<Student> students;

@Before
public void init() {
students = new Students().init();
}

@Test
public void testStudentCustomCollectorTest() {

System.out.println(“ 单线程 ”);
Map<Integer, Student> sequentialMap = students.stream().collect(new StudentCustomCollector());
System.out.println(“ 串行流执行效果:\n—————————————\n”+sequentialMap);
System.out.println(“—————————————\n”);

System.out.println(“ 多线程 ”);
Map<Integer, Student> parallelMap = students.parallelStream().collect(new StudentCustomCollector());
System.out.println(“ 并行流执行效果:\n—————————————\n”+parallelMap);
System.out.println(“—————————————\n”);
}

}
五、测试结果
SetCustomCollectorTest 测试结果
. ____ _ __ _ _
/\\ / ___’_ __ _ _(_)_ __ __ _ \ \ \ \
(()\___ | ‘_ | ‘_| | ‘_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| |) ) ) )
‘ |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.2.RELEASE)

2019-02-20 17:14:45.547 INFO 3260 — [main] c.j.d.j.S.C.SetCustomCollectorTest : Starting SetCustomCollectorTest on DESKTOP-87RMBG4 with PID 3260 (started by 46250 in E:\IdeaProjects\design)
2019-02-20 17:14:45.548 INFO 3260 — [main] c.j.d.j.S.C.SetCustomCollectorTest : No active profile set, falling back to default profiles: default
2019-02-20 17:14:46.055 INFO 3260 — [main] c.j.d.j.S.C.SetCustomCollectorTest : Started SetCustomCollectorTest in 0.686 seconds (JVM running for 1.43)
supplier invoked!
accumulator invoked!
combiner invoked!
characteristics invoked!
characteristics invoked!
[Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8), Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8)]

StudentCustomCollectorTest 测试
. ____ _ __ _ _
/\\ / ___’_ __ _ _(_)_ __ __ _ \ \ \ \
(()\___ | ‘_ | ‘_| | ‘_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| |) ) ) )
‘ |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.2.RELEASE)

2019-02-20 17:15:52.817 INFO 3292 — [main] c.j.d.j.S.C.StudentCustomCollectorTest : Starting StudentCustomCollectorTest on DESKTOP-87RMBG4 with PID 3292 (started by 46250 in E:\IdeaProjects\design)
2019-02-20 17:15:52.818 INFO 3292 — [main] c.j.d.j.S.C.StudentCustomCollectorTest : No active profile set, falling back to default profiles: default
2019-02-20 17:15:53.354 INFO 3292 — [main] c.j.d.j.S.C.StudentCustomCollectorTest : Started StudentCustomCollectorTest in 0.745 seconds (JVM running for 1.439)
单线程
supplier invoked!
accumulator invoked!
combiner invoked!
Characteristics invoked!
accumulator:main
accumulator:main
accumulator:main
accumulator:main
accumulator:main
Characteristics invoked!
finisher invoked!
串行流执行效果:
—————————————
{1=Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8), 2=Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), 3=Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), 4=Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), 5=Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)}
—————————————

多线程
Characteristics invoked!
Characteristics invoked!
supplier invoked!
accumulator invoked!
combiner invoked!
Characteristics invoked!
accumulator:main
accumulator:ForkJoinPool.commonPool-worker-5
accumulator:ForkJoinPool.commonPool-worker-5
accumulator:ForkJoinPool.commonPool-worker-3
accumulator:main
Characteristics invoked!
finisher invoked!
并行流执行效果:
—————————————
{1=Student(id=1, name=Kirito, sex=Male, age=18, addr=Sword Art Online, salary=9.99999999E8), 2=Student(id=2, name=Asuna, sex=Female, age=17, addr=Sword Art Online, salary=9.99999999E8), 3=Student(id=3, name=Sinon, sex=Female, age=16, addr=Gun Gale Online, salary=9.99999999E8), 4=Student(id=4, name=Yuuki, sex=Female, age=15, addr=Alfheim Online, salary=9.99999999E8), 5=Student(id=5, name=Alice, sex=Female, age=14, addr=Alicization, salary=9.99999999E8)}
—————————————

正文完
 0