关于java:怎样用Java-8优雅的开发业务

3次阅读

共计 9024 个字符,预计需要花费 23 分钟才能阅读完成。

怎么用 Java 8 优雅的开发业务

[TOC]

函数式编程

匿名函数

λ 演算

流式编程

基本原理

Java 中流式编程的基本原理有两点。

  1. 构建流
  2. 数据流转(流水线)
  3. 规约
IntStream.rangeClosed(1, 100) // 1. 构建流
    .mapToObj(String::valueOf)// 2. 数据流转(流水线)
    .collect(joining());      // 3. 规约
案例
  • 英雄的主地位一共有几类,别离是什么
@Test
fun t1() {
    // 英雄的主地位一共有几类,别离是什么
    // 映射
    val roleMains = heroes.map(Hero::getRoleMain)
        // 过滤为空的数据
        .filter(Objects::nonNull)
        // 去重
        .distinct()
    println(roleMains.size)
    println(roleMains)
}
@Test
public void t1() {
    // 英雄的主地位一共有几类,别离是什么
    List<String> roleMains = heroes.stream()
            // 映射
            .map(Hero::getRoleMain)
            // 过滤为空的数据
            .filter(Objects::nonNull)
            // 去重
            .distinct()
            // 收集列表
            .collect(toList());
    System.out.println(roleMains.size());
    System.out.println(roleMains);
}

  • 英雄按主次地位分组后,输入每个分组有多少英雄,其中:近战英雄有多少位,近程英雄有多少位
@Test
fun t2() {
    // 英雄按主次地位分组后,输入每个分组有多少英雄,其中:近战英雄有多少位,近程英雄有多少位

    // 主次地位分组的英雄数量
    val groupHeroCount = heroes.groupingBy {Pair.of(it.roleMain, it.roleAssist)
    }.eachCount()

    // 主次分组后,再按攻打范畴分组的英雄数量
    val groupThenGroupCount = heroes.groupBy {Pair.of(it.roleMain, it.roleAssist)
    }.map {val value = it.value.groupingBy(Hero::getAttackRange).eachCount()
        Pair.of(it.key, value)
    }.associateBy({it.left}, {it.value})

    // 遍历输入
    groupThenGroupCount.forEach {(groupKey, groupValue) ->
        val groupingCount = groupHeroCount[groupKey]
        print("英雄分组 key 为:$groupKey; 英雄数量:$groupingCount;")
        groupValue.forEach {(countKey, countValue) ->
            print("英雄攻打范畴:$countKey; 英雄数量:$countValue;")
        }
        println()}
}
@Test
public void t2() {
    // 英雄按主次地位分组后,输入每个分组有多少英雄,其中:近战英雄有多少位,近程英雄有多少位

    // 主次地位分组的英雄数量
    Map<Pair<String, String>, Long> groupHeroCount = heroes.stream()
            .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), counting()));

    // 主次分组后,再按攻打范畴分组的英雄数量
    Map<Pair<String, String>, Map<String, Long>> groupThenGroupCount = heroes.stream()
            .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()),
                    groupingBy(Hero::getAttackRange, counting())));

    // 遍历输入
    groupThenGroupCount.forEach((groupKey, groupValue) -> {Long groupingCount = groupHeroCount.get(groupKey);
        System.out.print("英雄分组 key 为:" + groupKey + "; 英雄数量:" + groupingCount + ";");
        groupValue.forEach((countKey, countValue) -> System.out.print("英雄攻打范畴:" + countKey + "; 英雄数量:" + countValue + ";"));
        System.out.println();});
}

  • 求近战英雄 HP 初始值的加总
@Test
fun t3() {
    // 求近战英雄 HP 初始值的加总
    val sum = heroes.filter {"近战" == it.attackRange}
        .map(Hero::getHpStart)
        .filter(Objects::nonNull)
        .reduce(BigDecimal::add)
    println("近战英雄 HP 初始值的加总为:$sum")
}
@Test
public void t3() {
    // 求近战英雄 HP 初始值的加总
    BigDecimal sum = heroes.stream()
            .filter(hero -> "近战".equals(hero.getAttackRange()))
            .map(Hero::getHpStart)
            .filter(Objects::nonNull)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
    System.out.println("近战英雄 HP 初始值的加总为:" + sum);
}

  • 通过最小列表收集器获取最小列表
@Test
public void t4() {
    // 通过最小列表收集器获取最小列表
    List<BigDecimal> minAttackGrowth = heroes.stream()
            .map(Hero::getAttackGrowth)
            .collect(new MinListCollector<>());
    System.out.println(minAttackGrowth);
    List<Hero> minHero = heroes.stream()
            .collect(new MinListCollector<>());
    System.out.println(minHero);
}
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static java.util.stream.Collector.Characteristics.*;

/**
 * 最小列表收集器
 *
 * @author switch
 * @since 2020/8/18
 */
public class MinListCollector<T extends Comparable<? super T>> implements Collector<T, List<T>, List<T>> {
    /**
     * 收集器的个性
     *
     * @see Characteristics
     */
    private final static Set<Characteristics> CHARACTERISTICS = Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));
    private final static int ZERO = 0;

    /**
     * 最小值
     */
    private final AtomicReference<T> min = new AtomicReference<>();

    @Override
    public Supplier<List<T>> supplier() {
        // supplier 参数用于生成后果容器,容器类型为 A
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<T>, T> accumulator() {
        // accumulator 用于生产元素,也就是演绎元素,这里的 T 就是元素,它会将流中的元素一个一个与后果容器 A 产生操作
        return (list, element) -> {
            // 获取最小值
            T minValue = min.get();
            if (Objects.isNull(minValue)) {
                // 第一次比拟
                list.add(element);
                min.set(element);
            } else if (element.compareTo(minValue) < ZERO) {
                // 发现更小的值
                list.clear();
                list.add(element);
                min.compareAndSet(minValue, element);
            } else if (element.compareTo(minValue) == ZERO) {
                // 与最小值相等
                list.add(element);
            }
        };
    }

    @Override
    public BinaryOperator<List<T>> combiner() {
        // combiner 用于两个两个合并并行执行的线程的执行后果,将其合并为一个最终后果 A
        return (left, right) -> {
            // 最小值列表合并
            List<T> leftList = getMinList(left);
            List<T> rightList = getMinList(right);
            leftList.addAll(rightList);
            return leftList;
        };
    }

    private List<T> getMinList(List<T> list) {return list.stream()
                .filter(element -> element.compareTo(min.get()) == ZERO)
                .collect(Collectors.toList());
    }

    @Override
    public Function<List<T>, List<T>> finisher() {
        // finisher 用于将之前整合完的后果 R 转换成为 A
        return Function.identity();}

    @Override
    public Set<Characteristics> characteristics() {
        // characteristics 示意以后 Collector 的特征值,这是个不可变 Set
        return CHARACTERISTICS;
    }
}

优雅的空解决

import org.junit.Test;

import java.util.Optional;

/**
 * @author switch
 * @since 2020/8/18
 */
public class OptionalTests {
    @Test
    public void t1() {
        // orElse
        System.out.println(Optional.ofNullable(null).orElse("张三"));
        System.out.println(Optional.ofNullable(null).orElseGet(() -> "李四"));
        System.out.println(Optional.ofNullable("王五").orElseThrow(NullPointerException::new));
    }

    @Test
    public void t2() {
        // isPresent
        Optional<String> name = Optional.ofNullable("张三");
        if (name.isPresent()) {System.out.println(name.get());
        }
    }

    @Test
    public void t3() {
        // map
        Optional<Integer> number = Optional.of("123456").map(Integer::valueOf);
        if (number.isPresent()) {System.out.println(number.get());
        }
    }

    @Test
    public void t4() {
        // flatMap
        Optional<Integer> number = Optional.of("123456").flatMap(s -> Optional.of(Integer.valueOf(s)));
        if (number.isPresent()) {System.out.println(number.get());
        }
    }

    @Test
    public void t5() {
        // 过滤
        String number = "123456";
        String filterNumber = Optional.of(number).filter(s -> !s.equals(number)).orElse("654321");
        System.out.println(filterNumber);
    }
}

新的并发工具类CompletableFuture

单机批处理多线程执行模型

该模型实用于百万级量级的工作。超过千万数据,能够思考分组,多机器并行执行。
根本流程:

  1. 从数据库获取 Id 列表
  2. 拆分成 n 个子 Id 列表
  3. 通过子 Id 列表获取关联数据(留神:都须要提供批量查问接口)
  4. 映射到须要解决的 Model(提交到 CompletableFuture)-> 解决数据 -> 收集成 list)(java 8 流式解决)
  5. 收集的 list 进行 join 操作
  6. 收集 list
模型

模型原理:Stream+CompletableFuture+lambda

简要解释:

  • CompletableFuture 是 java8 提供的一个工具类,次要是用于异步解决流程编排的。
  • Stream 是 java8 提供的一个汇合流式解决工具类,次要用于数据的流水线解决。
  • lambda 在 java 中是基于外部匿名类实现的,能够大幅缩小反复代码。
  • 总结:在该模型中 Stream 用于汇合流水线解决、CompletableFuture 解决异步编排问题(非阻塞)、lambda 简化代码。
  • 数据流动
List<List<String>> -> 
Stream<List<String>> -> 
Stream<List<Model>> -> 
Stream<CompletableFuture<List<Model>>> -> 
Stream<CompletableFuture<List< 映射类型 >>> -> 
List<CompletableFuture<Void>>
案例
  • ThreadPoolUtil
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

public final class ThreadPoolUtil {public static ThreadPoolTaskExecutor getDefaultExecutor(Integer poolSize, Integer maxPoolSize, Integer queueCapacity) {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setAllowCoreThreadTimeOut(true);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setCorePoolSize(poolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}
  • ThreadPoolConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class ThreadPoolConfig {
    /**
     * 计算规定:N(thread) = N(cpu) * U(cpu) * (1 + w/c)
     * N(thread):线程池大小
     * N(cpu):处理器核数
     * U(cpu):冀望 CPU 利用率(该值应该介于 0 和 1 之间)* w/c:是等待时间与计算工夫的比率,比如说 IO 操作即为等待时间,计算解决即为计算工夫
     */
    private static final Integer TASK_POOL_SIZE = 50;
    private static final Integer TASK_MAX_POOL_SIZE = 100;
    private static final Integer TASK_QUEUE_CAPACITY = 1000;

    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {return ThreadPoolUtil.getDefaultExecutor(TASK_POOL_SIZE, TASK_MAX_POOL_SIZE, TASK_QUEUE_CAPACITY);
    }
}
  • #getFuturesStream
public Stream<CompletableFuture<List<Model>>> getFuturesStream(List<List<String>> idSubLists) {return idSubLists.stream()
        .map(ids -> 
            CompletableFuture.supplyAsync(() -> modelService.listByIds(ids), taskExecutor)
        );
}
  • #standardisation
public void standardisation() {List<CompletableFuture<Void>> batchFutures = getFuturesStream(idSubLists)
            .map(future -> future.thenApply(this::listByNormalize))
            .map(future -> future.thenAccept(modelService::batchUpdateData))
            .collect(Collectors.toList());
    List<Void> results = batchFutures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
}

调整线程池的大小

《Java 并发编程实战》一书中,Brian Goetz 和合著者们为线程池大小的优化提供了不少中肯的倡议。这十分重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,节约大量的工夫在上下文切换上。反之,如果线程的数目过少,正如你的利用所面临的状况,处理器的一些核可能就无奈充分利用。Brian Goetz 倡议,线程池大小与处理器的利用率之比能够应用上面的公式进行估算:
$$N_{threads} = N_{CPU} * U_{CPU} * (1 + \frac{W}{C})$$

其中:

  • $N_{CPU}$ 是处理器的核的数目,能够通过 Runtime.getRuntime().availableProcessors() 失去
  • $U_{CPU}$ 是冀望的 CPU 利用率(该值应该介于 0 和 1 之间)
  • $\frac{W}{C}$ 是等待时间与计算工夫的比率,比如说 IO 操作即为等待时间,计算解决即为计算工夫

并行——应用流还是 CompletableFutures?

对汇合进行并行计算有两种形式:要么将其转化为并行流,利用 map 这样的操作发展工作,要么枚举出汇合中的每一个元素,创立新的线程,在 CompletableFuture 内对其进行操作。后者提供了更多的灵活性,能够调整线程池的大小,而这能帮忙确保整体的计算不会因为线程都在期待 I / O 而产生阻塞。

应用这些 API 的倡议如下:

  • 如果进行的是计算密集型的操作,并且没有 I /O,那么举荐应用 Stream 接口,因为实现简略,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创立比处理器核数更多的线程)。
  • 反之,如果并行的工作单元还波及期待 I / O 的操作(包含网络连接期待),那么应用 CompletableFuture 灵活性更好,能够根据期待 / 计算,或者 $\frac{W}{C}$ 的比率设定须要应用的线程数。这种状况不应用并行流的另一个起因是,解决流的流水线中如果产生 I / O 期待,流的提早个性很难判断到底什么时候触发了期待。

日期和工夫 API

使用指南:https://www.yuque.com/docs/sh…(明码:gtag)《时区工具类使用指南》

我的项目地址

GitHub:java8-fluent

参考

  • Java 8 实战学习笔记
  • Java 8 函数式编程学习笔记
  • 深刻了解 Java 函数式编程和 Streams API

分享并记录所学所见

正文完
 0