本文首发自「慕课网」,想理解更多 IT 干货内容,程序员圈内热闻,欢送关注!
作者 | 慕课网精英讲师 朱广蔚
- 简介
生产者和消费者问题是线程模型中的经典问题:
生产者和消费者共享同一个存储空间
生产者往存储空间中增加产品,消费者从存储空间中取走产品
当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞
Python 的内置模块 queue 提供了对生产者和消费者模型的反对,模块 queue 定义了类 Queue,类 Queue 示意一个被生产者和消费者共享的队列,类 Queue 提供如下罕用办法:
办法
性能
get()
从队列中取走数据,如果队列为空,则阻塞
put(item)
向队列中搁置数据,如果队列为慢,则阻塞
join()
如果队列不为空,则期待队列变为空
task_done()
消费者从队列中取走一项数据,当队列变为空时,唤醒调用 join() 的线程
- 实现生产者消费者模型
创立生产者线程和消费者线程,应用一个共享队列连贯这两个线程,代码如下:
import threading
import queue
q = queue.Queue()
代码块 1234
导入 threading 模块和 queue 模块
创立共享队列 q
def produce():
for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
q.put(item)
print('produce %s' % item)
代码块 1234
创立生产者线程的入口函数 produce
生产者生产 8 个数据
调用 q.put(item) 将生产的数据放入到共享队列 q 中
def consume():
for i in range(8):
item = q.get()
print('consume %s' % item)
代码块 1234
创立消费者线程的入口函数 consume
消费者生产 8 个数据
调用 q.get() 从共享队列 q 中取走数据
producer = threading.Thread(target=produce, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
consumer.start()
producer.join()
consumer.join()
代码块 123456
创立生产者线程 producer,线程入口为 produce
创立消费者线程 consumer,线程入口为 consume
启动生产者线程和消费者线程,并期待它们完结
运行程序,输入后果如下:
produce a
produce b
consume a
produce c
consume b
consume c
produce d
consume d
produce e
consume e
produce f
consume f
produce g
consume g
produce h
consume h
代码块 12345678910111213141516
生产者生产了 8 个数据:a、b、c、d、e、f、g、h
消费者取走了 8 个数据:a、b、c、d、e、f、g、h
- 实现生产者、计算者、消费者模型
创立生产者、计算者、消费者线程:
生产者生产 8 个数据
计算者对生产者输入的数据进行加工,将加工后的数据送往消费者
消费者取走计算者输入的数据
import threading
import queue
q0 = queue.Queue()
q1 = queue.Queue()
代码块 12345
导入模块 threading 和模块 queue
应用两个共享队列连贯这三个线程共享队列 q0 连贯生产者和计算者共享队列 q1 连贯计算者和消费者
def produce():
for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:
q0.put(item)
print('produce %s' % item)
代码块 1234
创立生产者线程的入口函数 produce
生产者生产 8 个数据
调用 q0.put(item) 将生产的数据放入到共享队列 q0 中
def compute():
for i in range(8):
item = q0.get()
item = item.upper()
q1.put(item)
代码块 12345
创立计算者线程的入口函数 compute
调用 q0.get() 读取生产者输入数据,并进行加工
调用 q1.put(item) 将加工后的数据放入到共享队列 q1 中
def consume():
for i in range(8):
item = q1.get()
print('consume %s' % item)
代码块 1234
创立消费者线程的入口函数 consume
消费者生产 8 个数据
调用 q1.get() 从共享队列 q1 中取走数据
producer = threading.Thread(target=produce, args=())
computer = threading.Thread(target=compute, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
computer.start()
consumer.start()
producer.join()
computer.join()
consumer.join()
代码块 12345678910
创立生产者线程 producer,线程入口为 produce
创立计算者线程 computer,线程入口为 compute
创立消费者线程 consumer,线程入口为 consume
启动生产者线程、计算者线程、消费者线程,并期待它们完结
运行程序,输入后果如下:
produce a
produce b
produce c
consume A
produce d
produce e
consume B
produce f
consume C
produce g
consume D
produce h
consume E
consume F
consume G
consume H
代码块 12345678910111213141516
生产者生产了 8 个数据:a、b、c、d、e、f、g、h
计算者将数据加工为:A、B、C、D、E、F、G、H
消费者取走了 8 个数据:A、B、C、D、E、F、G、H
- 同步生产者与消费者的推动速度
在生产者、消费者模型中,可能会存在两者推动速度不匹配的问题:生产者生产数据的速度较快,然而,消费者取走数据的速度较慢。
能够应用 queue 的 task_done() 办法和 join() 办法同步生产者与消费者的推动速度:
生产者调用 join() 办法,期待队列中所有的数据被取走
消费者调用 task_done() 办法,示意取走了队列中的一项数据,当队列为空时,唤醒阻塞在 join() 办法中的生产者
import threading
import queue
q = queue.Queue()
代码块 1234
导入 threading 模块和 queue 模块
创立共享队列 q
def produce():
for item in ['A', 'B', 'C', 'D']:
q.put(item)
print('produce %s' % item)
q.join()
print('------------ q is empty')
for item in ['E', 'F', 'G', 'H']:
q.put(item)
print('produce %s' % item)
q.join()
print('------------ q is empty')
代码块 123456789101112
创立生产者线程的入口函数 produce
首先,生产 4 个数据:A、B、C、D 调用 q.put(item) 将它们放入到队列 q 中调用 q.join() 期待消费者将它们全副取走
而后,生产 4 个数据:E、F、G、G 调用 q.put(item) 将它们放入到队列 q 中调用 q.join() 期待消费者将它们全副取走
def consume():
for i in range(8):
item = q.get()
print('consume %s' % item)
q.task_done()
代码块 12345
创立消费者线程的入口函数 consume
调用 q.get() 从队列 q 中取走一个数据
调用 q.task_done(),示意曾经从队列 q 中取走了一个数据,当队列为空时,唤醒生产者
producer = threading.Thread(target=produce, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
consumer.start()
代码块 1234
创立生产者线程 producer,线程入口为 produce
创立消费者线程 consumer,线程入口为 consume
启动生产者线程和消费者线程,并期待它们完结
运行程序,输入后果如下:
produce A
produce B
consume A
consume B
produce C
consume C
produce D
consume D
———— q is empty
produce E
consume E
produce F
consume F
produce G
produce H
consume G
consume H
———— q is empty
代码块 123456789101112131415161718
生产者生产第一批数据 A、B、C、D,消费者将其取走
当第一批数据齐全被消费者取走后,生产者才开始生产第二批数据
生产者生产第二批数据 E、F、G、H,消费者将其取走
欢送关注「慕课网」,发现更多 IT 圈优质内容,分享干货常识,帮忙你成为更好的程序员!