关于java并发:从零开始自己动手写阻塞队列

2次阅读

共计 7980 个字符,预计需要花费 20 分钟才能阅读完成。

从零开始本人入手写阻塞队列

前言

在咱们平时编程的时候一个很重要的工具就是容器,在本篇文章当中次要给大家介绍阻塞队列的原理,并且在理解原理之后本人入手实现一个低配版的阻塞队列。

需要剖析

在后面的两篇文章 ArrayDeque(JDK 双端队列)源码深度分析和深刻分析(JDK)ArrayQueue 源码当中咱们认真介绍了队列的原理,如果大家感兴趣能够查看一下!

而在本篇文章所谈到的阻塞队列当中,是在并发的状况下应用的,下面所谈到的是队列是 并发不平安 的,然而阻塞队列在并发下状况是平安的。阻塞队列的次要的需要如下:

  • 队列根底的性能须要有,往队列当中放数据,从队列当中取数据。
  • 所有的队列操作都要是 并发平安 的。
  • 当队列满了之后再往队列当中放数据的时候,线程须要被挂起,当队列当中的数据被取出,让队列当中有空间的时候线程须要被唤醒。
  • 当队列空了之后再往队列当中取数据的时候,线程须要被挂起,当有线程往队列当中退出数据的时候被挂起的线程须要被唤醒。
  • 在咱们实现的队列当中咱们应用数组去存储数据,因而在构造函数当中须要提供数组的初始大小,设置用多大的数组。

阻塞队列实现原理

线程阻塞和唤醒

在下面咱们曾经谈到了阻塞队列是 并发平安 的,而且咱们还有将线程唤醒和阻塞的需要,因而咱们能够抉择可重入锁 ReentrantLock 保障并发平安,然而咱们还须要将线程唤醒和阻塞,因而咱们能够抉择条件变量 Condition 进行线程的唤醒和阻塞操作,在 Condition 当中咱们将会应用到的,次要有以下两个函数:

  • signal用于唤醒线程,当一个线程调用 Conditionsignal函数的时候就能够唤醒一个被 await 函数阻塞的线程。
  • await用于阻塞线程,当一个线程调用 Conditionawait函数的时候这个线程就会阻塞。

数组循环应用

因为队列是一端进一端出,因而队列必定有头有尾。

当咱们往队列当中退出一些数据之后,队列的状况可能如下:

在上图的根底之上咱们在进行四次出队操作,后果如下:

在下面的状态下,咱们持续退出 8 个数据,那么布局状况如下:

咱们晓得上图在退出数据的时候不仅将数组后半局部的空间应用完了,而且能够持续应用前半部分没有应用过的空间,也就是说在队列外部实现了一个循环应用的过程。

为了保障数组的循环应用,咱们须要用一个变量记录队列头在数组当中的地位,用一个变量记录队列尾部在数组当中的地位,还须要有一个变量记录队列当中有多少个数据。

代码实现

成员变量定义

依据下面的剖析咱们能够晓得,在咱们本人实现的类当中咱们须要有如下的类成员变量:

// 用于爱护临界区的锁
private final ReentrantLock lock;
// 用于唤醒取数据的时候被阻塞的线程
private final Condition notEmpty;
// 用于唤醒放数据的时候被阻塞的线程
private final Condition notFull;
// 用于记录从数组当中取数据的地位 也就是队列头部的地位
private int takeIndex;
// 用于记录从数组当中放数据的地位 也就是队列尾部的地位
private int putIndex;
// 记录队列当中有多少个数据
private int count;
// 用于寄存具体数据的数组
private Object[] items;

构造函数

咱们的构造函数也很简略,最外围的就是传入一个数组大小的参数,并且给下面的变量进行初始化赋值。

@SuppressWarnings("unchecked")
public MyArrayBlockingQueue(int size) {this.lock = new ReentrantLock();
  this.notEmpty = lock.newCondition();
  this.notFull = lock.newCondition();
  // 其实能够不必初始化 类会有默认初始化 默认初始化为 0
  takeIndex = 0;
  putIndex = 0;
  count = 0;
  // 数组的长度必定不可能小于 0
  if (size <= 0)
    throw new RuntimeException("size can not be less than 1");
  items = (E[])new Object[size];
}

put 函数

这是一个比拟重要的函数了,在这个函数当中如果队列没有满,则间接将数据放入到数组当中即可,如果数组满了,则须要将线程挂起。

public void put(E x){
  // put 函数可能多个线程调用 然而咱们须要保障在给变量赋值的时候只可能有一个线程
  // 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作笼罩了前一个线程的赋值操作
  // 因而这里须要上锁
  lock.lock();

  try {
    // 如果队列当中的数据个数等于数组的长度的话 阐明数组曾经满了
    // 这个时候须要将线程挂起
    while (count == items.length)
      notFull.await(); // 将调用 await 的线程挂起
    // 当数组没有满 或者在挂起之后再次唤醒的话阐明数组当中有空间了
    // 这个时候须要将数组入队 
    // 调用入队函数将数据入队
    enqueue(x);
  } catch (InterruptedException e) {e.printStackTrace();
  } finally {
    // 解锁
    lock.unlock();}
}

// 将数据入队
private void enqueue(E x) {this.items[putIndex] = x;
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒}

offer 函数

offer 函数和 put 函数一样,然而与 put 函数不同的是,当数组当中数据填满之后 offer 函数返回false,而不是被阻塞。

public boolean offer(E e) {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    // 如果数组满了 则间接返回 false 而不是被阻塞
    if (count == items.length)
      return false;
    else {
      // 如果数组没有满则间接入队 并且返回 true
      enqueue(e);
      return true;
    }
  } finally {lock.unlock();
  }
}

add 函数

这个函数和下面两个函数作用一样,也是往队列当中退出数据,但当单队列满了之后这个函数会抛出异样。

public boolean add(E e) {if (offer(e))
    return true;
  else
    throw new RuntimeException("Queue full");
}

take 函数

这个函数次要是从队列当中取出一个数据,然而当队列为空的时候,这个函数会阻塞调用该函数的线程:

public E take() throws InterruptedException {
  // 这个函数也是不可能并发的 否则可能不同的线程取出的是同一个地位的数据
  // 进行加锁操作
  lock.lock();
  try {
    // 当 count 等于 0 阐明队列为空
    // 须要将线程挂起期待
    while (count == 0)
      notEmpty.await();
    // 当被唤醒之后进行出队操作
    return dequeue();}finally {lock.unlock();
  }
}

private E  dequeue() {final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  items[takeIndex] = null; // 将对应的地位设置为 null GC 就能够回收了
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--; // 队列当中数据少一个了
  // 因为出队了一个数据 能够唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程
  // 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒
  notFull.signal(); // 唤醒一个被 put 函数阻塞的线程
  return x;
}

重写 toString 函数

因为咱们在前面的测试函数当中会打印咱们这个类,而打印这个类的时候会调用对象的 toString 办法失去一个字符串,最初打印这个字符串。

@Override
public String toString() {StringBuilder stringBuilder = new StringBuilder();
  stringBuilder.append("[");
  // 这里须要上锁 因为咱们在打印的时候须要打印所有的数据
  // 打印所有的数据就须要对数组进行遍历操作 而在进行遍历
  // 操作的时候是不能进行插入和删除操作的 因为打印的是某
  // 个时刻的数据
  lock.lock();
  try {if (count == 0)
      stringBuilder.append("]");
    else {
      int cur = 0;
      // 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count
      // 个数据
      while (cur != count) {
        // 从 takeIndex 地位开始进行遍历 因为数据是从这个地位开始的
        stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ",");
        cur += 1;
      }
      // 删除掉最初一次没用的 ","
      stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
      stringBuilder.append(']');
    }
  }finally {lock.unlock();
  }
  return stringBuilder.toString();}

残缺代码

整个咱们本人实现的阻塞队列的代码如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyArrayBlockingQueue<E> {

  // 用于爱护临界区的锁
  private final ReentrantLock lock;
  // 用于唤醒取数据的时候被阻塞的线程
  private final Condition notEmpty;
  // 用于唤醒放数据的时候被阻塞的线程
  private final Condition notFull;
  // 用于记录从数组当中取数据的地位 也就是队列头部的地位
  private int takeIndex;
  // 用于记录从数组当中放数据的地位 也就是队列尾部的地位
  private int putIndex;
  // 记录队列当中有多少个数据
  private int count;
  // 用于寄存具体数据的数组
  private Object[] items;


  @SuppressWarnings("unchecked")
  public MyArrayBlockingQueue(int size) {this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.notFull = lock.newCondition();
    // 其实能够不必初始化 类会有默认初始化 默认初始化为 0
    takeIndex = 0;
    putIndex = 0;
    count = 0;
    if (size <= 0)
      throw new RuntimeException("size can not be less than 1");
    items = (E[])new Object[size];
  }

  public void put(E x){lock.lock();

    try {while (count == items.length)
        notFull.await();
      enqueue(x);
    } catch (InterruptedException e) {e.printStackTrace();
    } finally {lock.unlock();
    }
  }

  private void enqueue(E x) {this.items[putIndex] = x;
    if (++putIndex == items.length)
      putIndex = 0;
    count++;
    notEmpty.signal();}

  private E  dequeue() {final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
      takeIndex = 0;
    count--;
    notFull.signal();
    return x;
  }

  public boolean add(E e) {if (offer(e))
      return true;
    else
      throw new RuntimeException("Queue full");
  }

  public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {if (count == items.length)
        return false;
      else {enqueue(e);
        return true;
      }
    } finally {lock.unlock();
    }
  }

  public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {return (count == 0) ? null : dequeue();} finally {lock.unlock();
    }
  }

  public E take() throws InterruptedException {lock.lock();
    try {while (count == 0)
        notEmpty.await();
      return dequeue();}finally {lock.unlock();
    }
  }

  @Override
  public String toString() {StringBuilder stringBuilder = new StringBuilder();
    stringBuilder.append("[");
    lock.lock();
    try {if (count == 0)
        stringBuilder.append("]");
      else {
        int cur = 0;
        while (cur != count) {stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(",");
          cur += 1;
        }
        stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
        stringBuilder.append(']');
      }
    }finally {lock.unlock();
    }
    return stringBuilder.toString();}

}

当初对下面的代码进行测试:

咱们当初应用阻塞队列模仿一个生产者消费者模型,设置阻塞队列的大小为 5,生产者线程会往队列当中退出数据,数据为 0 - 9 的 10 个数字,消费者线程一共会生产 10 次。

import java.util.concurrent.TimeUnit;

public class Test {public static void main(String[] args) throws InterruptedException {MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5);
    Thread thread = new Thread(() -> {for (int i = 0; i < 10; i++) {System.out.println(Thread.currentThread().getName() + "往队列当中退出数据:" + i);
        queue.put(i);
      }
    }, "生产者");


    Thread thread1 = new Thread(() -> {for (int i = 0; i < 10; i++) {
        try {System.out.println(Thread.currentThread().getName() + "从队列当中取出数据:" + queue.take());
          System.out.println(Thread.currentThread().getName() + "以后队列当中的数据:" + queue);
        } catch (InterruptedException e) {e.printStackTrace();
        }
      }
    }, "消费者");
    thread.start();
    TimeUnit.SECONDS.sleep(3);
    thread1.start();}
}

下面代码的输入如下所示:

生产者 往队列当中退出数据:0
生产者 往队列当中退出数据:1
生产者 往队列当中退出数据:2
生产者 往队列当中退出数据:3
生产者 往队列当中退出数据:4
生产者 往队列当中退出数据:5
消费者 从队列当中取出数据:0
生产者 往队列当中退出数据:6
消费者 以后队列当中的数据:[1, 2, 3, 4, 5]
消费者 从队列当中取出数据:1
消费者 以后队列当中的数据:[2, 3, 4, 5]
消费者 从队列当中取出数据:2
消费者 以后队列当中的数据:[3, 4, 5, 6]
生产者 往队列当中退出数据:7
消费者 从队列当中取出数据:3
消费者 以后队列当中的数据:[4, 5, 6, 7]
消费者 从队列当中取出数据:4
消费者 以后队列当中的数据:[5, 6, 7]
消费者 从队列当中取出数据:5
消费者 以后队列当中的数据:[6, 7]
生产者 往队列当中退出数据:8
消费者 从队列当中取出数据:6
消费者 以后队列当中的数据:[7, 8]
消费者 从队列当中取出数据:7
消费者 以后队列当中的数据:[8]
消费者 从队列当中取出数据:8
消费者 以后队列当中的数据:[]
生产者 往队列当中退出数据:9
消费者 从队列当中取出数据:9
消费者 以后队列当中的数据:[]

从下面的输入后果咱们晓得,生产者线程打印 5 之后被挂起了,因为如果没有被挂起,生产者线程必定能够一次性输入实现,因为消费者线程阻塞了 3 秒。然而他没有输入实现阐明在打印 5 之后,因为阻塞队列满了,因此生产者线程被挂起了。而后消费者开始生产,这样阻塞队列当中就有空间了,生产者线程就能够持续生产了。

总结

在本篇文章当中,次要向大家介绍了阻塞队列的原理并且实现了一个低配版的数组阻塞队列,其实如果你理解数组队列和锁的话,这个代码实现起来还是绝对比较简单的,咱们只须要应用锁去保障咱们的程序并发平安即可。

  • 咱们在实现 put 函数的时候,如果以后队列曾经满了,则以后线程须要调用 await 函数进行阻塞,当线程被唤醒或者队列没有满能够继续执行的时候,咱们在往队列当中退出数据之后须要调用一次 signal 函数,因为这样能够唤醒在调用 take 函数的时候因为队列空而阻塞的线程。
  • 咱们实现 take 函数的时候,如果以后队列曾经空了,则以后线程也须要调用 await 函数进行阻塞,当线程被唤醒或者队列不为空线程能够继续执行,在出队之后须要调用一次 signal 函数,因为这样能够唤醒在调用 put 函数的时候因为队列满而阻塞的线程。

以上就是本篇文章的所有内容了,我是LeHung,咱们下期再见!!!更多精彩内容合集可拜访我的项目:https://github.com/Chang-LeHu…

关注公众号:一无是处的钻研僧,理解更多计算机(Java、Python、计算机系统根底、算法与数据结构)常识。

正文完
 0