前言

今天遇到了一个bug,现象是,一个任务放入线程池中,似乎“没有被执行”,日志也没有打。

经过本地代码调试之后,发现在任务逻辑的前半段,抛出了NPE,但是代码外层没有try-catch,导致这个异常被吃掉。

这个问题解决起来是很简单的,外层加个try-catch就好了,但是这个异常如果没有被catch,线程池内部逻辑是怎么处理这个异常的呢?这个异常最后会跑到哪里呢?

带着疑问和好奇心,我研究了一下线程池那一块的源码,并且做了以下的总结。

源码分析

项目中出问题的代码差不多就是下面这个样子

ExecutorService threadPool = Executors.newFixedThreadPool(3);threadPool.submit(() -> {    String pennyStr = null;    Double penny = Double.valueOf(pennyStr);    ...})

先进到newFixedThreadPool这个工厂方法中看生成的具体实现类,发现是ThreadPoolExecutor

public static ExecutorService newFixedThreadPool(int nThreads) {        return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());    }

再看这个类的继承关系,

再进到submit方法,这个方法在ExecutorService接口中约定,其实是在AbstractExectorService中实现,ThreadPoolExecutor并没有override这个方法。

 public Future<?> submit(Runnable task) {        if (task == null) throw new NullPointerException();        RunnableFuture<Void> ftask = newTaskFor(task, null);        execute(ftask);        return ftask;    }protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {        return new FutureTask<T>(runnable, value);    }

对应的FutureTask对象的构造方法

public FutureTask(Runnable runnable, V result) {        this.callable = Executors.callable(runnable, result);        this.state = NEW;       // state由volatile 修饰 保证多线程下的可见性    }

对应Callable 对象的构造方法

public static <T> Callable<T> callable(Runnable task, T result) {        if (task == null)            throw new NullPointerException();        return new RunnableAdapter<T>(task, result);    }

对应RunnableAdapter 对象的构造方法

 /**     * A callable that runs given task and returns given result     * 一个能执行所给任务并且返回结果的Callable对象     */    static final class RunnableAdapter<T> implements Callable<T> {        final Runnable task;        final T result;        RunnableAdapter(Runnable task, T result) {            this.task = task;            this.result = result;        }        public T call() {            task.run();            return result;        }    }

总结上面的,newTaskFor就是把我们提交的Runnable 对象包装成了一个Future

接下来就是会把任务提交到队列中给线程池调度处理:

public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();            int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }

因为主要关心的是这个线程怎么执行,异常的抛出和处理,所以我们暂时不解析多余的逻辑。很容易发现,如果任务要被执行,肯定是进到了addWorker方法当中,所以我们再进去看,鉴于addWorker方法的很长,不想列太多的代码,我就摘了关键代码段:

private boolean addWorker(Runnable firstTask, boolean core) {   ...   boolean workerStarted = false;   boolean workerAdded = false;   Worker w = null;   try {      // 实例化一个worker对象      w = new Worker(firstTask);      final Thread t = w.thread;      if (t != null) {          final ReentrantLock mainLock = this.mainLock;          mainLock.lock();          try {                          int rs = runStateOf(ctl.get());              if (rs < SHUTDOWN ||                  (rs == SHUTDOWN && firstTask == null)) {                  if (t.isAlive()) // precheck that t is startable                      throw new IllegalThreadStateException();                  workers.add(w);                  int s = workers.size();                  if (s > largestPoolSize)                      largestPoolSize = s;                  workerAdded = true;              }          } finally {              mainLock.unlock();          }          if (workerAdded) {              // 从Worker对象的构造方法看,当这个thread对象start之后,              // 之后实际上就是调用Worker对象的run()              t.start();              workerStarted = true;          }      }   } finally {      if (! workerStarted)          addWorkerFailed(w);   }   return workerStarted;}// Worker的构造方法  Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        } 

我们再看这个ThreadPoolExecutor的内部类Worker对象:

private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable   {        ...        /** Delegates main run loop to outer runWorker  */        public void run() {            runWorker(this);        }      ...   }

看来真正执行任务的是在这个外部的runWorker当中,让我们再看看这个方法是怎么消费Worker线程的。

final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        while (task != null || (task = getTask()) != null) {            w.lock();               if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                beforeExecute(wt, task);                Throwable thrown = null;                // ==== 关键代码 start ====                try {                    // 很简洁明了,调用了任务的run方法                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }                 // ==== 关键代码 end ====            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        processWorkerExit(w, completedAbruptly);    }}

终于走到底了,可以看到关键代码中的try-catch block代码块中,调用了本次执行任务的run方法。

// ==== 关键代码 start ====try {  // 很简洁明了,调用了任务的run方法  task.run();} catch (RuntimeException x) {  thrown = x; throw x;} catch (Error x) {  thrown = x; throw x;} catch (Throwable x) {  thrown = x; throw new Error(x);} finally {  afterExecute(task, thrown);}// ==== 关键代码 end ====

可以看到捕捉了异常之后,会再向外抛出,只不过再finally block 中有个afterExecute()方法,似乎在这里是可以处理这个异常信息的,进去看看

protected void afterExecute(Runnable r, Throwable t) { }

可以看到ThreadPoolExecutor#afterExecute()方法中,是什么都没做的,看来是让使用者通过override这个方法来定制化任务执行之后的逻辑,其中可以包括异常处理。

那么这个异常到底是抛到哪里去了呢。我在一个大佬的文章找到了hotSpot JVM处理线程异常的逻辑,

if (!destroy_vm || JDK_Version::is_jdk12x_version()) {    // JSR-166: change call from from ThreadGroup.uncaughtException to    // java.lang.Thread.dispatchUncaughtException    if (uncaught_exception.not_null()) {      //如果有未捕获的异常      Handle group(this, java_lang_Thread::threadGroup(threadObj()));      {        KlassHandle recvrKlass(THREAD, threadObj->klass());        CallInfo callinfo;        KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass());        /*           这里类似一个方法表,实际就会去调用Thread#dispatchUncaughtException方法         template(dispatchUncaughtException_name,            "dispatchUncaughtException")                        */        LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass,                                           vmSymbols::dispatchUncaughtException_name(),                                           vmSymbols::throwable_void_signature(),                                           KlassHandle(), false, false, THREAD);        CLEAR_PENDING_EXCEPTION;        methodHandle method = callinfo.selected_method();        if (method.not_null()) {          JavaValue result(T_VOID);          JavaCalls::call_virtual(&result,                                  threadObj, thread_klass,                                  vmSymbols::dispatchUncaughtException_name(),                                  vmSymbols::throwable_void_signature(),                                  uncaught_exception,                                  THREAD);        } else {          KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass());          JavaValue result(T_VOID);          JavaCalls::call_virtual(&result,                                  group, thread_group,                                  vmSymbols::uncaughtException_name(),                                  vmSymbols::thread_throwable_void_signature(),                                  threadObj,           // Arg 1                                  uncaught_exception,  // Arg 2                                  THREAD);        }        if (HAS_PENDING_EXCEPTION) {          ResourceMark rm(this);          jio_fprintf(defaultStream::error_stream(),                "\nException: %s thrown from the UncaughtExceptionHandler"                " in thread \"%s\"\n",                pending_exception()->klass()->external_name(),                get_thread_name());          CLEAR_PENDING_EXCEPTION;        }      }    }

代码是C写的,有兴趣可以去全文,根据英文注释能稍微看懂一点

http://hg.openjdk.java.net/jd...

可以看到这里最终会去调用Thread#dispatchUncaughtException方法:

/**     * Dispatch an uncaught exception to the handler. This method is     * intended to be called only by the JVM.     */    private void dispatchUncaughtException(Throwable e) {        getUncaughtExceptionHandler().uncaughtException(this, e);    }
/** * Called by the Java Virtual Machine when a thread in this * thread group stops because of an uncaught exception, and the thread * does not have a specific {@link Thread.UncaughtExceptionHandler} * installed. * */public void uncaughtException(Thread t, Throwable e) {        if (parent != null) {            parent.uncaughtException(t, e);        } else {            Thread.UncaughtExceptionHandler ueh =                Thread.getDefaultUncaughtExceptionHandler();            if (ueh != null) {                ueh.uncaughtException(t, e);            } else if (!(e instanceof ThreadDeath)) {               //可以看到会打到System.err里面                System.err.print("Exception in thread \""                                 + t.getName() + "\" ");                e.printStackTrace(System.err);            }        }    }

jdk的注释也说明的很清楚了,当一个线程抛出了一个未捕获的异常,JVM会去调用这个方法。如果当前线程没有声明UncaughtExceptionHandler成员变量并且重写uncaughtException方法的时候,就会看线程所属的线程组(如果有线程组的话)有没有这个类,没有就会打到System.err里面。

IBM这篇文章也提倡我们使用ThreadGroup 提供的 uncaughtException 处理程序来在线程异常终止时进行检测。

https://www.ibm.com/developer...

总结 (解决方法)

从上述源码分析中可以看到,对于本篇的异常“被吃掉”的问题,有以下几种方法

  1. 用try-catch 捕捉,一般都是用这种
  2. 线程或者线程组对象设置UncaughtExceptionHandler成员变量

      Thread t = new Thread(r);            t.setUncaughtExceptionHandler(                (t1, e) -> LOGGER.error(t1 + " throws exception: " + e));            return t;
  3. override 线程池的afterExecute方法。

本篇虽然是提出问题的解决方法,但主旨还是分析源码,了解了整个过程中异常的经过的流程,希望能对您产生帮助。

参考

  1. https://www.jcp.org/en/jsr/de...
  2. https://www.ibm.com/developer...
  3. http://ifeve.com/%E6%B7%B1%E5...