乐趣区

关于并发:自己动手写乞丐版线程池

本人入手写乞丐版线程池

前言

在上篇文章线程池的前世今生当中咱们介绍了实现线程池的原理,在这篇文章当中咱们次要介绍实现一个十分简易版的线程池,深刻的去了解其中的原理,麻雀虽小,五脏俱全。

线程池的具体实现

线程池实现思路

工作保留到哪里?

在上一篇文章线程池的前世今生当中咱们具体去介绍了线程池当中的原理。在线程池当中咱们有很多个线程一直的从工作池(用户在应用线程池的时候一直的应用 execute 办法将工作增加到线程池当中)外面去拿工作而后执行,当初须要思考咱们应该用什么去实现工作池呢?

答案是阻塞队列,因为咱们须要保障在多个线程往工作池外面退出工作的时候并发平安,JDK 曾经给咱们提供了这样的数据结构——BlockingQueue,这个是一个并发平安的阻塞队列,他之所以叫做阻塞队列,是因为咱们能够设置队列当中能够包容数据的个数,当退出到队列当中的数据超过这个值的时候,试图将数据退出到阻塞队列当中的线程就会被挂起。当队列当中为空的时候,试图从队列当中取出数据的线程也会被挂起。

线程的设计

在咱们本人实现的线程池当中咱们定一个 Worker 类去一直的从工作池当中取出工作,而后进行执行。在咱们本人定义的 worker 当中还须要有一个变量 isStopped 示意线程是否进行工作。同时在 worker 当中还须要保留以后是哪个线程在执行工作,因而在咱们本人设计的 woker 类当中还须要有一个 thisThread 变量,保留正在执行工作的线程,因而 worker 的整体设计如下:

package cscore.concurrent.java.threadpool;

import java.util.concurrent.BlockingQueue;

public class Worker implements Runnable {

  private Thread thisThread; // 示意正在执行工作的线程
  private BlockingQueue<Runnable> taskQueue; // 由线程池传递过去的工作队列
  private volatile boolean isStopped; // 示意 worker 是否进行工作 须要应用 volatile 保障线程之间的可见性

  public Worker(BlockingQueue taskQueue) { // 这个构造方法是在线程池的实现当中会被调用
    this.taskQueue = taskQueue;
  }

  // 线程执行的函数
  @Override
  public void run() {thisThread = Thread.currentThread(); // 获取执行工作的线程
    while (!isStopped) { // 当线程没有进行的时候就一直的去工作池当中取出工作
      try {Runnable task = taskQueue.take(); // 从工作池当中取出工作 当没有工作的时候线程会被这个办法阻塞
        task.run(); // 执行工作 工作就是一个 Runnable 对象} catch (InterruptedException e) {
        // do nothing
        // 这个中央很重要 你有没有思考过一个问题当工作池当中没有工作的时候 线程会被阻塞在 take 办法上
        // 如果咱们前面没有工作提交拿他就会始终阻塞 那么咱们该如何唤醒他呢
        // 答案就在上面的函数当中 调用线程的 interruput 办法 那么 take 办法就会产生一个异样 而后咱们
        // 捕捉到一异样 而后线程退出
      }
    }
  }

  public synchronized void stopWorker() {if (isStopped) {throw new RuntimeException("thread has been interrupted");
    }
    isStopped = true;
    thisThread.interrupt(); // 中断线程产生异样}

  public synchronized boolean isStopped() {return isStopped;}
}
线程池的参数

在咱们本人实现的线程池当中,咱们只须要定义两个参数一个是线程的个数,另外一个是阻塞队列(工作池)当中最大的工作个数。在咱们本人实现的线程池当中还须要有一个变量 isStopped 示意线程池是否进行工作了,因而线程池的初步设计大抵如下:

  private BlockingQueue taskQueue; // 工作池
  private volatile boolean isStopped; // 
  private final List<Worker> workers = new ArrayList<>();// 保留所所有的执行工作的线程

  public ThreadPool(int numThreads, int maxTasks) {this.taskQueue = new ArrayBlockingQueue(maxTasks);
    for (int i = 0; i < numThreads; i++) {workers.add(new Worker(this.taskQueue));
    }
    int i = 1;
    // 这里产生线程 而后启动线程
    for (Worker worker : workers) {new Thread(worker, "ThreadPool-" + i + "-thread").start();
      i++;
    }
  }

线程池实现代码

在上文当中咱们大抵设计的线程池的初步构造,从下面的后果能够看出当咱们造一个 ThreadPool 对象的时候会产生指定线程的数目线程并且启动他们去执行工作,当初咱们还须要设计的就是如果敞开线程!咱们在敞开线程的时候还须要保障所有的工作都被执行实现而后才敞开所有的线程,再退出,咱们设计这个办法为shutDown。除此之外咱们还设计一个函数能够强制退出,不必执行所有的工作了,就间接退出,这个办法为stop。整个线程池实现的代码如下:

package cscore.concurrent.java.threadpool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ThreadPool {

  private BlockingQueue taskQueue;
  private volatile boolean isStopped;
  private final List<Worker> workers = new ArrayList<>();

  public ThreadPool(int numThreads, int maxTasks) {this.taskQueue = new ArrayBlockingQueue(maxTasks);
    for (int i = 0; i < numThreads; i++) {workers.add(new Worker(this.taskQueue));
    }
    int i = 1;
    for (Worker worker : workers) {new Thread(worker, "ThreadPool-" + i + "-thread").start();
      i++;
    }
  }

  // 上面这个办法是向线程池提交工作
  public void execute(Runnable runnable) throws InterruptedException {if (isStopped) {
      // 如果线程池曾经停下来了,就不在向工作队列当中提交工作了
      System.err.println("thread pool has been stopped, so quit submitting task");
      return;
    }
    taskQueue.put(runnable);
  }

  // 强制敞开线程池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {worker.stopWorker();
    }
  }

  public synchronized void shutDown() {
    // 先示意敞开线程池 线程就不能再向线程池提交工作
    isStopped = true;
    // 先期待所有的工作执行实现再敞开线程池
    waitForAllTasks();
    stop();}

  private void waitForAllTasks() {
    // 当线程池当中还有工作的时候 就不退出循环
    while (taskQueue.size() > 0)
      Thread.yield();}
}

线程池测试代码

package cscore.concurrent.java.threadpool;

public class TestPool {public static void main(String[] args) throws InterruptedException {ThreadPool pool = new ThreadPool(3, 1024);

    for (int i = 0; i < 10; i++) {
      int tmp = i;
      pool.execute(() -> {System.out.println(Thread.currentThread().getName() + "say hello" + tmp);
      });
    }
    pool.shutDown();}
}

下面的代码输入后果:

ThreadPool-2-thread say hello 1
ThreadPool-2-thread say hello 3
ThreadPool-2-thread say hello 4
ThreadPool-2-thread say hello 5
ThreadPool-2-thread say hello 6
ThreadPool-2-thread say hello 7
ThreadPool-2-thread say hello 8
ThreadPool-2-thread say hello 9
ThreadPool-3-thread say hello 2
ThreadPool-1-thread say hello 0

从下面的后果来看的确实现了线程池的成果。

杂谈

可能你会有疑难,当咱们调用 interrupt的时候是如何产生异样的,咱们认真看一个阻塞队列的实现。在 ArrayBlockingQueue 当中 take 办法实现如下:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {while (count == 0)
                notEmpty.await();
            return dequeue();} finally {lock.unlock();
        }
    }

在这个办法当中调用的是锁的 lock.lockInterruptibly(); 办法,当调用这个办法的时候线程是能够被 interrupt 办法中断的,而后会抛出 InterruptedException 异样。

总结

在本篇文章当中咱们次要实现了一个乞丐版的线程池,这个线程池离 JDK 给咱们提供的线程池还是有一点间隔,JDK 给咱们提供给的线程池还有很多其余的参数,咱们将在后续的几篇文章当中持续向 JDK 给咱们提供的线程池凑近,直至实现一个盗版的 JDK 的线程池。本篇文章的代码在上面的链接当中也能够拜访。


以上就是本篇文章的所有内容了,我是LeHung,咱们下期再见!!!更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…

关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。

退出移动版