乐趣区

关于并发:彻底了解线程池的原理40行从零开始自己写线程池

彻底理解线程池的原理——40 行从零开始本人写线程池

前言

在咱们的日常的编程当中,并发是始终离不开的主题,而在并发多线程当中,线程池又是一个不可躲避的问题。多线程能够进步咱们并发程序的效率,能够让咱们不去频繁的申请和开释线程,这是一个很大的花销,而在线程池当中就不须要去频繁的申请线程,他的次要原理是申请完线程之后并不中断,而是一直的去队列当中支付工作,而后执行,重复这样的操作。在本篇文章当中咱们次要是介绍线程池的原理,因而咱们会本人写一个十分非常简单的线程池,次要帮忙大家了解线程池的外围原理!!!

线程池给咱们提供的性能

咱们首先来看一个应用线程池的例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo01 {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 100; i++) {pool.execute(new Runnable() {
        @Override
        public void run() {for (int i = 0; i < 100; i++) {System.out.println(Thread.currentThread().getName() + "print" + i);
          }
        }
      });
    }
  }
}

在下面的例子当中,咱们应用 Executors.newFixedThreadPool 去生成来一个固定线程数目的线程池,在下面的代码当中咱们是应用 5 个线程,而后通过 execute 办法一直的去向线程池当中提交工作,大抵流程如下图所示:

线程池通过 execute 函数一直的往线程池当中的工作队列退出工作,而线程池当中的线程会一直的从工作队列当中取出工作,而后进行执行,而后持续取工作,继续执行 ….,线程的执行过程如下:

while (true) {Runnable runnable = taskQueue.take(); // 从工作队列当中取出工作
  runnable.run(); // 执行工作}

依据下面所谈到的内容,当初咱们的需要很清晰了,首先咱们须要有一个队列去存储咱们所须要的工作,而后须要开启多个线程一直的去工作队列当中取出工作,而后进行执行,而后反复取工作执行工作的操作。

工具介绍

在咱们后面提到的线程池实现的原理当中有一个十分重要的数据结构,就是 ArrayBlockingQueue 阻塞队列,它是一个并发平安的数据结构,咱们首先先简略介绍一下这个数据结构的应用办法。(如果你想深刻理解阻塞队列的实现原理,能够参考这篇文章 JDK 数组阻塞队列源码分析)

咱们次要用的是 ArrayBlockingQueue 的上面两个办法:

  • put函数,这个函数是往线程当中退出数据的。咱们须要理解的是,如果一个线程调用了这个函数往队列当中退出数据,如果此时队列曾经满了则线程须要被挂起,如果没有满则须要将数据退出到队列当中,也就是将数据存储到数组当中。
  • take函数,从队列当中取出数据,然而当队列为空的时候须要将调用这个办法的线程阻塞。当队列当中有数据的时候,就能够从队列当中取出数据。
  • 须要留神的是,如果一个线程被下面两个任何一个线程阻塞之后,能够调用对应线程的 interrupt 办法终止线程的执行,同时还会抛出一个异样。

上面是一份测试代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class QueueTest {public static void main(String[] args) throws InterruptedException {ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5); // 队列的容量为 5
    Thread thread = new Thread(() -> {for (int i = 0; i < 10; i++) {
        try {queue.put(i);
          System.out.println("数据" + i + "被退出到队列当中");
        } catch (InterruptedException e) {System.out.println("呈现了中断异样");
          // 如果呈现中断异样 则退出 线程就不会始终在 put 办法被挂起了
          return;
        }finally {}}
    });
    thread.start();
    TimeUnit.SECONDS.sleep(1);
    thread.interrupt();}
}

下面代码输入后果:

数据 0 被退出到队列当中
数据 1 被退出到队列当中
数据 2 被退出到队列当中
数据 3 被退出到队列当中
数据 4 被退出到队列当中
呈现了中断异样

下面代码的执行程序是:

线程 thread 会将 0 - 4 这 5 个数据退出到队列当中,然而在退出第 6 个数据的时候,阻塞队列曾经满了,因而在退出数据的时候线程 thread 会被阻塞,而后主线程在劳动一秒之后中断了线程 thread,而后线程thread 产生了中断异样,而后被捕捉进入 catch 代码块,而后函数返回,线程 thread 就不会始终被阻塞了,这一点在咱们前面写线程池十分重要!!!

Worker 设计

在前文当中咱们曾经提到了咱们的线程须要一直的去工作队列外面取出工作而后执行,咱们设计一个 Worker 类去做这件事!

  • 首先在类当中必定须要有一个线程池的工作队列,因为 worker 须要一直的从阻塞队列当中取出工作进行执行。
  • 咱们用一个 isStopped 变量示意线程是否须要终止了,也就是线程池是否须要敞开,如果线程池须要敞开了,那么线程也应该进行了。
  • 咱们还须要有一个变量记录执行工作的线程,因为当咱们须要敞开线程池的时候须要期待工作队列当中所有的工作执行实现,那么当所有的工作都执行执行实现的时候,队列必定是空的,而如果这个时候有线程还去取工作,那么必定会被阻塞,后面曾经提到了 ArrayBlockingQueue 的应用办法了,咱们能够应用这个线程的 interrupt 的办法去中断这个线程的执行,这个线程会出现异常,而后这个线程捕捉这个异样就能够退出了,因而咱们须要晓得对那个线程执行 interrupt 办法!

Worker实现的代码如下:

import java.util.concurrent.ArrayBlockingQueue;

public class Worker implements Runnable {

  // 用于保留工作的队列
  private ArrayBlockingQueue<Runnable> tasks;
  // 线程的状态 是否终止
  private volatile boolean isStopped;

  // 保留执行 run 办法的线程
  private volatile Thread thisThread;

  public Worker(ArrayBlockingQueue<Runnable> tasks) {
    // 这个参数是线程池当中传入的
    this.tasks = tasks;
  }

  @Override
  public void run() {thisThread = Thread.currentThread();
    while (!isStopped) {
      try {Runnable task = tasks.take();
        task.run();} catch (InterruptedException e) {// do nothing}
    }
  }
    // 留神是其余线程调用这个办法 同时须要留神是 thisThread 这个线程在执行下面的 run 办法
  // 其余线程调用 thisThread 的 interrupt 办法之后 thisThread 会出现异常 而后就不会始终阻塞了
  // 会判断 isStopped 是否为 true 如果为 true 的话就能够退出 while 循环了
  public void stop() {
    isStopped = true;
    thisThread.interrupt(); // 中断线程 thisThread}

  public boolean isStopped(){return isStopped;}
}

线程池设计

  • 首先线程池须要能够指定有多少个线程,阻塞队列的最大长度,因而咱们须要有这两个参数。
  • 线程池必定须要有一个队列去寄存通过 submit 函数提交的工作。
  • 须要有一个变量存储所有的 woker,因为线程池敞开的时候须要将这些 worker 都停下来,也就是调用 worker 的 stop 办法。
  • 须要有一个 shutDown 函数示意敞开线程池。
  • 须要有一个函数可能进行所有线程的执行,因为敞开线程池就是让所有线程的工作停下来。

线程池实现代码:

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;

public class MyFixedThreadPool {
  // 用于存储工作的阻塞队列
  private ArrayBlockingQueue<Runnable> taskQueue;

  // 保留线程池当中所有的线程
  private ArrayList<Worker> threadLists;

  // 线程池是否敞开
  private boolean isShutDown;

  // 线程池当中的线程数目
  private int numThread;

  public MyFixedThreadPool(int i) {this(Runtime.getRuntime().availableProcessors() + 1, 1024);
  }

  public MyFixedThreadPool(int numThread, int maxTaskNumber) {
    this.numThread = numThread;
    taskQueue = new ArrayBlockingQueue<>(maxTaskNumber); // 创立阻塞队列
    threadLists = new ArrayList<>();
    // 将所有的 worker 都保留下来
    for (int i = 0; i < numThread; i++) {Worker worker = new Worker(taskQueue);
      threadLists.add(worker);
    }
    for (int i = 0; i < threadLists.size(); i++) {new Thread(threadLists.get(i),
              "ThreadPool-Thread-" + i).start(); // 让 worker 开始工作}
  }
    
  // 进行所有的 worker 这个只在线程池要敞开的时候才会调用
  private void stopAllThread() {for (Worker worker : threadLists) {worker.stop(); // 调用 worker 的 stop 办法 让正在执行 worker 当中 run 办法的线程进行执行
    }
  }

  public void shutDown() {
    // 期待工作队列当中的工作执行实现
    while (taskQueue.size() != 0) {
      // 如果队列当中还有工作 则让出 CPU 的使用权
      Thread.yield();}
    // 在所有的工作都被执行实现之后 进行所有线程的执行
    stopAllThread();}

  public void submit(Runnable runnable) {
    try {taskQueue.put(runnable); // 如果工作队列满了,调用这个办法的线程会被阻塞
    } catch (InterruptedException e) {e.printStackTrace();
    }
  }
}

测试代码:

public class Test {public static void main(String[] args) {MyFixedThreadPool pool = new MyFixedThreadPool(5, 1024); // 开启 5 个线程 工作队列当中最多只能存 1024 个工作
    for (int i = 0; i < 1000000; i++) {pool.submit(() -> {System.out.println(Thread.currentThread().getName()); // 提交的工作就是打印线程本人的名字
      });
    }
    pool.shutDown();}
}

下面的代码是能够失常执行并且完结的,这个输入太长了这里只列出局部输入后果:

ThreadPool-Thread-0
ThreadPool-Thread-4
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-1
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-4

从下面的输入咱们能够看见线程池当中只有 5 个线程,这 5 个线程在一直从队列当中取出工作而后执行,因为咱们能够看到同一个线程的名字输入了屡次。

总结

在本篇文章当中次要介绍了线程池的原理,以及咱们应该去如何设计一个线程池,同时也介绍了在阻塞队列当中一个十分重要的数据结构 ArrayBlockingQueue 的应用办法。

  • 线程池当中有一个阻塞队列去寄存所有被提交到线程池当中的工作。
  • 所有的 Worker 会一直的从工作队列当中取出工作而后执行。
  • 线程池的 shutDown 办法其实比拟难思考该怎么实现的,首先在咱们真正敞开线程池之前须要将工作队列当中所有的工作执行实现,而后将所有的线程停下来。
  • 在所有的工作执行实现之后,可能有的线程就会阻塞在 take 办法上(从队列当中取数据的办法,如果队列为空线程会阻塞),好在 ArrayBlockingQueue 在实现的时候就思考到了这个问题,只须要其余线程调用了这个被阻塞线程的 interrupt 办法的话,线程就能够通过捕捉异样复原执行,而后判断 isStopped,如果须要进行了就跳出while 循环,这样的话咱们就能够实现所有线程的进行操作了。

以上就是本篇文章的所有内容了,我是LeHung,咱们下期再见!!!更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…

关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。

退出移动版