前言
LinkedBlockingQueue实现了BlockingQueue,它是阻塞队列的一种,可用于线程池中。不同于ConcurrentLinkedQueue的CAS非阻塞算法,它底层是用锁实现的阻塞队列。
实现原理
先来看要害属性:
// 队列容量,最大为Integer.MAX_VALUEprivate final int capacity;// 队列长度private final AtomicInteger count = new AtomicInteger();// 头结点transient Node<E> head;// 尾结点private transient Node<E> last;// 移除操作的锁,take/poll办法用到private final ReentrantLock takeLock = new ReentrantLock();// 移除操作须要期待的条件notEmpty,与takeLock绑定private final Condition notEmpty = takeLock.newCondition();// 入队操作的锁,put/offer办法用到private final ReentrantLock putLock = new ReentrantLock();// 入队操作须要期待的条件notFull,与putLock绑定private final Condition notFull = putLock.newCondition();
能够看到,LinkedBlockingQueue外部是用单向链表实现的,并且它有两把锁:takeLock和putLock,以及对应的两个期待条件:notEmpty和notFull。takeLock管制同一时刻只有一个线程从队列头部获取/移除元素,putLock管制同一时刻只有一个线程在队列尾部增加元素。
再来看要害办法:
1.无参构造函数
public LinkedBlockingQueue() { // 调用有参构造函数,初始化容量capacity为int最大值 this(Integer.MAX_VALUE);}
2.有参构造函数
public LinkedBlockingQueue(int capacity) { // 容量不能小于0,留神也不能等于0,这点与惯例的汇合不同 if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; // 初始化头结点和尾结点为哑节点 last = head = new Node<E>(null);}
3.put()操作
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty();}
4.put()操作
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0;}
5.take()操作
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x;}
6.poll()操作
public E poll() { final AtomicInteger count = this.count; if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x;}
今天更新,先睡啦,晚安全世界!