关于java:JUCArrayBlockingQueue之Condition应用

32次阅读

共计 1970 个字符,预计需要花费 5 分钟才能阅读完成。

阻塞队列是一种生产者、消费者模式的利用;
ArrayBlockingQueue 从源码角度来看,其 实质是 condition 的一种利用

一、样例及原理

// == 1. 队列初始化
ArrayBlockingQueue queue = new ArrayBlockingQueue(100);

// == 2. 入队
Thread threadA = new Thread(()->{
    try {queue.put(new Object());
    } catch (InterruptedException e) {e.printStackTrace();
    }
});
threadA.start();

// == 3. 出队
Thread threadB = new Thread(()->{
    try {Object object = queue.take();
    } catch (InterruptedException e) {e.printStackTrace();
    }
});
threadB.start();

1. 元素 A 退出队列并被生产流程

创立 ArrayBlockingQueue 时会构建一个数组,用来寄存元素;同时会创立一个 notEmpty 的 condition 条件。
①、生产者生产元素
元素 A 会寄存在数组中,同时会触发 notEmpty 这个 condition 的 signal 办法唤醒被阻塞的消费者。
②、消费者生产元素
此时另一个线程生产,本着 FIFO 准则元素 A 会被移除出数组,当数组元素的 length= 0 时,触发 await 办法阻塞消费者

2. 队列已满元素 N 被生产流程

见下图(逻辑相似,不做剖析了,懒~)

二、源码剖析

1. 初始化

public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)
        throw new IllegalArgumentException();
    // 初始化数组
    this.items = new Object[capacity];
    // fair=false, 非偏心锁
    lock = new ReentrantLock(fair);
    // 两个 condition,不空、不满
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();}

2. 入队 put

public void put(E e) throws InterruptedException {checkNotNull(e);
    final ReentrantLock lock = this.lock;
    // ## 加锁,被中断则抛出异样(抛异样是 doAcquireInterruptibly()与 acquireQueued()的次要区别))lock.lockInterruptibly();
    try {
        // -- 1. 队列满了,await 阻塞
        while (count == items.length){notFull.await();
        }
        // -- 2. 队列未满,入队
        enqueue(e);
    } finally {lock.unlock();
    }
}


// -- 2. 队列未满,入队逻辑
java.util.concurrent.ArrayBlockingQueue#enqueue
private void enqueue(E x) {final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // ## 唤醒 notEmpty 的 condition
    notEmpty.signal();}

3. 出队 take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lockInterruptibly();
    try {
        // -- 1. 队列空,则阻塞
        while (count == 0)
            notEmpty.await();
        // -- 2. 队列不空,出队
        return dequeue();} finally {lock.unlock();
    }
}

// -- 2. 队列不空,出队
java.util.concurrent.ArrayBlockingQueue#dequeue
private E dequeue() {final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    // ## 唤醒 notFull 条件
    notFull.signal();
    return x;
}

正文完
 0