生产者消费者模式在咱们日常工作中用得十分多,比方:在 模块解耦、音讯队列、分布式场景 中都很常见。这个模式里有三个角色,他们之间的关系是如下图这样的:
- 生产者线程:生产音讯、数据
- 消费者线程:生产音讯、数据
- 阻塞队列:作数据缓冲、均衡二者能力,避免出现 ” 产能过剩 ” 的状况(生产者生产速度远高于消费者生产速度 or 多个生产者对一个消费者)以及 ” 供不应求 ” 的状况(生产者生产速度远低于消费者生产速度 or 多个消费者对一个生产者)
从图中 3 和 4 能够晓得:无论阻塞队列是满还是空都可能会产生阻塞,阻塞之后就要在适合的时候去唤醒被阻塞的线程。
Q1:那什么时候会唤醒阻塞线程?
- 当消费者判断队列为空时,消费者线程进入期待。这期间生产者一旦往队列中放入数据,就会告诉所有的消费者,唤醒阻塞的消费者线程。
- 反之,当生产者判断队列已满,生产者线程进入期待。这期间消费者一旦生产了数据、队列有空位,就会告诉所有的生产者,唤醒阻塞的生产者线程。
Q2:为什么要用这种模式?
看了下面的 Q1,大家发现没有?生产者不必管消费者的动作,消费者也不必管生产者的动作;它两之间就是通过阻塞队列通信,实现了 解耦 ;阻塞队列的退出, 均衡二者能力 ;生产者只有在队列满或消费者只有在队列空时才会期待,其余工夫谁抢到锁谁工作, 提高效率。以上就是起因~
应用 wait、notify/notifyAll 实现
上篇文章《正确应用 wait、notify/notifyAll》说过,wait 让以后线程期待并开释锁,notify 唤醒任意一个期待同一个锁的线程,notifyAll 则是唤醒所有期待该锁的线程,而后谁抢到锁,谁执行。这就是所谓的 期待唤醒机制
先来看看用期待唤醒机制如何实现生产者、消费者模式的,首先是阻塞队列:
public class MyBlockingQueue {
private int maxSize;
private LinkedList<Integer> queue;
public MyBlockingQueue(int size) {
this.maxSize = size;
queue = new LinkedList<>();}
public synchronized void put() throws InterruptedException {while (queue.size() == maxSize) {System.out.println("队列已满,生产者:" + Thread.currentThread().getName() +"进入期待");
wait();}
Random random = new Random();
int i = random.nextInt();
System.out.println("队列未满,生产者:" +
Thread.currentThread().getName() +"放入数据" + i);
// 队列空才去唤醒消费者,其余工夫自由竞争锁
if (queue.size() == 0) {notifyAll();
}
queue.add(i);
}
public synchronized void take() throws InterruptedException {while (queue.size() == 0) {System.out.println("队列为空,消费者:" + Thread.currentThread().getName() +"进入期待");
wait();}
// 队列满了才去唤醒生产者,其余工夫自由竞争锁
if (queue.size() == maxSize) {notifyAll();
}
System.out.println("队列有数据,消费者:" +
Thread.currentThread().getName() +"取出数据:" + queue.remove());
}
}
次要逻辑在阻塞队列这边:先看 put 办法,while 查看队列是否满?满则进入期待并被动开释锁,不满则生产数据,同时判断放入数据之前队列是否空?空则唤醒消费者(因为队列已有数据,可生产)。
再看 take 办法,while 查看队列是否空?空则进入期待并被动开释锁,不空则生产数据,同时判断取出数据之前队列是否已满?满则唤醒生产者(因为队列已有空位,可生产)。
为什么是 while 不是 if?
大家可能有个疑难。为什么判断队列 size 进入期待状态这里是用 while,不能用 if 吗?就这个 demo 而言,是能够的。因为咱们的生产者和消费者线程都只有一个,然而多线程状况下用 if 就大错特错了。设想以下状况:
- 假如有两个消费者一个生产者。队列为空,消费者一进入期待状态,开释锁。消费者二抢到锁,进入 if(queue.size == 0) 的判断,也进入期待,开释锁。这时生产者抢到锁生产数据,队列有数据了。反过来唤醒两个消费者。
- 消费者一抢到锁执行 wait() 后的逻辑,取完数据开释锁。这时消费者二拿到锁,执行 wait() 后的逻辑取数据,然而此时队列的数据已被消费者一取出,没有数据了,这时就会报异样了。
而用 while 为什么能够?因为不论是消费者一还是二抢到锁,循环体的逻辑之前。依据 while 的语法,它会再一次判断条件是否成立,而 if 不会。这就是用 while 不必 if 的起因。
生产者:
public class Producer implements Runnable {
private MyBlockingQueue myBlockingQueue;
public Producer(MyBlockingQueue myBlockingQueue) {this.myBlockingQueue = myBlockingQueue;}
@Override
public void run() {for (int i = 0; i < 100; i++) {
try {myBlockingQueue.put();
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
}
消费者:
public class Consumer implements Runnable{
private MyBlockingQueue myBlockingQueue;
public Consumer(MyBlockingQueue myBlockingQueue) {this.myBlockingQueue = myBlockingQueue;}
@Override
public void run() {for (int i = 0; i < 100; i++) {
try {myBlockingQueue.take();
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
}
测试类:
public class MyBlockingQueueTest {public static void main(String[] args) {MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
Producer producer = new Producer(myBlockingQueue);
Consumer consumer = new Consumer(myBlockingQueue);
new Thread(producer).start();
new Thread(consumer).start();}
}
应用 Condition 实现
Condition 是一个 多线程间协调通信的工具类,它的 await、sign/signAll 办法正好对应 Object 的 wait、notify/notifyAll 办法 。相比于 Object 的 wait、notify 办法,Condition 的 await、signal 联合的形式实现线程间合作更加平安和高效,所以更举荐这种形式实现线程间合作。 对于 Condition 前面章节会持续钻研,敬请关注
Object 的 wait、notify 形式须要联合 synchronized 关键字实现期待唤醒机制,同样 Condition 也须要联合 Lock 类 -。那么这种形式如何实现生产者、消费者模式?看代码:
public class MyBlockingQueueForCondition {
private Queue<Integer> queue;
private int max = 10;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public MyBlockingQueueForCondition(int size) {
this.max = size;
queue = new LinkedList();}
public void put(Integer i) throws InterruptedException {
// 加锁
lock.lock();
try {
// 队列满了,进入期待
while (queue.size() == max) {System.out.println("队列已满,生产者:" + Thread.currentThread().getName() + "进入期待");
notFull.await();}
// 退出数据之前,队列为空?告诉消费者,能够生产
if (queue.size() == 0) {notEmpty.signalAll();
}
// 否则,持续生产
queue.add(i);
} finally {
// 最初别忘记开释锁
lock.unlock();}
}
public Integer take() throws InterruptedException {
// 加锁
lock.lock();
try {
// 队列无数据,进入期待
while (queue.size() == 0) {System.out.println("队列为空,消费者:" + Thread.currentThread().getName() + "进入期待");
notEmpty.await();}
// 取出数据之前,队列已满?告诉生产者,能够生产
if (queue.size() == max) {notFull.signalAll();
}
// 否则,取出
return queue.remove();} finally {
// 最初别忘记开释锁
lock.unlock();}
}
}
首先,定义了一个队列以及 ReentrantLock 类型的锁,在这根底上还创立 notFull、notEmpty 两个条件,别离代表未满、不为空的条件。最初定义了 take、put 办法。
take 和 put 逻辑差不多,这里只说 put。因为生产生产模式必定用于多线程环境,须要保障同步。这里还是先获取锁,确保同步。之后仍然是判断队列是否已满?满了进入期待并开释锁,不满则持续生产,同时判断队列在生产前是否为空,为空才去唤醒消费者。否则不唤醒,因为当队列为空消费者才进入阻塞。
PS:最初是一个十分重要的细节,在 finally 外面开释锁,否则有可能出现异常无奈开释锁的状况。
生产者:
public class ProducerForCondition implements Runnable {
private MyBlockingQueueForCondition myBlockingQueueForCondition;
public ProducerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {this.myBlockingQueueForCondition = myBlockingQueueForCondition;}
@Override
public void run() {for (int i = 0; i < 100; i++) {
try {myBlockingQueueForCondition.put(i);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
}
消费者:
public class ConsumerForCondition implements Runnable{
private MyBlockingQueueForCondition myBlockingQueueForCondition;
public ConsumerForCondition(MyBlockingQueueForCondition myBlockingQueueForCondition) {this.myBlockingQueueForCondition = myBlockingQueueForCondition;}
@Override
public void run() {for (int i = 0; i < 100; i++) {
try {System.out.println("消费者取出数据:" + myBlockingQueueForCondition.take());
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
}
测试类:
public class MyBlockingQueueForConditionTest {public static void main(String[] args) {MyBlockingQueueForCondition myBlockingQueueForCondition = new MyBlockingQueueForCondition(10);
ProducerForCondition producerForCondition = new ProducerForCondition(myBlockingQueueForCondition);
ConsumerForCondition consumerForCondition = new ConsumerForCondition(myBlockingQueueForCondition);
new Thread(producerForCondition).start();
new Thread(consumerForCondition).start();}
}
应用 BlockingQueue 实现
看完前两种形式之后,有些小伙伴可能会说,实现个生产者消费者这么烦么?其实次要代码还是在阻塞队列,这点 Java 早就为咱们思考好了,它提供了 BlockingQueue 接口,并有实现类:ArrayBlockingQueue、DelayQueue、LinkedBlockingDeque、LinkedBlockingQueue、等。(对于阻塞队列,狗哥的多线程系列前面也会讲到)
咱们选用最简略的 ArrayBlockingQueue 实现。它的外部也是采取 ReentrantLock 和 Condition 联合的期待唤醒机制。所以,下面的两种形式其实是为这种形式铺垫。不多比比,上代码:
public class ArrayBlockingQueueTest {public static void main(String[] args) {
// 初始化长度为 10 的 ArrayBlockingQueue
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 生产者
Runnable producer = () -> {
try {
// 放入数据
Random random = new Random();
while (true) {queue.put(random.nextInt());
}
} catch (Exception e) {System.out.println("生产数据出错:" + e.getMessage());
}
};
// 开启线程生产数据
new Thread(producer).start();
// 消费者
Runnable consumer = () -> {
try {
// 取出数据
while (true) {System.out.println(queue.take());
}
} catch (Exception e) {System.out.println("生产数据出错:" + e.getMessage());
}
};
// 开启线程生产数据
new Thread(consumer).start();}
}
创立一个 ArrayBlockingQueue 并给定最大长度为 10,创立生产者和消费者。生产者在 while(true) 外面始终生产,与此同时消费者也是一直取数据,有数据就取出来。
看着是不是很简略?但其实背地 ArrayBlockingQueue 曾经为咱们做好了线程间通信的工作了,比方队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。
伟人的肩膀
- https://kaiwu.lagou.com/course/courseInfo.htm?courseId=59#/detail/pc?id=1762
总结
看了这几个例子之后,置信你对生产者消费者模式也有所理解。当前面试官让你手写一个阻塞队列,必定也难不倒你。
小福利
如果看到这里,喜爱这篇文章的话,请帮点个难看。微信搜寻 一个优良的废人 ,关注后回复 电子书 送你 100+ 本编程电子书,不只 Java 哦,详情看下图。回复 1024送你一套残缺的 java 视频教程。