乐趣区

关于java:Java踩坑记系列之线程池

线程池大家都很相熟,无论是平时的业务开发还是框架中间件都会用到,大部分都是基于 JDK 线程池 ThreadPoolExecutor 做的封装,比方 tomcat 的线程池,当然也有独自开发的,但都会牵涉到这几个外围参数的设置:外围线程数 期待队列 最大线程数 回绝策略 等。

先说下咱们项目组在应用线程池时踩到的坑:

  1. 线程池的参数设置肯定要联合具体的业务场景,辨别 I / O 密集和 CPU 密集,如果是 I / O 密集型业务,外围线程数,workQueue期待队列,最大线程数等参数设置不合理不仅不能施展线程池的作用,反而会影响现有业务
  2. 期待队列 workQueue 填满后,新创建的线程会优先解决新申请进来的工作,而不是去解决队列里的工作,队列里的工作只能等外围线程数忙完了能力被执行。有可能造成队列里的工作长时间期待,导致队列积压,尤其是 I / O 密集场景
  3. 如果须要失去线程池里的线程执行后果,应用 future 的形式,回绝策略不能应用 DiscardPolicy,这种抛弃策略尽管不执行子线程的工作,然而还是会返回future 对象 (其实在这种状况下咱们曾经不须要线程池返回的后果了),而后后续代码即便判断了future!=null 也没用,这样的话还是会走到 future.get() 办法,如果 get 办法没有设置超时工夫会导致始终阻塞上来!

伪代码如下:

// 如果线程池已满,新的申请会间接执行回绝策略
Future<String> future = executor.submit(() -> {
    // 业务逻辑, 比方调用第三方接口等耗时操作放在线程池里执行
    return result;
});

// 主流程调用逻辑
if(future != null) // 如果回绝策略设置不合理还是会走到上面代码
  future.get(超时工夫); // 调用方阻塞期待后果返回,直到超时

上面就结合实际业务状况逐个进行剖析。

当然这些问题一部分是对线程池了解不够导致的,还有一部分是线程池自身的问题。

一. 背景

公司有个接口局部性能应用了线程池,这个性能不依赖外围接口,但有肯定的耗时,所以放在线程池里和主线程并行执行,等线程池里的工作执行完通过 future.get 的形式获取线程池里的线程执行后果,而后合并到主流程的后果里返回给前端,业务场景很简略,大抵流程如下:

初衷也是为了不影响主流程的性能,不减少整体响应工夫。

然而之前应用的线程池 jdk 的 newCachedThreadPool,因为 sonar 扫描提醒说有内存溢出的危险(最大线程数是Integer.MAX_VALUE) 所以过后改成应用原生的ThreadPoolExecutor,通过指定外围线程数和最大线程数,来解决 sonar 问题。

然而改过的线程池并不适宜咱们这种 I / O 密集型的业务场景(大部分业务都是通过调用接口实现的),过后设置的外围线程数是 cpu 核数(线上机器是 4 核),期待队列是 2048,最大线程数是 cpu 核数 *2,从而引发了一系列问题。。。

二. 排查过程

上线后的景象是应用线程池的接口整体响应工夫变长,有的甚至到 10 秒才返回数据,通过线程 dump 剖析发现有大量的线程都阻塞在 future.get 办法上,如下:

future.get办法会阻塞以后主流程,在超时工夫内期待子线程返回后果,如果超时还没后果则完结期待继续执行后续的代码,超时工夫设置的是默认接口超时工夫 10 秒 (前面已改为 200ms),至此能够确定接口总耗时是因为流程都卡在了future.get 这一步了。

但这不是根本原因,future是线程池返回的,伪代码如下:

Future<String> future = executor.submit(() -> {
    // 业务逻辑, 比方调用第三方接口等耗时操作放在线程池里执行
    return result;
});

通过下面的代码可知 future 没有后果的起因是提交到线程池里的工作迟迟没有被执行。

那为什么没有执行呢?持续剖析线程池的 dump 文件发现,线程池里的线程数已达到最大数量,满负荷运行,如图:

SubThread是咱们本人定义的线程池里线程的名字,8 个线程都是 runnable 状态,阐明期待队列里曾经塞满工作了,之前设置的队列长度是 2048,也就是说还有 2048 个工作期待执行,这无疑加剧了整个接口的耗时。

线程池的执行程序是:外围线程数 ->  期待队列 -> 最大线程数 -> 回绝策略

如果对线程 dump 剖析不太理解的能够看下之前的一篇文章:Windows 环境下如何进行线程 dump 剖析,尽管环境不一样但原理相似。

这里根本确定接口耗时变长的次要起因是线程池设置不合理导致的。

另外还有一些偶发问题,就是线上日志显示尽管线程池执行了,然而线程池里的工作却没有记录运行日志,线程池里的工作是调用另外一个服务的接口,和对方接口负责人确认也的确调用了他们的接口,可咱们本人的日志里却没有记录下调用报文,通过进一步查看代码发现过后的线程池回绝策略也被批改过,并不是默认的抛出异样不执行策略 AbortPolicy,而是设置的CallerRunsPolicy 策略,即交给调用方执行!

也就是说当线程池达到最大负荷时执行的回绝策略是 让主流程去执行提交到线程池里的工作,这样除了进一步加剧整个接口的耗时外,还会导致主流程被 hang 死,最要害的是无奈确定是在哪一步执行提交到线程池的工作

剖析日志埋点能够推断出调用的工夫点应该是曾经调用完了记录日志的办法,要返回给前端后果的时才执行线程池里工作,此时记录日志的办法已调用过,不会再去打印日志了,而且子工作返回的后果也无奈合并到主流程后果里,因为合并主流程后果和线程池工作返回后果的办法也在之前调用过,不会回过头来再调用了,大抵流程如下:

其实这种回绝策略并不适宜咱们当初的业务场景,因为线程池里的工作不是外围工作,不应该影响主流程的执行。

三. 改良

  1. 调整线程池参数,外围线程数基于线上接口的 QPS 计算,最大线程数参考线上 tomcat 的最大线程数配置,可能 cover 住顶峰流量,队列设置的尽量小,防止造成工作挤压。对于线程数如何设置会在后续文章中独自解说。
  2. 扩大线程池,封装原生 JDK 线程池ThreadPoolExecutor,减少对线程池各项指标的监控,包含线程池运行状态、外围线程数、最大线程数、工作期待数、已实现工作数、线程池异样敞开等信息,便于实时监控和定位问题。
  3. 重写线程池回绝策略,次要也是记录超出线程池负载状况下的各项指标状况,以及调用线程的堆栈信息,便于排查剖析,通过抛出异样形式中断执行,防止援用的 future 不为 null 的问题。
  4. 正当调整 future.get 超时工夫,避免阻塞主线程工夫过长。

线程池外部流程:

线程池监控和自定义回绝策略的代码如下,大家能够联合本人的业务场景拿去应用:

package com.javakk;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.*;

/**
 * 自定义线程池 <p>
 * 1. 监控线程池状态及异样敞开等状况 <p>
 * 2. 监控线程池运行时的各项指标, 比方: 工作期待数、已实现工作数、工作异样信息、外围线程数、最大线程数等 <p>
 * author: 老 K
 */
public class ThreadPoolExt extends ThreadPoolExecutor{private static final Logger log = LoggerFactory.getLogger(ThreadPoolExt.class);

    private TimeUnit timeUnit;

    public ThreadPoolExt(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                         BlockingQueue<Runnable> workQueue,
                         ThreadFactory threadFactory,
                         RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeUnit = unit;
    }

    @Override
    public void shutdown() {
        // 线程池将要敞开事件, 此办法会期待线程池中正在执行的工作和队列中期待的工作执行结束再敞开
        monitor("ThreadPool will be shutdown:");
        super.shutdown();}

    @Override
    public List<Runnable> shutdownNow() {
        // 线程池立刻敞开事件, 此办法会立刻敞开线程池, 然而会返回队列中期待的工作
        monitor("ThreadPool going to immediately be shutdown:");
        // 记录被抛弃的工作, 临时只记录日志, 后续可依据业务场景做进一步解决
        List<Runnable> dropTasks = null;
        try {dropTasks = super.shutdownNow();
            log.error(MessageFormat.format("ThreadPool discard task count:{0}", dropTasks.size()));
        } catch (Exception e) {log.error("ThreadPool shutdownNow error", e);
        }
        return dropTasks;
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        // 监控线程池运行时的各项指标
        monitor("ThreadPool monitor data:");
    }

    @Override
    protected void afterExecute(Runnable r, Throwable ex) {if (ex != null) { // 监控线程池中的线程执行是否异样
            log.error("unknown exception caught in ThreadPool afterExecute:", ex);
        }
    }

    /**
     * 监控线程池运行时的各项指标, 比方: 工作期待数、工作异样信息、已实现工作数、外围线程数、最大线程数等 <p>
     */
    private void monitor(String title){
        try {
            // 线程池监控信息记录, 这里须要留神写 ES 的机会, 尤其是多个子线程的日志合并到主流程的记录形式
            String threadPoolMonitor = MessageFormat.format("{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}," +
                            "task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}," +
                            "thread name:{13}{14}",
                    System.lineSeparator(), title, this.getCorePoolSize(), this.getPoolSize(),
                    this.getQueue().size(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getLargestPoolSize(),
                    this.getMaximumPoolSize(), this.getKeepAliveTime(timeUnit != null ? timeUnit : TimeUnit.SECONDS), this.isShutdown(),
                    this.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());
            log.info(threadPoolMonitor);
        } catch (Exception e) {log.error("ThreadPool monitor error", e);
        }
    }
}

自定义回绝策略代码:

package com.javakk;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.management.*;
import java.text.MessageFormat;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 自定义线程池回绝策略:<p>
 * 1. 记录线程池的外围线程数, 沉闷数, 已实现数等信息, 以及调用线程的堆栈信息, 便于排查 <p>
 * 2. 抛出异常中断执行 <p>
 * author: 老 K
 */
public class RejectedPolicyWithReport implements RejectedExecutionHandler {private static final Logger log = LoggerFactory.getLogger(RejectedPolicyWithReport.class);

    private static volatile long lastPrintTime = 0;

    private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;

    private static Semaphore guard = new Semaphore(1);
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            String title = "thread pool execute reject policy!!";
            String msg = MessageFormat.format("{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}," +
                            "task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}," +
                            "thread name:{13}{14}",
                    System.lineSeparator(), title, e.getCorePoolSize(), e.getPoolSize(), e.getQueue().size(), e.getActiveCount(),
                    e.getCompletedTaskCount(), e.getTaskCount(), e.getLargestPoolSize(), e.getMaximumPoolSize(), e.getKeepAliveTime(TimeUnit.SECONDS),
                    e.isShutdown(), e.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());
            log.info(msg);
 threadDump(); // 记录线程堆栈信息包含锁争用信息} catch (Exception ex) {log.error("RejectedPolicyWithReport rejectedExecution error", ex);
        }
        throw new RejectedExecutionException("thread pool execute reject policy!!");
    }

    /**
     * 获取线程 dump 信息 <p>
     * 留神: 该办法默认会记录所有线程和锁信息尽管不便 debug, 应用时最好加开关和距离调用, 否则可能会减少 latency<p>
     * 1. 以后线程的根本信息:id,name,state<p>
     * 2. 堆栈信息 <p>
     * 3. 锁相干信息(能够设置不记录)<p>
     *  默认在 log 记录 <p>
     * @return
     */
    private void threadDump() {long now = System.currentTimeMillis();
        // 每隔 10 分钟 dump 一次
        if (now - lastPrintTime < TEN_MINUTES_MILLS) {return;} 
        if (!guard.tryAcquire()) {return;} 
        // 异步 dump 线程池信息 
        ExecutorService pool = Executors.newSingleThreadExecutor(); 
        pool.execute(() -> {
            try {ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
                StringBuilder sb = new StringBuilder();
                for (ThreadInfo threadInfo : threadMxBean.dumpAllThreads(true, true)) {sb.append(getThreadDumpString(threadInfo));
                }
                log.error("thread dump info:", sb.toString());
            } catch (Exception e) {log.error("thread dump error", e);
            } finally {guard.release();
            }
            lastPrintTime = System.currentTimeMillis();});
        pool.shutdown();}

    @SuppressWarnings("all")
    private String getThreadDumpString(ThreadInfo threadInfo) {StringBuilder sb = new StringBuilder("""+ threadInfo.getThreadName() +""" +
                "Id=" + threadInfo.getThreadId() + " " +
                threadInfo.getThreadState());
        if (threadInfo.getLockName() != null) {sb.append("on" + threadInfo.getLockName());
        }
        if (threadInfo.getLockOwnerName() != null) {sb.append("owned by"" + threadInfo.getLockOwnerName() +
                    ""Id=" + threadInfo.getLockOwnerId());
        }
        if (threadInfo.isSuspended()) {sb.append("(suspended)");
        }
        if (threadInfo.isInNative()) {sb.append("(in native)");
        }
        sb.append('n');
        int i = 0;

        StackTraceElement[] stackTrace = threadInfo.getStackTrace();
        MonitorInfo[] lockedMonitors = threadInfo.getLockedMonitors();
        for (; i < stackTrace.length && i < 32; i++) {StackTraceElement ste = stackTrace[i];
            sb.append("tat" + ste.toString());
            sb.append('n');
            if (i == 0 && threadInfo.getLockInfo() != null) {Thread.State ts = threadInfo.getThreadState();
                switch (ts) {
                    case BLOCKED:
                        sb.append("t-  blocked on" + threadInfo.getLockInfo());
                        sb.append('n');
                        break;
                    case WAITING:
                        sb.append("t-  waiting on" + threadInfo.getLockInfo());
                        sb.append('n');
                        break;
                    case TIMED_WAITING:
                        sb.append("t-  waiting on" + threadInfo.getLockInfo());
                        sb.append('n');
                        break;
                    default:
                }
            }

            for (MonitorInfo mi : lockedMonitors) {if (mi.getLockedStackDepth() == i) {sb.append("t-  locked" + mi);
                    sb.append('n');
                }
            }
        }
        if (i < stackTrace.length) {sb.append("t...");
            sb.append('n');
        }

        LockInfo[] locks = threadInfo.getLockedSynchronizers();
        if (locks.length > 0) {sb.append("ntNumber of locked synchronizers =" + locks.length);
            sb.append('n');
            for (LockInfo li : locks) {sb.append("t-" + li);
                sb.append('n');
            }
        }
        sb.append('n');
        return sb.toString();}
}

文章起源:http://javakk.com/188.html

退出移动版