乐趣区

关于前端:可动态调节参数的线程池实现

背景

线程池是一种基于池化思维治理线程的工具,应用线程池能够缩小创立销毁线程的开销,防止线程过多导致系统资源耗尽。在高并发的工作解决场景,线程池的应用是必不可少的。在双 11 主图价格表白我的项目中为了晋升解决性能,很多中央应用到了线程池。随着线程池的应用,逐步发现一个问题,线程池的参数如何设置?

线程池参数中有三个比拟要害的参数,别离是 corePoolSize(外围线程数)、maximumPoolSize(最大线程数)、workQueueSzie(工作队列大小)。依据工作的类型能够辨别为 IO 密集型和 CPU 密集型,对于 CPU 密集型,个别教训是设置 corePoolSize=CPU 核数 +1,对于 IO 密集型须要依据具体的 RT 和流量来设置,没有普适的经验值。然而,咱们个别遇到的状况少数是解决 IO 密集型工作,如果线程池参数不可动静调节,就没方法依据理论状况实时调整处理速度,只能通过公布代码调整参数。

如果线程池参数不合理会导致什么问题呢?上面列举几种可能呈现的场景:

  1. 最大线程数设置偏小,工作队列大小设置偏小,导致服务接口大量抛出 RejectedExecutionException。
  2. 最大线程数设置偏小,工作队列大小设置过大,工作沉积适度,接口响应时长变长。
  3. 最大线程数设置过大,线程调度开销增大,处理速度反而降落。
  4. 外围线程数设置过小,流量突增时须要先创立线程,导致响应时长过大。
  5. 外围线程数设置过大,闲暇线程太多,占用系统资源。

线程池任务调度机制

要明确线程池参数对运行时的影响,就必须了解其中的原理,所以上面先简略总结了线程池的外围原理。

Java 中的线程池外围实现类是 ThreadPoolExecutor,ThreadPoolExecutor 一方面保护本身的生命周期,另一方面同时治理线程和工作,使两者良好的联合从而执行并行任务。用户无需关注如何创立线程,如何调度线程来执行工作,用户只需提供 Runnable 对象,将工作的运行逻辑提交到执行器 (Executor) 中,由 Executor 框架实现线程的调配和工作的执行局部。

ThreadPoolExecutor 是如何运行,如何同时保护线程和执行工作的呢?其运行机制如下图所示:

所有工作的调度都是由 execute 办法实现的,这部分实现的工作是:查看当初线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是间接申请线程执行,或是缓冲到队列中执行,亦或是间接回绝该工作。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是 RUNNING,则间接回绝,线程池要保障在 RUNNING 的状态下执行工作。
  2. 如果 workerCount < corePoolSize,则创立并启动一个线程来执行新提交的工作。
  3. 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将工作增加到该阻塞队列中。
  4. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创立并启动一个线程来执行新提交的工作。
  5. 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则依据回绝策略来解决该工作, 默认的解决形式是间接抛异样。

其执行流程如下图所示:

动静调节线程池参数实现

线程池相干的重要参数有三个,别离是外围线程数、最大线程数和工作队列大小,接下来将论述如何实现动静调节线程池参数。

调节外围和最大线程数的原理

ThreadPoolExecutor 曾经提供了两个办法在运行时设置外围线程数和最大线程数,别离是 ThreadPoolExecutor.setCorePoolSize()ThreadPoolExecutor.setMaximumPoolSize()

setCorePoolSize 办法的执行流程是:首先会笼罩之前构造函数设置的 corePoolSize,而后,如果新的值比原始值要小,当多余的工作线程下次变成闲暇状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创立新的工作线程。流程图如下:

setMaximumPoolSize 办法执行流程是:首先会笼罩之前构造函数设置的 maximumPoolSize,而后,如果新的值比原来的值要小,当多余的工作线程下次变成闲暇状态的时候会被中断并销毁。

调节工作队列大小的原理

线程池中是以生产者消费者模式,通过一个阻塞队列来缓存工作,工作线程从阻塞队列中获取工作。工作队列的接口是阻塞队列(BlockingQueue),在队列为空时,获取元素的线程会期待队列变为非空,当队列满时,存储元素的线程会期待队列可用。

目前 JDK 提供了以下阻塞队列的实现:

然而很可怜,这些阻塞队列的实现都不反对动静调整大小,那么为什么不本人实现一个可动静调整大小的阻塞队列呢。反复造轮子是不可取的,所以我抉择革新轮子。LinkedBlockingQueue是比拟罕用的一个阻塞队列,它无奈批改大小的起因是 capacity 字段设置成了 final private final int capacity;。如果我把 final 去掉,并提供批改 capacity 的办法,是不是就满足咱们的需要呢?事实证明是可行的,文章开端上传了 ResizeLinkedBlockingQueue 的实现。

联合 Diamond 进行实现

Diamond 能够治理咱们的配置,如果能够通过 Diamond 实现线程池参数治理那就再好不过了。接下来就开始上代码了,首先实现一个 Diamond 配置管理类DispatchConfig,而后,实现一个线程池治理的工厂办法StreamExecutorFactory

DispatchConfig类是一个动态类,在初始化的时候获取了对应 Diamond 的内容并设置了监听,应用的时候只须要DispatchConfig.getConfig().getCorePoolSize()

_/**
 * @author moda
 */_
@Slf4j
@Data
public class DispatchConfig {
    public static final String DATA_ID = "com.alibaba.mkt.turbo.DispatchConfig";
    public static final String GROUP_ID = "mkt-turbo";
    private static DispatchConfig config;

    static {
        try {String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000);
            config = JSON.parseObject(content, DispatchConfig.class);
            Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() {
                @Override
                public void receiveConfigInfo(String content) {
                    try {config = JSON.parseObject(content, DispatchConfig.class);
                    } catch (Throwable t) {log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t);
                    }
                }
            });
        } catch (Exception e) {log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s", DATA_ID, GROUP_ID), e);
        }
    }

    public static DispatchConfig getConfig() {return config;}

    private int corePoolSize = 10;

    private int maximumPoolSize = 30;

    private int workQueueSize = 1024;

    _/**
     * 商品分批解决每批大小
     */_
    private int itemBatchProcessPageSize = 200;
}

StreamExecutorFactory是一个动态类,保护了一个动态属性 executor,并通过initExecutor() 进行初始化。在初始化的时候,工作队列应用了可调节大小的阻塞队列ResizeLinkedBlockingQueue,并设置了监听 Diamond 变更。Diamond 产生变更的时候通过在 callback 中对比值是否产生扭转,如果产生扭转则调整 workQueueSize、corePoolSize、maximumPoolSize。应用的时候只须要StreamExecutorFactory.getExecutor(),批改 Diamond 配置就能动静批改线程池参数。

_/**
 * @author moda
 */_
@Slf4j
public class StreamExecutorFactory {
    private static final String THREAD_NAME = "mkt-turbo_stream_dispatch";

    private static ThreadPoolExecutor executor = initExecutor();

    private static ThreadPoolExecutor initExecutor() {ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build();
        ResizeLinkedBlockingQueue<Runnable> workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize());
        _// 回绝策略,调用者线程解决_
        RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> {String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" +
                    "Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                    "Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)",
                THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());
            log.warn(msg);
            if (!e.isShutdown()) {r.run();
            }
        };
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DispatchConfig.getConfig().getCorePoolSize(),
            DispatchConfig.getConfig().getMaximumPoolSize(),
            10,
            TimeUnit.SECONDS,
            workQueue,
            nameThreadFactory,
            rejectedExecutionHandler
        );

        Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() {
            @Override
            public void receiveConfigInfo(String content) {
                try {DispatchConfig config = JSON.parseObject(content, DispatchConfig.class);
                    if (workQueue.getCapacity() != config.getWorkQueueSize()) {workQueue.setCapacity(config.getWorkQueueSize());
                    }
                    if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) {threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());
                    }
                    if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) {threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize());
                    }
                } catch (Throwable t) {log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t);
                }
            }
        });

        return threadPoolExecutor;
    }

    public static Executor getExecutor() {return executor;}
}

原文链接
本文为阿里云原创内容,未经容许不得转载。

退出移动版