共计 4382 个字符,预计需要花费 11 分钟才能阅读完成。
生产者 – 消费者模型 Producer-consumer problem
是一个十分经典的多线程并发合作的模型,在分布式系统里十分常见。也是面试中无论中美大厂都十分爱考的一个问题,对应届生问的要少一些,然而对于有工作教训的工程师来说,十分爱考。
这个问题有十分多的版本和解决形式,在本文我重点是和大家壹齐理清思路,由浅入深的思考问题,保障大家看完了都能有所播种。
问题背景
简略来说,这个模型是由两类线程形成:
- 生产者线程:“生产”产品,并把产品放到一个队列里;
- 消费者线程:“生产”产品。
有了这个队列,生产者就只须要关注生产,而不必管消费者的消费行为,更不必期待消费者线程执行完;消费者也只管生产,不必管生产者是怎么生产的,更不必等着生产者生产。
所以该模型实现了生产者和消费者之间的 解藕 和异步。
什么是异步呢?
比如说你和你女朋友打电话,就得等她接了电话你们能力谈话,这是同步。
然而如果你跟她发微信,并不需要等她回复,她也不须要立即回复,而是等她有空了再回,这就是异步。
然而呢,生产者和消费者之间也不能齐全没有分割的。
- 如果队列里的产品曾经满了,生产者就不能持续生产;
- 如果队列里的产品从无到有,生产者就得告诉一下消费者,通知它能够来生产了;
- 如果队列里曾经没有产品了,消费者也无奈持续生产;
- 如果队列里的产品从满到不满,消费者也得去告诉下生产者,说你能够来生产了。
所以它们之间还须要有合作,最经典的就是应用 Object
类里自带的 wait()
和 notify()
或者 notifyAll()
的音讯告诉机制。
上述形容中的 等着,其实就是用 wait()
来实现的;
而 告诉,就是 notify()
或者 notifyAll()
。
那么基于这种音讯告诉机制,咱们还可能 均衡生产者和消费者之间的速度差别。
如果生产者的生产速度很慢,然而消费者生产的很快,就像是咱们每月工资就发两次,然而每天都要花钱,也就是 1:15.
那么咱们就须要调整生产者(发工资)为 15 个线程,消费者放弃 1 个线程,这样是不是很爽~
** 总结下该模型的三大长处:
解藕,异步,均衡速度差别。**
wait()/notify()
接下来咱们须要重点看下这个告诉机制。
wait()
和 notify()
都是 Java 中的 Object
类自带的办法,能够用来实现线程间的通信。
在上一节讲的 11 个 APIs 里我也提到了它,咱们这里再开展讲一下。
wait()
办法是用来让以后线程期待,直到有别的线程调用 notify()
将它唤醒,或者咱们能够设定一个工夫让它主动昏迷。
调用该办法之前,线程必须要取得该对象的对象监视器锁,也就是只能用在加锁的办法下。
而调用该办法之后,以后线程会开释锁。(提醒:这里很重要,也是下文代码中用 while
而非 if
的起因。)
notify()
办法只能告诉一个线程,如果多个线程在期待,那就唤醒任意一个。
notifyAll()
办法是能够唤醒所有期待线程,而后退出同步队列。
这里咱们用到了 2 个队列:
- 同步队列:对应于咱们上一节讲的线程状态中的
Runnable
,也就是线程准备就绪,就等着抢资源了。 - 期待队列:对应于咱们上一节讲的线程状态中的
Waiting
,也就是期待状态。
这里须要留神,从期待状态线程无奈间接进入 Q2,而是要先重新加入同步队列,再次期待拿锁,拿到了锁能力进去 Q2;一旦出了 Q2,锁就丢了。
在 Q2
里,其实只有一个线程,因为这里咱们必须要加锁能力进行操作。
实现
这里我首先建了一个简略的 Product
类,用来示意生产和生产的产品,大家能够自行添加更多的 fields
。
public class Product {
private String name;
public Product(String name) {this.name = name;}
public String getName() {return name;}
public void setName(String name) {this.name = name;}
}
主函数里我设定了两类线程,并且这里抉择用一般的 ArrayDeque
来实现 Queue
,更简略的形式是间接用 Java 中的 BlockingQueue
来实现。
BlockingQueue
是阻塞队列,它有一系列的办法能够让线程实现主动阻塞,罕用的 BlockingQueue
有很多,前面会独自出一篇文章来讲。
这里为了更好的了解并发协同的这个过程,咱们先本人解决。
public class Test {public static void main(String[] args) {Queue<Product> queue = new ArrayDeque<>();
for (int i = 0; i < 100; i++) {new Thread(new Producer(queue, 100)).start();
new Thread(new Consumer(queue, 100)).start();}
}
}
而后就是 Producer
和 Consumer
了。
public class Producer implements Runnable{
private Queue<Product> queue;
private int maxCapacity;
public Producer(Queue queue, int maxCapacity) {
this.queue = queue;
this.maxCapacity = maxCapacity;
}
@Override
public void run() {synchronized (queue) {while (queue.size() == maxCapacity) { // 肯定要用 while,而不是 if,下文解释
try {System.out.println("生产者" + Thread.currentThread().getName() + "期待中... Queue 已达到最大容量,无奈生产");
wait();
System.out.println("生产者" + Thread.currentThread().getName() + "退出期待");
} catch (InterruptedException e) {e.printStackTrace();
}
}
if (queue.size() == 0) { // 队列里的产品从无到有,须要告诉在期待的消费者
queue.notifyAll();}
Random random = new Random();
Integer i = random.nextInt();
queue.offer(new Product("产品" + i.toString()));
System.out.println("生产者" + Thread.currentThread().getName() + "生产了产品:" + i.toString());
}
}
}
其实它的主逻辑很简略,我这里为了不便演示加了很多打印语句才显得有点简单。
咱们把次要逻辑拎进去看:
public void run() {synchronized (queue) {while (queue.size() == maxCapacity) { // 肯定要用 while,而不是 if,下文解释
try {wait();
} catch (InterruptedException e) {e.printStackTrace();
}
}
if (queue.size() == 0) {queue.notifyAll();
}
queue.offer(new Product("产品" + i.toString()));
}
}
}
这里有 3 块内容,再对照这个过程来看:
- 生产者线程拿到锁后,其实就是进入了
Q2
阶段。首先查看队列是否容量已满,如果满了,那就要去Q3
期待; - 如果不满,先检查一下队列本来是否为空,如果原来是空的,那就须要告诉消费者;
- 最初生产产品。
这里有个问题,为什么只能用 while
而不是 if
?
其实在这一小段,生产者线程经验了几个过程:
- 如果队列已满,它就没法生产,那也不能占着地位不做事,所以要把锁让进去,去
Q3 - 期待队列
等着; - 在期待队列里被唤醒之后,不能间接夺过锁来,而是要先退出
Q1 - 同步队列
期待资源; - 一旦抢到资源,关门上锁,能力来到
Q2
继续执行wait()
之后的活,然而,此时这个队列有可能又满了,所以退出wait()
之后,还须要再次查看queue.size() == maxCapacity
这个条件,所以要用while
。
那么为什么可能又满了呢?
因为线程没有始终拿着锁,在被唤醒之后,到拿到锁之间的这段时间里,有可能其余的生产者线程先拿到了锁进行了生产,所以队列又经验了一个从不满到满的过程。
总结:在应用线程的期待告诉机制时,个别都要在 while
循环中调用 wait()
办法。
消费者线程是齐全对称的,咱们来看代码。
public class Consumer implements Runnable{
private Queue<Product> queue;
private int maxCapacity;
public Consumer(Queue queue, int maxCapacity) {
this.queue = queue;
this.maxCapacity = maxCapacity;
}
@Override
public void run() {synchronized (queue) {while (queue.isEmpty()) {
try {System.out.println("消费者" + Thread.currentThread().getName() + "期待中... Queue 已缺货,无奈生产");
wait();
System.out.println("消费者" + Thread.currentThread().getName() + "退出期待");
} catch (InterruptedException e) {e.printStackTrace();
}
}
if (queue.size() == maxCapacity) {queue.notifyAll();
}
Product product = queue.poll();
System.out.println("消费者" + Thread.currentThread().getName() + "生产了:" + product.getName());
}
}
}
后果如下:
小结
生产者 – 消费者问题是面试中常常会遇到的题目,本文首先讲了该模型的三大长处:解藕,异步,均衡速度差别,而后解说了期待 / 告诉的音讯机制以及在该模型中的利用,最初进行了代码实现。
文中所有代码曾经放到了我的 Github 上:https://github.com/xiaoqi6666/NYCSDE
。
这个 Github 汇总了我所有的文章和材料,之后也会始终更新和保护,还心愿大家帮忙点个 Star
,你们的反对和认可,就是我创作的最大能源,咱们下篇文章见!
我是小齐,纽约程序媛,终生学习者,每天晚上 9 点,云自习室里不见不散!