在日常的开发工作当中,线程池往往承载着一个利用中最重要的业务逻辑,因而咱们有必要更多地去关注线程池的执行状况,包含异样的解决和剖析等。本文次要聚焦在如何正确应用线程池上,以及提供一些实用的倡议。文中会略微波及到一些线程池实现原理方面的常识,然而不会过多开展。

线程池的异样解决

UncaughtExceptionHandler

咱们都晓得Runnable接口中的run办法是不容许抛出异样的,因而派生出这个线程的主线程可能无奈间接取得该线程在执行过程中的异样信息。如下例:

    public static void main(String[] args) throws Exception {        Thread thread = new Thread(() -> {            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);            System.out.println(1 / 0); // 这行会导致报错!        });        thread.setUncaughtExceptionHandler((t, e) -> {            e.printStackTrace(); //如果你把这一行正文掉,这个程序将不会抛出任何异样.        });        thread.start();    }

为什么会这样呢?其实咱们看一下Thread中的源码就会发现,Thread在执行过程中如果遇到了异样,会先判断以后线程是否有设置UncaughtExceptionHandler,如果没有,则会从线程所在的ThreadGroup中获取。

留神:每个线程都有本人的ThreadGroup,即便你没有指定,并且它实现了UncaughtExceptionHandler接口。

咱们看下ThreadGroup中默认的对UncaughtExceptionHandler接口的实现:

    public void uncaughtException(Thread t, Throwable e) {        if (parent != null) {            parent.uncaughtException(t, e);        } else {            Thread.UncaughtExceptionHandler ueh =                Thread.getDefaultUncaughtExceptionHandler();            if (ueh != null) {                ueh.uncaughtException(t, e);            } else if (!(e instanceof ThreadDeath)) {                System.err.print("Exception in thread \""                                 + t.getName() + "\" ");                e.printStackTrace(System.err);            }        }    }

这个ThreadGroup如果有父ThreadGroup,则调用父ThreadGroup的uncaughtException,否则调用全局默认的Thread.DefaultUncaughtExceptionHandler,如果全局的handler也没有设置,则只是简略地将异样信息定位到System.err中,这就是为什么咱们该当在创立线程的时候,去实现它的UncaughtExceptionHandler接口的起因,这么做能够让你更不便地去排查问题。

通过execute提交工作给线程池

回到线程池这个话题,如果咱们向线程池提交的工作中,没有对异样进行try...catch解决,并且运行的时候呈现了异样,那会对线程池造成什么影响呢?答案是没有影响,线程池仍旧能够失常工作,然而异样却被吞掉了。这通常来说不是一个好事件,因为咱们须要拿到原始的异样对象去剖析问题。

那么怎样才能拿到原始的异样对象呢?咱们从线程池的源码着手开始钻研这个问题。当然网上对于线程池的源码解析文章有很多,这里限于篇幅,间接给出最相干的局部代码:

    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }

这个办法就是真正去执行提交给线程池的工作的代码。

这里咱们略去其中不相干的逻辑,重点关注第19行到第32行的逻辑,其中第23行是真正开始执行提交给线程池的工作,那么第20行是干什么的呢?其实就是在执行提交给线程池的工作之前能够做一些前置工作,同样的,咱们看到第31行,这个是在执行完提交的工作之后,能够做一些后置工作。

beforeExecute这个咱们暂且不论,重点关注下afterExecute这个办法。咱们能够看到,在执行工作过程中,一旦抛出任何类型的异样,都会提交给afterExecute这个办法,然而查看线程池的源代码咱们能够发现,默认的afterExecute是个空实现,因而,咱们有必要继承ThreadPoolExecutor去实现这个afterExecute办法。

看源码咱们能够发现这个afterExecute办法是protected类型的,从官网正文上也能够看到,这个办法就是举荐子类去实现的。

当然,这个办法不能随便去实现,须要遵循肯定的步骤,具体的官网正文也有讲,这里摘抄如下:

     *  <pre> {@code     * class ExtendedExecutor extends ThreadPoolExecutor {     *   // ...     *   protected void afterExecute(Runnable r, Throwable t) {     *     super.afterExecute(r, t);     *     if (t == null && r instanceof Future<?>) {     *       try {     *         Object result = ((Future<?>) r).get();     *       } catch (CancellationException ce) {     *           t = ce;     *       } catch (ExecutionException ee) {     *           t = ee.getCause();     *       } catch (InterruptedException ie) {     *           Thread.currentThread().interrupt(); // ignore/reset     *       }     *     }     *     if (t != null)     *       System.out.println(t);     *   }     * }}</pre>

那么通过这种形式,就能够将原先可能被线程池吞掉的异样胜利捕捉到,从而便于排查问题。

然而这里还有个小问题,咱们留神到在runWorker办法中,执行task.run();语句之后,各种类型的异样都被抛出了,那这些被抛出的异样去了哪里?事实上这里的异样对象最终会被传入到Thread的dispatchUncaughtException办法中,源码如下:

    private void dispatchUncaughtException(Throwable e) {        getUncaughtExceptionHandler().uncaughtException(this, e);    }

能够看到它会去获取UncaughtExceptionHandler的实现类,而后调用其中的uncaughtException办法,这也就回到了咱们上一大节所剖析的UncaughtExceptionHandler实现的具体逻辑。那么为了拿到最原始的异样对象,除了实现UncaughtExceptionHandler接口之外,也能够思考实现afterExecute办法。

通过submit提交工作到线程池

这个同样很简略,咱们还是先回到submit办法的源码:

    public <T> Future<T> submit(Callable<T> task) {        if (task == null) throw new NullPointerException();        RunnableFuture<T> ftask = newTaskFor(task);        execute(ftask);        return ftask;    }

这里的execute办法调用的是ThreadPoolExecutor中的execute办法,执行逻辑跟通过execute提交工作到线程池是一样的。咱们先重点关注这里的newTaskFor办法,其源码如下:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {        return new FutureTask<T>(callable);    }

能够看到提交的Callable对象用FutureTask封装起来了。咱们晓得最终会执行到上述runWorker这个办法中,并且最外围的执行逻辑就是task.run();这行代码。咱们晓得这里的task其实是FutureTask类型,因而咱们有必要看一下FutureTask中的run办法的实现:

    public void run() {        if (state != NEW ||            !UNSAFE.compareAndSwapObject(this, runnerOffset,                                         null, Thread.currentThread()))            return;        try {            Callable<V> c = callable;            if (c != null && state == NEW) {                V result;                boolean ran;                try {                    result = c.call();                    ran = true;                } catch (Throwable ex) {                    result = null;                    ran = false;                    setException(ex);                }                if (ran)                    set(result);            }        } finally {            // runner must be non-null until state is settled to            // prevent concurrent calls to run()            runner = null;            // state must be re-read after nulling runner to prevent            // leaked interrupts            int s = state;            if (s >= INTERRUPTING)                handlePossibleCancellationInterrupt(s);        }    }

能够看到这其中跟异样相干的最要害的代码就在第17行,也就是setException(ex);这个中央。咱们看一下这个中央的实现:

    protected void setException(Throwable t) {        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            outcome = t;            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state            finishCompletion();        }    }

这里最要害的中央就是将异样对象赋值给了outcome,outcome是FutureTask中的成员变量,咱们通过调用submit办法,拿到一个Future对象之后,再调用它的get办法,其中最外围的办法就是report办法,上面给出每个办法的源码:

首先是get办法:

    public V get() throws InterruptedException, ExecutionException {        int s = state;        if (s <= COMPLETING)            s = awaitDone(false, 0L);        return report(s);    }

能够看到最终调用了report办法,其源码如下:

    private V report(int s) throws ExecutionException {        Object x = outcome;        if (s == NORMAL)            return (V)x;        if (s >= CANCELLED)            throw new CancellationException();        throw new ExecutionException((Throwable)x);    }

下面是一些状态判断,如果当前任务不是失常执行结束,或者被勾销的话,那么这里的x其实就是原始的异样对象,能够看到会被ExecutionException包装。因而在你调用get办法时,可能会抛出ExecutionException异样,那么调用它的getCause办法就能够拿到最原始的异样对象了。

综上所述,针对提交给线程池的工作可能会抛出异样这一问题,次要有以下两种解决思路:

  1. 在提交的工作当中自行try...catch,但这里有个不好的中央就是如果你会提交多种类型的工作到线程池中,每种类型的工作都须要自行将异样try...catch住,比拟繁琐。而且如果你只是catch(Exception e),可能仍然会漏掉一些包含Error类型的异样,那为了保险起见,能够思考catch(Throwable t)。
  2. 自行实现线程池的afterExecute办法,或者实现Thread的UncaughtExceptionHandler接口。

上面给出我集体创立线程池的一个示例,供大家参考:

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE);    statisticsThreadPool = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_MAX_POOL_SIZE,            60, TimeUnit.SECONDS, queue, new ThreadFactoryBuilder()            .setThreadFactory(new ThreadFactory() {                private int count = 0;                private String prefix = "StatisticsTask";                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, prefix + "-" + count++);                }            }).setUncaughtExceptionHandler((t, e) -> {                String threadName = t.getName();                logger.error("statisticsThreadPool error occurred! threadName: {}, error msg: {}", threadName, e.getMessage(), e);            }).build(), (r, executor) -> {        if (!executor.isShutdown()) {            logger.warn("statisticsThreadPool is too busy! waiting to insert task to queue! ");            Uninterruptibles.putUninterruptibly(executor.getQueue(), r);        }    }) {        @Override        protected void afterExecute(Runnable r, Throwable t) {            super.afterExecute(r, t);            if (t == null && r instanceof Future<?>) {                try {                    Future<?> future = (Future<?>) r;                    future.get();                } catch (CancellationException ce) {                    t = ce;                } catch (ExecutionException ee) {                    t = ee.getCause();                } catch (InterruptedException ie) {                    Thread.currentThread().interrupt(); // ignore/reset                }            }            if (t != null) {                logger.error("statisticsThreadPool error msg: {}", t.getMessage(), t);            }        }    };    statisticsThreadPool.prestartAllCoreThreads();

线程数的设置

咱们晓得工作个别有两种:CPU密集型和IO密集型。那么面对CPU密集型的工作,线程数不宜过多,个别抉择CPU外围数+1或者外围数的2倍是比拟正当的一个值。因而咱们能够思考将corePoolSize设置为CPU外围数+1,maxPoolSize设置为外围数的2倍。

同样的,面对IO密集型工作时,咱们能够思考以外围数乘以4倍作为外围线程数,而后外围数乘以5倍作为最大线程数的形式去设置线程数,这样的设置会比间接拍脑袋设置一个值会更正当一些。

当然总的线程数不宜过多,管制在100个线程以内比拟正当,否则线程数过多可能会导致频繁地上下文切换,导致系统性能反不如前。

如何正确敞开一个线程池

说到如何正确去敞开一个线程池,这外面也有点考究。为了实现优雅停机的指标,咱们该当先调用shutdown办法,调用这个办法也就意味着,这个线程池不会再接管任何新的工作,然而曾经提交的工作还会继续执行,包含队列中的。所以,之后你还该当调用awaitTermination办法,这个办法能够设定线程池在敞开之前的最大超时工夫,如果在超时工夫完结之前线程池可能失常敞开,这个办法会返回true,否则,一旦超时,就会返回false。通常来说咱们不可能无限度地期待上来,因而须要咱们当时预估一个正当的超时工夫,而后去应用这个办法。

如果awaitTermination办法返回false,你又心愿尽可能在线程池敞开之后再做其余资源回收工作,能够思考再调用一下shutdownNow办法,此时队列中所有尚未被解决的工作都会被抛弃,同时会设置线程池中每个线程的中断标记位。shutdownNow并不保障肯定能够让正在运行的线程进行工作,除非提交给线程的工作可能正确响应中断。到了这一步,能够思考持续调用awaitTermination办法,或者间接放弃,去做接下来要做的事件。

线程池中的其余有用办法

大家可能有留意到,我在创立线程池的时候,还调用了这个办法:prestartAllCoreThreads。这个办法有什么作用呢?咱们晓得一个线程池创立进去之后,在没有给它提交任何工作之前,这个线程池中的线程数为0。有时候咱们当时晓得会有很多工作会提交给这个线程池,然而等它一个个去创立新线程开销太大,影响零碎性能,因而能够思考在创立线程池的时候就将所有的外围线程全副一次性创立结束,这样零碎起来之后就能够间接应用了。

其实线程池中还提供了其余一些比拟有意思的办法。比方咱们当初构想一个场景,当一个线程池负载很高,快要撑爆导致触发回绝策略时,有没有什么方法能够缓解这一问题?其实是有的,因为线程池提供了设置外围线程数和最大线程数的办法,它们别离是setCorePoolSize办法setMaximumPoolSize办法。是的,线程池创立结束之后也是能够更改其线程数的!因而,面对线程池高负荷运行的状况,咱们能够这么解决:

  1. 起一个定时轮询线程(守护类型),定时检测线程池中的线程数,具体来说就是调用getActiveCount办法。
  2. 当发现线程数超过了外围线程数大小时,能够思考将CorePoolSize和MaximumPoolSize的数值同时乘以2,当然这里不倡议设置很大的线程数,因为并不是线程越多越好的,能够思考设置一个上限值,比方50、100之类的。
  3. 同时,去获取队列中的工作数,具体来说是调用getQueue办法再调用size办法。当队列中的工作数少于队列大小的二分之一时,咱们能够认为当初线程池的负载没有那么高了,因而能够思考在线程池先前有扩容过的状况下,将CorePoolSize和MaximumPoolSize还原回去,也就是除以2。

具体来说如下图:


以上是我集体倡议的一种应用线程池的形式。

线程池肯定是最佳计划吗?

线程池并非在任何状况下都是性能最优的计划。如果是一个谋求极致性能的场景,能够思考应用Disruptor,这是一个高性能队列。排除Disruptor不谈,单纯基于JDK的话会不会有更好的计划?答案是有的。

咱们晓得在一个线程池中,多个线程是共用一个队列的,因而在工作很多的状况下,须要对这个队列进行频繁读写,为了避免抵触因而须要加锁。事实上在浏览线程池源代码的时候就能够发现,外面充斥着各种加锁的代码,那有没有更好的实现形式呢?

其实咱们能够思考创立一个由单线程线程池形成的列表,每个线程池都应用有界队列这种形式去实现多线程。这么做的益处是,每个线程池中的队列都只会被一个线程去操作,这样就没有竞争的问题。

其实这种用空间换工夫的思路借鉴了Netty中EventLoop的实现机制。试想,如果线程池的性能真的有那么好,为什么Netty不必呢?

其余须要留神的中央

  1. 任何状况下都不应该应用可伸缩线程池(线程的创立和销毁开销是很大的)。
  2. 任何状况下都不应该应用无界队列,单测除外。有界队列罕用的有ArrayBlockingQueue和LinkedBlockingQueue,前者基于数组实现,后者基于链表。从性能体现上来看,LinkedBlockingQueue的吞吐量更高然而性能并不稳固,理论状况下该当应用哪一种倡议自行测试之后决定。顺便说一句,Executors的newFixedThreadPool采纳的是LinkedBlockingQueue。
  3. 举荐自行实现RejectedExecutionHandler,JDK自带的都不是很好用,你能够在外面实现本人的逻辑。如果须要一些特定的上下文信息,能够在Runnable实现类中增加一些本人的货色,这样在RejectedExecutionHandler中就能够间接应用了。

怎么做到不丢工作

这里其实指的是一种非凡状况,就是比方忽然遇到了一股流量尖峰,导致线程池负载曾经十分高了,即快要触发回绝策略的时候,咱们能够怎么做来尽量避免提交的工作失落。一般来说当遇到这种状况的时候,该当尽快触发报警告诉研发人员来解决。之后不论是限流也好,还是减少机器也好,甚至是上Kafka、Redis甚至是数据库用来暂存工作数据也是能够的,但毕竟远水救不了近火,如果咱们心愿在正式解决这个问题之前,先尽可能地缓解,能够思考怎么做呢?

首先能够思考的就是我后面提到的动静增大线程池中的线程数,然而如果曾经扩容过了,此时不应持续扩容,否则可能导致系统的吞吐量更低。在这种状况下,该当自行实现RejectedExecutionHandler,具体来说就是在实现类中,独自开一个单线程的线程池,而后调用原线程池的getQueue办法的put办法,将塞不进去的工作再次尝试塞进去。当然在队列满的时候是塞不进去的,但那至多也只是阻塞了这个独自的线程而已,并不影响主流程。

当然,这种计划是治标不治本的,面对流量激增这种场景其实业界有很多成熟的做法,只是单纯从线程池的角度来看的话,这种形式不失为一种长期无效的解决方案。

起源:https://my.oschina.net/editor...
欢送关注公众号 【码农开花】一起学习成长
我会始终分享Java干货,也会分享收费的学习材料课程和面试宝典
回复:【计算机】【设计模式】【面试】有惊喜哦