Java并发15-CompletableFuture-异步编程

前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化。 // 以下两个方法都是耗时操作doBizA();doBizB();//创建两个子线程去执行就可以了,两个操作已经被异步化了。new Thread(()->doBizA()) .start();new Thread(()->doBizB()) .start(); 异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。Java 在 1.8 版本提供了 CompletableFuture 来支持异步编程。 CompletableFuture 的核心优势为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。 烧水泡茶分工方案 // 任务 1:洗水壶 -> 烧开水CompletableFuture<Void> f1 = CompletableFuture.runAsync(()->{ System.out.println("T1: 洗水壶..."); sleep(1, TimeUnit.SECONDS); System.out.println("T1: 烧开水..."); sleep(15, TimeUnit.SECONDS);});// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶CompletableFuture<String> f2 = CompletableFuture.supplyAsync(()->{ System.out.println("T2: 洗茶壶..."); sleep(1, TimeUnit.SECONDS); System.out.println("T2: 洗茶杯..."); sleep(2, TimeUnit.SECONDS); System.out.println("T2: 拿茶叶..."); sleep(1, TimeUnit.SECONDS); return " 龙井 ";});// 任务 3:任务 1 和任务 2 完成后执行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf)->{ System.out.println("T1: 拿到茶叶:" + tf); System.out.println("T1: 泡茶..."); return " 上茶:" + tf; });// 等待任务 3 执行结果System.out.println(f3.join());void sleep(int t, TimeUnit u) { try { u.sleep(t); }catch(InterruptedException e){}}// 一次执行结果:T1: 洗水壶...T2: 洗茶壶...T1: 烧开水...T2: 洗茶杯...T2: 拿茶叶...T1: 拿到茶叶: 龙井T1: 泡茶...上茶: 龙井从整体上来看,我们会发现 ...

June 25, 2019 · 3 min · jiezi

Lambda表达式与Stream流 (终)

package com.java.design.java8;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.time.;import java.time.format.DateTimeFormatter;import java.time.temporal.ChronoField;import java.util.;import java.util.concurrent.CompletableFuture;import java.util.stream.Collectors;import java.util.stream.IntStream;/** * @author 陈杨 * / @SpringBootTest@RunWith(SpringRunner.class)public class LambdaInfo {一、Lambda表达式与Stream流/ A lambda expression can be understood as a concise representation of an anonymous function that can be passed around: it doesn’t have a name, but it has a list of parameters, a body, a return type, and also possibly a list of exceptions that can be thrown. That’s one big definition; let’s break it down: Anonymous: We say anonymous because it doesn’t have an explicit name like a method would normally have: less to write and think about! Function: We say function because a lambda isn’t associated with a particular class like a method is. But like a method, a lambda has a list of parameters, a body, a return type, and a possible list of exceptions that can be thrown. Passed around: A lambda expression can be passed as argument to a method or stored in a variable. Concise: You don’t need to write a lot of boilerplate like you do for anonymous classes.// Stream : A sequence of elements from a source that supports data processing operations. Sequence of elements Source Pipelining Internal iteration Traversable only once Collections: external interation using an interator behind the scenes*/二、初始化测试数据private List<Integer> list;@Beforepublic void init() { list = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList()); list.sort(Collections.reverseOrder());}三、各种API1.allMatch@Testpublic void testLambdaInfo() { System.out.println(">———————Match方法———————-<"); // 一、Match方法 // Returns whether all elements of this stream match the provided predicate. Optional.of(list.stream().mapToInt(Integer::intValue).allMatch(i -> i > 0)) .ifPresent(System.out::println); // Returns whether any elements of this stream match the provided predicate. Optional.of(list.stream().mapToInt(Integer::intValue).anyMatch(i -> i > 0)) .ifPresent(System.out::println); // Returns whether no elements of this stream match the provided predicate.. Optional.of(list.stream().mapToInt(Integer::intValue).noneMatch(i -> i > 0)) .ifPresent(System.out::println);2、findSystem.out.println(">——————–Find方法———————–<");// 二、Find方法// Returns an Optional describing the first element of this stream,// or an empty Optional if the stream is empty.// If the stream has no encounter order, then any element may be returned.list.stream().mapToInt(Integer::intValue).filter(i -> i > 10).findFirst() .ifPresent(System.out::println);// Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty.list.stream().mapToInt(Integer::intValue).filter(i -> i > 10).findAny() .ifPresent(System.out::println);3、reduceSystem.out.println(">———————Reduce方法———————-<");// 三、Reduce方法// Performs a reduction on the elements of this stream, using the provided identity value// and an associative accumulation function, and returns the reduced value.// 求和System.out.println(list.stream().reduce(0, Integer::sum));list.stream().mapToInt(Integer::intValue).reduce(Integer::sum) .ifPresent(System.out::println);// 求最大值System.out.println(list.stream().reduce(0, Integer::max));list.stream().mapToInt(Integer::intValue).reduce(Integer::max) .ifPresent(System.out::println);// 求最小值System.out.println(list.stream().reduce(0, Integer::min));list.stream().mapToInt(Integer::intValue).reduce(Integer::min) .ifPresent(System.out::println); System.out.println(">——————————————-<"); }4、CompletableFuture API@Testpublic void testCompletableFuture() { // 四、CompletableFuture API /* * Returns a new CompletableFuture that is asynchronously completed by a task * running in the given executor with the value obtained by calling the given Supplier. */ CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum, System.out::println); Optional.of(CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .complete(55)).ifPresent(System.out::println); // thenAccept 无返回值 Consumer<? super T> action CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .thenAccept(System.out::println); // thenApply 有返回值 Function<? super T,? extends U> fn CompletableFuture.supplyAsync(() -> list.stream().mapToInt(Integer::intValue)) .thenApply(IntStream::sum).thenAccept(System.out::println); // 对元素及异常进行处理 BiFunction<? super T, Throwable, ? extends U> fn CompletableFuture.supplyAsync(() -> list.stream().mapToInt(Integer::intValue)) .handle((i, throwable) -> “handle:\t” + i.sum()).thenAccept(System.out::println); // whenCompleteAsync 完成时执行 BiConsumer<? super T, ? super Throwable> action CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .whenCompleteAsync((value, throwable) -> System.out.println(“whenCompleteAsync:\t” + value)); // 组合CompletableFuture 将前一个结果作为后一个输入参数 (参照 组合设计模式) CompletableFuture.supplyAsync(() -> list.stream().mapToInt(Integer::intValue)) .thenCompose(i -> CompletableFuture.supplyAsync(i::sum)).thenAccept(System.out::println); // 合并CompletableFuture CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .thenCombine(CompletableFuture.supplyAsync(() -> list.stream() .mapToDouble(Double::valueOf).sum()), Double::sum).thenAccept(System.out::println); // 合并CompletableFuture CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue)::sum) .thenAcceptBoth(CompletableFuture.supplyAsync(list.stream() .mapToDouble(Double::valueOf)::sum), (r1, r2) -> System.out.println(“thenAcceptBoth:\t” + r1 + “\t” + r2)); // 2个CompletableFuture运行完毕后运行Runnable CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + “\tis running”); return list.stream().mapToInt(Integer::intValue).sum(); }) .runAfterBoth( CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + “\tis running”); return list.stream().mapToDouble(Double::valueOf).sum(); }), () -> System.out.println(“The 2 method have done”)); // 2个CompletableFuture 有一个运行完就执行Runnable CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + “\tis running”); return list.stream().mapToInt(Integer::intValue).sum(); }) .runAfterEither( CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + “\tis running”); return list.stream().mapToDouble(Double::valueOf).sum(); }), () -> System.out.println(“The 2 method have done”)); // 2个CompletableFuture 有一个运行完就执行Function<? super T, U> fn CompletableFuture.supplyAsync( list.stream().mapToInt(Integer::intValue).max()::getAsInt) .applyToEither( CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue).min()::getAsInt) , v -> v * 10) .thenAccept(System.out::println); // 2个CompletableFuture 有一个运行完就执行Consumer<? super T> action CompletableFuture.supplyAsync( list.stream().mapToInt(Integer::intValue).max()::getAsInt) .acceptEither( CompletableFuture.supplyAsync(list.stream().mapToInt(Integer::intValue).min()::getAsInt) , System.out::println); // 将集合中每一个元素都映射成为CompletableFuture<Integer>对象 List<CompletableFuture<Integer>> collect = list.stream().map(i -> CompletableFuture.supplyAsync(i::intValue)) .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); // 集合转数组 CompletableFuture[] completableFutures = collect.toArray(CompletableFuture[]::new); // 有一个task执行完毕 CompletableFuture.anyOf(completableFutures) .thenRun(() -> System.out.println(“有一个task执行完毕—>first done”)); // 有且仅有所有task执行完毕 CompletableFuture.allOf(completableFutures) .thenRun(() -> System.out.println(“有且仅有所有task执行完毕—>done”));}5、Java.time API @Test public void testLocalDateTime() { // 五、Java.time API LocalDate localDate = LocalDate.of(2019, 12, 1); // 当前时间 Optional.of(LocalDate.now()).ifPresent(System.out::println); // 年份 Optional.of(localDate.getYear()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.YEAR)).ifPresent(System.out::println); // 月份 (Jan–>Dec) Optional.of(localDate.getMonth()).ifPresent(System.out::println); // 月份(1–>12) Optional.of(localDate.getMonthValue()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.MONTH_OF_YEAR)).ifPresent(System.out::println); // 年中的第几天 Optional.of(localDate.getDayOfYear()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.DAY_OF_YEAR)).ifPresent(System.out::println); // 月中的第几天 Optional.of(localDate.getDayOfMonth()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.DAY_OF_MONTH)).ifPresent(System.out::println); // 星期几(Mon–>Sun) Optional.of(localDate.getDayOfWeek()).ifPresent(System.out::println); // 星期几(1–>7) OptionalInt.of(localDate.get(ChronoField.DAY_OF_WEEK)).ifPresent(System.out::println); // 时代(公元前、后) CE BCE Optional.of(localDate.getEra()).ifPresent(System.out::println); // 时代(公元前、后) 1—>CE 0—>BCE Optional.of(localDate.getEra().getValue()).ifPresent(System.out::println); OptionalInt.of(localDate.get(ChronoField.ERA)).ifPresent(System.out::println); // ISO年表 Optional.of(localDate.getChronology().getId()).ifPresent(System.out::println); // 当前时间 LocalTime time = LocalTime.now(); // 时 OptionalInt.of(time.getHour()).ifPresent(System.out::println); OptionalInt.of(time.get(ChronoField.HOUR_OF_DAY)).ifPresent(System.out::println); // 分 OptionalInt.of(time.getMinute()).ifPresent(System.out::println); OptionalInt.of(time.get(ChronoField.MINUTE_OF_DAY)).ifPresent(System.out::println); // 秒 OptionalInt.of(time.getSecond()).ifPresent(System.out::println); OptionalInt.of(time.get(ChronoField.SECOND_OF_DAY)).ifPresent(System.out::println); // 纳秒 OptionalInt.of(time.getNano()).ifPresent(System.out::println); OptionalLong.of(time.getLong(ChronoField.NANO_OF_SECOND)).ifPresent(System.out::println); // 中午时间 Optional.of(LocalTime.NOON).ifPresent(System.out::println); // 午夜时间 Optional.of(LocalTime.MIDNIGHT).ifPresent(System.out::println); // 自定义格式化时间 DateTimeFormatter customDateTimeFormatter = DateTimeFormatter.ofPattern(“yyyy-MM-dd HH:mm:ss E”); LocalDateTime localDateTime = LocalDateTime.of(localDate, time); Optional.of(localDateTime.format(customDateTimeFormatter)).ifPresent(System.out::println); // 根据传入的文本匹配自定义指定格式进行解析 Optional.of(LocalDateTime.parse(“2019-12-25 12:30:00 周三”, customDateTimeFormatter)) .ifPresent(System.out::println); // 时间点 Instant Instant start = Instant.now(); try { Thread.sleep(10_000); } catch (InterruptedException e) { e.printStackTrace(); } Instant end = Instant.now(); // Duration 时间段 Duration duration = Duration.between(start, end); OptionalLong.of(duration.toNanos()).ifPresent(System.out::println); // Period 时间段 Period period = Period.between(LocalDate.now(), localDate); OptionalInt.of(period.getYears()).ifPresent(System.out::println); OptionalInt.of(period.getMonths()).ifPresent(System.out::println); OptionalInt.of(period.getDays()).ifPresent(System.out::println); // The Difference Between Duration And Period // Durations and periods differ in their treatment of daylight savings time when added to ZonedDateTime. // A Duration will add an exact number of seconds, thus a duration of one day is always exactly 24 hours. // By contrast, a Period will add a conceptual day, trying to maintain the local time. }} ...

March 11, 2019 · 4 min · jiezi