关于java:Java线程池动态配置以及注意点

50次阅读

共计 10368 个字符,预计需要花费 26 分钟才能阅读完成。

原始博文链接

Java 线程池这个货色也算是八股文常客了,然而自己在理论利用监控下发现了之前忽略的问题遂决定再论一番,简略谈谈 Java 线程池的动静配置以及对于线程池应用和了解上容易忽略的问题,置信理清之后会对线程池的优化可能有更精确的考量。首先抛出一个问题,即副标题所言:线程池线程数量超过外围线程数当前过一段时间后会缩小么?如果这个线程池放弃肯定的调用量。

注:本文所述 Java 版本为 1.8。

动静配置

咱们晓得线程池的外围属性和成员(就构造函数的入参)次要有:corePoolSize(外围线程数)、maximumPoolSize(最大线程数)、keepAliveTime(线程闲暇回收工夫)、workQueue(寄存工作的阻塞队列)、threadFactory(线程工厂)、rejectHandler(回绝策略)。查看源码中的 set 办法,可知其中可能动静调整的有:corePoolSize、keepAliveTime、maximumPoolSize、rejectHandler、threadFactory 以及 allowCoreThreadTimeOut。而其中对线程运行状态有影响、有调整价值的次要为 corePoolSize 和 maximumPoolSize 以及 keepAliveTime。这些办法的操作比我本来设想的要简略一些,间接把代码贴出来:

/**
 * Sets the thread factory used to create new threads.
 *
 * @param threadFactory the new thread factory
 * @throws NullPointerException if threadFactory is null
 * @see #getThreadFactory
 */
public void setThreadFactory(ThreadFactory threadFactory) {if (threadFactory == null)
        throw new NullPointerException();
    this.threadFactory = threadFactory;
}

/**
 * Sets a new handler for unexecutable tasks.
 *
 * @param handler the new handler
 * @throws NullPointerException if handler is null
 * @see #getRejectedExecutionHandler
 */
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {if (handler == null)
        throw new NullPointerException();
    this.handler = handler;
}


/**
 * Sets the core number of threads.  This overrides any value set
 * in the constructor.  If the new value is smaller than the
 * current value, excess existing threads will be terminated when
 * they next become idle.  If larger, new threads will, if needed,
 * be started to execute any queued tasks.
 *
 * @param corePoolSize the new core size
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 * @see #getCorePoolSize
 */
public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are"needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty())
                break;
        }
    }
}

/**
 * Sets the policy governing whether core threads may time out and
 * terminate if no tasks arrive within the keep-alive time, being
 * replaced if needed when new tasks arrive. When false, core
 * threads are never terminated due to lack of incoming
 * tasks. When true, the same keep-alive policy applying to
 * non-core threads applies also to core threads. To avoid
 * continual thread replacement, the keep-alive time must be
 * greater than zero when setting {@code true}. This method
 * should in general be called before the pool is actively used.
 *
 * @param value {@code true} if should time out, else {@code false}
 * @throws IllegalArgumentException if value is {@code true}
 *         and the current keep-alive time is not greater than zero
 *
 * @since 1.6
 */
public void allowCoreThreadTimeOut(boolean value) {if (value && keepAliveTime <= 0)
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        if (value)
            interruptIdleWorkers();}
}


/**
 * Sets the maximum allowed number of threads. This overrides any
 * value set in the constructor. If the new value is smaller than
 * the current value, excess existing threads will be
 * terminated when they next become idle.
 *
 * @param maximumPoolSize the new maximum
 * @throws IllegalArgumentException if the new maximum is
 *         less than or equal to zero, or
 *         less than the {@linkplain #getCorePoolSize core pool size}
 * @see #getMaximumPoolSize
 */
public void setMaximumPoolSize(int maximumPoolSize) {if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    this.maximumPoolSize = maximumPoolSize;
    if (workerCountOf(ctl.get()) > maximumPoolSize)
        interruptIdleWorkers();}


/**
 * Sets the time limit for which threads may remain idle before
 * being terminated.  If there are more than the core number of
 * threads currently in the pool, after waiting this amount of
 * time without processing a task, excess threads will be
 * terminated.  This overrides any value set in the constructor.
 *
 * @param time the time to wait.  A time value of zero will cause
 *        excess threads to terminate immediately after executing tasks.
 * @param unit the time unit of the {@code time} argument
 * @throws IllegalArgumentException if {@code time} less than zero or
 *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
 * @see #getKeepAliveTime(TimeUnit)
 */
public void setKeepAliveTime(long time, TimeUnit unit) {if (time < 0)
        throw new IllegalArgumentException();
    if (time == 0 && allowsCoreThreadTimeOut())
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    long keepAliveTime = unit.toNanos(time);
    long delta = keepAliveTime - this.keepAliveTime;
    this.keepAliveTime = keepAliveTime;
    if (delta < 0)
        interruptIdleWorkers();}

能够发现这几个动静配置除了外围线程调大时会判断立即减少一些 worker 之外,次要操作就是赋值而后判断是否须要打断闲暇线程(调用 interruptIdleworkers 办法),所以线程池动静调整的外围就是给闲暇线程发送 interrupt 信号,咱们看一下这个打断办法,也很简略:

/**
 * Common form of interruptIdleWorkers, to avoid having to
 * remember what the boolean argument means.
 */
private void interruptIdleWorkers() {interruptIdleWorkers(false);
}

/**
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 *
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers.  In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case all threads are currently waiting.
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {t.interrupt();
                } catch (SecurityException ignore) { } finally {w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {mainLock.unlock();
    }
}

循环所有的 worker,尝试加锁(worker 继承了 AQS),如果可能锁上阐明该 worker 是闲暇的,随即发送 interrupt 信号而后解锁完结。很简略的逻辑,感觉没有什么好探讨,然而这也从侧面阐明了线程池设计的独到之处,简略的操作实现无效地调整。

那么为了搞清楚线程池的状态会如何被动静配置所影响,有必要再明确一下线程池内治理线程工作的 worker 对象的运行流程。其实看下源码,单纯只关注线程的运行其实也没多少货色,就是两个办法:runWorkergetTask,这里间接贴下源码以及美团技术博客里盗来的图,美团的那一篇文章还是不错的,甚至还成为他们 2020 年浏览量最高的文章,然而那里并没有思考到本文将要阐述的问题,我还挺意外。

// 工作线程运行逻辑
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 一句话概括:继续从阻塞队列里取工作,取到了就执行工作,// 取不到 (通常是期待超时) 就完结运行
        while (task != null || (task = getTask()) != null) {w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行工作
                    task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                } finally {afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();}
        }
        completedAbruptly = false;
    } finally {processWorkerExit(w, completedAbruptly);
    }
}

// 从阻塞队列中获取工作的逻辑
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?

    for (;;) {int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 满足 2 个条件将会尝试完结运行
        // 1. 以后线程数超过最大线程数 或者 期待超过 keepalive 工夫
        // 2. 以后线程数大于 1 或者 阻塞队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 典型的循环 CAS 逻辑
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // interrupt 会影响 poll/take 办法
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 超时标记
            timedOut = true;
        } catch (InterruptedException retry) {
            // 被打断不算做超时
            timedOut = false;
        }
    }
}

worker 运行流程图:

感觉也不须要多赘述,写了些正文,看一遍都能看懂的。总结一下:worker 线程就是不停地向工作队列申请获取工作,能取到工作则执行工作,如果取不到则依据各种条件判断是否须要退出执行还是进入下一轮申请;开始执行工作时会加锁,执行完结开释锁,因而能够依据 tryLock 判断线程是否是沉闷状态即正在执行工作。

看完了这两局部,动静配置所造成的影响应该就比拟清晰了,对所有非沉闷的线程发送 interrupt 信号,也就是打断在 getTask 办法的循环中对阻塞队列的 poll/take 办法,触发下一轮逻辑判断是否退出还是持续期待获取 task。

易忽略的问题

当初回过头来看结尾就抛出的问题,依据我的察看肯定会有不少人认为会或者对此心里没有一个必定的数。当然了,要精确的答复这个问题还是须要依据具体场景来分类探讨,间接说会或者不会都是不太得当的,然而很多时候并不是如你所想的那样线程数量会缩小到外围线程数。起因是在常见的应用状况下:

  1. 大多数状况下线程池中 task 的均匀耗时不会很长,尤其是对于实时响应的零碎,不太会超过 1 秒
  2. keepalive 的设置个别会给到 60 秒,这也是 Spring 对线程池封装以及 Executors.newCachedThreadPool 创立时的默认值
  3. 零碎稳固,放弃肯定的调用量,沉闷线程数不会跌零

那么在这种状况下,线程池内的一个线程要饥渴到 60 秒内一个工作都拿不到才会退出,对于一个沉闷的零碎产生这种状况的概率是非常低的,除非线程池的线程数十分十分大。为了有更直观的阐明,我简略做了匀速下的调用试验,通过 http 裸露接口动静批改线程池的参数以及生产生产的速度,一直打印线程池状态,数据如下:

corePoolSize=50,maximumPoolSize=300,先加大生产速度让线程数超过外围线程数,而后降速察看。线程数的降落是绝对十分迟缓的,要通过十多分钟才会稳固

keepalive=60s,producer=20ms/ 次,consumer=500ms/ 次,沉闷线程数 =25,以后线程数 =300
keepalive=5s,producer=20ms/ 次,consumer=500ms/ 次,沉闷线程数 =25,以后线程数 =145
keepalive=3s,producer=20ms/ 次,consumer=500ms/ 次,沉闷线程数 =25,以后线程数 =86
keepalive=2s,producer=20ms/ 次,consumer=500ms/ 次,沉闷线程数 =25,以后线程数 =64

那么这种状况所带来的最大的影响是线程池的行为逻辑将在运行过程中产生扭转。在线程池新创建后经验第一轮调用顶峰时,可能会经验如下过程:

沉闷线程数达到外围线程数 => 阻塞队列塞满 => 沉闷线程数超出外围线程数 => 沉闷线程数达到最大线程数 => 触发工作回绝策略

而尔后调用顶峰完结回归失常,尔后再次遇到调用顶峰线程池的行为将与第一次不同,可能的过程是:

沉闷线程间接达到最大线程数 => 阻塞队列塞满 => 触发工作回绝策略

在这种状况下,通过第一次流量顶峰后相当于这个线程池变成了一个 fixedThreadPool(core=max),那么对于外围线程数和最大线程数的设置很有可能就须要从新考量一番了。实质上来说,这是 keepalivetime 这个配置所造成的影响,而绝大部分文章都没有关注到 keepalivetime 要如何配置,通常探讨的都是外围线程数与最大线程数。

所以,对于这种生产和生产速度都比拟快的利用场景,keepalivetime 也是一个须要认真考量的配置点,至于到底须要配多少自己目前也没有什么太精确的教训,长短都有利弊:工夫配的长了,通过流量顶峰后外围线程数的设置就可能会失去它原来的意义;工夫配的短了,如果呈现抖动,会有频繁的创立、销毁线程的问题。从纯理论角度上来说,如果要彻底贯彻外围线程满且队列溢出状况下再创立额定线程来执行生产的解决逻辑,那么 keepalivetime 能够设的很短甚至是 0,这样可能在流量顶峰过来后迅速销毁多余的线程。

如果线程池内次要执行 IO 调用为主的工作,且上游承压能力无限,我认为齐全能够思考应用 fixedThreadPool,线程数量的设置依据上游最大承受能力来确定就能够,队列的长度保障可能兜住可控范畴内的顶峰流量即可(如果关注提早则能够思考给较小的值或者间接回绝),此时 keepalivetime 配置就不会有任何作用。此观点的考量在于,目前的机器配置状况下多几百个不吃 CPU 的线程并不会有特地大的影响,顶多是锁争抢强烈一些,而 IO 调用为主的状况下,线程次要是发送申请而后期待调用后果返回,只有在上游接受压力范畴内,有多大量就用多少线程,队列满了再减少线程意义不大。当然了,reactive 可能是更高级的解法,但这不在本文探讨范畴内。

如果既要维持绝对较高的 keepalive 工夫,又心愿线程池在失常状况下线程不超过外围线程数,那么也能够应用动静配置来人为重置线程池的状态,只不过这个操作无奈一次到位,你须要先调低最大线程数到外围线程数大小,待以后线程数量降落实现后再把最大线程数调回来。

总结

阐述结束,总结稀释须要留神的几个点:

  1. 线程池内的线程都是一样的,没有某个线程是外围线程这种说法,只是会依据以后线程数量做不同的操作。
  2. 线程池通过工作提交顶峰后线程数量超过外围线程数时,线程超过 keepalivetime 工夫仍未获取到任何 task 才会退出。
  3. 如果 keepalivetime 较大且工作提交速度较快,线程池在经验一轮顶峰后,外围线程数可能会失去它的意义。
  4. 能够动静调整线程池的外围线程数、最大线程数、keepalivetime,这些动静调整操作基本上就是成员赋值而后中断一下闲暇线程期待从队列中取工作,以触发进入下一轮循环从新进行逻辑判断。
  5. 如果以后线程数量稳固维持大于外围线程数,动静调整外围线程数到任意小于以后线程数的值根本都是无用的,因为线程并不会退出,它们仍然能够在 keepalivetime 工夫内从队列中取到 task。只有调到大于以后线程数时才会造成线程数量减少。
  6. 最大线程数是一个比拟强的束缚,把最大线程数调整到以后线程数以下通常能够在很短时间内回收多余的线程,除非所有线程都在执行长耗时工作。

正文完
 0