共计 1019 个字符,预计需要花费 3 分钟才能阅读完成。
前提:一个过程一个 amqp 连贯
我原本想用多线程来实现,然而貌似做不到:一个连贯,多个线程生产
然而我又不想一个线程一个连贯,这样太节约连接数了
而后我想到 nameko 是一个过程一个连贯,而后应用 evnetlet 协程来实现,并发生产
所以我写了上面的 demo
from loguru import logger | |
from kombu.transport.pyamqp import Message | |
from kombu import Exchange, Queue | |
from kombu import Connection, Consumer, Queue | |
from concurrent.futures import ThreadPoolExecutor | |
import time | |
import eventlet | |
eventlet.monkey_patch() | |
amqp_uri = 'amqp://pon:[email protected]:5672//' | |
pool = eventlet.GreenPool(10) | |
def handle_message(message: Message): | |
logger.debug(message) | |
logger.debug(message.body) | |
time.sleep(1) | |
message.ack(multiple=True) | |
refresh_exchange = Exchange('refresh', type='topic') | |
imdb_queue = Queue('refresh_imdb', refresh_exchange, | |
routing_key='to_imdb', durable=True) | |
queues: list[Queue] = [imdb_queue] | |
def start_consuming(message: Message): | |
pool.spawn_n(handle_message, message) | |
with Connection(amqp_uri) as conn: | |
with conn.channel() as channel: | |
consumer = Consumer( | |
channel, queues=queues, | |
prefetch_count=10, on_message=start_consuming | |
) | |
with consumer: | |
while True: | |
conn.drain_events() | |
- 生产一个工作须要 1 秒
- 预取 10 个
- 协程池 size 为 10
从上面的生产数据能够看出,是一秒生产 10 个
所以是实现了并发的
正文完