关于java:Java中Future接口详解

7次阅读

共计 11402 个字符,预计需要花费 29 分钟才能阅读完成。

主打一手后果导向;

一、背景

在零碎中,异步执行工作,是很常见的性能逻辑,然而在不同的场景中,又存在很多细节差别;

有的工作只强调「执行过程」,并不需要追溯工作本身的「执行后果」,这里并不是指对系统和业务产生的成果,比方定时工作、音讯队列等场景;

然而有些工作即强调「执行过程」,又须要追溯工作本身的「执行后果」,在流程中依赖某个异步后果,判断流程是否中断,比方「并行」解决;

串行解决】整个流程依照逻辑逐步推进,如果出现异常会导致流程中断;

并行处理】主流程依照逻辑逐步推进,其余「异步」交互的流程执行结束后,将后果返回到主流程,如果「异步」流程异样,会影响局部后果;

此前在《「订单」业务》的内容中,聊过对于「串行」和「并行」的利用比照,即在订单详情的加载过程中,通过「并行」的形式读取:商品、商户、订单、用户等信息,晋升接口的响应工夫;

二、Future 接口

1、入门案例

异步是对流程的解耦,然而有的流程中又依赖异步执行的最终后果,此时就能够应用「Future」接口来达到该目标,先来看一个简略的入门案例;

public class ServerTask implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {Thread.sleep(2000);
        return 3;
    }
}
public class FutureBase01 {public static void main(String[] args) throws Exception {TimeInterval timer = DateUtil.timer();
        // 线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        // 批量工作
        List<ServerTask> serverTasks = new ArrayList<>() ;
        for (int i=0;i<3;i++){serverTasks.add(new ServerTask());
        }
        List<Future<Integer>> taskResList = executor.invokeAll(serverTasks) ;
        // 后果输入
        for (Future<Integer> intFuture:taskResList){System.out.println(intFuture.get());
        }
        // 耗时统计
        System.out.println("timer...interval ="+timer.interval());
    }
}

这里模仿一个场景,以线程池批量执行异步工作,在工作内线程休眠 2 秒,以并行的形式最终获取全副后果,只耗时 2 秒多一点,如果串行的话耗时必定超过 6 秒;

2、Future 接口

Future 示意异步计算的后果,提供了用于查看计算是否实现、期待计算实现、以及检索计算结果的办法。

外围办法

  • get():期待工作实现,获取执行后果,如果工作勾销会抛出异样;
  • get(long timeout, TimeUnit unit):指定期待工作实现的工夫,期待超时会抛出异样;
  • isDone():判断工作是否实现;
  • isCancelled():判断工作是否被勾销;
  • cancel(boolean mayInterruptIfRunning):尝试勾销此工作的执行,如果工作曾经实现、曾经勾销或因为其余起因无奈勾销,则此尝试将失败;

根底用法

public class FutureBase02 {public static void main(String[] args) throws Exception {
        // 线程池执行工作
        ExecutorService executor = Executors.newFixedThreadPool(3);
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {Thread.sleep(3000);
                return "task...OK";
            }
        }) ;
        executor.execute(futureTask);
        // 工作信息获取
        System.out.println("是否实现:"+futureTask.isDone());
        System.out.println("是否勾销:"+futureTask.isCancelled());
        System.out.println("获取后果:"+futureTask.get());
        System.out.println("尝试勾销:"+futureTask.cancel(Boolean.TRUE));
    }
}

FutureTask

Future 接口的根本实现类,提供了计算的启动和勾销、查问计算是否实现以及检索计算结果的办法;

在「FutureTask」类中,能够看到线程异步执行工作时,其中的外围状态转换,以及最终后果写出的形式;

尽管「Future」从设计上,实现了异步计算的后果获取,然而通过下面的案例也能够发现,流程的主线程在执行 get() 办法时会阻塞,直到最终获取后果,显然对于程序来说并不敌对;

JDK1.8 提供「CompletableFuture」类,对「Future」进行优化和扩大;

三、CompletableFuture 类

1、根底阐明

「CompletableFuture」类提供函数编程的能力,能够通过回调的形式解决计算结果,并且反对组合操作,提供很多办法来实现异步编排,升高异步编程的复杂度;

「CompletableFuture」实现「Future」和「CompletionStage」两个接口;

  • Future:示意异步计算的后果;
  • CompletionStage:示意异步计算的一个步骤,当一个阶段计算实现时,可能会触发其余阶段,即步骤可能由其余 CompletionStage 触发;

入门案例

public class CompletableBase01 {public static void main(String[] args) throws Exception {
        // 线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        // 工作执行
        CompletableFuture<String> cft = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(3000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "Res...OK";
        }, executor);
        // 后果输入
        System.out.println(cft.get());
    }
}

2、外围办法

2.1 实例办法

public class Completable01 {public static void main(String[] args) throws Exception {
        // 线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 1、创立未实现的 CompletableFuture,通过 complete()办法实现
        CompletableFuture<Integer> cft01 = new CompletableFuture<>() ;
        cft01.complete(99) ;

        // 2、创立曾经实现 CompletableFuture,并且给定后果
        CompletableFuture<String> cft02 = CompletableFuture.completedFuture("given...value");

        // 3、有返回值,默认 ForkJoinPool 线程池
        CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> {return "OK-3";});

        // 4、有返回值,采纳 Executor 自定义线程池
        CompletableFuture<String> cft04 = CompletableFuture.supplyAsync(() -> {return "OK-4";},executor);

        // 5、无返回值,默认 ForkJoinPool 线程池
        CompletableFuture<Void> cft05 = CompletableFuture.runAsync(() -> {});

        // 6、无返回值,采纳 Executor 自定义线程池
        CompletableFuture<Void> cft06 = CompletableFuture.runAsync(()-> {}, executor);
    }
}

2.2 计算方法

public class Completable02 {public static void main(String[] args) throws Exception {
        // 线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(2000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "OK";
        },executor);

        // 1、计算实现后,执行后续解决
        // cft01.whenComplete((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex));

        // 2、触发计算,如果没有实现,则 get 设定的值,如果已实现,则 get 工作返回值
        // boolean completeFlag = cft01.complete("given...value");
        // if (completeFlag){//     System.out.println(cft01.get());
        // } else {//     System.out.println(cft01.get());
        // }

        // 3、开启新 CompletionStage,从新获取线程执行工作
        cft01.whenCompleteAsync((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex),executor);
    }
}

2.3 后果获取办法

public class Completable03 {public static void main(String[] args) throws Exception {
        // 线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(2000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "Res...OK";
        },executor);
        // 1、阻塞直到获取后果
        // System.out.println(cft01.get());

        // 2、设定超时的阻塞获取后果
        // System.out.println(cft01.get(4, TimeUnit.SECONDS));

        // 3、非阻塞获取后果,如果工作曾经实现,则返回后果,如果工作未实现,返回给定的值
        // System.out.println(cft01.getNow("given...value"));

        // 4、get 获取抛查看异样,join 获取非查看异样
        System.out.println(cft01.join());
    }
}

2.4 工作编排办法

public class Completable04 {public static void main(String[] args) throws Exception {
        // 线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(2000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println("OK-1");
            return "OK";
        },executor);

        // 1、cft01 工作执行实现后,执行之后的工作,此处不关注 cft01 的后果
        // cft01.thenRun(() -> System.out.println("task...run")) ;

        // 2、cft01 工作执行实现后,执行之后的工作,能够获取 cft01 的后果
        // cft01.thenAccept((res) -> {//     System.out.println("cft01:"+res);
        //     System.out.println("task...run");
        // });

        // 3、cft01 工作执行实现后,执行之后的工作,获取 cft01 的后果,并且具备返回值
        // CompletableFuture<Integer> cft02 = cft01.thenApply((res) -> {//     System.out.println("cft01:"+res);
        //     return 99 ;
        // });
        // System.out.println(cft02.get());

        // 4、程序执行 cft01、cft02
        // CompletableFuture<String> cft02 = cft01.thenCompose((res) ->  CompletableFuture.supplyAsync(() -> {//     System.out.println("cft01:"+res);
        //     return "OK-2";
        // }));
        // cft02.whenComplete((res,ex) -> System.out.println("Result:"+res+";Exe:"+ex));

        // 5、比照工作的执行效率,因为 cft02 先实现,所以取 cft02 的后果
        // CompletableFuture<String> cft02 = cft01.applyToEither(CompletableFuture.supplyAsync(() -> {//     System.out.println("run...cft02");
        //     try {//         Thread.sleep(3000);
        //     } catch (InterruptedException e) {//         e.printStackTrace();
        //     }
        //     return "OK-2";
        // }),(res) -> {//     System.out.println("either...result:" + res);
        //     return res;
        // });
        // System.out.println("finally...result:" + cft02.get());

        // 6、两组工作执行实现后,对后果进行合并
        // CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> "OK-2") ;
        // String finallyRes = cft01.thenCombine(cft02,(res1,res2) -> {//     System.out.println("res1:"+res1+";res2:"+res2);
        //     return res1+";"+res2 ;
        // }).get();
        // System.out.println(finallyRes);


        CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> {System.out.println("OK-2");
            return  "OK-2";
        }) ;
        CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> {System.out.println("OK-3");
            return "OK-3";
        }) ;
        // 7、期待批量工作执行完返回
        // CompletableFuture.allOf(cft01,cft02,cft03).get();

        // 8、任意一个工作执行完即返回
        System.out.println("Sign:"+CompletableFuture.anyOf(cft01,cft02,cft03).get());
    }
}

2.5 异样解决办法

public class Completable05 {public static void main(String[] args) throws Exception {
        // 线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {if (1 > 0){throw new RuntimeException("task...exception");
            }
            return "OK";
        },executor);

        // 1、捕捉 cft01 的异样信息,并提供返回值
        String finallyRes = cft01.thenApply((res) -> {System.out.println("cft01-res:" + res);
            return res;
        }).exceptionally((ex) -> {System.out.println("cft01-exe:" + ex.getMessage());
            return "error" ;
        }).get();
        System.out.println("finallyRes="+finallyRes);


        CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(1000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "OK-2";
        },executor);
        // 2、如果 cft02 未实现,则 get 时抛出指定异样信息
        boolean exeFlag = cft02.completeExceptionally(new RuntimeException("given...exception"));
        if (exeFlag){System.out.println(cft02.get());
        } else {System.out.println(cft02.get());
        }
    }
}

3、线程池问题

  • 在实践中,通常不应用 ForkJoinPool#commonPool() 公共线程池,会呈现线程竞争问题,从而造成零碎瓶颈;
  • 在工作编排中,如果呈现依赖状况或者父子工作,尽量应用多个线程池,从而防止工作申请同一个线程池,躲避死锁状况产生;

四、CompletableFuture 原理

1、外围构造

在剖析「CompletableFuture」其原理之前,首先看一下波及的外围构造;

CompletableFuture

在该类中有两个要害的字段:「result」存储以后 CF 的后果,「stack」代表栈顶元素,即以后 CF 计算实现后会触发的依赖动作;从下面案例中可知,依赖动作能够没有或者有多个;

Completion

依赖动作的封装类;

UniCompletion

继承 Completion 类,一元依赖的根底类,「executor」指线程池,「dep」指依赖的计算,「src」指源动作;

BiCompletion

继承 UniCompletion 类,二元或者多元依赖的根底类,「snd」指第二个源动作;

2、零依赖

顾名思义,即各个 CF 之间不产生依赖关系;

public class DepZero {public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(()-> "OK-1",executor);
        CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(()-> "OK-2",executor);
        System.out.println(cft1.get()+";"+cft2.get());
    }
}

3、一元依赖

即 CF 之间的单个依赖关系;这里应用「thenApply」办法演示,为了看到成果,使「cft1」长时间休眠,断点查看「stack」构造;

public class DepOne {public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(30000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "OK-1";
        },executor);

        CompletableFuture<String> cft2 = cft1.thenApply(res -> {System.out.println("cft01-res"+res);
            return "OK-2" ;
        });
        System.out.println("cft02-res"+cft2.get());
    }
}

断点截图

原理剖析

观察者 Completion 注册到「cft1」,注册时会查看计算是否实现,未实现则观察者入栈,当「cft1」计算实现会弹栈;已实现则间接触发观察者;

能够调整断点代码,让「cft1」先处于实现状态,再查看其运行时构造,从而剖析残缺的逻辑;

4、二元依赖

即一个 CF 同时依赖两个 CF;这里应用「thenCombine」办法演示;为了看到成果,使「cft1、cft2」长时间休眠,断点查看「stack」构造;

public class DepTwo {public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(30000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "OK-1";
        },executor);
        CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(30000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "OK-2";
        },executor);

        // cft3 依赖 cft1 和 cft2 的计算结果
        CompletableFuture<String> cft3 = cft1.thenCombine(cft2,(res1,res2) -> {System.out.println("cft01-res:"+res1);
            System.out.println("cft02-res:"+res2);
            return "OK-3" ;
        });
        System.out.println("cft03-res:"+cft3.get());
    }
}

断点截图

原理剖析

在「cft1」和「cft2」未实现的状态下,尝试将 BiApply 压入「cft1」和「cft2」两个栈中,任意 CF 实现时,会尝试触发观察者,观察者查看「cft1」和「cft2」是否都实现,如果实现则执行;

5、多元依赖

即一个 CF 同时依赖多个 CF;这里应用「allOf」办法演示;为了看到成果,使「cft1、cft2、cft3」长时间休眠,断点查看「stack」构造;

public class DepMore {public static void main(String[] args) throws Exception {ExecutorService executor = Executors.newFixedThreadPool(3);
        CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(30000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "OK-1";
        },executor);
        CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(30000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "OK-2";
        },executor);

        CompletableFuture<String> cft3 = CompletableFuture.supplyAsync(() -> {
            try {Thread.sleep(30000);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            return "OK-3";
        },executor);

        // cft4 依赖 cft1 和 cft2 和 cft3 的计算结果
        CompletableFuture<Void> cft4 = CompletableFuture.allOf(cft1,cft2,cft3);
        CompletableFuture<String> finallyRes = cft4.thenApply(tm -> {System.out.println("cft01-res:"+cft1.join());
            System.out.println("cft02-res:"+cft2.join());
            System.out.println("cft03-res:"+cft3.join());
            return "OK-4";
        });
        System.out.println("finally-res:"+finallyRes.get());
    }
}

断点截图

原理剖析

多元依赖的回调办法除了「allOf」还有「anyOf」,其实现原理都是将依赖的多个 CF 补全为均衡二叉树,从断点图可知会依照树的层级解决,外围构造参考二元依赖即可;

五、参考源码

编程文档:https://gitee.com/cicadasmile/butte-java-note

利用仓库:https://gitee.com/cicadasmile/butte-flyer-parent
正文完
 0