乐趣区

关于后端:自己动手写线程池向JDK线程池进发

本人入手写线程池——向 JDK 线程池进发

前言

在后面的文章本人入手写乞丐版线程池中,咱们写了一个非常简单的线程池实现,这个只是一个非常简单的实现,在本篇文章当中咱们将要实现一个和 JDK 外部实现的线程池十分类似的线程池。

JDK 线程池一瞥

咱们首先看一个 JDK 给咱们提供的线程池 ThreadPoolExecutor 的构造函数的参数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

参数解释:

  • corePoolSize:这个参数你能够了解为线程池当中至多须要 corePoolSize 个线程,初始时线程池当中线程的个数为 0,当线程池当中线程的个数小于 corePoolSize 每次提交一个工作都会创立一个线程,并且先执行这个提交的工作,而后再去工作队列外面去获取新的工作,而后再执行。
  • maximumPoolSize:这个参数指的是线程池当中可能容许的最大的线程的数目,当工作队列满了之后如果这个时候有新的工作想要退出队列当中,当发现队列满了之后就创立新的线程去执行工作,然而须要满足最大的线程的个数不可能超过 maximumPoolSize。
  • keepAliveTime 和 unit:这个次要是用于工夫的示意,当队列当中多长时间没有数据的时候线程本人退出,后面谈到了线程池当中工作过多的时候会超过 corePoolSize,当线程池闲下来的时候这些多余的线程就能够退出了。
  • workQueue:这个就是用于保留工作的阻塞队列。
  • threadFactory:这个参数倒不是很重要,线程工厂。
  • handler:这个示意回绝策略,JDK 给咱们提供了四种策略:

    • AbortPolicy:抛出异样。
    • DiscardPolicy:放弃这个工作。
    • CallerRunPolicy:提交工作的线程执行。
    • DiscardOldestPolicy: 放弃等待时间最长的工作。

如果下面的参数你不可能了解,能够先浏览这篇文章本人入手写乞丐版线程池。基于下面谈到的参数,线程池当中提交工作的流程大抵如下图所示:

本人入手实现线程池

依据后面的参数剖析咱们本人实现的线程池须要实现一下性能:

  • 可能提交 Runnable 的工作和 Callable 的工作。
  • 线程池可能本人实现动静的扩容和所容,动静调整线程池当中线程的数目,当工作多的时候可能减少线程的数目,当工作少的时候多进去的线程可能主动退出。
  • 有本人的回绝策略,当工作队列满了,线程数也达到最大的时候,须要回绝提交的工作。

线程池参数介绍

  private AtomicInteger ct = new AtomicInteger(0); // 以后在执行工作的线程个数
  private int corePoolSize;
  private int maximumPoolSize;
  private long keepAliveTime;
  private TimeUnit unit;
  private BlockingQueue<Runnable> taskQueue;
  private RejectPolicy policy;

  private ArrayList<Worker> workers = new ArrayList<>();

  private volatile boolean isStopped;
  private boolean useTimed;

参数解释如下:

  • ct:示意以后线程池当中线程的个数。
  • corePoolSize:线程池当中外围线程的个数,意义和下面谈到的 JDK 的线程池意义统一。
  • maximumPoolSize:线程池当中最大的线程个数,意义和下面谈到的 JDK 的线程池意义统一。
  • keepAliveTime 和 unit:和 JDK 线程池的参数意义统一。
  • taskQueue:工作队列,用不保留提交的工作。
  • policy:回绝策略,次要有一下四种策略:
public enum RejectPolicy {

  ABORT,
  CALLER_RUN,
  DISCARD_OLDEST,
  DISCARD
}
  • workers:用于保留工作线程。
  • isStopped:线程池是否被敞开了。
  • useTimed:次要是用于示意是否应用下面的 keepAliveTime 和 unit,如果应用就是在肯定的工夫内,如果没有从工作队列当中获取到工作,线程就从线程池退出,然而须要保障线程池当中最小的线程个数不小于 corePoolSize。

实现 Runnable

  // 上面这个办法是向线程池提交工作
  public void execute(Runnable runnable) throws InterruptedException {checkPoolState();

    if (addWorker(runnable, false)  // 如果可能退出新的线程执行工作 退出胜利就间接返回
            || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 阐明提交工作失败 工作队列曾经满了
            || addWorker(runnable, true)) // 应用可能应用的最大的线程数 (maximumPoolSize) 看是否可能产生新的线程
      return;

    // 如果工作队列满了而且不可能退出新的线程 则回绝这个工作
    if (!taskQueue.offer(runnable))
      reject(runnable);
  }

在下面的代码当中:

  • checkPoolState 函数是查看线程池的状态,当线程池被停下来之后就不可能在提交工作:
  private void checkPoolState() {if (isStopped) {
      // 如果线程池曾经停下来了,就不在向工作队列当中提交工作了
      throw new RuntimeException("thread pool has been stopped, so quit submitting task");
    }
  }
  • addWorker 函数是往线程池当中提交工作并且产生一个线程,并且这个线程执行的第一个工作就是传递的参数。max 示意线程的最大数目,max == true 的时候示意应用 maximumPoolSize 否则应用 corePoolSize,当返回值等于 true 的时候示意执行胜利,否则示意执行失败。
  /**
   *
   * @param runnable 须要被执行的工作
   * @param max 是否应用 maximumPoolSize
   * @return boolean
   */
  public synchronized boolean addWorker(Runnable runnable, boolean max) {if (ct.get() >= corePoolSize && !max)
      return false;
    if (ct.get() >= maximumPoolSize && max)
      return false;
    Worker worker = new Worker(runnable);
    workers.add(worker);
    Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
    thread.start();
    return true;
  }

实现 Callable

这个函数其实比较简单,只须要将传入的 Callable 对象封装成一个 FutureTask 对象即可,因为 FutureTask 实现了 Callable 和 Runnable 两个接口,而后将这个后果返回即可,失去这个对象,再调用对象的 get 办法就可能失去后果。

  public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {checkPoolState();
    FutureTask<V> futureTask = new FutureTask<>(task);
    execute(futureTask);
    return futureTask;
  }

回绝策略的实现

依据后面提到的各种策略的具体实现形式,具体的代码实现如下所示:

  private void reject(Runnable runnable) throws InterruptedException {switch (policy) {
      case ABORT:
        throw new RuntimeException("task queue is full");
      case CALLER_RUN:
        runnable.run();
      case DISCARD: // 间接放弃这个工作
        return;
      case DISCARD_OLDEST:
        // 放弃等待时间最长的工作 也就是队列当中的第一个工作
        taskQueue.poll();
        execute(runnable); // 从新执行这个工作
    }
  }

线程池敞开实现

一共两种形式实现线程池敞开:

  • 间接敞开线程池,不论工作队列当中的工作是否被全副执行实现。
  • 平安敞开线程池,先期待工作队列当中所有的工作被执行实现,再敞开线程池,然而在这个过程当中不容许持续提交工作了,这一点曾经在函数 checkPoolState 当中实现了。
  // 强制敞开线程池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {worker.stopWorker();
    }
  }

  public synchronized void shutDown() {
    // 先示意敞开线程池 线程就不能再向线程池提交工作
    isStopped = true;
    // 先期待所有的工作执行实现再敞开线程池
    waitForAllTasks();
    stop();}

  private void waitForAllTasks() {
    // 当线程池当中还有工作的时候 就不退出循环
    while (taskQueue.size() > 0) {Thread.yield();
      try {Thread.sleep(1000);
      } catch (InterruptedException e) {e.printStackTrace();
      }
    }
  }

工作线程的工作实现

    @Override
    public void run() {
      // 先执行传递过去的第一个工作 这里是一个小的优化 让线程间接执行第一个工作 不须要
      // 放入工作队列再取出来执行了
      firstTask.run();

      thisThread = Thread.currentThread();
      while (!isStopped) {
        try {
          // 是否应用工夫就在这里显示进去了
          Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
          if (task == null) {
            int i;
            boolean exit = true;
            // 如果以后线程数大于外围线程数 则应用 CAS 去退出 用于保障在线程平安下的退出
            // 且保障线程的个数不小于 corePoolSize 上面这段代码须要仔细分析一下
            if (ct.get() > corePoolSize) {
              do{i = ct.get();
                if (i <= corePoolSize) {
                  exit = false;
                  break;
                }
              }while (!ct.compareAndSet(i, i - 1));
              if (exit) {return;}
            }
          }else {task.run();
          }
        } catch (InterruptedException e) {// do nothing}
      }
    }

咱们当初来仔细分析一下,线程退出线程池的时候是如何保障线程池当中总的线程数是不小于 corePoolSize 的!首先整体的框架是应用 CAS 进行实现,具体代码为 do … while 操作,而后在 while 操作外面应用 CAS 进行测试替换,如果没有胜利再次获取,当线程池当中外围线程的数目小于等于 corePoolSize 的时候也须要退出循环,因为线程池当中线程的个数不能小于 corePoolSize。因而应用 break 跳出循环的线程是不会退出线程池的。

线程池实现的 BUG

在咱们本人实现的线程池当中当线程退出的时候,workers 当中还保留这指向这个线程的对象,然而当线程退出的时候咱们还没有在 workers 当中删除这个对象,因而这个线程对象不会被垃圾回收器收集掉,然而咱们这个只是一个线程池实现的例子而已,并不用于生产环境,只是为了帮忙大家了解线程池的原理。

残缺代码

package cscore.concurrent.java.threadpoolv2;


import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPool {private AtomicInteger ct = new AtomicInteger(0); // 以后在执行工作的线程个数
  private int corePoolSize;
  private int maximumPoolSize;
  private long keepAliveTime;
  private TimeUnit unit;
  private BlockingQueue<Runnable> taskQueue;
  private RejectPolicy policy;

  private ArrayList<Worker> workers = new ArrayList<>();

  private volatile boolean isStopped;
  private boolean useTimed;

  public int getCt() {return ct.get();
  }

  public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy
          , int maxTasks) {
    // please add -ea to vm options to make assert keyword enable
    assert corePoolSize > 0;
    assert maximumPoolSize > 0;
    assert keepAliveTime >= 0;
    assert maxTasks > 0;

    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.unit = unit;
    this.policy = policy;
    this.keepAliveTime = keepAliveTime;
    taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks);
    useTimed = keepAliveTime != 0;
  }

  /**
   *
   * @param runnable 须要被执行的工作
   * @param max 是否应用 maximumPoolSize
   * @return boolean
   */
  public synchronized boolean addWorker(Runnable runnable, boolean max) {if (ct.get() >= corePoolSize && !max)
      return false;
    if (ct.get() >= maximumPoolSize && max)
      return false;
    Worker worker = new Worker(runnable);
    workers.add(worker);
    Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
    thread.start();
    return true;
  }

  // 上面这个办法是向线程池提交工作
  public void execute(Runnable runnable) throws InterruptedException {checkPoolState();

    if (addWorker(runnable, false)  // 如果可能退出新的线程执行工作 退出胜利就间接返回
            || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 阐明提交工作失败 工作队列曾经满了
            || addWorker(runnable, true)) // 应用可能应用的最大的线程数 (maximumPoolSize) 看是否可能产生新的线程
      return;

    // 如果工作队列满了而且不可能退出新的线程 则回绝这个工作
    if (!taskQueue.offer(runnable))
      reject(runnable);
  }

  private void reject(Runnable runnable) throws InterruptedException {switch (policy) {
      case ABORT:
        throw new RuntimeException("task queue is full");
      case CALLER_RUN:
        runnable.run();
      case DISCARD:
        return;
      case DISCARD_OLDEST:
        // 放弃等待时间最长的工作
        taskQueue.poll();
        execute(runnable);
    }
  }

  private void checkPoolState() {if (isStopped) {
      // 如果线程池曾经停下来了,就不在向工作队列当中提交工作了
      throw new RuntimeException("thread pool has been stopped, so quit submitting task");
    }
  }

  public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {checkPoolState();
    FutureTask<V> futureTask = new FutureTask<>(task);
    execute(futureTask);
    return futureTask;
  }

  // 强制敞开线程池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {worker.stopWorker();
    }
  }

  public synchronized void shutDown() {
    // 先示意敞开线程池 线程就不能再向线程池提交工作
    isStopped = true;
    // 先期待所有的工作执行实现再敞开线程池
    waitForAllTasks();
    stop();}

  private void waitForAllTasks() {
    // 当线程池当中还有工作的时候 就不退出循环
    while (taskQueue.size() > 0) {Thread.yield();
      try {Thread.sleep(1000);
      } catch (InterruptedException e) {e.printStackTrace();
      }
    }
  }

  class Worker implements Runnable {

    private Thread thisThread;

    private final Runnable firstTask;
    private volatile boolean isStopped;

    public Worker(Runnable firstTask) {this.firstTask = firstTask;}

    @Override
    public void run() {
      // 先执行传递过去的第一个工作 这里是一个小的优化 让线程间接执行第一个工作 不须要
      // 放入工作队列再取出来执行了
      firstTask.run();

      thisThread = Thread.currentThread();
      while (!isStopped) {
        try {Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
          if (task == null) {
            int i;
            boolean exit = true;
            if (ct.get() > corePoolSize) {
              do{i = ct.get();
                if (i <= corePoolSize) {
                  exit = false;
                  break;
                }
              }while (!ct.compareAndSet(i, i - 1));
              if (exit) {return;}
            }
          }else {task.run();
          }
        } catch (InterruptedException e) {// do nothing}
      }
    }

    public synchronized void stopWorker() {if (isStopped) {throw new RuntimeException("thread has been interrupted");
      }
      isStopped = true;
      thisThread.interrupt();}

  }

}

线程池测试

package cscore.concurrent.java.threadpoolv2;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

public class Test {public static void main(String[] args) throws InterruptedException, ExecutionException {var pool = new ThreadPool(2, 5, TimeUnit.SECONDS, 10, RejectPolicy.ABORT, 100000);

    for (int i = 0; i < 10; i++) {RunnableFuture<Integer> submit = pool.submit(() -> {System.out.println(Thread.currentThread().getName() + "output a");
        try {Thread.sleep(10);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return 0;
      });
      System.out.println(submit.get());
    }
    int n = 15;
    while (n-- > 0) {System.out.println("Number Threads =" + pool.getCt());
      Thread.sleep(1000);
    }
    pool.shutDown();}
}

下面测试代码的输入后果如下所示:

ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
Number Threads = 5
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 3
Number Threads = 2
Number Threads = 2
Number Threads = 2
Number Threads = 2

从下面的代码能够看出咱们实现了正确的工作实现后果,同时线程池当中的外围线程数从 2 变到了 5,当线程池当中工作队列全副别执行实现之后,线程的数目从新降下来了,这的确是咱们想要达到的后果。

总结

在本篇文章当中次要给大家介绍了如何实现一个相似于 JDK 中的线程池,外面有十分多的实现细节,大家能够认真捋一下其中的流程,对线程池的了解将会十分有帮忙。


以上就是本篇文章的所有内容了,我是LeHung,咱们下期再见!!!更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…

关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。

退出移动版