共计 4575 个字符,预计需要花费 12 分钟才能阅读完成。
前言
随着业务的发展,系统的功能也越来越多,这时候很多业务操作就需要异步执行,提高效率。大多时候,我们都可以采用线程池来实现异步执行的需求,但是,有时候主流程需要等待其他任务执行完以后才能继续执行,又或者是主流程需要知道其他任务执行的结果,这时候,就需要采用线程的合并了。
在讲线程合并之前,我们先来看看平时使用 Thread 是怎么实现主线程等待其他线程的。
join 合并线程
当一个线程需要等待其他线程执行完再继续执行的时候,我们可以使用 join 或者是 CountDownLatch,这里只说 join,CountDownLatch 前面的文章已经说过,这里不再赘述。
当线程 A 调用线程 B 的 join 方法时,线程 A 会阻塞,直到线程 B 执行完以后才继续执行。
下面举个简单例子:下班回家,煮饭、做菜和烧水这些日常操作是怎么用多线程来实现的。下面是具体流程:
主线程需要等待煮饭线程和烧水线程执行完才能继续下面的吃饭喝水操作。下面我们看看代码是怎么实现的
public class JoinDemo {
static class RiceThread extends Thread{
@Override
public void run() {
try {System.out.println(Thread.currentThread().getName() + "开始煮饭");
Thread.sleep(10 * 1000);
System.out.println(Thread.currentThread().getName() + "饭煮好了");
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
static class WaterThread extends Thread{
@Override
public void run() {
try {System.out.println(Thread.currentThread().getName() + "开始烧水");
Thread.sleep(5 * 1000);
System.out.println(Thread.currentThread().getName() + "水烧好了");
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
public static void main(String[] args){
try {System.out.println(Thread.currentThread().getName() + "回到家");
// 启动煮饭线程
RiceThread riceThread = new RiceThread();
riceThread.start();
// 启动烧水线程
WaterThread waterThread = new WaterThread();
waterThread.start();
System.out.println(Thread.currentThread().getName() + "开始做菜");
Thread.sleep(3 * 1000);
System.out.println(Thread.currentThread().getName() + "菜做好了,等着吃饭");
waterThread.join();
riceThread.join();
System.out.println(Thread.currentThread().getName() + "吃饭喝水。。。");
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
// 输出
main 回到家
Thread- 0 开始煮饭
main 开始做菜
Thread- 1 开始烧水
main 菜做好了,等着吃饭
Thread- 1 水烧好了
Thread- 0 饭煮好了
main 吃饭喝水。。。
上面例子看出,使用 join 可以实现主线程等待其他线程完成以后再往下执行,但是这里有个问题,如果其他线程执行出现问题怎么办,这时候主线程是不知道的,因为 Thread 中的 run 方法是没有返回值的。那有什么办法可以异步执行,而且可以获取结果的方法吗?可以用 Future 来解决。
Future
Future 是个接口,它有下面几个方法
public interface Future<V> {boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
主要是通过里面的 get() 方法来获取异步任务的执行结果, 这个方法是阻塞的,直到异步任务执行完成。
Future 只是个接口,我们不能直接使用它,需要它的实现类来完成,这时候我们不用自己来实现这个类,Java 给我们提高了一个现成的实现类 -FutureTask。
下面我们来看看 FutureTask 的构造方法
public FutureTask(Callable<V> callable) {if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
构造 FutureTask 需要传入一个 Callable 对象,那这个 Callable 又是什么呢?
我们都知道新起一个线程,可以实现 Runable 接口来实现,这个 Callable 和 Runable 接口很类似,它也只有一个方法:
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}
但是它和 Runable 的最大不同就在于这个方法是有返回值的,说到这里,大家应该知道了,获取异步执行结果就是通过这个返回值来获取的。
下面还剩最后一个问题,Java 中线程只有 Thread 类,那这个 Callable 是怎么实现启动新线程的呢?这就需要借助上面说到的 FutureTask 了,这个类实现了 RunnableFuture 接口,而这个接口同时继承了 Runable 和 Future 两个接口,Runable 可以作为参数传入 Thread 类,这样就把 Callable 和 Thread 联系在一起了。这样说着,大家听着可能有点迷糊,下面我们通过代码来重写一下上面的煮饭流程。
public class FutureDemo {
static class RiceThread implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {System.out.println(Thread.currentThread().getName() + "开始煮饭");
Thread.sleep(10 * 1000);
// 模拟煮饭出现问题
return false;
// System.out.println(Thread.currentThread().getName() + "饭煮好了");
// return true;
} catch (InterruptedException e) {System.out.println("煮饭出现异常");
return false;
}
}
}
static class WaterThread implements Callable<Boolean> {
@Override
public Boolean call() throws Exception {
try {System.out.println(Thread.currentThread().getName() + "开始烧水");
Thread.sleep(5 * 1000);
System.out.println(Thread.currentThread().getName() + "水烧好了");
return true;
} catch (InterruptedException e) {System.out.println("烧水出现异常");
return false;
}
}
}
public static void main(String[] args) {
try {Callable<Boolean> riceThread = new RiceThread();
FutureTask<Boolean> rickTask = new FutureTask<>(riceThread);
Thread rThread = new Thread(rickTask);
Callable<Boolean> waterThread = new WaterThread();
FutureTask<Boolean> waterTask = new FutureTask<>(waterThread);
Thread wThread = new Thread(waterTask);
System.out.println(Thread.currentThread().getName() + "回到家");
// 启动煮饭线程
rThread.start();
// 启动烧水线程
wThread.start();
System.out.println(Thread.currentThread().getName() + "开始做菜");
Thread.sleep(3 * 1000);
System.out.println(Thread.currentThread().getName() + "菜做好了,等着吃饭");
Boolean wFlag = waterTask.get();
if (!wFlag) {System.out.println(Thread.currentThread().getName() + "烧水出问题了,没水喝了");
return;
}
Boolean rFlag = rickTask.get();
if (!rFlag) {System.out.println(Thread.currentThread().getName() + "煮饭出问题了,没饭吃了");
return;
}
System.out.println(Thread.currentThread().getName() + "吃饭喝水。。。");
} catch (Exception e) {e.printStackTrace();
}
}
}
// 输出
main 回到家
main 开始做菜
Thread- 0 开始煮饭
Thread- 1 开始烧水
main 菜做好了,等着吃饭
Thread- 1 水烧好了
main 煮饭出问题了,没饭吃了
上面模拟了其他线程返回数据,主线程 FutrueTask 的 get() 方法来获取这些数据,然后进行相应的逻辑。
总结
在 Java 中,线程等待其他线程执行完,可以使用 join 来实现;线程等待其他线程而且需要获取结果,可以使用 FutureTask 来实现。但是 join() 和 FutureTask 里的 get() 方法都是阻塞的,那有什么方法可以完成上面需求而且不阻塞的吗?下回再说!