死磕-java线程系列之自己动手写一个线程池续

28次阅读

共计 5983 个字符,预计需要花费 15 分钟才能阅读完成。

(手机横屏看源码更方便)


问题

(1)自己动手写的线程池如何支持带返回值的任务呢?

(2)如果任务执行的过程中抛出异常了该怎么处理呢?

简介

上一章我们自己动手写了一个线程池,但是它是不支持带返回值的任务的,那么,我们自己能否实现呢?必须可以,今天我们就一起来实现带返回值任务的线程池。

前情回顾

首先,让我们先回顾一下上一章写的线程池:

(1)它包含四个要素:核心线程数、最大线程数、任务队列、拒绝策略;

(2)它具有执行无返回值任务的能力;

(3)它无法处理有返回值的任务;

(4)它无法处理任务执行的异常(线程中的异常不会抛出到线程外);

那么,我们能不能在现有的基础上实现其下面两项能力呢?让我们一起来试一试吧!

有返回值和无返回值的任务到底有何不同?

答案很明显,就是一个有返回值,一个无返回值,用伪代码来表示就是下面这样:

    // 无返回值
    threadPool.execute(()->{System.out.println(1);
    });
    // 有返回值,分两步走
    // 1. 提交任务到线程池中
    SomeClass result = threadPool.execute(()->{System.out.println(1);
        return 1;
    });
    // 2. 等待任务的结果返回
    Object value = result.get();
    

无返回值的任务提交了就完事,主线程并不 Care 它到底有没有执行完,并不关心它是不是抛出异常,主线程 Just 提交线程到线程池中,其余什么都不管。

有返回值的任务就不一样了,主线程首先要提交任务到线程池中,它需要使用到任务执行的结果,所以它必须等待任务执行完毕才能拿到任务执行的结果。

那么,为什么不直接在 execute 的时候就等待任务执行完毕呢?这样的话那不就跟串行没啥区别了,还不如直接在主线程执行任务呢,还少了线程切换的资源消耗。

之所以要分成两步,是因为主线程并不一定需要立即获取返回值,在需要用到返回值的时候才去 get,这样就可以在提交任务和获取返回值之间干些其它的事情,提高效率。

所以,提交任务的时候不需要阻塞,get 返回值的时候才可能需要阻塞,如果 get 的时候任务已经执行完毕了,这时候也不需要阻塞,如果 get 的时候任务还未执行完毕,那就要阻塞等待任务执行完毕才能获取到返回值。

实现分析

首先,无返回值的任务我们直接使用的 Runnable 函数式接口,有返回值的任务有没有现成的接口呢?还真有,那就是 Callable 接口,它有个返回值。

@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}

其次,提交任务的时候需要有个返回值,它是在将来用来获取任务执行结果的,实际上它也是新任务的一种能力,可以使用它对任务进行包装,使其具有返回值的能力。

public interface Future<T> {T get();
}

再次,我们需要给现有的线程池增加一种新的能力,根据单一职责原则,我们定义一个新的接口来承载这种能力。

public interface FutureExecutor extends Executor {<T> Future<T> submit(Callable<T> command);
}

然后 ,我们需要一种新的任务,它既具有旧任务的执行能力(run() 方法),又具有新任务的返回值能力(get()方法),所以我们造一个“将来的任务”对提交的任务进行包装,使其具有返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {

    /**
     * 真正的任务
     */
    private Callable<T> task;
    
    public FutureTask(Callable<T> task) {this.task = task;}

    @Override
    public void run() {// 具体实现...}

    @Override
    public T get() {// 具体实现...}
}

最后,我们只要对原有的线程池进行扩展,将提交的任务包装成“将来获取返回值的任务”,还是使用原来的方法去执行,然后返回这个将来的任务即可。

根据开闭原则,【本篇文章由公众号“彤哥读源码”原创】原来的代码我们不做任何修改,扩展新的子类来实现新的能力。

public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor {public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {super(name, coreSize, maxSize, taskQueue, rejectPolicy);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        // 包装成将来获取返回值的任务
        FutureTask<T> futureTask = new FutureTask<>(task);
        // 还是使用原来的执行能力
        execute(futureTask);
        // 返回将来的任务,只需要返回其 get 返回值的能力即可
        // 所以这里返回的是 Future 而不是 FutureTask 类型
        return futureTask;
    }
}

好了,到这里整体的逻辑我们就已经比较清晰地实现完了,还剩下最关键的部分,这个“将来的任务”的两个能力要如何实现。

将来的任务

将来的任务,具有两个能力:一是执行真正任务的能力,二是将来获取返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {
    @Override
    public void run() {// 具体实现...}

    @Override
    public T get() {// 具体实现...}
}

首先,我们要明确一件事,任务的执行是线程池中,获取返回值是在主线程中,它们是在两个线程中执行的,而且谁先谁后我们无法确定。

其次 ,如果 run() 在 get()之前执行,我们需要告诉 get()任务已经执行完毕了,所以需要一个状态来通知这个事,还需要一个变量来承载任务执行的返回值。

    /**
     * 任务执行的状态,0 未开始,1 正常完成,2 异常完成
     * 也可以使用 volatile+Unsafe 实现 CAS 操作
     */
    private AtomicInteger state = new AtomicInteger(NEW);
    private static final int NEW = 0;
    private static final int FINISHED = 1;
    private static final int EXCEPTION = 2;
    /**
     * 任务执行的结果【本篇文章由公众号“彤哥读源码”原创】* 如果执行正常,返回结果为 T
     * 如果执行异常,返回结果为 Exception
     */
    private Object result;

再次 ,如果 get() 在 run()之前执行,那就需要阻塞等待 run()执行完毕才能拿到返回值,所以需要保存调用者(主线程),get()的时候 park 阻塞住,run()完成了 unpark 唤醒它来拿返回值。

    /**
     * 调用者线程
     * 也可以使用 volatile+Unsafe 实现 CAS 操作
     */
    private AtomicReference<Thread> caller = new AtomicReference<>();

然后 ,我们先来看看 run() 方法的逻辑,它其实就是先执行真正的任务,然后修改状态为完成,并保存任务的返回值,如果保存了主线程,还要唤醒它。

    @Override
    public void run() {
        // 如果状态不是 NEW,说明执行过了,直接返回
        if (state.get() != NEW) {return;}
        try {
            // 执行任务【本篇文章由公众号“彤哥读源码”原创】T r = task.call();
            // CAS 更新 state 的值为 FINISHED
            // 如果更新成功,就把 r 赋值给 result
            // 如果更新失败,说明 state 的值不为 NEW 了,也就是任务已经执行过了
            if (state.compareAndSet(NEW, FINISHED)) {
                this.result = r;
                // finish()必须放在修改 state 里面,见下面的分析
                finish();}
        } catch (Exception e) {
            // 如果 CAS 更新 state 的值为 EXCEPTION 成功,就把 e 赋值给 result
            // 如果 CAS 更新失败,说明 state 的值不为 NEW 了,也就是任务已经执行过了
            if (state.compareAndSet(NEW, EXCEPTION)) {
                this.result = e;
                // finish()必须放在修改 state 里面,见下面的分析
                finish();}
        }
    }

    private void finish() {
        // 检查调用者是否为空,如果不为空,唤醒它
        // 调用者在调用 get()方法的进入阻塞状态
        for (Thread c; (c = caller.get()) != null;) {if (caller.compareAndSet(c, null)) {LockSupport.unpark(c);
            }
        }
    }

最后 ,我们再看看 get() 方法,如果任务还未执行,就阻塞等待任务的执行;如果任务已经执行完毕了,直接拿返回值即可;但是,还有一种情况,get()方法执行的过程中 run()方法也在执行,所以 get()方法中的每一步都要检查状态的值有没有变化。

@Override
    public T get() {int s = state.get();
        // 如果任务还未执行完成,判断当前线程是否要进入阻塞状态
        if (s == NEW) {
            // 标识调用者线程是否被标记过
            boolean marked = false;
            for (;;) {
                // 重新获取 state 的值
                s = state.get();
                // 如果 state 大于 NEW 说明完成了,跳出循环
                if (s > NEW) {
                    break;
                    // 此处必须把 caller 的 CAS 更新和 park()方法分成两步处理,不能把 park()放在 CAS 里面
                } else if (!marked) {
                    // 尝试更新调用者线程
                    // 试想断点停在此处【本篇文章由公众号“彤哥读源码”原创】// 此时 state 为 NEW,让 run()方法执行到底,它不会执行 finish()中的 unpark()方法
                    // 这时打开断点,这里会更新 caller 成功,但是循环从头再执行一遍发现 state 已经变了,// 直接在上面的 if(s>NEW)处跳出循环了,因为 finish()在修改 state 内部
                    marked = caller.compareAndSet(null, Thread.currentThread());
                } else {
                    // 调用者线程更新之后 park 当前线程
                    // 试想断点停在此处
                    // 此时 state 为 NEW,让 run()方法执行到底,因为上面的 caller 已经设置值了,// 所以会执行 finish()方法中的 unpark()方法,// 这时再打开断点,这里不会 park 信
                    // 见 unpark()方法的注释,上面写得很清楚:// 如果线程执行了 park()方法,那么执行 unpark()方法会唤醒那个线程
                    // 如果先执行了 unpark()方法,那么线程下一次执行 park()方法将不会阻塞
                    LockSupport.park();}
            }
        }

        if (s == FINISHED) {return (T) result;
        }
        throw new RuntimeException((Throwable) result);
    }

在我们的实现中,如果任务执行的过程抛出异常了,也是通过 result 返回给主线程,这样主线程就拿到了这个异常,它就可以做相应的处理了。

好了,完整的实现到此结束,不知道你领悟了没有。

测试用例

最后奉上测试代码:

public class MyThreadPoolFutureExecutorTest {public static void main(String[] args) {FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
        List<Future<Integer>> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int num = i;
            Future<Integer> future = threadPool.submit(() -> {Thread.sleep(1000);
                System.out.println("running:" + num);
                return num;
            });
            list.add(future);
        }

        for (Future<Integer> future : list) {System.out.println("runned:" + future.get());
        }
    }
}

运行结果:

thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
... 省略被拒绝的任务【本篇文章由公众号“彤哥读源码”原创】discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
running: 3
running: 5
runned: 2
runned: 3
runned: 4
runned: 5
running: 6
running: 7
runned: 6
runned: 7
runned: 8
runned: 9

总结

(1)有返回值的任务是通过包装成将来的任务来实现的,这个任务既具有基本的执行能力,又具有将来获取返回值的能力;

(2)任务执行的异常跟任务正常的返回值是通过同一个返回值返回到主线程的,主线程根据状态判断是异常还是正常值;

(3)我们的实现中运用了单一职责原则、开闭原则等设计原则,对原有代码没有造成任何的入侵;

彩蛋

手写线程池目前只打算写这两章,后面开始进入 jdk 原生线程池的源码分析,敬请期待。

另外,需要手写线程池 完整源码 的同学请关注我的公众号“彤哥读源码”,在后台回复“MyThreadPool”(不带引号)即可领取手写线程池完整源码,注意大小写不要弄错哦,否则彤哥是不会给你的哈。


欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

正文完
 0