Fork/Join框架中两个外围类ForkJoinTaskForkJoinPool,申明ForkJoinTask后,将其退出ForkJoinPool中,并返回一个Future对象。
  • ForkJoinPool:ForkJoinTask须要通过ForkJoinPool来执行,工作宰割的子工作会增加到当前工作保护的双端队列中,进入队列的头部。当一个工作线程的队列里临时没有工作时,它会随机从其它工作线程的队列尾部获取一个工作。
  • ForkJoinTask:咱们须要应用ForkJoin框架,首先要创立一个ForkJoin工作。它提供在工作中执行Fork()Join()操作的机制,通常状况下不须要间接继承ForkJoinTask类,而只须要继承它的子类,Fork/Join框架提供以下两个子类。
  • RecursiveAction:用于没有返回值的工作。
  • RecursizeTask:用于有返回值的工作。

Exception

ForkJoinTask在执行的时候可能会抛出异样,然而咱们没有方法间接在主线程里捕捉异样,所以ForkJoinTask提供了isCompletedAbnormally()办法来查看工作是否曾经抛出异样或曾经被勾销了,并且能够通过ForkJoinTaskgetException办法捕捉异样。
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {    /** ForkJoinTask运行状态 */    volatile int status; // 间接被ForkJoin池和工作线程拜访    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits    static final int NORMAL      = 0xf0000000;  // must be negative    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16    static final int SMASK       = 0x0000ffff;  // short bits for tags        /**     * @Ruturn 工作是否扔出异样或被勾销     */    public final boolean isCompletedAbnormally() {        return status < NORMAL;    }        /**     * 如果计算扔出异样,则返回异样     * 如果工作被勾销了则返回CancellationException。如果工作没有实现或者没有抛出异样则返回null     */    public final Throwable getException() {        int s = status & DONE_MASK;        return ((s >= NORMAL)    ? null :                (s == CANCELLED) ? new CancellationException() :                getThrowableException());    }}

ForkJoinPool源码

public class ForkJoinPool extends AbstractExecutorService {    /**     * ForkJoinPool,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它应用了     * 一个有限队列来保留须要执行的工作,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希     * 望的线程数量,那么以后计算机可用的CPU数量会被设置为线程数量作为默认值。     */    public ForkJoinPool() {        this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),             defaultForkJoinWorkerThreadFactory, null, false);    }    public ForkJoinPool(int parallelism) {        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);    }    //有多个结构器,这里省略        volatile WorkQueue[] workQueues;     // main registry    static final class WorkQueue {        final ForkJoinWorkerThread owner; // 工作线程        ForkJoinTask<?>[] array;   // 工作                //传入的是ForkJoinPool与指定的一个工作线程        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {            this.pool = pool;            this.owner = owner;            // Place indices in the center of array (that is not yet allocated)            base = top = INITIAL_QUEUE_CAPACITY >>> 1;        }    }}

FrokJoinPool work stealing算法

ForkJoinPool保护了一组WorkQueue,也就是工作队列,工作队列中又保护了一个工作线程ForkJoinWorkerThread与一组工作工作ForkJoinTask

WorkQueue是一个双端队列Deque(Double Ended Queue),Deque是一种具备队列和栈性质的数据结构,双端队列中的元素能够从两端弹出,其限定插入和删除操作在表的两端进行。

每个工作线程在运行中产生新的工作(通常因为调用了fork())时,会放在工作队列的对尾,并且工作线程在解决本人的工作队列时,应用的是LIFO,也就是说每次从队列尾部取工作来执行。

每个工作线程在解决本人的工作队列同时,会尝试窃取一个工作(或是来自于刚刚提交到 pool 的工作,或是来自于其它工作线程的工作队列),窃取的工作位于其余线程的工作队列的队首,也就是说工作线程在窃取其余工作线程的工作时,应用的是 FIFO 形式。

在遇到Join()时,如果须要Join的工作尚未实现,则会优先解决其它工作,并期待其实现。

在没有本人的工作时,也没有任何能够窃取时,则进入休眠。

public class ForkJoinPool extends AbstractExecutorService {    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {}    public <T> ForkJoinTask<T> submit(Callable<T> task) {}    public <T> ForkJoinTask<T> submit(Runnable task, T result) {}    public ForkJoinTask<?> submit(Runnable task) {}}
ForkJoinPool本身也领有工作队列,这些工作队列的作用是用来接管由内部线程(非 ForkJoinThread线程)提交过去的工作,而这些工作队列被称为 submitting queue

ForkJoinTask

工作的操作,重要的是fork()join()
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {    /**     * 在当前任务正在运行的池中异步执行此工作(如果实用)     * 或应用ForkJoinPool.commonPool()(如果不是ForkJoinWorkerThread实例)进行异步执行      */    public final ForkJoinTask<V> fork() {        Thread t;        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)            ((ForkJoinWorkerThread)t).workQueue.push(this);        else            ForkJoinPool.common.externalPush(this);        return this;    }        public final V join() {        int s;        if ((s = doJoin() & DONE_MASK) != NORMAL)            reportException(s);        return getRawResult();    }        private int doJoin() {        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;        return (s = status) < 0 ? s :            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?            (w = (wt = (ForkJoinWorkerThread)t).workQueue).            tryUnpush(this) && (s = doExec()) < 0 ? s :            wt.pool.awaitJoin(w, this, 0L) :            externalAwaitDone();    }}
fork()做的工作只有一件事,就是把当前任务推入以后线程的工作队列里。

join()的工作就比较复杂,也是join()能够使的线程免于被阻塞的起因。

  • 查看调用join()的线程是否是ForkJoinThread线程。如果不是(例如main线程),则阻塞以后线程,期待工作实现。如果是,则不阻塞。
  • 查看工作的实现状态,如果曾经实现,则间接返回后果。
  • 如果工作尚未实现,然而解决本人的工作队列,则实现它。
  • 如果工作曾经被其它线程偷走,则这个小偷工作队列的工作以先进先出的形式执行,帮忙小偷线程尽快实现join
  • 如果偷走工作的小偷也曾经把本人的工作全副做完,正在期待须要join的工作时,则找到小偷的小偷(递归执行),帮忙它实现它的工作。

ForkJoinPool.submit办法

public static void main(String[] args) throws ExecutionException, InterruptedException {        //生成一个池        ForkJoinPool forkJoinPool=new ForkJoinPool();        ForkJoinTask task=new ForkJoinExample(1, 100000);        ForkJoinTask<Integer> submit = forkJoinPool.submit(task);        Integer sum = submit.get();        System.out.println("最初的后果是:"+sum);}
每个工作线程本人领有的工作队列以外,ForkJoinPool本身也领有工作队列,这些工作队列的作用是用来接管有内部线程(非ForkJoinPool)提交过去的工作,而这些工作队列被称为submitting queue

submit()fork()没有本质区别,只是提交对象变成了submitting queue(还有一些初始化,同步操作)。submitting queue和其它work queue一样,是工作线程窃取的对象,因而当其中的工作被一个工作线程胜利窃取时,也就意味着提交的工作真正开始进入执行阶段。

关注微信公众号:【入门小站】,解锁更多知识点。