ThreadPoolExecutor

11次阅读

共计 3263 个字符,预计需要花费 9 分钟才能阅读完成。

关于 ThreadPoolExecutor 中的 corePoolSize 和 maximumPoolSize 官方文档中有说明

ThreadPoolExecutor 会自动根据设定的 corePoolSize 和 maximumPoolSize 调整 pool 的大小。

当一个新的 task 通过方法 execute 提交
<1> 当运行的线程数少于 corePoolSize,即便已经生成的线程是空闲的状态也会新生成一个新的线程去处理请求。
<2> 当有超过 corePoolSize 但是少于 maximumPoolSize 数量的线程正在运行,只有当 queue 满了时才会生成新的线程。
<3> 将 corePoolSize 和 maximumPoolSize 设为同样的值,可以生成固定大小的线程池。

Keep-alive times
如果 pool 中当前有超过 corePoolSize 的线程,超过的线程在 idle 时长超过 keepAliveTime 将会被终止。

Queuing
任何 BlockingQueue 可以被用来转换和持有提交的任务。Queue 的使用与 pool 的大小相互作用

  • 如果有少于 corePoolSize 个线程正在运行,Executor 会倾向于添加新的线程而不是加入 queue
  • 如果有大于等于 corePoolSize 个线程正在运行,Executor 会倾向于将请求加入队列而不是添加新的任务
  • 如果请求不能被加入队列,如果运行的线程数超过了 maximumPoolSize 任务会被拒绝,否则会生成一个新的线程

对于加入队列有三个一般的策略

  • 直接移交(Direct handoffs)对于 work queue,SynchronousQueue 是缺省的选择,直接将任务交给 thread 不持有任务。如果没有能立即获得的线程入队列操作会失败,所以一个新的线程会被构造。这个策略避免了当处理一系列有内部依赖请求的锁。直接移交一般要求无限定的 maximumPoolSize 避免提交新的任务被拒绝。当大于能够处理的量的请求持续到达时线程可能会无限制增长。
  • 无限队列(Unbounded queues)使用无限队列(例如 LinkedBlockingQueue 没有预定义容量)当所有 corePoolSize 数量的线程正在运行时会导致新的任务在队列中等待。所以最多只有 corePoolSize 指定数量的线程会被生成,maximumPoolSize 不会生效。这适用于每一个任务完全独立与其他任务,不会影响其他任何任务的执行。
  • 有限队列(Bounded queues)一个有限队列(例如 ArrayBlockingQueue)有助于阻止使用 maximumPoolSizes 导致资源耗尽。Queue sizes 和 maximum pool size 是相互制衡的。使用大的 queues,小的 pool size 会降低资源消耗,但也会导致认为的低输出。使用小的 queues,大的 pool size 会保持 CPU 繁忙

测试代码

Queue 使用 LinkedBlockingQueue

public class ThreadPoolExecutorTest {

    private final static int MIN_THREAD_COUNT = 5;
    private final static int MAX_THREAD_COUNT = 15;


    public static void main(String[] args) throws Exception {BlockingQueue<Runnable> consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
        //ArrayBlockingQueue<Runnable> consumeRequestQueue = new ArrayBlockingQueue<Runnable>(10);
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(
                MIN_THREAD_COUNT,
                MAX_THREAD_COUNT,
                1000 * 10,
                TimeUnit.MILLISECONDS,
                consumeRequestQueue,
                new ThreadFactoryImpl("ConsumeMessageThread_"));

        class MyThread implements Runnable {

            private int myID;
            MyThread(int i) {myID = i;}

            @Override
            public void run() {System.out.println("Thread" + myID);
                try {sleep(8000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
        }

        for(int i=0;i<100;i++) {tpe.execute(new MyThread(i));

            System.out.println(i + "Pool size:"+ tpe.getPoolSize() + ", corepoolsize:" + tpe.getCorePoolSize() + ", activeCount:" + tpe.getActiveCount() + ", Queue size:" + consumeRequestQueue.size());
        }
    }
}

测试结果

可以看到 Pool size 始终是 corePoolSize 指定的 5,maximumPoolSize 指定的 15 未生效。

Queue 使用 ArrayBlockingQueue。
当启动任务数超过 corePoolSize 后新的 task 会先添加到 Queue,当 Queue 满了后会启动新的线程接受 task。当 Queue 填满,启动线程超过 maximumPoolSize 再有新的任务添加会抛例外

public class ThreadPoolExecutorTest {

    private final static int MIN_THREAD_COUNT = 5;
    private final static int MAX_THREAD_COUNT = 15;


    public static void main(String[] args) throws Exception {//BlockingQueue<Runnable> consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
        ArrayBlockingQueue<Runnable> consumeRequestQueue = new ArrayBlockingQueue<Runnable>(10);
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(
                MIN_THREAD_COUNT,
                MAX_THREAD_COUNT,
                1000 * 10,
                TimeUnit.MILLISECONDS,
                consumeRequestQueue,
                new ThreadFactoryImpl("ConsumeMessageThread_"));

        class MyThread implements Runnable {

            private int myID;
            MyThread(int i) {myID = i;}

            @Override
            public void run() {System.out.println("Thread" + myID);
                try {sleep(8000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
        }

        for(int i=0;i<100;i++) {tpe.execute(new MyThread(i));

            System.out.println(i + "Pool size:"+ tpe.getPoolSize() + ", corepoolsize:" + tpe.getCorePoolSize() + ", activeCount:" + tpe.getActiveCount() + ", Queue size:" + consumeRequestQueue.size());
        }
    }
}

执行结果如下

正文完
 0