关于java:突击并发编程JUC系列阻塞队列-BlockingQueue

3次阅读

共计 7021 个字符,预计需要花费 18 分钟才能阅读完成。

突击并发编程 JUC 系列演示代码地址:
https://github.com/mtcarpenter/JavaTutorial

什么是阻塞队列

阻塞队列(BlockingQueue)是一个反对两个附加操作的队列。这两个附加的操作反对阻塞的插入和移除办法。

  • 反对阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 反对阻塞的移除办法:意思是在队列为空时,获取元素的线程会期待队列变为非空。

阻塞队列罕用于生产者和消费者的场景,生产者是向队列里增加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来寄存元素、消费者用来获取元素的容器。

插入和移除操作的 4 种解决形式

办法 / 解决形式 抛出异样 返回非凡值 始终阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除办法 remove() poll() take() poll(time,unit)
查看办法 element() peek() 不可用 不可用
  • 抛出异样:当队列满时,如果再往队列里插入元素,会抛出 IllegalStateException(”Queue full”)异样。当队列空时,从队列里获取元素会抛出NoSuchElementException 异样。
  • 返回非凡值:当往队列插入元素时,会返回元素是否插入胜利,胜利返回 true。如果是移除办法,则是从队列里取出一个元素,如果没有则返回 null。
  • 始终阻塞:当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会始终阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里 take 元素,队列会阻塞住消费者线程,直到队列不为空。
  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的工夫,生产者线程就会退出。

如果是无界阻塞队列,队列不可能会呈现满的状况,所以应用 put 或 offer 办法永远不会被阻塞,而且应用 offer 办法时,该办法永远返回 true。

ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列依照先进先出(FIFO)的准则对元素进行排序。
默认状况下不保障线程偏心的拜访队列,所谓偏心拜访队列是指阻塞的线程,能够依照阻塞的先后顺序拜访队列,即先阻塞线程先拜访队列。非公平性是对先期待的线程是非偏心的,当队列可用时,阻塞的线程都能够抢夺拜访队列的资格,有可能先阻塞的线程最初才拜访队列。为了保障公平性,通常会升高吞吐量。

阻塞式写办法

ArrayBlockingQueue 中提供了两个阻塞式写办法,别离如下(在该队列中,无论是阻塞式写办法还是非阻塞式写办法,都不容许写入 null)。

  • void put(E e):向队列的尾部插入新的数据,当队列已满时调用该办法的线程会进入阻塞,直到有其余线程对该线程执行了中断操作,或者队列中的元素被其余线程生产。
  • boolean offer(E e, long timeout, TimeUnit unit):向队列尾部写入新的数据,当队列已满时执行该办法的线程在指定的工夫单位内将进入阻塞,直到到了指定的超时工夫后,或者在此期间有其余线程对队列数据进行了生产。

put() 办法示例

public class ArrayBlockingQueueExample1 {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        try {queue.put("class 1");
            queue.put("class 2");
            queue.put("class 3");
            // 超过指定得容量以后线程阻塞
            queue.put("class 4");
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }
}

非阻塞式写办法

当队列已满时写入数据,如果不想使得以后线程进入阻塞,那么就能够应用非阻塞式的写操作方法。

  • boolean add(E e):向队列尾部写入新的数据,当队列已满时不会进入阻塞,然而该办法会抛出队列已满的异样。
  • boolean offer(E e):向队列尾部写入新的数据,当队列已满时不会进入阻塞,并且会立刻返回 false。

add() 办法示例

public class ArrayBlockingQueueExample2 {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        queue.add("class 1");
        queue.add("class 2");
        queue.add("class 3");
        //  超过指定容量 抛出异样
        queue.add("class 4");
    }
}
// 抛出异样

阻塞式读办法

  • E take():从队列头部获取数据,并且该数据会从队列头部移除,当队列为空时执行 take 办法的线程将进入阻塞,直到有其余线程写入新的数据,或者以后线程被执行了中断操作。
  • E poll(long timeout, TimeUnit unit):从队列头部获取数据并且该数据会从队列头部移除,如果队列中没有任何元素时则执行该办法,以后线程会阻塞指定的工夫,直到在此期间有新的数据写入,或者阻塞的以后线程被其余线程中断,当线程因为超时退出阻塞时,返回值为 null。

take() 办法示例

public class ArrayBlockingQueueExample3 {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        queue.add("class 1");
        queue.add("class 2");
        queue.add("class 3");
        try {
            // 取出对头元素
            System.out.println(queue.take());
        } catch (InterruptedException e) {e.printStackTrace();
        }
        // 队列大小 
        System.out.println(queue.size());
    }
}
//class 1
// 2

非阻塞式读办法

  • E poll():从队列头部获取数据并且该数据会从队列头部移除,当队列为空时,该办法不会使得以后线程进入阻塞,而是返回 null 值。
  • E peek():当队列为空时,该办法不会使得以后线程进入阻塞,而是返回 null 值。
public class ArrayBlockingQueueExample4 {public static void main(String[] args) {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        // 队列无元素 间接返回 null
        System.out.println(queue.poll());
        System.out.println(queue.peek());
    }
}
// null
// null

局部源码

    public void put(E e) throws InterruptedException {
        // 查看元素
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lockInterruptibly();
        try {
            // 元素满 始终阻塞,队列非满时,被唤醒
            while (count == items.length)
                notFull.await();
            // 入队
            enqueue(e);
        } finally {lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
         // 获取锁
        lock.lockInterruptibly();
        try {
            // 队列为空 期待
            while (count == 0)
                notEmpty.await();
            // 出队
            return dequeue();} finally {lock.unlock();
        }
    }

LinkedBlockingQueue

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列依照先进先出的准则对元素进行排序。

PriorityBlockingQueue

PriorityBlockingQueue是一个反对优先级的无界阻塞队列。默认状况下元素采取天然程序升序排列。也能够自定义类实现 compareTo() 办法来指定元素排序规定,或者初始化 PriorityBlockingQueue 时,指定结构参数 Comparator 来对元素进行排序。须要留神的是不能保障同优先级元素的程序。

public class PriorityBlockingQueueExample1 {public static void main(String[] args) {PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue();
        queue.offer(1);
        queue.offer(12);
        queue.offer(21);
        queue.offer(6);
        // 外部排序
        System.out.println(queue.poll()); // 1
        System.out.println(queue.poll()); // 6
        System.out.println(queue.poll()); // 12
        System.out.println(queue.poll()); //21

    }
}

DelayQueue

DelayQueue是一个反对延时获取元素的无界阻塞队列。队列应用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创立元素时能够指定多久能力从队列中获取以后元素。只有在提早期满时能力从队列中提取元素。

DelayQueue十分有用,能够将 DelayQueue 使用在以下利用场景。

  • 缓存零碎的设计:能够用 DelayQueue 保留缓存元素的有效期,应用一个线程循环查问 DelayQueue,一旦能从DelayQueue 中获取元素时,示意缓存有效期到了。
  • 定时任务调度:应用 DelayQueue 保留当天将会执行的工作和执行工夫,一旦从 DelayQueue 中获取到工作就开始执行,比方 TimerQueue 就是应用 DelayQueue 实现的。

DelayQueue队列的元素必须实现 Delayed 接口。咱们能够参考 ScheduledThreadPoolExecutorScheduledFutureTask类的实现。

public class DelayQueueExample1 {public static void main(String[] args) throws InterruptedException {DelayQueue<DelayedEntry> queue = new DelayQueue<>();
        // 延期 3 秒 解决
        queue.put(new DelayedEntry("A", 30000L));
        // 延期 10 秒解决
        queue.add(new DelayedEntry("B", 10000L));
        // 延期 20 秒解决
        queue.add(new DelayedEntry("C", 20000L));
        int size = queue.size();
        System.out.println("以后工夫是:" + LocalDateTime.now());
        // 从延时队列中获取元素,将输入 A,B,C
        for (int i = 0; i < size; i++) {System.out.println(queue.take() + "------" + LocalDateTime.now());
        }
    }
}

/**
 * 继承 Delayed 接口
 */
class DelayedEntry implements Delayed {
    /**
     * 元素数据内容
     */
    private final String value;
    /**
     * 用于计算生效工夫
     */
    private final long exeTime;

    DelayedEntry(String value, long exeTime) {
        this.value = value;
        this.exeTime = exeTime + System.currentTimeMillis();}

    @Override
    public long getDelay(TimeUnit unit) {return exeTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {DelayedEntry t = (DelayedEntry) o;
        if (this.exeTime < t.exeTime) {return -1;} else if (this.exeTime > t.exeTime) {return 1;} else {return 0;}

    }

    @Override
    public String toString() {
        return "DelayedEntry{" +
                "value=" + value +
                ", exeTime=" + exeTime +
                '}';
    }
}

// 以后工夫是:2020-10-15T16:26:37.167
//DelayedEntry{value=B, exeTime=1602750407104} ------ 2020-10-15T16:26:47.117
// DelayedEntry{value=C, exeTime=1602750417104} ------ 2020-10-15T16:26:57.105
//DelayedEntry{value=A, exeTime=1602750427104} ------ 2020-10-15T16:27:07.104

SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个 put 操作必须期待一个 take 操作,否则不能持续增加元素。
它反对偏心拜访队列。默认状况下线程采纳非公平性策略拜访队列。应用以下构造方法能够创立公平性拜访的SynchronousQueue,如果设置为 true,则期待的线程会采纳先进先出的程序拜访队列。

LinkedTransferQueue

LinkedTransferQueue是一个由链表构造组成的无界阻塞 TransferQueue 队列。绝对于其余阻塞队列,LinkedTransferQueue多了 tryTransfertransfer办法。

  • transfer 办法

    如果以后有消费者正在期待接管元素(消费者应用 take() 办法或带工夫限度的 poll()办法时),transfer 办法能够把生产者传入的元素立即 transfer(传输)给消费者。如果没有消费者在期待接管元素,transfer 办法会将元素寄存在队列的 tail 节点,并等到该元素被消费者生产了才返回。transfer 办法的要害代码如下

  • tryTransfer 办法

    tryTransfer办法是用来试探生产者传入的元素是否能间接传给消费者。如果没有消费者期待接管元素,则返回 false。和 transfer 办法的区别是 tryTransfer 办法无论消费者是否接管,办法立刻返回,而 transfer 办法是必须等到消费者生产了才返回。
    对于带有工夫限度的 tryTransfer(E e,long timeout,TimeUnit unit) 办法,试图把生产者传入的元素间接传给消费者,然而如果没有消费者生产该元素则期待指定的工夫再返回,如果超时还没生产元素,则返回 false,如果在超时工夫内生产了元素,则返回 true。

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表构造组成的双向阻塞队列。所谓双向队列指的是能够从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就缩小了一半的竞争。相比其余的阻塞队列,LinkedBlockingDeque多了 addFirstaddLastofferFirstofferLastpeekFirstpeekLast等办法,以 First 单词结尾的办法,示意插入、获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的办法,示意插入、获取或移除双端队列的最初一个元素。另外,插入方法 add 等同于 addLast,移除办法remove 等效于 removeFirst。然而take 办法却等同于 takeFirst,不晓得是不是JDK 的 bug,应用时还是用带有 FirstLast后缀的办法更分明。
在初始化LinkedBlockingDeque 时能够设置容量避免其适度收缩。另外,双向阻塞队列能够使用在“工作窃取”模式中。


欢送关注公众号 山间木匠, 我是小春哥,从事 Java 后端开发,会一点前端、通过继续输入系列技术文章以文会友,如果本文能为您提供帮忙,欢送大家关注、点赞、分享反对,_咱们下期再见!_

正文完
 0