共计 1970 个字符,预计需要花费 5 分钟才能阅读完成。
阻塞队列是一种生产者、消费者模式的利用;
ArrayBlockingQueue 从源码角度来看,其 实质是 condition 的一种利用
一、样例及原理
// == 1. 队列初始化
ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
// == 2. 入队
Thread threadA = new Thread(()->{
try {queue.put(new Object());
} catch (InterruptedException e) {e.printStackTrace();
}
});
threadA.start();
// == 3. 出队
Thread threadB = new Thread(()->{
try {Object object = queue.take();
} catch (InterruptedException e) {e.printStackTrace();
}
});
threadB.start();
1. 元素 A 退出队列并被生产流程
创立 ArrayBlockingQueue 时会构建一个数组,用来寄存元素;同时会创立一个 notEmpty 的 condition 条件。
①、生产者生产元素
元素 A 会寄存在数组中,同时会触发 notEmpty 这个 condition 的 signal 办法唤醒被阻塞的消费者。
②、消费者生产元素
此时另一个线程生产,本着 FIFO 准则元素 A 会被移除出数组,当数组元素的 length= 0 时,触发 await 办法阻塞消费者
2. 队列已满元素 N 被生产流程
见下图(逻辑相似,不做剖析了,懒~)
二、源码剖析
1. 初始化
public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化数组
this.items = new Object[capacity];
// fair=false, 非偏心锁
lock = new ReentrantLock(fair);
// 两个 condition,不空、不满
notEmpty = lock.newCondition();
notFull = lock.newCondition();}
2. 入队 put
public void put(E e) throws InterruptedException {checkNotNull(e);
final ReentrantLock lock = this.lock;
// ## 加锁,被中断则抛出异样(抛异样是 doAcquireInterruptibly()与 acquireQueued()的次要区别))lock.lockInterruptibly();
try {
// -- 1. 队列满了,await 阻塞
while (count == items.length){notFull.await();
}
// -- 2. 队列未满,入队
enqueue(e);
} finally {lock.unlock();
}
}
// -- 2. 队列未满,入队逻辑
java.util.concurrent.ArrayBlockingQueue#enqueue
private void enqueue(E x) {final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// ## 唤醒 notEmpty 的 condition
notEmpty.signal();}
3. 出队 take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
// -- 1. 队列空,则阻塞
while (count == 0)
notEmpty.await();
// -- 2. 队列不空,出队
return dequeue();} finally {lock.unlock();
}
}
// -- 2. 队列不空,出队
java.util.concurrent.ArrayBlockingQueue#dequeue
private E dequeue() {final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// ## 唤醒 notFull 条件
notFull.signal();
return x;
}
正文完