乐趣区

关于java:通俗易懂的JUC源码剖析LinkedBlockingQueue

前言

LinkedBlockingQueue 实现了 BlockingQueue,它是阻塞队列的一种,可用于线程池中。不同于 ConcurrentLinkedQueue 的 CAS 非阻塞算法,它底层是用锁实现的阻塞队列。

实现原理

先来看要害属性:

// 队列容量,最大为 Integer.MAX_VALUE
private final int capacity;
// 队列长度
private final AtomicInteger count = new AtomicInteger();
// 头结点
transient Node<E> head;
// 尾结点
private transient Node<E> last;
// 移除操作的锁,take/poll 办法用到
private final ReentrantLock takeLock = new ReentrantLock();
// 移除操作须要期待的条件 notEmpty,与 takeLock 绑定
private final Condition notEmpty = takeLock.newCondition();
// 入队操作的锁,put/offer 办法用到
private final ReentrantLock putLock = new ReentrantLock();
// 入队操作须要期待的条件 notFull,与 putLock 绑定
private final Condition notFull = putLock.newCondition();

能够看到,LinkedBlockingQueue 外部是用单向链表实现的,并且它有两把锁:takeLock 和 putLock,以及对应的两个期待条件:notEmpty 和 notFull。takeLock 管制同一时刻只有一个线程从队列头部获取 / 移除元素,putLock 管制同一时刻只有一个线程在队列尾部增加元素。

再来看要害办法:

1. 无参构造函数

public LinkedBlockingQueue() {
    // 调用有参构造函数,初始化容量 capacity 为 int 最大值
    this(Integer.MAX_VALUE);
}

2. 有参构造函数

public LinkedBlockingQueue(int capacity) {
    // 容量不能小于 0,留神也不能等于 0,这点与惯例的汇合不同
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    // 初始化头结点和尾结点为哑节点
    last = head = new Node<E>(null);
}

3.put() 操作

public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException(); 
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {while (count.get() == capacity) {notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();} finally {putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();}

4.put() 操作

public boolean offer(E e) {if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
           return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {if (count.get() < capacity) {enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();}
    } finally {putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

5.take() 操作

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {while (count.get() == 0) {notEmpty.await();
           }
           x = dequeue();
           c = count.getAndDecrement();
           if (c > 1)
               notEmpty.signal();} finally {takeLock.unlock();
     }
     if (c == capacity)
         signalNotFull();
     return x;
}

6.poll() 操作

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {if (count.get() > 0) {x = dequeue();
             c = count.getAndDecrement();
             if (c > 1)
                notEmpty.signal();}
    } finally {takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

今天更新,先睡啦,晚安全世界!

退出移动版