共计 7353 个字符,预计需要花费 19 分钟才能阅读完成。
前言
在并发编程当中咱们最常见的需要就是启动一个线程执行一个函数去实现咱们的需要,而在这种需要当中,咱们经常须要函数有返回值。比方咱们须要同一个十分大的数组当中数据的和,让每一个线程求某一个区间外部的和,最终将这些和加起来,那么每个线程都须要返回对应区间的和。而在 Java 当中给咱们提供了这种机制,去实现这一个成果——FutureTask
。
FutureTask
在本人写 FutureTask
之前咱们首先写一个例子来回顾一下 FutureTask
的编程步骤:
- 写一个类实现
Callable
接口。
@FunctionalInterface | |
public interface Callable<V> { | |
/** | |
* Computes a result, or throws an exception if unable to do so. | |
* | |
* @return computed result | |
* @throws Exception if unable to compute a result | |
*/ | |
V call() throws Exception;} |
实现接口就实现 call
即可,能够看到这个函数是有返回值的,而 FutureTask
返回给咱们的值就是这个函数的返回值。
new
一个FutureTask
对象,并且new
一个第一步写的类,new FutureTask<>(callable 实现类)
。- 最初将刚刚失去的
FutureTask
对象传入Thread
类当中,而后启动线程即可new Thread(futureTask).start();
。 - 而后咱们能够调用
FutureTask
的get
办法失去返回的后果futureTask.get();
。
如果有一个数组
data
,长度为 100000,当初有 10 个线程,第i
个线程求数组[i * 10000, (i + 1) * 10000)
所有数据的和,而后将这十个线程的后果加起来。
import java.lang.reflect.Array; | |
import java.util.Arrays; | |
import java.util.Random; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.FutureTask; | |
public class FutureTaskDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {int[] data = new int[100000]; | |
Random random = new Random(); | |
for (int i = 0; i < 100000; i++) {data[i] = random.nextInt(10000); | |
} | |
@SuppressWarnings("unchecked") | |
FutureTask<Integer>[] tasks = (FutureTask<Integer>[]) Array.newInstance(FutureTask.class, 10); | |
// 设置 10 个 futuretask 工作计算数组当中数据的和 | |
for (int i = 0; i < 10; i++) { | |
int idx = i; | |
tasks[i] = new FutureTask<>(() -> { | |
int sum = 0; | |
for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {sum += data[k]; | |
} | |
return sum; | |
}); | |
} | |
// 开启线程执行 futureTask 工作 | |
for (FutureTask<Integer> futureTask : tasks) {new Thread(futureTask).start();} | |
int threadSum = 0; | |
for (FutureTask<Integer> futureTask : tasks) {threadSum += futureTask.get(); | |
} | |
int sum = Arrays.stream(data).sum(); | |
System.out.println(sum == threadSum); // 后果始终为 true | |
} | |
} |
可能你会对 FutureTask
的应用形式感觉困惑,或者不是很分明,当初咱们来认真捋一下思路。
- 首先启动一个线程要么是继承自
Thread
类,而后重写Thread
类的run
办法,要么是给Thread
类传递一个实现了Runnable
的类对象,当然能够用匿名外部类实现。 - 既然咱们的
FutureTask
对象能够传递给Thread
类,阐明FutureTask
必定是实现了Runnable
接口,咱们当初来看一下FutureTask
的继承体系。
能够发现的是 FutureTask
的确实现了 Runnable
接口,同时还实现了 Future
接口,这个 Future
接口次要提供了前面咱们应用 FutureTask
的一系列函数比方get
。
- 看到这里你应该可能大抵想到在
FutureTask
中的run
办法会调用Callable
当中实现的call
办法,而后将后果保留下来,当调用get
办法的时候再将这个后果返回。
本人实现 FutureTask
工具筹备
通过上文的剖析你可能曾经大抵理解了 FutureTask
的大抵执行过程了,然而须要留神的是,如果你执行 FutureTask
的get
办法是可能阻塞的,因为可能 Callable
的call
办法还没有执行实现。因而在 get
办法当中就须要有阻塞线程的代码,然而当 call
办法执行实现之后须要将这些线程都唤醒。
在本篇文章当中应用锁 ReentrantLock
和条件变量 Condition
进行线程的阻塞和唤醒,在咱们本人入手实现 FutureTask
之前,咱们先相熟一下下面两种工具的应用办法。
-
ReentrantLock
次要有两个办法:lock
对临界区代码块进行加锁。unlock
对临界区代码进行解锁。
-
Condition
次要有三个办法:await
阻塞调用这个办法的线程,期待其余线程唤醒。signal
唤醒一个被await
办法阻塞的线程。signalAll
唤醒所有被await
办法阻塞的线程。
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.ReentrantLock; | |
public class LockDemo { | |
private ReentrantLock lock; | |
private Condition condition; | |
LockDemo() {lock = new ReentrantLock(); | |
condition = lock.newCondition();} | |
public void blocking() {lock.lock(); | |
try {System.out.println(Thread.currentThread() + "筹备期待被其余线程唤醒"); | |
condition.await();} catch (InterruptedException e) {e.printStackTrace(); | |
}finally {lock.unlock(); | |
} | |
} | |
public void inform() throws InterruptedException { | |
// 先休眠两秒 等他其余线程先阻塞 | |
TimeUnit.SECONDS.sleep(2); | |
lock.lock(); | |
try {System.out.println(Thread.currentThread() + "筹备唤醒其余线程"); | |
condition.signal(); // 唤醒一个被 await 办法阻塞的线程 | |
// condition.signalAll(); // 唤醒所有被 await 办法阻塞的线程}finally {lock.unlock(); | |
} | |
} | |
public static void main(String[] args) {LockDemo lockDemo = new LockDemo(); | |
Thread thread = new Thread(() -> {lockDemo.blocking(); // 执行阻塞线程的代码 | |
}, "Blocking-Thread"); | |
Thread thread1 = new Thread(() -> { | |
try {lockDemo.inform(); // 执行唤醒线程的代码 | |
} catch (InterruptedException e) {e.printStackTrace(); | |
} | |
}, "Inform-Thread"); | |
thread.start(); | |
thread1.start();} | |
} |
下面的代码的输入:
Thread[Blocking-Thread,5,main] 筹备期待被其余线程唤醒 | |
Thread[Inform-Thread,5,main] 筹备唤醒其余线程 |
FutureTask 设计与实现
在前文当中咱们曾经谈到了 FutureTask
的实现原理,次要有以下几点:
- 构造函数须要传入一个实现了
Callable
接口的类对象,这个将会在FutureTask
的run
办法执行,而后失去函数的返回值,并且将返回值存储起来。 - 当线程调用
get
办法的时候,如果这个时候Callable
当中的call
曾经执行实现,间接返回call
函数返回的后果就行,如果call
函数还没有执行实现,那么就须要将调用get
办法的线程挂起,这里咱们能够应用condition.await()
将线程挂起。 - 在
call
函数执行实现之后,须要将之前被get
办法挂起的线程唤醒继续执行,这里应用condition.signalAll()
将所有挂起的线程唤醒。 - 因为是咱们本人实现
FutureTask
,性能不会那么齐全,只须要可能满足咱们的次要需要即可,次要是帮忙大家理解FutureTask
原理。
实现代码如下(剖析都在正文当中):
import java.util.concurrent.Callable; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.ReentrantLock; | |
// 这里须要实现 Runnable 接口,因为须要将这个对象放入 Thread 类当中 | |
// 而 Thread 要求传入的对象实现了 Runnable 接口 | |
public class MyFutureTask<V> implements Runnable { | |
private final Callable<V> callable; | |
private Object returnVal; // 这个示意咱们最终的返回值 | |
private final ReentrantLock lock; | |
private final Condition condition; | |
public MyFutureTask(Callable<V> callable) { | |
// 将传入的 callable 对象存储起来 不便在前面的 run 办法当中调用 | |
this.callable = callable; | |
lock = new ReentrantLock(); | |
condition = lock.newCondition();} | |
@SuppressWarnings("unchecked") | |
public V get(long timeout, TimeUnit unit) {if (returnVal != null) // 如果符合条件 阐明 call 函数曾经执行实现 返回值曾经不为 null 了 | |
return (V) returnVal; // 间接将后果返回即可 这样不必竞争锁资源 进步程序执行效率 | |
lock.lock(); | |
try {// 这里须要进行二次判断 (双重查看) | |
// 因为如果一个线程在第一次判断 returnVal 为空 | |
// 而后这个时候它可能因为获取锁而被挂起 | |
// 而在被挂起的这段时间,call 可能曾经执行实现 | |
// 如果这个时候不进行判断间接执行 await 办法 | |
// 那前面这个线程将无奈被唤醒 | |
if (returnVal == null) | |
condition.await(timeout, unit); | |
} catch (InterruptedException e) {e.printStackTrace(); | |
} finally {lock.unlock(); | |
} | |
return (V) returnVal; | |
} | |
@SuppressWarnings("unchecked") | |
public V get() {if (returnVal != null) | |
return (V) returnVal; | |
lock.lock(); | |
try { | |
// 同样的须要进行双重查看 | |
if (returnVal == null) | |
condition.await();} catch (InterruptedException e) {e.printStackTrace(); | |
} finally {lock.unlock(); | |
} | |
return (V) returnVal; | |
} | |
@Override | |
public void run() {if (returnVal != null) | |
return; | |
try { | |
// 在 Runnable 的 run 办法当中 | |
// 执行 Callable 办法的 call 失去返回后果 | |
returnVal = callable.call();} catch (Exception e) {e.printStackTrace(); | |
} | |
lock.lock(); | |
try { | |
// 因为曾经失去了后果 | |
// 因而须要将所有被 await 办法阻塞的线程唤醒 | |
// 让他们从 get 办法返回 | |
condition.signalAll();}finally {lock.unlock(); | |
} | |
} | |
// 上面是测试代码 | |
public static void main(String[] args) {MyFutureTask<Integer> ft = new MyFutureTask<>(() -> {TimeUnit.SECONDS.sleep(2); | |
return 101; | |
}); | |
Thread thread = new Thread(ft); | |
thread.start(); | |
System.out.println(ft.get(100, TimeUnit.MILLISECONDS)); // 输入为 null | |
System.out.println(ft.get()); // 输入为 101 | |
} | |
} |
咱们当初用咱们本人写的 MyFutureTask
去实现在前文当中数组求和的例子:
public static void main(String[] args) throws ExecutionException, InterruptedException {int[] data = new int[100000]; | |
Random random = new Random(); | |
for (int i = 0; i < 100000; i++) {data[i] = random.nextInt(10000); | |
} | |
@SuppressWarnings("unchecked") | |
MyFutureTask<Integer>[] tasks = (MyFutureTask<Integer>[]) Array.newInstance(MyFutureTask.class, 10); | |
for (int i = 0; i < 10; i++) { | |
int idx = i; | |
tasks[i] = new MyFutureTask<>(() -> { | |
int sum = 0; | |
for (int k = idx * 10000; k < (idx + 1) * 10000; k++) {sum += data[k]; | |
} | |
return sum; | |
}); | |
} | |
for (MyFutureTask<Integer> MyFutureTask : tasks) {new Thread(MyFutureTask).start();} | |
int threadSum = 0; | |
for (MyFutureTask<Integer> MyFutureTask : tasks) {threadSum += MyFutureTask.get(); | |
} | |
int sum = Arrays.stream(data).sum(); | |
System.out.println(sum == threadSum); // 输入后果为 true | |
} |
总结
在本篇文章当中次要给大家介绍了 FutureTask
的外部原理,并且咱们本人通过应用 ReentrantLock
和Condition
实现了咱们本人的FutureTask
,本篇文章的次要内容如下:
-
FutureTask
的外部原理:FutureTask
首先会继承Runnable
接口,这样就能够将FutureTask
的对象间接放入Thread
类当中,作为构造函数的参数。- 咱们在应用
FutureTask
的时候须要传入一个Callable
实现类的对象,在函数call
当中实现咱们须要执行的函数,执行实现之后,将call
函数的返回值保留下来,当有线程调用get
办法时候将保留的返回值返回。
-
咱们应用条件变量进行对线程的阻塞和唤醒。
- 当有线程调用
get
办法时,如果call
曾经执行实现,那么能够间接将后果返回,否则须要应用条件变量将线程挂起。 - 当
call
函数执行实现的时候,须要应用条件变量将所有阻塞在get
办法的线程唤醒。
- 当有线程调用
-
双重查看:
- 咱们在
get
办法当中首先判断returnVal
是否为空,如果不为空间接将后果返回,这就能够不必去竞争锁资源了,能够进步程序执行的效率。 - 然而咱们在应用锁爱护的临界区还须要进行判断,判断
returnVal
是否为空,因为如果一个线程在第一次判断returnVal
为空,而后这个时候它可能因为获取锁而被挂起,而在被挂起的这段时间,call 可能曾经执行实现,如果这个时候不进行判断间接执行 await 办法,那前面这个线程将无奈被唤醒,因为在call
函数执行实现之后调用了condition.signalAll()
,如果线程在这之后执行await
办法,那么未来再没有线程去将这些线程唤醒。
- 咱们在
更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…
关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。