乐趣区

关于java8:CompletableFuture让你的代码免受阻塞之苦

前言

当初大部分的 CPU 都是多核,咱们都晓得想要晋升咱们应用程序的运行效率,就必须得充分利用多核 CPU 的计算能力;Java 早曾经为咱们提供了多线程的 API,然而实现形式稍微麻烦,明天咱们就来看看 Java8 在这方面提供的改善。


假如场景

当初你须要为在线教育平台提供一个查问用户详情的 API,该接口须要返回用户的根本信息,标签信息,这两个信息寄存在不同地位,须要近程调用来获取这两个信息;为了模仿近程调用,咱们须要在代码外面提早 1s;

public interface RemoteLoader {String load();

    default void delay() {
        try {Thread.sleep(1000L);
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

public class CustomerInfoService implements RemoteLoader {public String load() {this.delay();
        return "根本信息";
    }

}

public class LearnRecordService implements RemoteLoader {public String load() {this.delay();
        return "学习信息";
    }

}

同步形式实现版本

如果咱们采纳同步的形式来实现这个 API 接口,咱们的实现代码:

@Test
public void testSync() {long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<String> customerDetail = remoteLoaders.stream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共破费工夫:" + (end - start));
}

不出所料,因为调用的两个接口都是提早了 1s,所以后果大于 2 秒


Future 实现的版本

接下来咱们把这个例子用 Java7 提供的 Future 来实现异步的版本,看下成果如何呢?代码如下:

@Test
public void testFuture() {long start = System.currentTimeMillis();
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<Future<String>> futures = remoteLoaders.stream()
            .map(remoteLoader -> executorService.submit(remoteLoader::load))
            .collect(toList());

    List<String> customerDetail = futures.stream()
            .map(future -> {
                try {return future.get();
                } catch (InterruptedException | ExecutionException e) {e.printStackTrace();
                }
                return null;
            })
            .filter(Objects::nonNull)
            .collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共破费工夫:" + (end - start));
}

这次咱们采纳多线程的形式来革新了咱们这个例子,后果还是比较满意的,工夫大略破费了 1s 多一点

留神:这里我分成了两个 Stream,如何合在一起用同一个 Stream,那么在用 future.get() 的时候会导致阻塞,相当于提交一个工作执行完后才提交下一个工作,这样达不到异步的成果

这里咱们能够看到尽管 Future 达到了咱们预期的成果,然而如果须要实现将两个异步的后果进行合并解决就略微麻一些,这里就不细说,前面次要看下 CompletableFuture 在这方面的改良


Java8 并行流

以上咱们用的是 Java8 之前提供的办法来实现,接下来咱们来看下 Java8 中提供的并行流来实习咱们这个例子成果怎么呢?

@Test
public void testParallelStream() {long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService());
    List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共破费工夫:" + (end - start));
}

运行的后果还是相当的称心,破费工夫 1s 多点

和 Java8 之前的实现比照,咱们发现整个代码会更加的简洁;

接下来咱们把咱们的例子扭转一下,查问用户详情的接口还须要返回视频观看记录,用户的标签信息,购买订单

public class WatchRecordService implements RemoteLoader {
    @Override
    public String load() {this.delay();
        return "观看记录";
    }
}

public class OrderService implements RemoteLoader {
    @Override
    public String load() {this.delay();
        return "订单信息";
    }
}

public class LabelService implements RemoteLoader {
    @Override
    public String load() {this.delay();
        return "标签信息";
    }
}

咱们持续应用 Java8 提供的并行流来实现,看下运行的后果是否现实

@Test
public void testParallelStream2() {long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    List<String> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList());
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共破费工夫:" + (end - start));
}

然而这次运行的后果不是太现实,破费工夫超过了 2 秒


CompletableFuture

根本的用法
@Test
public void testCompletableFuture() {CompletableFuture<String> future = new CompletableFuture<>();
    new Thread(() -> {doSomething();
        future.complete("Finish");          // 工作执行实现后 设置返回的后果
    }).start();
    System.out.println(future.join());      // 获取工作线程返回的后果
}

private void doSomething() {System.out.println("doSomething...");
}

这种用法还有个问题,就是工作呈现了异样,主线程会无感知,工作线程不会把异样给抛出来;这会导致主线程会始终期待,通常咱们也须要晓得呈现了什么异样,做出对应的响应;改良的形式是在工作中 try-catch 所有的异样,而后调用future.completeExceptionally(e),代码如下:

@Test
public void testCompletableFuture() throws ExecutionException, InterruptedException {CompletableFuture<String> future = new CompletableFuture<>();
    new Thread(() -> {
        try {doSomething();
            future.complete("Finish");
        } catch (Exception e) {future.completeExceptionally(e);
        }
    }).start();
    System.out.println(future.get());
}

private void doSomething() {System.out.println("doSomething...");
    throw new RuntimeException("Test Exception");
}

从当初来看 CompletableFuture 的应用过程须要解决的事件很多,不太简洁,你会感觉看起来很麻烦;然而这只是表象,Java8 其实对这个过程进行了封装,提供了很多简洁的操作形式;接下来咱们看下如何革新下面的代码

@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {doSomething();
        return "Finish";
    });
    System.out.println(future.get());
}

这里咱们采纳了supplyAsync,这下看起来简洁了许多,世界都亮堂了; Java8 不仅提供容许工作返回后果的supplyAsync,还提供了没有返回值的runAsync;让咱们能够更加的关注业务的开发,不须要解决异样谬误的治理


CompletableFuture 异样解决

如果说主线程须要关怀工作到底产生了什么异样,须要对其作出相应操作,这个时候就须要用到exceptionally

@Test
public void testCompletableFuture2() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {doSomething();
                return "Finish";
            })
            .exceptionally(throwable -> "Throwable exception message:" + throwable.getMessage());
    System.out.println(future.get());
}

应用 CompletableFuture 来实现咱们查问用户详情的 API 接口
@Test
public void testCompletableFuture3() throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    List<CompletableFuture<String>> completableFutures = remoteLoaders
            .stream()
            .map(loader -> CompletableFuture.supplyAsync(loader::load))
            .collect(toList());

    List<String> customerDetail = completableFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(toList());
    
    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共破费工夫:" + (end - start));
}

这里仍然是采纳的两个 Stream 来实现的,执行的后果如下:

这个后果不太称心,和并行流的后果差不多,耗费工夫 2 秒多点;在这种场景下咱们用 CompletableFuture 做了这么多工作,然而成果不现实,难道就有没有其余的形式能够让它在快一点吗?

为了解决这个问题,咱们必须深刻理解下并行流和 CompletableFuture 的实现原理,它们底层应用的线程池的大小都是 CPU 的核数Runtime.getRuntime().availableProcessors();那么咱们来尝试一下批改线程池的大小,看看成果如何?


自定义线程池,优化 CompletableFuture

应用并行流无奈自定义线程池,然而 CompletableFuture 能够

@Test
public void testCompletableFuture4() throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();
    List<RemoteLoader> remoteLoaders = Arrays.asList(new CustomerInfoService(),
            new LearnRecordService(),
            new LabelService(),
            new OrderService(),
            new WatchRecordService());
    
    ExecutorService executorService = Executors.newFixedThreadPool(Math.min(remoteLoaders.size(), 50));
    
    List<CompletableFuture<String>> completableFutures = remoteLoaders
            .stream()
            .map(loader -> CompletableFuture.supplyAsync(loader::load, executorService))
            .collect(toList());

    List<String> customerDetail = completableFutures
            .stream()
            .map(CompletableFuture::join)
            .collect(toList());

    System.out.println(customerDetail);
    long end = System.currentTimeMillis();
    System.out.println("总共破费工夫:" + (end - start));
}

咱们应用自定义线程池,设置最大的线程池数量 50,来看下执行的后果

这下执行的后果比较满意了,1 秒多点;实践上来说这个后果能够始终继续,直到达到线程池的大小 50


并行流和 CompletableFuture 两者该如何抉择

这两者如何抉择次要看工作类型,倡议

  1. 如果你的工作是计算密集型的,并且没有 I / O 操作的话,那么举荐你抉择 Stream 的并行流,实现简略并行效率也是最高的
  2. 如果你的工作是有频繁的 I / O 或者网络连接等操作,那么举荐应用CompletableFuture,采纳自定义线程池的形式,依据服务器的状况设置线程池的大小,尽可能的让 CPU 繁忙起来

CompletableFuture的其余罕用办法
  1. thenApply、thenApplyAsync: 如果工作执行实现后,还须要后续的操作,比方返回后果的解析等等;能够通过这两个办法来实现
  2. thenCompose、thenComposeAsync: 容许你对两个异步操作进行流水线的操作,当第一个操作实现后,将其后果传入到第二个操作中
  3. thenCombine、thenCombineAsync:容许你把两个异步的操作整合;比方把第一个和第二个操作返回的后果做字符串的连贯操作

总结

  1. Java8 并行流的应用形式
  2. CompletableFuture 的应用形式、异样解决机制,让咱们有机会治理工作执行中发送的异样
  3. Java8 并行流和 CompletableFuture 两者该如何抉择
  4. CompletableFuture的罕用办法

原创不易 转载请注明出处:https://silently9527.cn/archi…

退出移动版