第一版地址:https://blog.csdn.net/wandou9527/article/details/107769598

第二版优化点:

  • 线程实现提早创立,对内存更敌对
  • 自定义线程池实现java.util.concurrent.Executor 接口,更加符合规范
  • submit办法改为execute办法,语义更贴切
  • 退出对工作(command)的非空判断

优化后又离完满线程池近了一步。

如果想玩转 Java 的多线程与高并发,线程池是你永远也绕不过的山。既然绕不过,咱们就啃他,吃透线程池,玩转高并发。
浏览Jdk线程池源码发现,Jdk里的线程池实现的十分欠缺,有很多简单的逻辑解决,所以造成代码较长,而且代码格局也不标准(ps. 请原谅我指点江山,人家可能有人家的理论起因),eg. if 后没有大括号;很多变量命名都是单字母,比方c、w等。。。

本文,精简了线程池的一些简单逻辑,从骨干性能登程,实现骨干性能,我置信更有助于咱们了解线程池,而后再一步步深刻。

Jdk里的线程池

次要属性

private volatile int corePoolSize; //外围线程数private volatile int maximumPoolSize; //最大线程数private volatile long keepAliveTime; //存活工夫private final BlockingQueue<Runnable> workQueue; //工作期待队列private volatile ThreadFactory threadFactory; //线程工厂private volatile RejectedExecutionHandler handler; //回绝策略

上面介绍一下线程池执行工作的流程,了解各个属性的意义。当一个线程池初始化,向线程池提交工作,线程池新建线程执行工作,随着线程创立,线程数逐步增多,当达到 corePoolSize 线程池将不再新建线程,而是将工作放入工作期待队列 workQueue 。再继续向线程池提交工作,当期待队列满了,这时会持续新建线程,直到达到最大线程数 maximumPoolSize,如果还持续有工作到来,线程池无奈解决,这时就启动回绝策略。

这个过程咱们能够以生存中的例子比喻一下。大抵咱们把线程池了解为理发店。那么流程就是:来了顾客开始理发,比方只有4个理发师4个座位,相当于外围线程。那么来了过多的顾客,理发师忙不过来就会先让你去等待区稍等排队期待,后面有理完发的会叫你,相当于期待队列。期待区满了呢?事实中理发店必定不会回绝顾客的啊,他可能让你先在里面等。但如果期待区每天都爆满,那么老板可能会思考扩充店面,裁减理发师团队了。所以,这只是个大抵的比喻。

自定义手写线程池

废话不说,咱们上代码。

package com.wandou.demo.thread.post.threapool;import java.util.HashSet;import java.util.Objects;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Executor;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.atomic.AtomicInteger;/** * @author liming * @date 2020/09 * @description 自定义线程池 * new性能  * - 线程提早创立(来工作才创立)  * 欠缺性能:  * - 队列满了后持续创立线程,直到达到最大线程数  * - 回绝策略  * - 线程超时销毁  */ public class MyThreadPool implements Executor {    /**     * 外围线程数(外围理发师数量)      */     private volatile int corePoolSize;    /**    * 工作期待队列(期待区座位数)     */     private final BlockingQueue<Runnable> workQueue;    /**    * 线程容器(理发师作业区)     */     private final HashSet<MyThreadPool.Worker> workers = new HashSet<>();     private final AtomicInteger workerCount = new AtomicInteger(0);     public MyThreadPool(int corePoolSize, BlockingQueue<Runnable> workQueue) {        this.corePoolSize = corePoolSize;        this.workQueue = workQueue;    }       /**    * 运行,来顾客了,安顿     *     * @param command    * @return    */    @Override    public void execute(Runnable command) {        if (command == null) {            throw new NullPointerException();        }        int count = workerCount.get();        // 如果小于外围线程数,能够建设新线程        if (count < corePoolSize) {            if (addWorker(command, true)) {                return;            }        }        if (workQueue.offer(command)) {            int recheck = workerCount.get();            if (recheck == 0) {                addWorker(null, false);            }            return;        }        //回绝        throw new RejectedExecutionException("Task " +                   command.toString() +                " rejected from " +                this.toString());    }        private boolean addWorker(Runnable command, boolean core) {        for (; ; ) {            int count = workerCount.get();            if (count >= corePoolSize) {                return false;            }            // 线程数+1, 胜利向下走            if (workerCount.compareAndSet(count, count + 1)) {                break;            }        }        Worker worker = new Worker(command);        final Thread thread = worker.thread;        workers.add(worker);        thread.start();        return true;     }            // ----------------        /**     * 线程(理发师)      */     private class Worker implements Runnable {            /**         * 工作者运行的线程          */         final Thread thread;        /**         * 初始运行的工作,可能为 null          */         Runnable firstTask;        /**         * 创立一个工作者          *          * @param firstTask 第一个工作,能够为 null         */         Worker(Runnable firstTask) {            this.firstTask = firstTask;            this.thread = new Thread(this);        }            @Override        public void run() {            System.out.println("run");            runWorker(this);        }        @Override        public boolean equals(Object o) {            if (this == o) return true;            if (o == null || getClass() != o.getClass()) return false;            Worker worker = (Worker) o;            return Objects.equals(thread, worker.thread) &&                        Objects.equals(firstTask, worker.firstTask);        }        @Override        public int hashCode() {            return Objects.hash(thread, firstTask);        }    }            final void runWorker(Worker worker) {        Runnable task = worker.firstTask;        worker.firstTask = null;        while (task != null || (task = getTask()) != null) {            try {                task.run();            } catch (Exception e) {                e.printStackTrace();            } finally {                // 运行结束                task = null;            }        }    }            private Runnable getTask() {        for (; ; ) {            try {                //阻塞拿工作                return workQueue.take();            } catch (InterruptedException e) {                System.out.println("InterruptedException!!!");            }        }    }    }

测试代码:

package com.wandou.demo.thread.post.threapool;import org.junit.Test;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicInteger;/** * @author liming * @date 2020/9/17 * @description */public class MyThreadPoolDemo {    private BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();    private MyThreadPool myThreadPool = new MyThreadPool(3, workQueue);    @Test    public void t1() throws Exception {        AtomicInteger atomicInteger = new AtomicInteger(0);        Runnable task = new Runnable() {            @Override            public void run() {                try {                    System.out.println(atomicInteger.incrementAndGet()                            + "号顾客来理发,为其理发的理发师是:"                            + Thread.currentThread().getName());                } catch (Exception e) {                    e.printStackTrace();                }            }        };                for (int i = 0; i < 30; i++) {            myThreadPool.execute(task);        }        Thread.sleep(5000);        System.out.println("================================================");                for (int i = 0; i < 30; i++) {            myThreadPool.execute(task);        }        //让主线程阻塞期待        System.in.read();    }    }

测试后果: