前提:一个过程一个 amqp 连贯
我原本想用多线程来实现,然而貌似做不到:一个连贯,多个线程生产
然而我又不想一个线程一个连贯,这样太节约连接数了
而后我想到 nameko 是一个过程一个连贯,而后应用 evnetlet 协程来实现,并发生产
所以我写了上面的 demo
from loguru import loggerfrom kombu.transport.pyamqp import Messagefrom kombu import Exchange, Queuefrom kombu import Connection, Consumer, Queuefrom concurrent.futures import ThreadPoolExecutorimport timeimport eventleteventlet.monkey_patch()amqp_uri = 'amqp://pon:[email protected]:5672//'pool = eventlet.GreenPool(10)def handle_message(message: Message): logger.debug(message) logger.debug(message.body) time.sleep(1) message.ack(multiple=True)refresh_exchange = Exchange('refresh', type='topic')imdb_queue = Queue('refresh_imdb', refresh_exchange, routing_key='to_imdb', durable=True)queues: list[Queue] = [imdb_queue]def start_consuming(message: Message): pool.spawn_n(handle_message, message)with Connection(amqp_uri) as conn: with conn.channel() as channel: consumer = Consumer( channel, queues=queues, prefetch_count=10, on_message=start_consuming ) with consumer: while True: conn.drain_events()
- 生产一个工作须要 1 秒
- 预取 10 个
- 协程池 size 为 10
从上面的生产数据能够看出,是一秒生产 10 个
所以是实现了并发的