共计 4645 个字符,预计需要花费 12 分钟才能阅读完成。
Fork/Join
框架中两个外围类ForkJoinTask
与ForkJoinPool
, 申明ForkJoinTask
后, 将其退出ForkJoinPool
中, 并返回一个Future
对象。
ForkJoinPool
:ForkJoinTask
须要通过ForkJoinPool
来执行, 工作宰割的子工作会增加到当前工作保护的双端队列中, 进入队列的头部。当一个工作线程的队列里临时没有工作时, 它会随机从其它工作线程的队列尾部获取一个工作。ForkJoinTask
: 咱们须要应用ForkJoin
框架, 首先要创立一个ForkJoin
工作。它提供在工作中执行Fork()
和Join()
操作的机制, 通常状况下不须要间接继承ForkJoinTask
类, 而只须要继承它的子类,Fork/Join
框架提供以下两个子类。RecursiveAction
: 用于没有返回值的工作。RecursizeTask
: 用于有返回值的工作。
Exception
ForkJoinTask
在执行的时候可能会抛出异样, 然而咱们没有方法间接在主线程里捕捉异样, 所以ForkJoinTask
提供了isCompletedAbnormally()
办法来查看工作是否曾经抛出异样或曾经被勾销了, 并且能够通过ForkJoinTask
的getException
办法捕捉异样。
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** ForkJoinTask 运行状态 */
volatile int status; // 间接被 ForkJoin 池和工作线程拜访
static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
static final int SIGNAL = 0x00010000; // must be >= 1 << 16
static final int SMASK = 0x0000ffff; // short bits for tags
/**
* @Ruturn 工作是否扔出异样或被勾销
*/
public final boolean isCompletedAbnormally() {return status < NORMAL;}
/**
* 如果计算扔出异样,则返回异样
* 如果工作被勾销了则返回 CancellationException。如果工作没有实现或者没有抛出异样则返回 null
*/
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
}
ForkJoinPool
源码
public class ForkJoinPool extends AbstractExecutorService {
/**
* ForkJoinPool,它同 ThreadPoolExecutor 一样,也实现了 Executor 和 ExecutorService 接口。它应用了
* 一个有限队列来保留须要执行的工作,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希
* 望的线程数量,那么以后计算机可用的 CPU 数量会被设置为线程数量作为默认值。*/
public ForkJoinPool() {this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
// 有多个结构器,这里省略
volatile WorkQueue[] workQueues; // main registry
static final class WorkQueue {
final ForkJoinWorkerThread owner; // 工作线程
ForkJoinTask<?>[] array; // 工作
// 传入的是 ForkJoinPool 与指定的一个工作线程
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
}
}
FrokJoinPool
work stealing 算法
ForkJoinPool
保护了一组WorkQueue
, 也就是工作队列, 工作队列中又保护了一个工作线程ForkJoinWorkerThread
与一组工作工作ForkJoinTask
WorkQueue
是一个双端队列Deque(Double Ended Queue)
,Deque
是一种具备队列和栈性质的数据结构, 双端队列中的元素能够从两端弹出, 其限定插入和删除操作在表的两端进行。每个工作线程在运行中产生新的工作 (通常因为调用了
fork()
) 时, 会放在工作队列的对尾, 并且工作线程在解决本人的工作队列时, 应用的是LIFO
, 也就是说每次从队列尾部取工作来执行。每个工作线程在解决本人的工作队列同时,会尝试窃取一个工作(或是来自于刚刚提交到 pool 的工作,或是来自于其它工作线程的工作队列),窃取的工作位于其余线程的工作队列的队首,也就是说工作线程在窃取其余工作线程的工作时,应用的是 FIFO 形式。
在遇到
Join()
时, 如果须要Join
的工作尚未实现, 则会优先解决其它工作, 并期待其实现。在没有本人的工作时, 也没有任何能够窃取时, 则进入休眠。
public class ForkJoinPool extends AbstractExecutorService {public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {}
public <T> ForkJoinTask<T> submit(Callable<T> task) {}
public <T> ForkJoinTask<T> submit(Runnable task, T result) {}
public ForkJoinTask<?> submit(Runnable task) {}}
ForkJoinPool
本身也领有工作队列,这些工作队列的作用是用来接管由内部线程(非ForkJoinThread
线程)提交过去的工作,而这些工作队列被称为submitting queue
。
ForkJoinTask
工作的操作, 重要的是
fork()
和join()
。
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/**
* 在当前任务正在运行的池中异步执行此工作(如果实用)* 或应用 ForkJoinPool.commonPool()(如果不是 ForkJoinWorkerThread 实例)进行异步执行
*/
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();}
}
fork()
做的工作只有一件事, 就是把当前任务推入以后线程的工作队列里。
join()
的工作就比较复杂, 也是join()
能够使的线程免于被阻塞的起因。
- 查看调用
join()
的线程是否是ForkJoinThread
线程。如果不是(例如 main 线程), 则阻塞以后线程, 期待工作实现。如果是, 则不阻塞。 - 查看工作的实现状态, 如果曾经实现, 则间接返回后果。
- 如果工作尚未实现, 然而解决本人的工作队列, 则实现它。
- 如果工作曾经被其它线程偷走, 则这个小偷工作队列的工作以先进先出的形式执行, 帮忙小偷线程尽快实现
join
- 如果偷走工作的小偷也曾经把本人的工作全副做完, 正在期待须要
join
的工作时, 则找到小偷的小偷(递归执行), 帮忙它实现它的工作。
ForkJoinPool.submit
办法
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 生成一个池
ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinTask task=new ForkJoinExample(1, 100000);
ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
Integer sum = submit.get();
System.out.println("最初的后果是:"+sum);
}
每个工作线程本人领有的工作队列以外,
ForkJoinPool
本身也领有工作队列, 这些工作队列的作用是用来接管有内部线程 (非ForkJoinPool
) 提交过去的工作, 而这些工作队列被称为submitting queue
。
submit()
和fork()
没有本质区别, 只是提交对象变成了submitting queue
(还有一些初始化, 同步操作)。submitting queue
和其它work queue
一样, 是工作线程窃取的对象, 因而当其中的工作被一个工作线程胜利窃取时, 也就意味着提交的工作真正开始进入执行阶段。
关注微信公众号:【入门小站】, 解锁更多知识点。