堕落的人生啊……
如何获取 Thread 返回值?
偶然间看到这个问题,对于标配 jdk1.8
的我们是不是分分钟拍出答案?
答曰:简单,Callable
,完美解决,下一题……
可是,身处 jdk1.4
(甚至更早)的前辈们,要怎么做才能拿到线程返回值呢?或者说,禁用Callable
技能,怎么获取线程返回值?
嗯,这似乎是线程间通信的问题;只有 Runnable
作为武器,有些麻烦,接受挑战!
首先,定义任务 Task
// 任务 Task
class Task implements Runnable{
@Getter
Object result; // 返回值
@Override
public void run() {
try {
// 模拟某耗时逻辑
System.out.println(String.format("[%s] 执行中..",Thread.currentThread().getName()));
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {e.printStackTrace();
}
// 计算得到最终结果
result = Integer.valueOf(9987);
System.out.println(String.format("[%s] 执行完毕..",Thread.currentThread().getName()));
}
}
之后,启动线程
public static void main(String[] args) {
// 使用了内部类,采用如下方式 new
CallbackTest callbackTest = new CallbackTest();
Task task = callbackTest.new Task();
final String threadName = "T-1";
Thread thread = new Thread(task,threadName);
thread.start();}
好,T-1
线程启动了,看样子能很好的执行任务,问题是 main 方法
中怎么获取到 Task
的返回值 result
呢?
以目前的代码运行,效果绝对是 T - 1 线程单飞,和 main 线程没啥联系。
我有一项能力,总是能第一时间相当最简易的方法。
简单方式
main 线程辛苦些,多跑跑腿检查下 result 的状态:
public static void main(String[] args) {CallbackTest callbackTest = new CallbackTest();
Task task = callbackTest.new Task();
final String threadName = "T-1";
Thread thread = new Thread(task,threadName);
thread.start();
// main 线程频繁检查 T - 1 线程
while (true){if(task.getResult()!=null){System.out.println(String.format("结果 task=%s",task.getResult()));
break;
}
// 让 cpu 稍微冷静一下
TimeUnit.MILLISECONDS.sleep(200L);
System.out.println(String.format("[main] 勤劳检查 result 中(result=%s)",task.getResult()));
}
}
运行效果,可能是这样的:
[T-1] 执行中..
[main] 勤劳检查 result 中(result=null)[main] 勤劳检查 result 中(result=null)[main] 勤劳检查 result 中(result=null)[main] 勤劳检查 result 中(result=null)[main] 勤劳检查 result 中(result=null)[main] 勤劳检查 result 中(result=null)[main] 勤劳检查 result 中(result=null)[main] 勤劳检查 result 中(result=null)[main] 勤劳检查 result 中(result=null)[T-1] 执行完毕..
[main] 勤劳检查 result 中(result=9987)结果 task=9987
虽然已加入了对 cpu 而言人性化的休眠方法(sleep),但这依然不是个很好的方案。该方案极大的操劳了 main 线程,需要一遍遍的检查子线程的运行情况——子线程是否将最终结果赋值。
那有没有一种方式,可以在 T - 1 运行完之后,告诉 main 线程呢?
作为一个老派(技术陈旧)的程序员,我首先想到的是wait..notify 组合
wait..notify 组合方式
- wait 部分,检查 result 值,如果为 null 则表示 T - 1 还未执行完,安心等待
public static void main(String[] args){CallbackTest callbackTest = new CallbackTest();
Task task = callbackTest.new Task();
final String threadName = "T-1";
Thread thread = new Thread(task,threadName);
thread.start();
while (true){
// 检查 result 状态,还没有赋值,则等待
if(task.getResult()==null){System.out.println(String.format("[%s] 等待执行..",Thread.currentThread().getName()));
synchronized (task){
try {task.wait();
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
if(task.getResult()!=null){System.out.println(String.format("结果 task=%s",task.getResult()));
break;
}
}
}
- notify 部分,增加唤醒逻辑
class Task implements Runnable{
@Getter
Object result;
@Override
public void run() {
try {System.out.println(String.format("[%s] 执行中..",Thread.currentThread().getName()));
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {e.printStackTrace();
}
result = Integer.valueOf(9987);
// 唤醒 wait 的对象
synchronized (this){this.notify();
}
System.out.println(String.format("[%s] 执行完毕..",Thread.currentThread().getName()));
}
}
改造后,执行效果如下:
[T-1] 执行中..
[main] 等待执行..
[T-1] 执行完毕..
结果 task=9987
LockSupport 实现
其实也可以使用 LockSupport
实现,和 wait / notify 类似,直接贴出完整代码吧:
public class CallbackTest {
class Task implements Runnable{
@Getter
Object result;
// 构造函数传入调用线程(main 线程)Thread runner;
Task(Thread runner){this.runner = runner;}
@Override
public void run() {
try {System.out.println(String.format("[%s] 执行中..",Thread.currentThread().getName()));
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {e.printStackTrace();
}
result = Integer.valueOf(9987);
// 唤醒 main 线程
synchronized (this){LockSupport.unpark(runner);
}
System.out.println(String.format("[%s] 执行完毕..",Thread.currentThread().getName()));
}
}
public static void main(String[] args) {CallbackTest callbackTest = new CallbackTest();
Task task = callbackTest.new Task(Thread.currentThread());
final String threadName = "T-1";
Thread thread = new Thread(task,threadName);
thread.start();
while (true){if(task.getResult()==null){System.out.println(String.format("[%s] 等待执行..",Thread.currentThread().getName()));
LockSupport.park(); //main 线程阻塞}
if(task.getResult()!=null){System.out.println(String.format("结果 task=%s",task.getResult()));
break;
}
}
}
}
Callable 使用
至此,我们相当于可以用自己的方式获取到 Thread 的返回值了,此时回顾下文章开始初的解答:
偶然间看到这个问题,对于标配 `jdk1.8` 的我们是不是分分钟拍出答案?答曰:简单,`Callable`,完美解决,下一题……
当时很自然的就回答了Callable
,先看看它是怎么用的。
public class CallbackTest {
class Task implements Callable<Object> {
@Override
public Object call() {
try {
// 某耗时逻辑
System.out.println(String.format("[%s] 执行中..",Thread.currentThread().getName()));
TimeUnit.SECONDS.sleep(2L);
} catch (InterruptedException e) {e.printStackTrace();
}
return Integer.valueOf(9987);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {CallbackTest callbackTest = new CallbackTest();
Task task = callbackTest.new Task();
ExecutorService es = Executors.newSingleThreadExecutor();
Future<Object> future = es.submit(task);
System.out.println("结果:"+future.get());
es.shutdown();}
}
代码并不复杂,demo 中获取返回值的方式是future.get()
,这是一个阻塞方法;在子线程执行完(return)之前会一直阻塞。没用过的开发兄弟(姐妹?)们自行科普吧,不多解释了。
Callable 源码如下:
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;}
Callable 本身就一接口,没什么玄机,玄机在 Future 或者说 FutureTask
上。
重头戏来了,看看源码是怎么实现从其它线程获取返回值的。
FutureTask 解析
先瞧瞧 FutureTask 的江湖地位:
可以看出,FutureTask
是 Future 接口
和Runnable 接口
的实现类 。
此事留个大概印象,我们来看下 FutureTask
是怎么和 Callable
关联上的?
FutureTask 和 Callable 的关系
(可对照下文,追下源码;如果实在不理解,可直接跳到本章节末尾结论处)
ExecutorService es = Executors.newSingleThreadExecutor();
Future<Object> future = es.submit(task);
以例子中的 ExecutorService 的 submit 方法作为入口,实际的实现方法为 AbstractExecutorService
的submit
:
/* `AbstractExecutorService` */
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // 注释 1 - 构建了 FutureTask
execute(ftask); // 注释 2 - 最终会调用 ftask 的 run 方法,也就是调用 ` 步骤 1 构建的 FutureTask 对象的 run 方法
return ftask;
}
...
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable); //1.1- 调用 FutureTask 的构造函数
}
-
注释 1 – 观察
FutureTask
的构造函数:
// callable 是 FutureTask 的成员变量
private Callable<V> callable;
public FutureTask(Callable<V> callable) {if (callable == null)
throw new NullPointerException();
this.callable = callable; // 为成员变量赋值
this.state = NEW; // ensure visibility of callable
}
结论 1
:经 ExecutorService 的穿针引线,Callable 会最终赋值给 FutureTask 的成员变量
- 注释 2 – 再次追踪下执行部分,注意看注释的分析:
/* `AbstractExecutorService` 的 `submit` */
public <T> Future<T> submit(Callable<T> task) {
...
execute(ftask); // 注释 2 - 最终会调用 ftask 的 run 方法,也就是调用 ` 步骤 1 构建的 FutureTask 对象的 run 方法
...
}
↓↓↓↓↓
↓↓↓↓↓
/* ThreadPoolExecutor 的 execute */
public void execute(Runnable command) {
...
addWorker(null, false); // 添加到 workder 中
...
}
↓↓↓↓↓
↓↓↓↓↓
/* ThreadPoolExecutor 的 addWorker */
private boolean addWorker(Runnable firstTask, boolean core) {w = new Worker(firstTask); //`Worker` 封装
final Thread t = w.thread;
...
t.start(); // 注释 3 -worker 中的 thread 执行 start 方法,会调用对应 Runnable 的 run 方法
...
}
↓↓↓↓↓
↓↓↓↓↓
/* 内部类 `Work` */
final Thread thread; // 成员变量
Runnable firstTask; // 成员变量
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask; // 赋值成员变量 thread
this.thread = getThreadFactory().newThread(this); // 创建新线程,并赋值成员变量 firstTask
}
// 3.1-` 注释 3` 处的 start,会执行此处的 run 方法,进而会调用 runWorker 方法
public void run() {runWorker(this);
}
final void runWorker(Worker w) {
...
Runnable task = w.firstTask;
...
task.run(); //##### 注意了,最终会调用到此处 #####
...
}
task.run()
中的 task 又是什么呢,就是在最开始赋值的 FutureTask(注释 1 处),看下它的 run 方法
public void run() {
...
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {result = c.call(); // 会调用 callable 的 call 方法,这个方法中就是我们自己定义的逻辑
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result); // 注释 4 - 赋值动作
}
...
}
结论 2
:调用过程,经过一系列周转,最终会调用 Callable 的 call 方法(也就是我们的自定义逻辑)
- 注释 4 – 看下此处的赋值动作
// 成员变量
private Object outcome;
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 赋值给成员变量
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();}
}
结论就是,FutureTask 包装了 Callable,执行期 call 方法后将返回值赋值给成员变量
接下来探索下返回值的获取,即 Future.get()
的实现。
返回值获取
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 1- 未完成状态,线程阻塞
s = awaitDone(false, 0L);
return report(s); // 2- 已完成状态,直接获取
}
// 1.1- 阻塞
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
...
LockSupport.park(this); // 阻塞
...
// 2.1- 返回了 outcome
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
get() 的逻辑并不复杂:
- 判断状态,如果此时还未执行完,或者说还未给成员变量
outcome
(call() 方法返回值的引用)赋值,阻塞 - 如果此时已经给
outcome
赋值,则将该对象返回
怎么样,是不是有点似曾相识?这和我们自己实现的那一版的逻辑是一致的!
再次看下 set() 方法,找找里面的LockSupport.unpark(Thread t)
方法,作为证据。
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion(); // 这里看上去很可疑}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {if (WAITERS.weakCompareAndSet(this, q, null)) {for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // 吼吼吼,抓到你了,果然有 LockSupport.unpark(t)
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
果然找到了 park 方法对应的 unpark,证明我们的推断是正确的——FutureTask
的核心实现思路 ,与我们自己的实现方式是一致的(尤其 LockSupport 版本), 即子线程未完成时阻塞,已完成时释放。
主逻辑分析完了,再来两个开胃甜点。
关注点
LockSupport
对比自己和源码的实现,都用 LockSupport,使用的阻塞方法却不相同——park()
vs park(Object blocker)
差别在哪?引用官方文档的解释:
The three forms of park each also support a blocker object parameter.
This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked.
当线程被阻塞时记录此对象,以允许监视和诊断工具识别线程被阻塞的原因。(Such tools may access blockers using method getBlocker(java.lang.Thread).)
The use of these forms rather than the original forms without this parameter is strongly encouraged.
待有参数的 park(Object blocker)是被强烈推荐的
The normal argument to supply as a blocker within a lock implementation is this.
按文档中的意思:传入的 blocker 对象,相当于一个标志对象,线程阻塞时会记录下来。下面的例子能明显看出差别
举例说明:(转自 https://www.jianshu.com/p/835…)
private static void parkVsParkBlocker() {Thread t1 = new Thread(() -> {LockSupport.park();
}, "t1");
t1.start();
Object blocker = new Object();
Thread t2 = new Thread(() -> {LockSupport.park(blocker);
}, "t2");
t2.start();
LockSupport.getBlocker(t2);
unpark(t1, 60);
unpark(t2, 60);
}
Print java stack trace of a given jvm process.
jstack jps -l | grep LockSupport | awk '{print $1}'
VarHandle
FutureTask 作为抽象出的工具类,考虑了多线程环境下的 get()的情况,这不部分在前文故意忽略了。
而并发环境下的数据统一,主要靠下面几个 volatile
关键字 +CAS
来达成。(经典模式)
// 状态,记录子线程执行情况
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
// 记录子线程,运行 Callable.call()的线程
private volatile Thread runner;
// 等待节点,链表
private volatile WaitNode waiters;
volatile 关键字,此处主要用于让其它线程可见(可见性);那CAS
(Compare And Sweep)是做什么的?
本质上,它就是个乐观锁:
-
比较 某内存地址下的某一变量的 当前值 和猜想值 是否一致,如果一致,原子替换 该变量为新值,
return true
- 如果不一致,
return false
jdk 9
之前,主要靠 Unsafe;jdk 9
开始,推出了 VarHandle
, 旨在替代 AtomicXX,以及方便开发人员使用 Unsafe 的部分权能。
以状态 state 的变更为例:
private volatile int state;
/* 声明和赋值 */
private static final VarHandle STATE;
static{
try {MethodHandles.Lookup l = MethodHandles.lookup(); //1 - 通过 MethodHandles.lookup()声明 MethodHandles.Lookup 对象
STATE = l.findVarHandle(FutureTask.class, "state", int.class); //2 - 赋值 VarHandle STATE,此时 STATE 和 state 就建立了某种联系
} catch (ReflectiveOperationException e) {throw new ExceptionInInitializerError(e);
}
}
/* 调用 */
protected void set(V v) {if (STATE.compareAndSet(this, NEW, COMPLETING)) { // 3 - 当前对象,将 state 变量,由 NEW 改成 COMPLETING
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();}
}