怎么用Java 8优雅的开发业务
[TOC]
函数式编程
匿名函数
演算
流式编程
基本原理
在Java
中流式编程的基本原理有两点。
- 构建流
- 数据流转(流水线)
- 规约
IntStream.rangeClosed(1, 100) // 1. 构建流 .mapToObj(String::valueOf)// 2. 数据流转(流水线) .collect(joining()); // 3. 规约
案例
- 英雄的主地位一共有几类,别离是什么
@Testfun t1() { // 英雄的主地位一共有几类,别离是什么 // 映射 val roleMains = heroes.map(Hero::getRoleMain) // 过滤为空的数据 .filter(Objects::nonNull) // 去重 .distinct() println(roleMains.size) println(roleMains)}
@Testpublic void t1() { // 英雄的主地位一共有几类,别离是什么 List<String> roleMains = heroes.stream() // 映射 .map(Hero::getRoleMain) // 过滤为空的数据 .filter(Objects::nonNull) // 去重 .distinct() // 收集列表 .collect(toList()); System.out.println(roleMains.size()); System.out.println(roleMains);}
- 英雄按主次地位分组后,输入每个分组有多少英雄,其中:近战英雄有多少位,近程英雄有多少位
@Testfun t2() { // 英雄按主次地位分组后,输入每个分组有多少英雄,其中:近战英雄有多少位,近程英雄有多少位 // 主次地位分组的英雄数量 val groupHeroCount = heroes.groupingBy { Pair.of(it.roleMain, it.roleAssist) }.eachCount() // 主次分组后,再按攻打范畴分组的英雄数量 val groupThenGroupCount = heroes.groupBy { Pair.of(it.roleMain, it.roleAssist) }.map { val value = it.value.groupingBy(Hero::getAttackRange).eachCount() Pair.of(it.key, value) }.associateBy({ it.left }, { it.value }) // 遍历输入 groupThenGroupCount.forEach { (groupKey, groupValue) -> val groupingCount = groupHeroCount[groupKey] print("英雄分组key为:$groupKey;英雄数量:$groupingCount;") groupValue.forEach { (countKey, countValue) -> print("英雄攻打范畴:$countKey;英雄数量:$countValue;") } println() }}
@Testpublic void t2() { // 英雄按主次地位分组后,输入每个分组有多少英雄,其中:近战英雄有多少位,近程英雄有多少位 // 主次地位分组的英雄数量 Map<Pair<String, String>, Long> groupHeroCount = heroes.stream() .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), counting())); // 主次分组后,再按攻打范畴分组的英雄数量 Map<Pair<String, String>, Map<String, Long>> groupThenGroupCount = heroes.stream() .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), groupingBy(Hero::getAttackRange, counting()))); // 遍历输入 groupThenGroupCount.forEach((groupKey, groupValue) -> { Long groupingCount = groupHeroCount.get(groupKey); System.out.print("英雄分组key为:" + groupKey + ";英雄数量:" + groupingCount + ";"); groupValue.forEach((countKey, countValue) -> System.out.print("英雄攻打范畴:" + countKey + ";英雄数量:" + countValue + ";")); System.out.println(); });}
- 求近战英雄HP初始值的加总
@Testfun t3() { // 求近战英雄HP初始值的加总 val sum = heroes.filter { "近战" == it.attackRange } .map(Hero::getHpStart) .filter(Objects::nonNull) .reduce(BigDecimal::add) println("近战英雄HP初始值的加总为:$sum")}
@Testpublic void t3() { // 求近战英雄HP初始值的加总 BigDecimal sum = heroes.stream() .filter(hero -> "近战".equals(hero.getAttackRange())) .map(Hero::getHpStart) .filter(Objects::nonNull) .reduce(BigDecimal.ZERO, BigDecimal::add); System.out.println("近战英雄HP初始值的加总为:" + sum);}
- 通过最小列表收集器获取最小列表
@Testpublic void t4() { // 通过最小列表收集器获取最小列表 List<BigDecimal> minAttackGrowth = heroes.stream() .map(Hero::getAttackGrowth) .collect(new MinListCollector<>()); System.out.println(minAttackGrowth); List<Hero> minHero = heroes.stream() .collect(new MinListCollector<>()); System.out.println(minHero);}
import java.util.*;import java.util.concurrent.atomic.AtomicReference;import java.util.function.BiConsumer;import java.util.function.BinaryOperator;import java.util.function.Function;import java.util.function.Supplier;import java.util.stream.Collector;import java.util.stream.Collectors;import static java.util.stream.Collector.Characteristics.*;/** * 最小列表收集器 * * @author switch * @since 2020/8/18 */public class MinListCollector<T extends Comparable<? super T>> implements Collector<T, List<T>, List<T>> { /** * 收集器的个性 * * @see Characteristics */ private final static Set<Characteristics> CHARACTERISTICS = Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH)); private final static int ZERO = 0; /** * 最小值 */ private final AtomicReference<T> min = new AtomicReference<>(); @Override public Supplier<List<T>> supplier() { // supplier参数用于生成后果容器,容器类型为A return ArrayList::new; } @Override public BiConsumer<List<T>, T> accumulator() { // accumulator用于生产元素,也就是演绎元素,这里的T就是元素,它会将流中的元素一个一个与后果容器A产生操作 return (list, element) -> { // 获取最小值 T minValue = min.get(); if (Objects.isNull(minValue)) { // 第一次比拟 list.add(element); min.set(element); } else if (element.compareTo(minValue) < ZERO) { // 发现更小的值 list.clear(); list.add(element); min.compareAndSet(minValue, element); } else if (element.compareTo(minValue) == ZERO) { // 与最小值相等 list.add(element); } }; } @Override public BinaryOperator<List<T>> combiner() { // combiner用于两个两个合并并行执行的线程的执行后果,将其合并为一个最终后果A return (left, right) -> { // 最小值列表合并 List<T> leftList = getMinList(left); List<T> rightList = getMinList(right); leftList.addAll(rightList); return leftList; }; } private List<T> getMinList(List<T> list) { return list.stream() .filter(element -> element.compareTo(min.get()) == ZERO) .collect(Collectors.toList()); } @Override public Function<List<T>, List<T>> finisher() { // finisher用于将之前整合完的后果R转换成为A return Function.identity(); } @Override public Set<Characteristics> characteristics() { // characteristics示意以后Collector的特征值,这是个不可变Set return CHARACTERISTICS; }}
优雅的空解决
import org.junit.Test;import java.util.Optional;/** * @author switch * @since 2020/8/18 */public class OptionalTests { @Test public void t1() { // orElse System.out.println(Optional.ofNullable(null).orElse("张三")); System.out.println(Optional.ofNullable(null).orElseGet(() -> "李四")); System.out.println(Optional.ofNullable("王五").orElseThrow(NullPointerException::new)); } @Test public void t2() { // isPresent Optional<String> name = Optional.ofNullable("张三"); if (name.isPresent()) { System.out.println(name.get()); } } @Test public void t3() { // map Optional<Integer> number = Optional.of("123456").map(Integer::valueOf); if (number.isPresent()) { System.out.println(number.get()); } } @Test public void t4() { // flatMap Optional<Integer> number = Optional.of("123456").flatMap(s -> Optional.of(Integer.valueOf(s))); if (number.isPresent()) { System.out.println(number.get()); } } @Test public void t5() { // 过滤 String number = "123456"; String filterNumber = Optional.of(number).filter(s -> !s.equals(number)).orElse("654321"); System.out.println(filterNumber); }}
新的并发工具类CompletableFuture
单机批处理多线程执行模型
该模型实用于百万级量级的工作。超过千万数据,能够思考分组,多机器并行执行。
根本流程:
- 从数据库获取Id列表
- 拆分成n个子Id列表
- 通过子Id列表获取关联数据(留神:都须要提供批量查问接口)
- 映射到须要解决的Model(提交到CompletableFuture)->解决数据->收集成list)(java 8流式解决)
- 收集的list进行join操作
- 收集list
模型
模型原理:Stream+CompletableFuture+lambda
简要解释:
- CompletableFuture是java8提供的一个工具类,次要是用于异步解决流程编排的。
- Stream是java8提供的一个汇合流式解决工具类,次要用于数据的流水线解决。
- lambda在java中是基于外部匿名类实现的,能够大幅缩小反复代码。
- 总结:在该模型中Stream用于汇合流水线解决、CompletableFuture解决异步编排问题(非阻塞)、lambda简化代码。
- 数据流动
List<List<String>> -> Stream<List<String>> -> Stream<List<Model>> -> Stream<CompletableFuture<List<Model>>> -> Stream<CompletableFuture<List<映射类型>>> -> List<CompletableFuture<Void>>
案例
ThreadPoolUtil
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;public final class ThreadPoolUtil { public static ThreadPoolTaskExecutor getDefaultExecutor(Integer poolSize, Integer maxPoolSize, Integer queueCapacity) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setAllowCoreThreadTimeOut(true); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setCorePoolSize(poolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }}
ThreadPoolConfig
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configurationpublic class ThreadPoolConfig { /** * 计算规定:N(thread) = N(cpu) * U(cpu) * (1 + w/c) * N(thread):线程池大小 * N(cpu):处理器核数 * U(cpu):冀望CPU利用率(该值应该介于0和1之间) * w/c:是等待时间与计算工夫的比率,比如说IO操作即为等待时间,计算解决即为计算工夫 */ private static final Integer TASK_POOL_SIZE = 50; private static final Integer TASK_MAX_POOL_SIZE = 100; private static final Integer TASK_QUEUE_CAPACITY = 1000; @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { return ThreadPoolUtil.getDefaultExecutor(TASK_POOL_SIZE, TASK_MAX_POOL_SIZE, TASK_QUEUE_CAPACITY); }}
#getFuturesStream
public Stream<CompletableFuture<List<Model>>> getFuturesStream(List<List<String>> idSubLists) { return idSubLists.stream() .map(ids -> CompletableFuture.supplyAsync(() -> modelService.listByIds(ids), taskExecutor) );}
#standardisation
public void standardisation() { List<CompletableFuture<Void>> batchFutures = getFuturesStream(idSubLists) .map(future -> future.thenApply(this::listByNormalize)) .map(future -> future.thenAccept(modelService::batchUpdateData)) .collect(Collectors.toList()); List<Void> results = batchFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList());}
调整线程池的大小
《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的倡议。这十分重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,节约大量的工夫在上下文切换上。反之,如果线程的数目过少,正如你的利用所面临的状况,处理器的一些核可能就无奈充分利用。Brian Goetz倡议,线程池大小与处理器的利用率之比能够应用上面的公式进行估算:
$$N_{threads} = N_{CPU} * U_{CPU} * (1 + \frac{W}{C})$$其中:
- $N_{CPU}$是处理器的核的数目,能够通过
Runtime.getRuntime().availableProcessors()
失去- $U_{CPU}$是冀望的CPU利用率(该值应该介于0和1之间)
- $\frac{W}{C}$是等待时间与计算工夫的比率,比如说IO操作即为等待时间,计算解决即为计算工夫
并行——应用流还是CompletableFutures?
对汇合进行并行计算有两种形式:要么将其转化为并行流,利用map这样的操作发展工作,要么枚举出汇合中的每一个元素,创立新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,能够调整线程池的大小,而这能帮忙确保整体的计算不会因为线程都在期待I/O而产生阻塞。
应用这些API的倡议如下:
- 如果进行的是计算密集型的操作,并且没有I/O,那么举荐应用Stream接口,因为实现简略,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创立比处理器核数更多的线程)。
- 反之,如果并行的工作单元还波及期待I/O的操作(包含网络连接期待),那么应用CompletableFuture灵活性更好,能够根据期待/计算,或者$\frac{W}{C}$的比率设定须要应用的线程数。这种状况不应用并行流的另一个起因是,解决流的流水线中如果产生I/O期待,流的提早个性很难判断到底什么时候触发了期待。
日期和工夫API
使用指南:https://www.yuque.com/docs/sh...(明码:gtag) 《时区工具类使用指南》
我的项目地址
GitHub:java8-fluent
参考
- Java 8 实战学习笔记
- Java 8 函数式编程学习笔记
- 深刻了解Java函数式编程和Streams API
分享并记录所学所见