前提:一个过程一个 amqp 连贯
我原本想用多线程来实现,然而貌似做不到:一个连贯,多个线程生产
然而我又不想一个线程一个连贯,这样太节约连接数了
而后我想到 nameko 是一个过程一个连贯,而后应用 evnetlet 协程来实现,并发生产
所以我写了上面的 demo
from loguru import logger
from kombu.transport.pyamqp import Message
from kombu import Exchange, Queue
from kombu import Connection, Consumer, Queue
from concurrent.futures import ThreadPoolExecutor
import time
import eventlet
eventlet.monkey_patch()
amqp_uri = 'amqp://pon:[email protected]:5672//'
pool = eventlet.GreenPool(10)
def handle_message(message: Message):
logger.debug(message)
logger.debug(message.body)
time.sleep(1)
message.ack(multiple=True)
refresh_exchange = Exchange('refresh', type='topic')
imdb_queue = Queue('refresh_imdb', refresh_exchange,
routing_key='to_imdb', durable=True)
queues: list[Queue] = [imdb_queue]
def start_consuming(message: Message):
pool.spawn_n(handle_message, message)
with Connection(amqp_uri) as conn:
with conn.channel() as channel:
consumer = Consumer(
channel, queues=queues,
prefetch_count=10, on_message=start_consuming
)
with consumer:
while True:
conn.drain_events()
- 生产一个工作须要 1 秒
- 预取 10 个
- 协程池 size 为 10
从上面的生产数据能够看出,是一秒生产 10 个
所以是实现了并发的