共计 4692 个字符,预计需要花费 12 分钟才能阅读完成。
起步
queue 模块提供实用于多线程编程的先进先出(FIFO)数据结构。因为它是线程平安的,所以多个线程很轻松地应用同一个实例。
源码剖析
先从初始化的函数来看:
class Queue: | |
def __init__(self, maxsize=0): | |
# 设置队列的最大容量 | |
self.maxsize = maxsize | |
self._init(maxsize) | |
# 线程锁,互斥变量 | |
self.mutex = threading.Lock() | |
# 由锁衍生出三个条件变量 | |
self.not_empty = threading.Condition(self.mutex) | |
self.not_full = threading.Condition(self.mutex) | |
self.all_tasks_done = threading.Condition(self.mutex) | |
self.unfinished_tasks = 0 | |
def _init(self, maxsize): | |
# 初始化底层数据结构 | |
self.queue = deque() |
从这初始化函数能失去哪些信息呢?首先,队列是能够设置其容量大小的,并且具体的底层寄存元素的它应用了 collections.deque() 双端列表的数据结构,这使得能很不便的做先进先出操作。这里还顺便形象为 _init 函数是为了不便其子类进行笼罩,容许子类应用其余构造来寄存元素(比方优先队列应用了 list)。
而后就是线程锁 self.mutex,对于底层数据结构 self.queue 的操作都要先取得这把锁;再往下是三个条件变量,这三个 Condition 都以 self.mutex 作为参数,也就是说它们共用一把锁;从这能够晓得诸如 with self.mutex 与 with self.not_empty 等都是互斥的。
基于这些锁而做的一些简略的操作:
class Queue: | |
... | |
def qsize(self): | |
# 返回队列中的元素数 | |
with self.mutex: | |
return self._qsize() | |
def empty(self): | |
# 队列是否为空 | |
with self.mutex: | |
return not self._qsize() | |
def full(self): | |
# 队列是否已满 | |
with self.mutex: | |
return 0 < self.maxsize <= self._qsize() | |
def _qsize(self): | |
return len(self.queue) |
这个代码片段挺好了解的,无需剖析。
作为队列,次要得实现入队与出队的操作,首先是入队:
class Queue: | |
... | |
def put(self, item, block=True, timeout=None): | |
with self.not_full: # 获取条件变量 not_full | |
if self.maxsize > 0: | |
if not block: | |
if self._qsize() >= self.maxsize: | |
raise Full # 如果 block 是 False,并且队列已满,那么抛出 Full 异样 | |
elif timeout is None: | |
while self._qsize() >= self.maxsize: | |
self.not_full.wait() # 阻塞直到由残余空间 | |
elif timeout < 0: # 不合格的参数值,抛出 ValueError | |
raise ValueError("'timeout' must be a non-negative number") | |
else: | |
endtime = time() + timeout # 计算期待的完结工夫 | |
while self._qsize() >= self.maxsize: | |
remaining = endtime - time() | |
if remaining <= 0.0: | |
raise Full # 期待期间始终没空间,抛出 Full 异样 | |
self.not_full.wait(remaining) | |
self._put(item) # 往底层数据结构中退出一个元素 | |
self.unfinished_tasks += 1 | |
self.not_empty.notify() | |
def _put(self, item): | |
self.queue.append(item) |
只管只有二十几行的代码,但这里的逻辑还是比较复杂的。它要解决超时与队列残余空间有余的状况,具体几种状况如下:
- 如果 block 是 False,疏忽 timeout 参数
- 若此时队列已满,则抛出 Full 异样;
- 若此时队列未满,则立刻把元素保留到底层数据结构中;
- 如果 block 是 True
- 若 timeout 是 None 时,那么 put 操作可能会阻塞,直到队列中有闲暇的空间(默认);
- 若 timeout 是非正数,则会阻塞相应工夫直到队列中有残余空间,在这个期间,如果队列中始终没有空间,抛出 Full 异样;
解决好参数逻辑后,,将元素保留到底层数据结构中,并递增 unfinished_tasks,同时告诉 not_empty,唤醒在其中期待数据的线程。
出队操作:
class Queue: | |
... | |
def get(self, block=True, timeout=None): | |
with self.not_empty: | |
if not block: | |
if not self._qsize(): | |
raise Empty | |
elif timeout is None: | |
while not self._qsize(): | |
self.not_empty.wait() | |
elif timeout < 0: | |
raise ValueError("'timeout' must be a non-negative number") | |
else: | |
endtime = time() + timeout | |
while not self._qsize(): | |
remaining = endtime - time() | |
if remaining <= 0.0: | |
raise Empty | |
self.not_empty.wait(remaining) | |
item = self._get() | |
self.not_full.notify() | |
return item | |
def _get(self): | |
return self.queue.popleft() |
get() 操作是 put() 相同的操作,代码块也及其类似,get() 是从队列中移除最先插入的元素并将其返回。
- 如果 block 是 False,疏忽 timeout 参数
- 若此时队列没有元素,则抛出 Empty 异样;
- 若此时队列由元素,则立刻把元素保留到底层数据结构中;
- 如果 block 是 True
- 若 timeout 是 None 时,那么 get 操作可能会阻塞,直到队列中有元素(默认);
- 若 timeout 是非正数,则会阻塞相应工夫直到队列中有元素,在这个期间,如果队列中始终没有元素,则抛出 Empty 异样;
最初,通过 self.queue.popleft() 将最早放入队列的元素移除,并告诉 not_full,唤醒在其中期待数据的线程。
这里有个值得注意的中央,在 put() 操作中递增了 self.unfinished_tasks,而 get() 中却没有递加,这是为什么?
这其实是为了留给用户一个生产元素的工夫,get() 仅仅是获取元素,并不代表消费者线程解决的该元素,用户须要调用 task_done() 来告诉队列该工作解决实现了:
class Queue: | |
... | |
def task_done(self): | |
with self.all_tasks_done: | |
unfinished = self.unfinished_tasks - 1 | |
if unfinished <= 0: | |
if unfinished < 0: # 也就是胜利调用 put()的次数小于调用 task_done()的次数时,会抛出异样 | |
raise ValueError('task_done() called too many times') | |
self.all_tasks_done.notify_all() # 当 unfinished 为 0 时,会告诉 all_tasks_done | |
self.unfinished_tasks = unfinished | |
def join(self): | |
with self.all_tasks_done: | |
while self.unfinished_tasks: # 如果有未实现的工作,将调用 wait()办法期待 | |
self.all_tasks_done.wait() |
因为 task_done() 应用方调用的,当 task_done() 次数大于 put() 次数时会抛出异样。
task_done() 操作的作用是唤醒正在阻塞的 join() 操作。join() 办法会始终阻塞,直到队列中所有的元素都被取出,并被解决了(和线程的 join 办法相似)。也就是说 join() 办法必须配合 task_done() 来应用才行。
LIFO 后进先出队列
LifoQueue 应用后进先出程序,与栈构造类似:
class LifoQueue(Queue): | |
'''Variant of Queue that retrieves most recently added entries first.''' | |
def _init(self, maxsize): | |
self.queue = [] | |
def _qsize(self): | |
return len(self.queue) | |
def _put(self, item): | |
self.queue.append(item) | |
def _get(self): | |
return self.queue.pop() |
这就是 LifoQueue 全副代码了,这正是 Queue 设计很棒的一个起因,它将底层的数据操作形象成四个操作函数,自身来解决线程平安的问题,使得其子类只需关注底层的操作。
LifoQueue 底层数据结构改用 list 来寄存,通过 self.queue.pop() 就能将 list 中最初一个元素移除,无需重置索引。
PriorityQueue 优先队列
from heapq import heappush, heappop | |
class PriorityQueue(Queue): | |
'''Variant of Queue that retrieves open entries in priority order (lowest first). | |
Entries are typically tuples of the form: (priority number, data). | |
''' | |
def _init(self, maxsize): | |
self.queue = [] | |
def _qsize(self): | |
return len(self.queue) | |
def _put(self, item): | |
heappush(self.queue, item) | |
def _get(self): | |
return heappop(self.queue) |
优先队列应用了 heapq 模块的构造,也就是最小堆的构造。优先队列更为罕用,队列中我的项目的解决程序须要基于这些我的项目的特色,一个简略的例子:
import queue | |
class A: | |
def __init__(self, priority, value): | |
self.priority = priority | |
self.value = value | |
def __lt__(self, other): | |
return self.priority < other.priority | |
q = queue.PriorityQueue() | |
q.put(A(1, 'a')) | |
q.put(A(0, 'b')) | |
q.put(A(1, 'c')) | |
print(q.get().value) # 'b' |
应用优先队列的时候,须要定义 lt 魔术办法,来定义它们之间如何比拟大小。若元素的 priority 雷同,仍然应用先进先出的程序。
以上就是本次分享的所有内容,想要理解更多 python 常识欢送返回公众号:Python 编程学习圈,发送“J”即可收费获取,每日干货分享