关于python:Python-Queue-进阶多生产者单消费者问题

54次阅读

共计 3071 个字符,预计需要花费 8 分钟才能阅读完成。

生产者消费者模型
在并发编程中,比方爬虫,有的线程负责爬取数据,有的线程负责对爬取到的数据做解决(荡涤、分类和入库)。如果他们是间接交互的,那么当二者的速度不匹配时势必呈现期待景象,这也就产生了资源的节约。

形象是一种很重要的通用能力,而生产者消费者模型是前人将一系列同类型的具体的问题形象进去的一个统一的最佳解决方案。

该模型有三个重要角色,容器,生产者和消费者,顾名思义,生产者就是负责生产数据或工作的,消费者就是负责生产数据或者工作的(下文统称为工作),容器是二者进行通信的媒介。

在该模型中,生产者和消费者不在间接进行通信,而是通过引入一个第三者容器(通常都是用阻塞队列)来达到解耦的目标。

这样生产者不用在因为消费者速度过慢而期待,间接将工作放入容器即可,消费者也不必因生产者生产速度过慢而期待,间接从容器中获取工作,以此达到了资源的最大利用。

应用该模型能够解决并发编程中的绝大部分并发问题。

简易版
咱们先写一个单生产者和单消费者的简易版生产者消费者模型。

import threading
import time
import queue

def consume(thread_name, q):
    while True:
        time.sleep(2)
        product = q.get()
        print("%s consume %s" % (thread_name, product))

def produce(thread_name, q):
    for i in range(3):
        product = 'product-' + str(i)
        q.put(product)
        print("%s produce %s" % (thread_name, product))
        time.sleep(1)
    
            
q = queue.Queue()
p = threading.Thread(target=produce, args=("producer",q))
c = threading.Thread(target=consume, args=("consumer",q))

p.start()
c.start()

p.join()

# 输入如下
producer produce product-0
producer produce product-1
consumer consume product-0
producer produce product-2
consumer consume product-1
consumer consume product-2
...

以上就是最简略的生产者消费者模型了,生产者生产三个工作供消费者生产。然而下面的写法有个问题,就是生产者将工作生产结束之后就和主线程一起退出了,然而消费者将所有的工作生产完之后还没进行,始终处于阻塞状态。

那可不可以将 while True 的判断改为 while not q.empty()呢,必定是不行的。因为 empty() 返回 False,不保障后续调用的 get()不被阻塞。

同时,如果用 empty() 函数来做判断的话,那么就要保障消费者线程开启之时生产者肯定至多生产了一个工作,否则消费者线程就会因条件不满足间接退出程序;同时如果生产者生产速度比较慢,一旦消费者将工作生产完且下次判断时还没有新的工作入队,那么消费者线程也会因条件不满足间接退出程序。自此以后,生产者生产的工作就永远不会被生产了。

那咱们能够做一个约定,当生产者生产完工作之后,放入一个标记,相似于 q.put(None), 一旦消费者接管到为 None 的工作时就意味着完结,间接退出程序即可。

这种做法在下面的程序中是没有问题的,惟一的毛病就是有 N 个消费者线程就须要放入 N 个 None 标记,这对于多消费者类型的程序显然是很不敌对的。

最佳实际
咱们能够联合队列的内置函数 task_done() 和 join() 来达到咱们的目标。

join() 函数是阻塞的。当消费者通过 get() 从队列获取一项工作并解决实现之后,须要调用且只能够调用一次 task_done(),该办法会给队列发送一个信号,join()函数则在监听这个信号。

能够简略了解为队列外部保护了一个计数器,该计数器标识未实现的工作数,每当增加工作时,计数器会减少,调用 task_done()时计数器则会缩小,直到队列为空。而 join() 就是在监听队列是否为空,一旦条件满足则完结阻塞状态。

import threading
import time
import queue

def consume(thread_name, q):
    while True:
        time.sleep(2)
        product = q.get()
        print("%s consume %s" % (thread_name, product))
        q.task_done()

def produce(thread_name, q):
    for i in range(3):
        product = 'product-' + str(i)
        q.put(product)
        print("%s produce %s" % (thread_name, product))
        time.sleep(1)
    q.join()
            
q = queue.Queue()
p = threading.Thread(target=produce, args=("producer",q))
c = threading.Thread(target=consume, args=("consumer",q))
c1 = threading.Thread(target=consume, args=("consumer-1",q))

c.setDaemon(True)
c1.setDaemon(True)
p.start()
c.start()
c1.start()

p.join()

# 输入如下
producer produce product-0
producer produce product-1
consumer-1 consume product-0
consumer consume product-1
producer produce product-2
consumer consume product-2

上述示例中,咱们将消费者线程设置为守护线程,这样当主线程完结时消费者线程也会一并完结。而后主线程最初一句 p.join() 又示意主线程必须期待生产者线程完结后才能够完结。

再细看生产者线程的主函数 produce(),该函数中呈现了咱们下面说过的 q.join() 函数。而 task_done 则是在消费者线程的主函数中调用的。

故当生产者线程生产完所有工作后就会被阻塞,只有当消费者线程解决完所有工作后生产者才会阻塞完结。随着生产者线程的完结,主线程也一并完结,守护线程消费者线程也一并完结,自此所有线程均平安退出。

Queue 总结
本章节介绍了队列的高级利用,从简易版的示例到最佳实际,介绍了生产者消费者模型的根本用法,在该模型中,队列表演了十分重要的角色,起到理解耦的目标。

本模型有固定的步骤,其中最重要的就是通过 task_done() 和 join() 来相互通信。task_done() 仅仅用来告诉队列消费者已实现一个工作,至于工作是什么它毫不关心,它只关怀队列中未实现的工作数量。

留神:task_done() 不能够在 put() 之前调用,否则会引发 ValueError: task_done() called too many times。同时在解决完工作后只能够调用一次该函数,否则队列将不能精确计算未实现工作数量。

最近整顿了几百 G 的 Python 学习材料,蕴含新手入门电子书、教程、源码等等,收费分享给大家!想要的返回公~豪“Python 编程学习圈”,发送“J”即可收费取得

正文完
 0