关于java:分析源码学会正确使用-Java-线程池

36次阅读

共计 10068 个字符,预计需要花费 26 分钟才能阅读完成。

在日常的开发工作当中,线程池往往承载着一个利用中最重要的业务逻辑,因而咱们有必要更多地去关注线程池的执行状况,包含异样的解决和剖析等。本文次要聚焦在如何正确应用线程池上,以及提供一些实用的倡议。文中会略微波及到一些线程池实现原理方面的常识,然而不会过多开展。

线程池的异样解决

UncaughtExceptionHandler

咱们都晓得 Runnable 接口中的 run 办法是不容许抛出异样的,因而派生出这个线程的主线程可能无奈间接取得该线程在执行过程中的异样信息。如下例:

    public static void main(String[] args) throws Exception {Thread thread = new Thread(() -> {Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
            System.out.println(1 / 0); // 这行会导致报错!});
        thread.setUncaughtExceptionHandler((t, e) -> {e.printStackTrace(); // 如果你把这一行正文掉,这个程序将不会抛出任何异样.
        });
        thread.start();}

为什么会这样呢?其实咱们看一下 Thread 中的源码就会发现,Thread 在执行过程中如果遇到了异样,会先判断以后线程是否有设置 UncaughtExceptionHandler,如果没有,则会从线程所在的 ThreadGroup 中获取。

留神:每个线程都有本人的 ThreadGroup,即便你没有指定,并且它实现了 UncaughtExceptionHandler 接口。

咱们看下 ThreadGroup 中默认的对 UncaughtExceptionHandler 接口的实现:

    public void uncaughtException(Thread t, Throwable e) {if (parent != null) {parent.uncaughtException(t, e);
        } else {
            Thread.UncaughtExceptionHandler ueh =
                Thread.getDefaultUncaughtExceptionHandler();
            if (ueh != null) {ueh.uncaughtException(t, e);
            } else if (!(e instanceof ThreadDeath)) {
                System.err.print("Exception in thread \""
                                 + t.getName() + "\" ");
                e.printStackTrace(System.err);
            }
        }
    }

这个 ThreadGroup 如果有父 ThreadGroup,则调用父 ThreadGroup 的 uncaughtException,否则调用全局默认的 Thread.DefaultUncaughtExceptionHandler,如果全局的 handler 也没有设置,则只是简略地将异样信息定位到 System.err 中,这就是为什么咱们该当在创立线程的时候,去实现它的 UncaughtExceptionHandler 接口的起因,这么做能够让你更不便地去排查问题。

通过 execute 提交工作给线程池

回到线程池这个话题,如果咱们向线程池提交的工作中,没有对异样进行 try…catch 解决,并且运行的时候呈现了异样,那会对线程池造成什么影响呢?答案是没有影响,线程池仍旧能够失常工作,然而 异样却被吞掉了。这通常来说不是一个好事件,因为咱们须要拿到原始的异样对象去剖析问题。

那么怎样才能拿到原始的异样对象呢?咱们从线程池的源码着手开始钻研这个问题。当然网上对于线程池的源码解析文章有很多,这里限于篇幅,间接给出最相干的局部代码:

    final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {while (task != null || (task = getTask()) != null) {w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {task.run();
                    } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {processWorkerExit(w, completedAbruptly);
        }
    }

这个办法就是真正去执行提交给线程池的工作的代码。

这里咱们略去其中不相干的逻辑,重点关注第 19 行到第 32 行的逻辑,其中第 23 行是真正开始执行提交给线程池的工作,那么第 20 行是干什么的呢?其实就是在执行提交给线程池的工作之前能够做一些前置工作,同样的,咱们看到第 31 行,这个是在执行完提交的工作之后,能够做一些后置工作。

beforeExecute 这个咱们暂且不论,重点关注下 afterExecute 这个办法。咱们能够看到,在执行工作过程中,一旦抛出任何类型的异样,都会提交给 afterExecute 这个办法,然而查看线程池的源代码咱们能够发现,默认的 afterExecute 是个空实现,因而,咱们有必要继承 ThreadPoolExecutor 去实现这个 afterExecute 办法。

看源码咱们能够发现这个 afterExecute 办法是 protected 类型的,从官网正文上也能够看到,这个办法就是举荐子类去实现的。

当然,这个办法不能随便去实现,须要遵循肯定的步骤,具体的官网正文也有讲,这里摘抄如下:

     *  <pre> {@code
     * class ExtendedExecutor extends ThreadPoolExecutor {
     *   // ...
     *   protected void afterExecute(Runnable r, Throwable t) {*     super.afterExecute(r, t);
     *     if (t == null && r instanceof Future<?>) {
     *       try {*         Object result = ((Future<?>) r).get();
     *       } catch (CancellationException ce) {
     *           t = ce;
     *       } catch (ExecutionException ee) {*           t = ee.getCause();
     *       } catch (InterruptedException ie) {*           Thread.currentThread().interrupt(); // ignore/reset
     *       }
     *     }
     *     if (t != null)
     *       System.out.println(t);
     *   }
     * }}</pre>

那么通过这种形式,就能够将原先可能被线程池吞掉的异样胜利捕捉到,从而便于排查问题。

然而这里还有个小问题,咱们留神到在 runWorker 办法中,执行 task.run(); 语句之后,各种类型的异样都被抛出了,那这些被抛出的异样去了哪里?事实上这里的异样对象最终会被传入到 Thread 的 dispatchUncaughtException 办法中,源码如下:

    private void dispatchUncaughtException(Throwable e) {getUncaughtExceptionHandler().uncaughtException(this, e);
    }

能够看到它会去获取 UncaughtExceptionHandler 的实现类,而后调用其中的 uncaughtException 办法,这也就回到了咱们上一大节所剖析的 UncaughtExceptionHandler 实现的具体逻辑。那么为了拿到最原始的异样对象,除了实现 UncaughtExceptionHandler 接口之外,也能够思考实现 afterExecute 办法。

通过 submit 提交工作到线程池

这个同样很简略,咱们还是先回到 submit 办法的源码:

    public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

这里的 execute 办法调用的是 ThreadPoolExecutor 中的 execute 办法,执行逻辑跟通过 execute 提交工作到线程池是一样的。咱们先重点关注这里的 newTaskFor 办法,其源码如下:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
    }

能够看到提交的 Callable 对象用 FutureTask 封装起来了。咱们晓得最终会执行到上述 runWorker 这个办法中,并且最外围的执行逻辑就是 task.run(); 这行代码。咱们晓得这里的 task 其实是 FutureTask 类型,因而咱们有必要看一下 FutureTask 中的 run 办法的实现:

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

能够看到这其中跟异样相干的最要害的代码就在第 17 行,也就是 setException(ex); 这个中央。咱们看一下这个中央的实现:

    protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();}
    }

这里最要害的中央就是将异样对象赋值给了 outcome,outcome 是 FutureTask 中的成员变量,咱们通过调用 submit 办法,拿到一个 Future 对象之后,再调用它的 get 办法,其中最外围的办法就是 report 办法,上面给出每个办法的源码:

首先是 get 办法:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

能够看到最终调用了 report 办法,其源码如下:

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

下面是一些状态判断,如果当前任务不是失常执行结束,或者被勾销的话,那么这里的 x 其实就是原始的异样对象,能够看到会被 ExecutionException 包装。因而在你调用 get 办法时,可能会抛出 ExecutionException 异样,那么调用它的 getCause 办法就能够拿到最原始的异样对象了。

综上所述,针对提交给线程池的工作可能会抛出异样这一问题,次要有以下两种解决思路:

  1. 在提交的工作当中自行 try…catch,但这里有个不好的中央就是如果你会提交多种类型的工作到线程池中,每种类型的工作都须要自行将异样 try…catch 住,比拟繁琐。而且如果你只是 catch(Exception e),可能仍然会漏掉一些包含 Error 类型的异样,那为了保险起见,能够思考 catch(Throwable t)。
  2. 自行实现线程池的 afterExecute 办法,或者实现 Thread 的 UncaughtExceptionHandler 接口。

上面给出我集体创立线程池的一个示例,供大家参考:

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);
    statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,
            60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder()
            .setThreadFactory(new ThreadFactory() {
                private int count = 0;
                private String prefix = "StatisticsTask";

                @Override
                public Thread newThread(Runnable r) {return new Thread(r, prefix + "-" + count++);
                }
            }).setUncaughtExceptionHandler((t, e) -> {String threadName = t.getName();
                logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);
            }).build(), (r, executor) -> {if (!executor.isShutdown()) {logger.warn("statisticsThreadPool is too busy! waiting to insert task to queue!");
            Uninterruptibles.putUninterruptibly(executor.getQueue(), r);
        }
    }) {
        @Override
        protected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);
            if (t == null && r instanceof Future<?>) {
                try {Future<?> future = (Future<?>) r;
                    future.get();} catch (CancellationException ce) {t = ce;} catch (ExecutionException ee) {t = ee.getCause();
                } catch (InterruptedException ie) {Thread.currentThread().interrupt(); // ignore/reset}
            }
            if (t != null) {logger.error("statisticsThreadPool error msg: {}", t.getMessage(), t);
            }
        }
    };
    statisticsThreadPool.prestartAllCoreThreads();

线程数的设置

咱们晓得工作个别有两种:CPU 密集型和 IO 密集型。那么面对 CPU 密集型的工作,线程数不宜过多,个别抉择 CPU 外围数 + 1 或者外围数的 2 倍是比拟正当的一个值。因而咱们能够思考将 corePoolSize 设置为 CPU 外围数 +1,maxPoolSize 设置为外围数的 2 倍。

同样的,面对 IO 密集型工作时,咱们能够思考以外围数乘以 4 倍作为外围线程数,而后外围数乘以 5 倍作为最大线程数的形式去设置线程数,这样的设置会比间接拍脑袋设置一个值会更正当一些。

当然总的线程数不宜过多,管制在 100 个线程以内比拟正当,否则线程数过多可能会导致频繁地上下文切换,导致系统性能反不如前。

如何正确敞开一个线程池

说到如何正确去敞开一个线程池,这外面也有点考究。为了实现优雅停机的指标,咱们该当先调用 shutdown 办法,调用这个办法也就意味着,这个线程池不会再接管任何新的工作,然而曾经提交的工作还会继续执行,包含队列中的。所以,之后你还该当调用 awaitTermination 办法,这个办法能够设定线程池在敞开之前的最大超时工夫,如果在超时工夫完结之前线程池可能失常敞开,这个办法会返回 true,否则,一旦超时,就会返回 false。通常来说咱们不可能无限度地期待上来,因而须要咱们当时预估一个正当的超时工夫,而后去应用这个办法。

如果 awaitTermination 办法返回 false,你又心愿尽可能在线程池敞开之后再做其余资源回收工作,能够思考再调用一下 shutdownNow 办法,此时队列中所有尚未被解决的工作都会被抛弃,同时会设置线程池中每个线程的中断标记位。shutdownNow 并不保障肯定能够让正在运行的线程进行工作,除非提交给线程的工作可能正确响应中断。到了这一步,能够思考持续调用 awaitTermination 办法,或者间接放弃,去做接下来要做的事件。

线程池中的其余有用办法

大家可能有留意到,我在创立线程池的时候,还调用了这个办法:prestartAllCoreThreads。这个办法有什么作用呢?咱们晓得一个线程池创立进去之后,在没有给它提交任何工作之前,这个线程池中的线程数为 0。有时候咱们当时晓得会有很多工作会提交给这个线程池,然而等它一个个去创立新线程开销太大,影响零碎性能,因而能够思考在创立线程池的时候就将所有的外围线程全副一次性创立结束,这样零碎起来之后就能够间接应用了。

其实线程池中还提供了其余一些比拟有意思的办法。比方咱们当初构想一个场景,当一个线程池负载很高,快要撑爆导致触发回绝策略时,有没有什么方法能够缓解这一问题?其实是有的,因为线程池提供了设置外围线程数和最大线程数的办法,它们别离是 setCorePoolSize 办法setMaximumPoolSize 办法 。是的, 线程池创立结束之后也是能够更改其线程数的!因而,面对线程池高负荷运行的状况,咱们能够这么解决:

  1. 起一个定时轮询线程(守护类型),定时检测线程池中的线程数,具体来说就是调用 getActiveCount 办法。
  2. 当发现线程数超过了外围线程数大小时,能够思考将 CorePoolSize 和 MaximumPoolSize 的数值同时乘以 2,当然这里不倡议设置很大的线程数,因为并不是线程越多越好的,能够思考设置一个上限值,比方 50、100 之类的。
  3. 同时,去获取队列中的工作数,具体来说是调用 getQueue 办法再调用 size 办法。当队列中的工作数少于队列大小的二分之一时,咱们能够认为当初线程池的负载没有那么高了,因而能够思考在线程池先前有扩容过的状况下,将 CorePoolSize 和 MaximumPoolSize 还原回去,也就是除以 2。

具体来说如下图:


以上是我集体倡议的一种应用线程池的形式。

线程池肯定是最佳计划吗?

线程池并非在任何状况下都是性能最优的计划。如果是一个谋求极致性能的场景,能够思考应用 Disruptor,这是一个高性能队列。排除 Disruptor 不谈,单纯基于 JDK 的话会不会有更好的计划?答案是有的。

咱们晓得在一个线程池中,多个线程是共用一个队列的,因而在工作很多的状况下,须要对这个队列进行频繁读写,为了避免抵触因而须要加锁。事实上在浏览线程池源代码的时候就能够发现,外面充斥着各种加锁的代码,那有没有更好的实现形式呢?

其实咱们能够思考创立一个由单线程线程池形成的列表,每个线程池都应用有界队列这种形式去实现多线程。这么做的益处是,每个线程池中的队列都只会被一个线程去操作,这样就没有竞争的问题。

其实这种用空间换工夫的思路借鉴了 Netty 中 EventLoop 的实现机制。试想,如果线程池的性能真的有那么好,为什么 Netty 不必呢?

其余须要留神的中央

  1. 任何状况下都不应该应用可伸缩线程池(线程的创立和销毁开销是很大的)。
  2. 任何状况下都不应该应用无界队列,单测除外。有界队列罕用的有 ArrayBlockingQueue 和 LinkedBlockingQueue,前者基于数组实现,后者基于链表。从性能体现上来看,LinkedBlockingQueue 的吞吐量更高然而性能并不稳固,理论状况下该当应用哪一种倡议自行测试之后决定。顺便说一句,Executors 的 newFixedThreadPool 采纳的是 LinkedBlockingQueue。
  3. 举荐自行实现 RejectedExecutionHandler,JDK 自带的都不是很好用,你能够在外面实现本人的逻辑。如果须要一些特定的上下文信息,能够在 Runnable 实现类中增加一些本人的货色,这样在 RejectedExecutionHandler 中就能够间接应用了。

怎么做到不丢工作

这里其实指的是一种非凡状况,就是比方忽然遇到了一股流量尖峰,导致线程池负载曾经十分高了,即快要触发回绝策略的时候,咱们能够怎么做来尽量避免提交的工作失落。一般来说当遇到这种状况的时候,该当尽快触发报警告诉研发人员来解决。之后不论是限流也好,还是减少机器也好,甚至是上 Kafka、Redis 甚至是数据库用来暂存工作数据也是能够的,但毕竟远水救不了近火,如果咱们心愿在正式解决这个问题之前,先尽可能地缓解,能够思考怎么做呢?

首先能够思考的就是我后面提到的动静增大线程池中的线程数,然而如果曾经扩容过了,此时不应持续扩容,否则可能导致系统的吞吐量更低。在这种状况下,该当自行实现 RejectedExecutionHandler,具体来说就是在实现类中,独自开一个单线程的线程池,而后调用原线程池的 getQueue 办法的 put 办法,将塞不进去的工作再次尝试塞进去。当然在队列满的时候是塞不进去的,但那至多也只是阻塞了这个独自的线程而已,并不影响主流程。

当然,这种计划是治标不治本的,面对流量激增这种场景其实业界有很多成熟的做法,只是单纯从线程池的角度来看的话,这种形式不失为一种长期无效的解决方案。

起源:https://my.oschina.net/editor…
欢送关注公众号【码农开花】一起学习成长
我会始终分享 Java 干货,也会分享收费的学习材料课程和面试宝典
回复:【计算机】【设计模式】【面试】有惊喜哦

正文完
 0