前提:一个过程一个 amqp 连贯

我原本想用多线程来实现,然而貌似做不到:一个连贯,多个线程生产

然而我又不想一个线程一个连贯,这样太节约连接数了

而后我想到 nameko 是一个过程一个连贯,而后应用 evnetlet 协程来实现,并发生产

所以我写了上面的 demo

from loguru import loggerfrom kombu.transport.pyamqp import Messagefrom kombu import Exchange, Queuefrom kombu import Connection, Consumer, Queuefrom concurrent.futures import ThreadPoolExecutorimport timeimport eventleteventlet.monkey_patch()amqp_uri = 'amqp://pon:[email protected]:5672//'pool = eventlet.GreenPool(10)def handle_message(message: Message):    logger.debug(message)    logger.debug(message.body)    time.sleep(1)    message.ack(multiple=True)refresh_exchange = Exchange('refresh', type='topic')imdb_queue = Queue('refresh_imdb', refresh_exchange,                   routing_key='to_imdb', durable=True)queues: list[Queue] = [imdb_queue]def start_consuming(message: Message):    pool.spawn_n(handle_message, message)with Connection(amqp_uri) as conn:    with conn.channel() as channel:        consumer = Consumer(            channel, queues=queues,            prefetch_count=10, on_message=start_consuming        )        with consumer:            while True:                conn.drain_events()
  • 生产一个工作须要 1 秒
  • 预取 10 个
  • 协程池 size 为 10

从上面的生产数据能够看出,是一秒生产 10 个

所以是实现了并发的