深度分析面试阿里字节跳动美团几乎都会被问到的阻塞队列

45次阅读

共计 7350 个字符,预计需要花费 19 分钟才能阅读完成。

基本概念

阻塞队列(BlockingQueue)是一个反对两个附加操作的队列。这两个附加的操作反对阻塞的插入和移除办法。
1)反对阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。

2)反对阻塞的移除办法:意思是在队列为空时,获取元素的线程会期待队列变为非空

阻塞队列一共有 7 种,咱们着重讲一下
ArrayBlockingQueue,
LinkedBlockingQueue,
DelayQueue,
SynchronousQueue
这四种阻塞队列

ArrayBlockingQueue

基于数组实现有界的阻塞队列(循环数组)

类的继承

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable

次要成员变量

private static final long serialVersionUID = -817911632652898426L;

   final Object[] items; // 底层存储元素的数组

   int takeIndex; // 进行取操作时的下标

   int putIndex;// 进行放操作时的下标
   
   int count;// 队列中元素的数量
   
   final ReentrantLock lock;// 阻塞时用的锁

   private final Condition notEmpty;// 满时的 condition 队列

   private final Condition notFull;// 空时的 condition 队列

结构器

参数有容量和全局锁是否是偏心锁

 public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();}

不必确定是否是偏心锁,默认是非偏心锁

public ArrayBlockingQueue(int capacity) {this(capacity, false);
    }

在第一个结构器的前提下,将整个汇合移入阻塞队列

public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {for (E e : c) {checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {lock.unlock();
        }
    }

次要办法

put()

public void put(E e) throws InterruptedException {checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {lock.unlock();
        }
    }

1. 首先判断增加的是否非空,是空的会抛出异样。
2. 给 put 办法上锁
3. 当汇合元素数量和汇合的长度相等时,调用 put 办法的线程将会被放入 notFull 队列上期待。
4. 如果不相等,则 enqueue(),也就是让 e 进入队列。
咱们再看看 enqueue()办法(入队办法)

 private void enqueue(E x) {// assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();}

其实就是让该元素入队,并且唤醒因为汇合空而期待的线程。

take()办法同理。

LinkedBlockingQueue

LinkedBlockingQueue 底层是基于链表实现的,所以其根本成员变量和 LinkedList 差不多。

类的继承关系

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable 

结构器

无参结构器,默认容量为最大容量

public LinkedBlockingQueue() {this(Integer.MAX_VALUE);
    }

手动设定容量

public LinkedBlockingQueue(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

将整个汇合挪入队列中,默认容量同样是最大容量。

public LinkedBlockingQueue(Collection<? extends E> c) {this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {putLock.unlock();
        }
    }

次要成员变量

链表就肯定会有节点
外部节点类
和 ArrayBlockingQueue 不同的是,它有两个全局锁,一个负责放元素,一个负责取元素。

static class Node<E> {
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        Node<E> next;

        Node(E x) {item = x;}
    }

除了节点之外。

private transient Node<E> last;// 尾节点

transient Node<E> head;// 头节点

private final AtomicInteger count = new AtomicInteger();// 计算以后阻塞队列中的元素个数 

private final int capacity;// 容量
 
 // 获取并移除元素时应用的锁,如 take, poll, etc
private final ReentrantLock takeLock = new ReentrantLock();

//notEmpty 条件对象,当队列没有数据时用于挂起执行删除的线程
private final Condition notEmpty = takeLock.newCondition();

// 增加元素时应用的锁如 put, offer, etc
private final ReentrantLock putLock = new ReentrantLock();

//notFull 条件对象,当队列数据已满时用于挂起执行增加的线程
private final Condition notFull = putLock.newCondition();

次要办法

put()办法

 public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();
       // Note: convention in all put/take/etc is to preset local var
       // holding count negative to indicate failure unless set.
       int c = -1;
       Node<E> node = new Node<E>(e);
       final ReentrantLock putLock = this.putLock;
       final AtomicInteger count = this.count;
       putLock.lockInterruptibly();
       try {while (count.get() == capacity) {notFull.await();
           }
           enqueue(node);
           c = count.getAndIncrement();
           if (c + 1 < capacity)
               notFull.signal();} finally {putLock.unlock();
       }
       if (c == 0)
           signalNotEmpty();}

根本和 ArrayBlockingQueue 一样,只是锁的数量不同,导致有一些轻微的区别。

代码示例

public class TestDemo16 {private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
    public static void main(String[] args) {new Thread("put"){
            @Override
            public void run() {
                // 增加元素
                for(int i=0; i<10; i++){System.out.println("put:"+i);
                    try {queue.put(i);
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                }
            }
        }.start();

        new Thread("take"){
            @Override
            public void run() {
                // 获取元素
                for(int i=0; i<10; i++){
                    try {System.out.println("take:"+queue.take());
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                }
            }
        }.start();}
}

DelayQueue

基于 PriorityQueue 延时阻塞队列,DelayQueue 中的元素只有当其延时工夫达到,才可能去以后队列中获取到该元素,DelayQueue 是一个无界队列。次要用于缓存零碎的设计、定时工作零碎的设计。
实现 DelayQueue 的三个步骤

第一步:继承 Delayed 接口

第二步:实现 getDelay(TimeUnit unit),该办法返回以后元素还须要延时多长时间,单位是纳秒

第三步:实现 compareTo 办法来指定元素的程序
例如;

class Test implements Delayed {
    private long time; //Test 实例延时工夫

    public Test(long time, TimeUnit unit){this.time = System.currentTimeMillis() + unit.toMillis(time);
    }

    @Override
    public long getDelay(TimeUnit unit) {return this.time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {long diff = this.time - ((Test)o).time;
        if(diff <= 0){return -1;}else{return 1;}
    }
}
        DelayQueue<Test> queue = new DelayQueue<>();
        queue.put(new Test(5, TimeUnit.SECONDS));
        queue.put(new Test(10, TimeUnit.SECONDS));
        queue.put(new Test(15, TimeUnit.SECONDS));

        System.out.println("begin time:"+ LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        for(int i=0; i<3; i++){
            try {Test test = queue.take();
            } catch (InterruptedException e) {e.printStackTrace();
            }
            System.out.println("time:"+LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        }

SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列。每一个 put 操作必须期待一个 take 操作,否则不能持续增加元素。它反对偏心拜访队列。默认状况下线程采纳非公平性策略拜访队列。应用以下构造方法能够创立公平性拜访的 SynchronousQueue,如果设置为 true,则期待的线程会采纳先进先出的程序拜访队列

SynchronousQueue 能够看成是一个传球手,负责把生产者线程解决的数据间接传递给消费者线程。队列自身并不存储任何元素,非常适合传递性场景。SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue.

 public static void main(String[] args) throws InterruptedException {SynchronousQueue queue=new SynchronousQueue();
        LinkedBlockingQueue
        new Thread("put"){
            @Override
            public void run() {System.out.println("put has started");
                for(int i=0;i<5;i++){System.out.println("put after takeThread");
                    try {queue.put((int)((Math.random() * 100) + 1));
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                }
                System.out.println("put has ended");
            }
        }.start();
        new Thread("take"){
            @Override
            public void run() {System.out.println("take has started");
                for(int i=0;i<5;i++){
                    try {System.out.println("take from putThread"+queue.take());
                    } catch (InterruptedException e) {e.printStackTrace();
                    }
                }
                System.out.println("put has ended");
            }
        }.start();}

总结

1:ArrayBlockingQueue 和 LinkedBlockingQueue 的区别和分割?
1)数据存储容器不一样,ArrayBlockingQueue 采纳数组去存储数据、LinkedBlockingQueue 采纳链表去存储数据。
2)ArrayBlockingQueue(循环数组)采纳数组去存储数据,不会产生额定的对象实例;LinkedBlockingQueue 采纳链表去存储数据,在插入和删除元素只与一个节点无关,须要去生成一个额定的 Node 对象,这可能长时间内须要并发解决大批量的数据,对于性能和前期 GC 会产生影响。
3)ArrayBlockingQueue 是有界的,初始化时必须要指定容量;LinkedBlockingQueue 默认是无界的,Integer.MAX_VALUE, 当增加速度大于删除速度、有可能造成内存溢出。
4) ArrayBlockingQueue 在读和写应用的锁是一样的,即增加操作和删除操作应用的是同一个 ReentrantLock,没有实现锁拆散;LinkedBlockingQueue 实现了锁拆散,增加的时候采纳 putLock、删除的时候采纳 takeLock, 这样能进步队列的吞吐量。
2:ArrayBlockingQueue 能够应用两把锁提高效率吗?
不能,次要起因是 ArrayBlockingQueue 底层循环数组来存储数据,LinkedBlockingQueue 底层 链表来存储数据,链表队列的增加和删除,只是和某一个节点无关,为了避免 head 和 last 相互影响,就须要有一个原子性的计数器,每个增加操作先退出队列,计数器 +1,这样是为了保障队列在移除的时候,长度是大于等于计数器的,通过原子性的计数器,使得以后增加和移除互不烦扰。对于循环数据来说,当咱们走到最初一个地位须要返回到第一个地位,这样的操作是无奈原子化,只能应用同一把锁来解决。

正文完
 0