Java并发编程之线程间通讯(下)-生产者与消费者

40次阅读

共计 8664 个字符,预计需要花费 22 分钟才能阅读完成。

前文回顾
上一篇文章重点唠叨了 java 中协调线程间通信的 wait/notify 机制,它有力的保证了线程间通信的安全性以及便利性。本篇将介绍 wait/notify 机制的一个应用以及更多线程间通信的内容。
生产者 - 消费者模式
目光从厕所转到饭馆,一个饭馆里通常都有好多厨师以及好多服务员,这里我们把厨师称为生产者,把服务员称为消费者,厨师和服务员是不直接打交道的,而是在厨师做好菜之后放到窗口,服务员从窗口直接把菜端走给客人就好了,这样会极大的提升工作效率,因为省去了生产者和消费者之间的沟通成本。从 java 的角度看这个事情,每一个厨师就相当于一个生产者线程,每一个服务员都相当于一个消费者线程,而放菜的窗口就相当于一个缓冲队列,生产者线程不断把生产好的东西放到缓冲队列里,消费者线程不断从缓冲队列里取东西,画个图就像是这样:

现实中放菜的窗口能放的菜数量是有限的,我们假设这个窗口只能放 5 个菜。那么厨师在做完菜之后需要看一下窗口是不是满了,如果窗口已经满了的话,就在一旁抽根烟等待,直到有服务员来取菜的时候通知一下厨师窗口有了空闲,可以放菜了,这时厨师再把自己做的菜放到窗口上去炒下一个菜。从服务员的角度来说,如果窗口是空的,那么也去一旁抽根烟等待,直到有厨师把菜做好了放到窗口上,并且通知他们一下,然后再把菜端走。
我们先用 java 抽象一下菜:
public class Food {

private static int counter = 0;

private int i; // 代表生产的第几个菜

public Food() {
i = ++counter;
}

@Override
public String toString() {
return “ 第 ” + i + “ 个菜 ”;
}
}
每次创建 Food 对象,字段 i 的值都会加 1,代表这是创建的第几道菜。
为了故事的顺利进行,我们首先定义一个工具类:
class SleepUtil {

private static Random random = new Random();

public static void randomSleep() {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
SleepUtil 的静态方法 randomSleep 代表当前线程随机休眠一秒内的时间。
然后我们再用 java 定义一下厨师:
public class Cook extends Thread {

private Queue<Food> queue;

public Cook(Queue<Food> queue, String name) {
super(name);
this.queue = queue;
}

@Override
public void run() {

while (true) {
SleepUtil.randomSleep(); // 模拟厨师炒菜时间

Food food = new Food();
System.out.println(getName() + ” 生产了 ” + food);

synchronized (queue) {
while (queue.size() > 4) {
try {
System.out.println(“ 队列元素超过 5 个,为:” + queue.size() + ” ” + getName() + “ 抽根烟等待中 ”);
queue.wait();

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

queue.add(food);
queue.notifyAll();
}
}
}
}
我们说每一个厨师 Cook 都是一个线程,内部维护了一个名叫 queue 的队列。在 run 方法中是一个死循环,代表不断的生产 Food。他每生产一个 Food 后,都要判断 queue 队列中元素的个数是不是大于 4,如果大于 4 的话,就调用 queue.wait() 等待,如果不大于 4 的话,就把创建号的 Food 对象放到 queue 队列中,由于可能多个线程同时访问 queue 的各个方法,所以对这段代码用 queue 对象来加锁保护。当向队列添加完刚创建的 Food 对象之后,就可以通知 queue 这个锁对象关联的等待队列中的服务员线程们可以继续端菜了。
然后我们再用 java 定义一下服务员:
class Waiter extends Thread {

private Queue<Food> queue;

public Waiter(Queue<Food> queue, String name) {
super(name);
this.queue = queue;
}

@Override
public void run() {

while (true) {

Food food;
synchronized (queue) {
while (queue.size() < 1) {
try {
System.out.println(“ 队列元素个数为:” + queue.size() + “,” + getName() + “ 抽根烟等待中 ”);
queue.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

food = queue.remove();
System.out.println(getName() + ” 获取到:” + food);
queue.notifyAll();
}

SleepUtil.randomSleep(); // 模拟服务员端菜时间
}
}
}
每个服务员也是一个线程,和厨师一样,都在内部维护了一个名叫 queue 的队列。在 run 方法中是一个死循环,代表不断的从队列中取走 Food。每次在从 queue 队列中取 Food 对象的时候,都需要判断一下队列中的元素是否小于 1,如果小于 1 的话,就调用 queue.wait() 等待,如果不小于 1 的话,也就是队列里有元素,就从队列里取走一个 Food 对象,并且通知与 queue 这个锁对象关联的等待队列中的厨师线程们可以继续向队列里放入 Food 对象了。
在厨师和服务员线程类都定义好了之后,我们再创建一个 Restaurant 类,来看看在餐馆里真实发生的事情:
public class Restaurant {

public static void main(String[] args) {

Queue<Food> queue = new LinkedList<>();
new Cook(queue, “1 号厨师 ”).start();
new Cook(queue, “2 号厨师 ”).start();
new Cook(queue, “3 号厨师 ”).start();
new Waiter(queue, “1 号服务员 ”).start();
new Waiter(queue, “2 号服务员 ”).start();
new Waiter(queue, “3 号服务员 ”).start();
}
}
我们在 Restaurant 中安排了 3 个厨师和 3 个服务员,大家执行一下这个程序,会发现在如果厨师生产的过快,厨师就会等待,如果服务员端菜速度过快,服务员就会等待。但是整个过程厨师和服务员是没有任何关系的,它们是通过队列 queue 实现了所谓的解耦。
这个过程虽然不是很复杂,但是使用中还是需要注意一些问题:
我们这里的厨师和服务员使用同一个锁 queue。
使用同一个锁是因为对 queue 的操作只能用同一个锁来保护,假设使用不同的锁,厨师线程调用 queue.add 方法,服务员线程调用 queue.remove 方法,这两个方法都不是原子操作,多线程并发执行的时候会出现不可预测的结果,所以我们使用同一个锁来保护对 queue 这个变量的操作,这一点我们在唠叨设计线程安全类的时候已经强调过了。
厨师和服务员线程使用同一个锁 queue 的后果就是厨师线程和服务员线程使用的是同一个等待队列。
但是同一时刻厨师线程和服务员线程不会同时在等待队列中,因为当厨师线程在 wait 的时候,队列里的元素肯定是 5,此时服务员线程肯定是不会 wait 的,但是消费的过程是被锁对象 queue 保护的,所以在一个服务员线程消费了一个 Food 之后,就会调用 notifyAll 来唤醒等待队列中的厨师线程们;当消费者线程在 wait 的时候,队列里的元素肯定是 0,此时厨师线程肯定是不会 wait 的,生产的过程是被锁对象 queue 保护的,所以在一个厨师线程生产了一个 Food 对象之后,就会调用 notifyAll 来唤醒等待队列中的服务员线程们。所以同一时刻厨师线程和服务员线程不会同时在等待队列中。
在生产和消费过程,我们都调用了 SleepUtil.randomSleep();。
我们这里的生产者 - 消费者模型是把实际使用的场景进行了简化,真正的实际场景中生产过程和消费过程一般都会很耗时,这些耗时的操作最好不要放在同步代码块中,这样会造成别的线程的长时间阻塞。如果把生产过程和消费过程都放在同步代码块中,也就是说在一个厨师炒菜的同时不允许别的厨师炒菜,在一个服务员端菜的同时不允许别的程序员端菜,这个显然是不合理的,大家需要注意这一点。
以上就是 wait/notify 机制的一个现实应用:生产者 - 消费者模式的一个简介。
管道输入 / 输出流
还记得在唠叨 I / O 的时候提到的管道流么,这些管道流就是用于在不同线程之间的数据传输,一共有四种管道流:

PipedInputStream:管道输入字节流

PipedOutputStream:管道输出字节流

PipedReader:管道输入字符流

PipedWriter:管道输出字符流

字节流和字符流的用法是差不多的,我们下边以字节流为例来唠叨一下管道流的用法。
一个线程可以持有一个 PipedInputStream 对象,这个 PipedInputStream 对象在内部维护了一个字节数组,默认大小为 1024 字节。它并不能单独使用,需要与另一个线程持有的一个 PipedOutputStream 建立关联,PipedOutputStream 往该字节数组中写数据,PipedInputStream 从该字节数组中读数据,从而实现两个线程的通信。
PipedInputStream
先看一下它的几个构造方法:
它有一个特别重要的方法就是:
PipedOutputStream
看一下它的构造方法:它也有一个连接到管道输入流的方法:
使用示例
管道流的通常使用场景就是一个线程持有一个 PipedInputStream 对象,另一个线程持有一个 PipedOutputStream 对象,然后把这两个输入输出管道流通过 connect 方法建立连接,此后从管道输出流写入的数据就可以通过管道输入流读出,从而实现了两个线程间的数据交换,也就是实现了线程间的通信:
public class PipedDemo {

public static void main(String[] args){

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = new PipedOutputStream();

try {
in.connect(out); // 将输入流和输出流建立关联
} catch (IOException e) {
throw new RuntimeException(e);
}

new ReadThread(in).start();
new WriteThread(out).start();
}
}

class ReadThread extends Thread {

private PipedInputStream in;

public ReadThread(PipedInputStream in) {
this.in = in;
}

@Override
public void run() {

int i = 0;
try {
while ((i=in.read()) != -1) {// 从输入流读取数据
System.out.println(i);
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
in.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

class WriteThread extends Thread {

private PipedOutputStream out;

public WriteThread(PipedOutputStream out) {
this.out = out;
}

@Override
public void run() {
byte[] bytes = {1, 2, 3, 4, 5};
try {
out.write(bytes); // 向输出流写入数据
out.flush();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
out.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
执行结果是:
1
2
3
4
5
join 方法
我们前边说过这个方法,比如有代码是这样:
public static void main(String[] args) {

Thread t = new Thread(new Runnable() {

@Override
public void run() {
// … 线程 t 执行的具体任务
}
}, “t”);

t.start();

t.join();
System.out.println(“t 线程执行完了,继续执行 main 线程 ”);
}
在 main 线程中调用 t.join(),代表 main 线程需要等待 t 线程执行完成后才能继续执行。也就是说,这个 join 方法可以协调各个线程之间的执行顺序。它的实现其实很简单:
public final synchronized void join() throws InterruptedException {
while (isAlive()) {
wait();
}
}
需要注意的是,join 方法是 Thread 类的成员方法。上边例子中在 main 线程中调用 t.join() 的意思就是,使用 Thread 对象 t 作为锁对象,如果 t 线程还活着,就调用 wait(),把 main 线程放到与 t 对象关联的等待队列里,直到 t 线程执行结束,系统会主动调用一下 t.notifyAll(),把与 t 对象关联的等待队列中的线程全部移出,从而 main 线程可以继续执行~
当然它还有两个指定等待时间的重载方法:
java 线程的状态
java 为了方便的管理线程,对底层的操作系统的线程状态做了一些抽象封装,定义了如下的线程状态:
需要注意的是:

对于在操作系统中线程的运行 / 就绪状态,java 语言中统一用 RUNNABLE 状态来表示。
对于在操作系统中线程的阻塞状态,java 语言中用 BLOCKED、WAITING 和 TIME_WAITING 这三个状态分别表示。
也就是对阻塞状态进行了进一步细分。对于因为获取不到锁而产生的阻塞称为 BLOCKED 状态,因为调用 wait 或者 join 方法而产生的阻塞称为 WAITING 状态,因为调用有超时时间的 wait、join 或者 sleep 方法而产生的在有限时间内阻塞称为 TIME_WAITING 状态。

大家可以通过这个图来详细的看一下各个状态之间的转换过程:
java 这么划分线程的状态纯属于方便自己的管理,比如它会给在 WAITING 和 TIMED_WAITING 状态的线程分别建立不同的队列,来方便实施不同的恢复策略~所以大家也不用纠结为啥和操作系统中定义的不一样,其实操作系统中对各个状态的线程仍然有各种细分来方便管理,如果是你去设计一个语言或者一个操作系统,你也可以为了自己的方便来定义一下线程的各种状态。我们作为语言的使用者,首先还是把这些状态记住了再说哈????~
获取线程执行状态
java 中定义了一个 State 枚举类型,来表示线程的状态:
public class Thread implements Runnable {
// … 为节省篇幅,省略其它方法和字段

public enum State {
NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED;
}
}
然后又在 Thread 类里定义了一个成员方法:
public State getState() {
// 省略了具体的实现
}
我们可以通过这个 getState 方法来获取到对应的线程状态,下边来举个例子获取上边列举的 6 种状态,为了故事的顺利发展,我们先定义一个工具类:
public class LockUtil {

public static void sleep(long mill) {
try {
Thread.sleep(mill);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public static void wait(Object obj) {
try {
obj.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
因为每次调用 sleep 和 wait 操作的时候都会有 InterruptedException 的异常说明,我们都需要 try…catch 一下,会导致代码结构会很混乱,所以我们写了个工具类来把 InterruptedException 的异常转为运行时异常。注意,我们这里转为运行时异常只是为了代码结构清晰,真实情况需要认真处理 InterruptedException 的异常说明,具体怎么使用我们后边会详细唠叨。
然后接着写具体的获取状态的代码:
public class ThreadStateDemo {

private static Object lock = new Object(); // 锁对象

public static void main(String[] args) {

Thread t = new Thread(new Runnable() {

@Override
public void run() {

double d = 0.1;
int i = 0;
while (i++ < 100000) {// 模仿一个耗时操作
d = d*0.3d;
}

SleepUtil.sleep(2000L); // 休眠 2 秒钟
synchronized (lock) {
LockUtil.wait(lock);
}
synchronized (lock) {// 尝试获取 lock 锁

}
}
}, “t”);

System.out.println(“ 初始状态:” + t.getState());
t.start();
System.out.println(“ 运行一个耗时操作时的状态:” + t.getState());

SleepUtil.sleep(1000L);

System.out.println(“ 休眠时的状态:” + t.getState());

SleepUtil.sleep(2000L);

System.out.println(“wait 的状态:” + t.getState());
synchronized (lock) {
lock.notifyAll();
}
System.out.println(“ 被 notify 后的状态:” + t.getState());

synchronized (lock) {
SleepUtil.sleep(1000L); // 调用 sleep 方法不会释放锁
System.out.println(“ 因为获取锁而阻塞的状态:” + t.getState());
}
}
}
我们在程序里用了一系列的 sleep 方法来控制程序的执行顺序,这只是为了简单的说明线程的各个状态的产生原因,在真实环境中是不允许使用 sleep 方法来控制线程间的执行顺序的,应该使用同步或者我们上边介绍的一系列线程通信的方式。这个程序的执行结果是:
初始状态:NEW
运行一个耗时操作时的状态:RUNNABLE
休眠时的状态:TIMED_WAITING
wait 的状态:WAITING
被 notify 后的状态:BLOCKED
因为获取锁而阻塞的状态:TERMINATED
线程的各个状态都获取到了哈。
总结

基于 wait/notify 机制的生产者 - 消费者模式很重要,务必认真看几遍~
一个线程可以持有一个 PipedInputStream 对象,这个 PipedInputStream 对象在内部维护了一个字节数组,默认大小为 1024 字节。它并不能单独使用,需要与另一个线程持有的一个 PipedOutputStream 建立关联,PipedOutputStream 往该字节数组中写数据,PipedInputStream 从该字节数组中读数据,从而实现两个线程的通信。
使用 join 方法可以实现一个线程在另一个线程执行完毕后才继续执行的功能。
java 为了方便的管理线程,对底层的操作系统的线程状态做了一些抽象封装,定义了 NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED 这些线程状态,与操作系统中的线程有一些区别:

对于在操作系统中线程的运行 / 就绪状态,java 语言中统一用 RUNNABLE 状态来表示。
对于在操作系统中线程的阻塞状态,java 语言中用 BLOCKED、WAITING 和 TIME_WAITING 这三个状态分别表示。

题外话
写文章挺累的,有时候你觉得阅读挺流畅的,那其实是背后无数次修改的结果。如果你觉得不错请帮忙转发一下,万分感谢~

正文完
 0