乐趣区

关于线程池:10问10答你真的了解线程池吗

简介:《Java 开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用 ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成 OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨 ThreadPoolExecutor 的一些技术细节,并且给出几个罕用的最佳实际倡议。

作者 | 风楼
起源 | 阿里技术公众号

《Java 开发手册》中强调,线程资源必须通过线程池提供,而创立线程池必须应用 ThreadPoolExecutor。手册次要强调利用线程池防止两个问题,一是线程过渡切换,二是防止申请过多时造成 OOM。然而如果参数配置谬误,还是会引发下面的两个问题。所以本节咱们次要是探讨 ThreadPoolExecutor 的一些技术细节,并且给出几个罕用的最佳实际倡议。

我在查找材料的过程中,发现有些问题存在争议。前面发现,一部分起因是因为不同 JDK 版本的事实是有差别的。因而,上面的剖析是基于当下最罕用的版本 JDK1.8,并且对于存在争议的问题,咱们剖析源码,源码才是最精确的。

1 corePoolSize= 0 会怎么样

这是一个争议点。我发现大部分博文,不论是国内的还是国外的,都是这样答复这个问题的:

  • 提交工作后,先判断以后池中线程数是否小于 corePoolSize,如果小于,则创立新线程执行这个工作。
  • 否者,判断期待队列是否已满,如果没有满,则增加到期待队列。
  • 否者,判断以后池中线程数是否大于 maximumPoolSize,如果大于则回绝。
  • 否者,创立一个新的线程执行这个工作。

依照下面的形容,如果 corePoolSize=0,则会判断期待队列的容量,如果还有容量,则排队,并且不会创立新的线程。

—— 但其实,这是老版本的实现形式,从 1.6 之后,实现形式就变了。咱们间接看 execute 的源码(submit 也依赖它),我备注出了要害一行:

int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
                return;
            c = ctl.get();}
        if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 留神这一行代码,增加到期待队列胜利后,判断以后池内线程数是否为 0,如果是则创立一个 firstTask 为 null 的 worker,这个 worker 会从期待队列中获取工作并执行。else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
  • 线程池提交工作后,首先判断以后池中线程数是否小于 corePoolSize。
  • 如果小于则尝试创立新的线程执行该工作;否则尝试增加到期待队列。
  • 如果增加队列胜利,判断以后池内线程数是否为 0,如果是则创立一个 firstTask 为 null 的 worker,这个 worker 会从期待队列中获取工作并执行。
  • 如果增加到期待队列失败,个别是队列已满,才会再尝试创立新的线程。
  • 但在创立之前须要与 maximumPoolSize 比拟,如果小于则创立胜利。
  • 否则执行回绝策略。

上述问题需辨别 JDK 版本。在 1.6 版本之后,如果 corePoolSize=0,提交工作时如果线程池为空,则会立刻创立一个线程来执行工作(先排队再获取);如果提交工作的时候,线程池不为空,则先在期待队列中排队,只有队列满了才会创立新线程。

所以,优化在于,在队列没有满的这段时间内,会有一个线程在生产提交的工作;1.6 之前的实现是,必须等队列满了之后,才开始生产。

2 线程池创立之后,会立刻创立外围线程么

之前有人问过我这个问题,因为他发现利用中有些 Bean 创立了线程池,然而这个 Bean 个别状况下用不到,所以征询我是否须要把这个线程池正文掉,以缩小利用运行时的线程数(该利用运行时线程过多。)

不会。从下面的源码能够看出,在刚刚创立 ThreadPoolExecutor 的时候,线程并不会立刻启动,而是要等到有工作提交时才会启动,除非调用了 prestartCoreThread/prestartAllCoreThreads 当时启动外围线程。

  • prestartCoreThread:Starts a core thread, causing it to idly wait for work. This overrides the default policy of starting core threads only when new tasks are executed.
  • prestartAllCoreThreads:Starts all core threads.

3 外围线程永远不会销毁么

这个问题有点 tricky。首先咱们要明确一下概念,尽管在 JavaDoc 中也应用了“core/non-core threads”这样的形容,但其实这是一个动静的概念,JDK 并没有给一部分线程打上“core”的标记,做什么特殊化的解决。这个问题我认为想要探讨的是闲置线程终结策略的问题。

在 JDK1.6 之前,线程池会尽量放弃 corePoolSize 个外围线程,即便这些线程闲置了很长时间。这一点曾被开发者诟病,所以从 JDK1.6 开始,提供了办法 allowsCoreThreadTimeOut,如果传参为 true,则容许闲置的外围线程被终止。

请留神这种策略和 corePoolSize= 0 的区别。我总结的区别是:

  • corePoolSize=0:在个别状况下只应用一个线程生产工作,只有当并发申请特地多、期待队列都满了之后,才开始用多线程。
  • allowsCoreThreadTimeOut=true && corePoolSize>1:在个别状况下就开始应用多线程(corePoolSize 个),当并发申请特地多,期待队列都满了之后,持续加大线程数。然而当申请没有的时候,容许外围线程也终止。

所以 corePoolSize= 0 的成果,根本等同于 allowsCoreThreadTimeOut=true && corePoolSize=1,但实现细节其实不同。

在 JDK1.6 之后,如果 allowsCoreThreadTimeOut=true,外围线程也能够被终止。

4 如何保障线程不被销毁

首先咱们要明确一下线程池模型。线程池有个外部类 Worker,它实现了 Runnable 接口,首先,它本人要 run 起来。而后它会在适合的时候获取咱们提交的 Runnable 工作,而后调用工作的 run()接口。一个 Worker 不终止的话能够一直执行工作。

咱们后面说的“线程池中的线程”,其实就是 Worker;期待队列中的元素,是咱们提交的 Runnable 工作。

每一个 Worker 在创立进去的时候,会调用它自身的 run()办法,实现是 runWorker(this),这个实现的外围是一个 while 循环,这个循环不完结,Worker 线程就不会终止,就是这个根本逻辑。

  • 在这个 while 条件中,有个 getTask()办法是外围中的外围,它所做的事件就是从期待队列中取出工作来执行:
  • 如果没有达到 corePoolSize,则创立的 Worker 在执行完它承接的工作后,会用 workQueue.take()取工作、留神,这个接口是阻塞接口,如果取不到工作,Worker 线程始终阻塞。
  • 如果超过了 corePoolSize,或者 allowCoreThreadTimeOut,一个 Worker 在闲暇了之后,会用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取工作。留神,这个接口只阻塞期待 keepAliveTime 工夫,超过这个工夫返回 null,则 Worker 的 while 循环执行完结,则被终止了。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 看这里,外围逻辑在这里
            while (task != null || (task = getTask()) != null) {w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {task.run();
                    } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                    } finally {afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();}
            }
            completedAbruptly = false;
        } finally {processWorkerExit(w, completedAbruptly);
        }
    }
    private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?

        for (;;) {int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 留神,外围中的外围在这里
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {timedOut = false;}
        }
    }

实现形式十分奇妙,外围线程(Worker)即便始终闲暇也不终止,是通过 workQueue.take()实现的,它会始终阻塞到从期待队列中取到新的工作。非核心线程闲暇指定工夫后终止是通过 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的,一个闲暇的 Worker 只期待 keepAliveTime,如果还没有取到工作则循环终止,线程也就运行完结了。

引申思考

Worker 自身就是个线程,它再调用咱们传入的 Runnable.run(),会启动一个子线程么?如果你还没有答案,再回忆一下 Runnable 和 Thread 的关系。

5 闲暇线程过多会有什么问题

抽象地答复是会占用内存,咱们剖析一下占用了哪些内存。首先,比拟一般的一部分,一个线程的内存模型:

  • 虚拟机栈
  • 本地办法栈
  • 程序计数器

我想额定强调是上面这几个内存占用,须要小心:

  • ThreadLocal:业务代码是否应用了 ThreadLocal?就算没有,Spring 框架中也大量应用了 ThreadLocal,你所在公司的框架可能也是一样。
  • 局部变量:线程处于阻塞状态,必定还有栈帧没有出栈,栈帧中有局部变量表,但凡被局部变量表援用的内存都不能回收。所以如果这个线程创立了比拟大的局部变量,那么这一部分内存无奈 GC。
  • TLAB 机制:如果你的利用线程数处于高位,那么新的线程初始化可能因为 Eden 没有足够的空间调配 TLAB 而触发 YoungGC。

线程池放弃闲暇的外围线程是它的默认配置,一般来讲是没有问题的,因为它占用的内存个别不大。怕的就是业务代码中应用 ThreadLocal 缓存的数据过大又不清理。

如果你的利用线程数处于高位,那么须要察看一下 YoungGC 的状况,估算一下 Eden 大小是否足够。如果不够的话,可能要审慎地创立新线程,并且让闲暇的线程终止;必要的时候,可能须要对 JVM 进行调参。

6 keepAliveTime= 0 会怎么样

这也是个争议点。有的博文说等于 0 示意闲暇线程永远不会终止,有的说示意执行完立即终止。还有的说等于 - 1 示意闲暇线程永远不会终止。其实略微看一下源码晓得了,这里我间接抛出答案。

在 JDK1.8 中,keepAliveTime= 0 示意非核心线程执行完立即终止。

默认状况下,keepAliveTime 小于 0,初始化的时候才会报错;但如果 allowsCoreThreadTimeOut,keepAliveTime 必须大于 0,不然初始化报错。

7 怎么进行异样解决

很多代码的写法,咱们都习惯依照常见范式去编写,而没有去思考为什么。比方:

  • 如果咱们应用 execute()提交工作,咱们个别要在 Runable 工作的代码加上 try-catch 进行异样解决。
  • 如果咱们应用 submit()提交工作,咱们个别要在主线程中,对 Future.get()进行 try-catch 进行异样解决。

—— 然而在下面,我提到过,submit()底层实现依赖 execute(),两者应该对立呀,为什么有差别呢?上面再扒一扒 submit()的源码,它的实现蛮有意思。

首先,ThreadPoolExecutor 中没有 submit 的代码,而是在它的父类 AbstractExecutorService 中,有三个 submit 的重载办法,代码非常简单,要害代码就两行:

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

正是因为这三个重载办法,都调用了 execute,所以我才说 submit 底层依赖 execute。通过查看这里 execute 的实现,咱们不难发现,它就是 ThreadPoolExecutor 中的实现,所以,造成 submit 和 execute 的差异化的代码,不在这。那么造成差别的肯定在 newTaskFor 办法中。这个办法也就 new 了一个 FutureTask 而已,FutureTask 实现 RunnableFuture 接口,RunnableFuture 接口继承 Runnable 接口和 Future 接口。而 Callable 只是 FutureTask 的一个成员变量。

所以讲到这里,就有另一个 Java 根底知识点:Callable 和 Future 的关系。咱们个别用 Callable 编写工作代码,Future 是异步返回对象,通过它的 get 办法,阻塞式地获取后果。FutureTask 的外围代码就是实现了 Future 接口,也就是 get 办法的实现:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        // 外围代码
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 死循环
    for (;;) {if (Thread.interrupted()) {removeWaiter(q);
            throw new InterruptedException();}

        int s = state;
        // 只有工作的状态是’已实现‘,才会跳出死循环
        if (s > COMPLETING) {if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

get 的外围实现是有个 awaitDone 办法,这是一个死循环,只有工作的状态是“已实现”,才会跳出死循环;否则会依赖 UNSAFE 包下的 LockSupport.park 原语进行阻塞,期待 LockSupport.unpark 信号量。而这个信号量只有当运行完结取得后果、或者出现异常的状况下,才会收回来。别离对应办法 set 和 setException。这就是异步执行、阻塞获取的原理,扯得有点远了。

回到最后咱们的疑难,为什么 submit 之后,通过 get 办法能够获取到异样?起因是 FutureTask 有一个 Object 类型的 outcome 成员变量,用来记录执行后果。这个后果能够是传入的泛型,也能够是 Throwable 异样:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

// get 办法中依赖的,报告执行后果

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

FutureTask 的另一个奇妙的中央就是借用 RunnableAdapter 外部类,将 submit 的 Runnable 封装成 Callable。所以就算你 submit 的是 Runnable,一样能够用 get 获取到异样。

  • 不论是用 execute 还是 submit,都能够本人在业务代码上加 try-catch 进行异样解决。我个别喜爱应用这种形式,因为我喜爱对不同业务场景的异样进行差异化解决,至多打不一样的日志吧。
  • 如果是 execute,还能够自定义线程池,继承 ThreadPoolExecutor 并复写其 afterExecute(Runnable r, Throwable t)办法。
  • 或者实现 Thread.UncaughtExceptionHandler 接口,实现 void uncaughtException(Thread t, Throwable e); 办法,并将该 handler 传递给线程池的 ThreadFactory。
  • 然而留神,afterExecute 和 UncaughtExceptionHandler 都不实用 submit。因为通过下面的 FutureTask.run()不难发现,它本人对 Throwable 进行了 try-catch,封装到了 outcome 属性,所以底层办法 execute 的 Worker 是拿不到异样信息的。

8 线程池需不需要敞开

一般来讲,线程池的生命周期追随服务的生命周期。如果一个服务(Service)进行服务了,那么须要调用 shutdown 办法进行敞开。所以 ExecutorService.shutdown 在 Java 以及一些中间件的源码中,是封装在 Service 的 shutdown 办法内的。

如果是 Server 端不重启就不进行提供服务,我认为是不须要非凡解决的。

9 shutdown 和 shutdownNow 的区别

shutdown => 平缓敞开,期待所有已增加到线程池中的工作执行完再敞开。
shutdownNow => 立即敞开,进行正在执行的工作,并返回队列中未执行的工作。
原本想剖析一下两者的源码的,然而发现本文的篇幅曾经过长了,源码也贴了不少。感兴趣的敌人本人看一下即可。

10 Spring 中有哪些和 ThreadPoolExecutor 相似的工具

这里我想着重强调的就是 SimpleAsyncTaskExecutor,Spring 中应用的 @Async 注解,底层就是基于 SimpleAsyncTaskExecutor 去执行工作,只不过它不是线程池,而是每次都新开一个线程。

另外想要强调的是 Executor 接口。Java 初学者容易想当然的认为 Executor 结尾的类就是一个线程池,而下面的都是反例。咱们能够在 JDK 的 execute 办法上看到这个正文:

/**

  • Executes the given command at some time in the future. The command
  • may execute in a new thread, in a pooled thread, or in the calling
  • thread, at the discretion of the {@code Executor} implementation.
    */
    所以,它的职责并不是提供一个线程池的接口,而是提供一个“未来执行命令”的接口。真正能代表线程池意义的,是 ThreadPoolExecutor 类,而不是 Executor 接口。

最佳实际总结
【强制】应用 ThreadPoolExecutor 的构造函数申明线程池,防止应用 Executors 类的 newFixedThreadPool 和 newCachedThreadPool。
【强制】创立线程或线程池时请指定有意义的线程名称,不便出错时回溯。即 threadFactory 参数要结构好。
【倡议】倡议不同类别的业务用不同的线程池。
【倡议】CPU 密集型工作(N+1):这种工作耗费的次要是 CPU 资源,能够将线程数设置为 N(CPU 外围数)+1,比 CPU 外围数多进去的一个线程是为了避免线程偶发的缺页中断,或者其它起因导致的工作暂停而带来的影响。一旦工作暂停,CPU 就会处于闲暇状态,而在这种状况下多进去的一个线程就能够充分利用 CPU 的闲暇工夫。
【倡议】I/ O 密集型工作(2N):这种工作利用起来,零碎会用大部分的工夫来解决 I / O 交互,而线程在解决 I / O 的时间段内不会占用 CPU 来解决,这时就能够将 CPU 交出给其它线程应用。因而在 I / O 密集型工作的利用中,咱们能够多配置一些线程,具体的计算方法是 2N。
【倡议】workQueue 不要应用无界队列,尽量应用有界队列。防止大量工作期待,造成 OOM。
【倡议】如果是资源缓和的利用,应用 allowsCoreThreadTimeOut 能够进步资源利用率。
【倡议】尽管应用线程池有多种异样解决的形式,但在工作代码中,应用 try-catch 最通用,也能给不同工作的异样解决做精细化。
【倡议】对于资源缓和的利用,如果放心线程池资源使用不当,能够利用 ThreadPoolExecutor 的 API 实现简略的监控,而后进行剖析和优化。

线程池初始化示例:

private static final ThreadPoolExecutor pool;

static {ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
    pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
        threadFactory, new ThreadPoolExecutor.AbortPolicy());
    pool.allowCoreThreadTimeOut(true);
}
  • threadFactory:给出带业务语义的线程命名。
  • corePoolSize:疾速启动 4 个线程解决该业务,是足够的。
  • maximumPoolSize:IO 密集型业务,我的服务器是 4C8G 的,所以 4 *2=8。
  • keepAliveTime:服务器资源缓和,让闲暇的线程疾速开释。
  • pool.allowCoreThreadTimeOut(true):也是为了在能够的时候,让线程开释,开释资源。
  • workQueue:一个工作的执行时长在 100~300ms,业务高峰期 8 个线程,依照 10s 超时(曾经很高了)。10s 钟,8 个线程,能够解决 10 1000ms / 200ms 8 = 400 个工作左右,往上再取一点,512 曾经很多了。
  • handler:极其状况下,一些工作只能抛弃,爱护服务端。

原文链接

本文为阿里云原创内容,未经容许不得转载。

退出移动版