共计 9024 个字符,预计需要花费 23 分钟才能阅读完成。
怎么用 Java 8 优雅的开发业务
[TOC]
函数式编程
匿名函数
λ 演算
流式编程
基本原理
在 Java
中流式编程的基本原理有两点。
- 构建流
- 数据流转(流水线)
- 规约
IntStream.rangeClosed(1, 100) // 1. 构建流
.mapToObj(String::valueOf)// 2. 数据流转(流水线)
.collect(joining()); // 3. 规约
案例
- 英雄的主地位一共有几类,别离是什么
@Test
fun t1() {
// 英雄的主地位一共有几类,别离是什么
// 映射
val roleMains = heroes.map(Hero::getRoleMain)
// 过滤为空的数据
.filter(Objects::nonNull)
// 去重
.distinct()
println(roleMains.size)
println(roleMains)
}
@Test
public void t1() {
// 英雄的主地位一共有几类,别离是什么
List<String> roleMains = heroes.stream()
// 映射
.map(Hero::getRoleMain)
// 过滤为空的数据
.filter(Objects::nonNull)
// 去重
.distinct()
// 收集列表
.collect(toList());
System.out.println(roleMains.size());
System.out.println(roleMains);
}
- 英雄按主次地位分组后,输入每个分组有多少英雄,其中:近战英雄有多少位,近程英雄有多少位
@Test
fun t2() {
// 英雄按主次地位分组后,输入每个分组有多少英雄,其中:近战英雄有多少位,近程英雄有多少位
// 主次地位分组的英雄数量
val groupHeroCount = heroes.groupingBy {Pair.of(it.roleMain, it.roleAssist)
}.eachCount()
// 主次分组后,再按攻打范畴分组的英雄数量
val groupThenGroupCount = heroes.groupBy {Pair.of(it.roleMain, it.roleAssist)
}.map {val value = it.value.groupingBy(Hero::getAttackRange).eachCount()
Pair.of(it.key, value)
}.associateBy({it.left}, {it.value})
// 遍历输入
groupThenGroupCount.forEach {(groupKey, groupValue) ->
val groupingCount = groupHeroCount[groupKey]
print("英雄分组 key 为:$groupKey; 英雄数量:$groupingCount;")
groupValue.forEach {(countKey, countValue) ->
print("英雄攻打范畴:$countKey; 英雄数量:$countValue;")
}
println()}
}
@Test
public void t2() {
// 英雄按主次地位分组后,输入每个分组有多少英雄,其中:近战英雄有多少位,近程英雄有多少位
// 主次地位分组的英雄数量
Map<Pair<String, String>, Long> groupHeroCount = heroes.stream()
.collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), counting()));
// 主次分组后,再按攻打范畴分组的英雄数量
Map<Pair<String, String>, Map<String, Long>> groupThenGroupCount = heroes.stream()
.collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()),
groupingBy(Hero::getAttackRange, counting())));
// 遍历输入
groupThenGroupCount.forEach((groupKey, groupValue) -> {Long groupingCount = groupHeroCount.get(groupKey);
System.out.print("英雄分组 key 为:" + groupKey + "; 英雄数量:" + groupingCount + ";");
groupValue.forEach((countKey, countValue) -> System.out.print("英雄攻打范畴:" + countKey + "; 英雄数量:" + countValue + ";"));
System.out.println();});
}
- 求近战英雄 HP 初始值的加总
@Test
fun t3() {
// 求近战英雄 HP 初始值的加总
val sum = heroes.filter {"近战" == it.attackRange}
.map(Hero::getHpStart)
.filter(Objects::nonNull)
.reduce(BigDecimal::add)
println("近战英雄 HP 初始值的加总为:$sum")
}
@Test
public void t3() {
// 求近战英雄 HP 初始值的加总
BigDecimal sum = heroes.stream()
.filter(hero -> "近战".equals(hero.getAttackRange()))
.map(Hero::getHpStart)
.filter(Objects::nonNull)
.reduce(BigDecimal.ZERO, BigDecimal::add);
System.out.println("近战英雄 HP 初始值的加总为:" + sum);
}
- 通过最小列表收集器获取最小列表
@Test
public void t4() {
// 通过最小列表收集器获取最小列表
List<BigDecimal> minAttackGrowth = heroes.stream()
.map(Hero::getAttackGrowth)
.collect(new MinListCollector<>());
System.out.println(minAttackGrowth);
List<Hero> minHero = heroes.stream()
.collect(new MinListCollector<>());
System.out.println(minHero);
}
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import static java.util.stream.Collector.Characteristics.*;
/**
* 最小列表收集器
*
* @author switch
* @since 2020/8/18
*/
public class MinListCollector<T extends Comparable<? super T>> implements Collector<T, List<T>, List<T>> {
/**
* 收集器的个性
*
* @see Characteristics
*/
private final static Set<Characteristics> CHARACTERISTICS = Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));
private final static int ZERO = 0;
/**
* 最小值
*/
private final AtomicReference<T> min = new AtomicReference<>();
@Override
public Supplier<List<T>> supplier() {
// supplier 参数用于生成后果容器,容器类型为 A
return ArrayList::new;
}
@Override
public BiConsumer<List<T>, T> accumulator() {
// accumulator 用于生产元素,也就是演绎元素,这里的 T 就是元素,它会将流中的元素一个一个与后果容器 A 产生操作
return (list, element) -> {
// 获取最小值
T minValue = min.get();
if (Objects.isNull(minValue)) {
// 第一次比拟
list.add(element);
min.set(element);
} else if (element.compareTo(minValue) < ZERO) {
// 发现更小的值
list.clear();
list.add(element);
min.compareAndSet(minValue, element);
} else if (element.compareTo(minValue) == ZERO) {
// 与最小值相等
list.add(element);
}
};
}
@Override
public BinaryOperator<List<T>> combiner() {
// combiner 用于两个两个合并并行执行的线程的执行后果,将其合并为一个最终后果 A
return (left, right) -> {
// 最小值列表合并
List<T> leftList = getMinList(left);
List<T> rightList = getMinList(right);
leftList.addAll(rightList);
return leftList;
};
}
private List<T> getMinList(List<T> list) {return list.stream()
.filter(element -> element.compareTo(min.get()) == ZERO)
.collect(Collectors.toList());
}
@Override
public Function<List<T>, List<T>> finisher() {
// finisher 用于将之前整合完的后果 R 转换成为 A
return Function.identity();}
@Override
public Set<Characteristics> characteristics() {
// characteristics 示意以后 Collector 的特征值,这是个不可变 Set
return CHARACTERISTICS;
}
}
优雅的空解决
import org.junit.Test;
import java.util.Optional;
/**
* @author switch
* @since 2020/8/18
*/
public class OptionalTests {
@Test
public void t1() {
// orElse
System.out.println(Optional.ofNullable(null).orElse("张三"));
System.out.println(Optional.ofNullable(null).orElseGet(() -> "李四"));
System.out.println(Optional.ofNullable("王五").orElseThrow(NullPointerException::new));
}
@Test
public void t2() {
// isPresent
Optional<String> name = Optional.ofNullable("张三");
if (name.isPresent()) {System.out.println(name.get());
}
}
@Test
public void t3() {
// map
Optional<Integer> number = Optional.of("123456").map(Integer::valueOf);
if (number.isPresent()) {System.out.println(number.get());
}
}
@Test
public void t4() {
// flatMap
Optional<Integer> number = Optional.of("123456").flatMap(s -> Optional.of(Integer.valueOf(s)));
if (number.isPresent()) {System.out.println(number.get());
}
}
@Test
public void t5() {
// 过滤
String number = "123456";
String filterNumber = Optional.of(number).filter(s -> !s.equals(number)).orElse("654321");
System.out.println(filterNumber);
}
}
新的并发工具类CompletableFuture
单机批处理多线程执行模型
该模型实用于百万级量级的工作。超过千万数据,能够思考分组,多机器并行执行。
根本流程:
- 从数据库获取 Id 列表
- 拆分成 n 个子 Id 列表
- 通过子 Id 列表获取关联数据(留神:都须要提供批量查问接口)
- 映射到须要解决的 Model(提交到 CompletableFuture)-> 解决数据 -> 收集成 list)(java 8 流式解决)
- 收集的 list 进行 join 操作
- 收集 list
模型
模型原理:Stream+CompletableFuture+lambda
简要解释:
- CompletableFuture 是 java8 提供的一个工具类,次要是用于异步解决流程编排的。
- Stream 是 java8 提供的一个汇合流式解决工具类,次要用于数据的流水线解决。
- lambda 在 java 中是基于外部匿名类实现的,能够大幅缩小反复代码。
- 总结:在该模型中 Stream 用于汇合流水线解决、CompletableFuture 解决异步编排问题(非阻塞)、lambda 简化代码。
- 数据流动
List<List<String>> ->
Stream<List<String>> ->
Stream<List<Model>> ->
Stream<CompletableFuture<List<Model>>> ->
Stream<CompletableFuture<List< 映射类型 >>> ->
List<CompletableFuture<Void>>
案例
ThreadPoolUtil
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
public final class ThreadPoolUtil {public static ThreadPoolTaskExecutor getDefaultExecutor(Integer poolSize, Integer maxPoolSize, Integer queueCapacity) {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setAllowCoreThreadTimeOut(true);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setCorePoolSize(poolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
ThreadPoolConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ThreadPoolConfig {
/**
* 计算规定:N(thread) = N(cpu) * U(cpu) * (1 + w/c)
* N(thread):线程池大小
* N(cpu):处理器核数
* U(cpu):冀望 CPU 利用率(该值应该介于 0 和 1 之间)* w/c:是等待时间与计算工夫的比率,比如说 IO 操作即为等待时间,计算解决即为计算工夫
*/
private static final Integer TASK_POOL_SIZE = 50;
private static final Integer TASK_MAX_POOL_SIZE = 100;
private static final Integer TASK_QUEUE_CAPACITY = 1000;
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {return ThreadPoolUtil.getDefaultExecutor(TASK_POOL_SIZE, TASK_MAX_POOL_SIZE, TASK_QUEUE_CAPACITY);
}
}
#getFuturesStream
public Stream<CompletableFuture<List<Model>>> getFuturesStream(List<List<String>> idSubLists) {return idSubLists.stream()
.map(ids ->
CompletableFuture.supplyAsync(() -> modelService.listByIds(ids), taskExecutor)
);
}
#standardisation
public void standardisation() {List<CompletableFuture<Void>> batchFutures = getFuturesStream(idSubLists)
.map(future -> future.thenApply(this::listByNormalize))
.map(future -> future.thenAccept(modelService::batchUpdateData))
.collect(Collectors.toList());
List<Void> results = batchFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
调整线程池的大小
《Java 并发编程实战》一书中,Brian Goetz 和合著者们为线程池大小的优化提供了不少中肯的倡议。这十分重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,节约大量的工夫在上下文切换上。反之,如果线程的数目过少,正如你的利用所面临的状况,处理器的一些核可能就无奈充分利用。Brian Goetz 倡议,线程池大小与处理器的利用率之比能够应用上面的公式进行估算:
$$N_{threads} = N_{CPU} * U_{CPU} * (1 + \frac{W}{C})$$其中:
- $N_{CPU}$ 是处理器的核的数目,能够通过
Runtime.getRuntime().availableProcessors()
失去- $U_{CPU}$ 是冀望的 CPU 利用率(该值应该介于 0 和 1 之间)
- $\frac{W}{C}$ 是等待时间与计算工夫的比率,比如说 IO 操作即为等待时间,计算解决即为计算工夫
并行——应用流还是 CompletableFutures?
对汇合进行并行计算有两种形式:要么将其转化为并行流,利用 map 这样的操作发展工作,要么枚举出汇合中的每一个元素,创立新的线程,在 CompletableFuture 内对其进行操作。后者提供了更多的灵活性,能够调整线程池的大小,而这能帮忙确保整体的计算不会因为线程都在期待 I / O 而产生阻塞。
应用这些 API 的倡议如下:
- 如果进行的是计算密集型的操作,并且没有 I /O,那么举荐应用 Stream 接口,因为实现简略,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创立比处理器核数更多的线程)。
- 反之,如果并行的工作单元还波及期待 I / O 的操作(包含网络连接期待),那么应用 CompletableFuture 灵活性更好,能够根据期待 / 计算,或者 $\frac{W}{C}$ 的比率设定须要应用的线程数。这种状况不应用并行流的另一个起因是,解决流的流水线中如果产生 I / O 期待,流的提早个性很难判断到底什么时候触发了期待。
日期和工夫 API
使用指南:https://www.yuque.com/docs/sh…(明码:gtag)《时区工具类使用指南》
我的项目地址
GitHub:java8-fluent
参考
- Java 8 实战学习笔记
- Java 8 函数式编程学习笔记
- 深刻了解 Java 函数式编程和 Streams API
分享并记录所学所见
正文完