共计 2353 个字符,预计需要花费 6 分钟才能阅读完成。
一 前言
本文算是一次队列的学习笔记,Queue 模块实现了三种类型的队列,它们的区别仅仅是队列中元素被取回的顺序。在 FIFO
队列中,先添加的任务先取回。在 LIFO
队列中,最近被添加的元素先取回(操作类似一个堆栈)。优先级队列中,元素将保持排序(使用 heapq 模块) 并且最小值的条目第一个返回。
值得注意的是 Python 2.X 版本中调用队列需要引用
import Queue
而在 Python 3.X 版本中则需要import queue
二 队列特性
2.1 Queue 的常用函数
Queue 常用的方法:
qsize() 获取队列的元素个数。put(item [,block[, timeout]]): 往 queue 中放一个 item
get(item [,block[, timeout]]): 从 queue 中取出一个 item,并在队列中删除的这个 item
需要特别说明的是:
如果 block 为 True,timeout 为 None(也是默认的选项 ),那么 get()/put() 可能会阻塞,直到队列中出现可用的数据 / 位置。如果 timeout 是正整数,那么函数会阻塞直到超时 N 秒,然后抛出一个异常。
如果 block 为 False,如果队列无数据,调用 get()或者有无空余位置时调用 put(),就立即抛出异常(timeout 将会被忽略)。
task_done(): 表示前面排队的任务已经被完成。被队列的消费者线程使用。每个 get() 被用于获取一个任务,后续调用 task_done() 告诉队列,该任务的处理已经完成。join(): 队列中所有的元素都被接收和处理完毕之前程序一直阻塞。
在应用程序中,如果主程序调用了 join()则当前程序发生阻塞,当队列中所有的元素都被处理后,将解除阻塞 (意味着每个 put() 进队列的条目的 task_done()
都被收到)。如果 task_done()
被调用的次数多于放入队列中的项目数量,将引发 ValueError 异常。
我们通过程序向队列添加元素的时候,未完成任务的计数就会增加。每当消费者线程调用
task_done()
时表示这个元素已经被回收,涉及到该元素的业务逻辑已经完成,未完成计数就会减少。当未完成计数降到零的时候,程序便会解除 join()阻塞。
2.2 实践
我们用一个比较经典的案例 生产者和消费者模型,生产者生产馒头放到队列,消费者去队列里面获取馒头。
# encoding: utf-8
"""
author: yangyi@youzan.com
time: 2019/8/14 11:20 PM
func:
"""
from multiprocessing import Process, JoinableQueue, Lock
import time
import random
thread_lock = Lock()
def lock_print(msg):
with thread_lock:
print (msg)
def consumer(q):
while True:
res = q.get(block=True, timeout=3) # 如果为空 则等待 3 秒超时则报错退出
print('消费者拿到了 %s' % res)
q.task_done()
def producer(q):
for item in range(4):
time.sleep(random.randrange(1, 2))
q.put('馒头{0}'.format(item))
print('生产者做好了 %s' %'馒头{0}'.format(item))
q.join()
lock_print("生产结束")
if __name__ == '__main__':
print('主进程开始')
q = JoinableQueue()
pd = Process(target=producer, args=(q,))
cp = Process(target=consumer, args=(q,))
cp.daemon = True ##
pd.start()
cp.start()
pd.join()
print('主进程结束')
说明
这里生产者生产馒头并将馒头通过 put()
放到全局的队列中,消费者从使用 get()
队列中获取馒头然后调用 task_done()
通知队列中的馒头已经被消费者获取。
设置 cp.daemon = True
表示消费者进程会随主进程一起结束而结束。还有一种写法是
if __name__ == '__main__':
print('主进程开始')
q = JoinableQueue()
pd = Process(target=producer, args=(q,))
cp = Process(target=consumer, args=(q,))
pd.start()
cp.start()
pd.join()
cp.join()
print('主进程结束')
cp.join()
会让消费者进程一直等待生产者往队列放数据直到设置的超时时间。具体的逻辑需要结合自己程序的实际需求来定,是需要一直等待生产者生产数据还是随着主进程结束而结束。
三 总结
本文结合前面文章中介绍的多进程中的 守护进程和 join()方法,学习如何使用队列中的两个函数 task_done
和 join
。其实还有其他比较多的函数用法,需要深入的学习探索,感兴趣的朋友可以动手实践一下。
推荐阅读
https://docs.python.org/zh-cn/3/library/queue.html
https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/12_Thread_communication_using_a_queue.html
本公众号长期关注于数据库技术以及性能优化,故障案例分析,数据库运维技术知识分享,个人成长和自我管理等主题,欢迎扫码关注。