应用 nameko 的时候,设置 max_workers 能够实现不同并发的生产
我本人用 eventlet+kombu 写了一个消费者,却不能实现并发生产,只能一个一个排队生产
代码如下:
import random
import eventlet
eventlet.monkey_patch() # noqa (code before rest of imports)
from kombu.mixins import ConsumerMixin
from kombu.messaging import Consumer
from kombu import Connection, Exchange, Queue
from loguru import logger
import time
class MyConsumer(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
print('创立消费者 start')
queue_name = 'evt-ye.events-take--dna_create_service.auth'
exchange_name = 'ye.events'
routing_key = 'take'
exchange = Exchange(exchange_name, type='topic')
queue = Queue(
queue_name, exchange=exchange,
routing_key=routing_key,
queue_arguments={'x-max-priority': 10}
)
# 创立一个消费者,并设置预取音讯数量为 10
consumer = Consumer(queues=[queue], callbacks=[self.on_message],
prefetch_count=10
)
print('创立消费者 down')
return [consumer]
def on_message(self, body, message):
logger.debug(f"Received message: {body}")
logger.debug(f'开始生产')
time.sleep(3)
logger.debug(f'完结生产')
message.ack() # 发送 ACK 确认音讯接管
with Connection('amqp://pon:pon@192.168.38.191:5672//') as conn:
consumer = MyConsumer(conn)
consumer.run()
就很烦,怎么办,我就去翻了半天 nameko 的源码代码,发现问题在这里:
site-packages/nameko/messaging.py
注册回调函数
def handle_message(self, provider, body, message):
ident = u"{}.handle_message[{}]".format(type(provider).__name__, message.delivery_info['routing_key']
)
self.container.spawn_managed_thread(partial(provider.handle_message, body, message), identifier=ident
)
def get_consumers(self, consumer_cls, channel):
""" Kombu callback to set up consumers.
Called after any (re)connection to the broker.
"""_log.debug('setting up consumers %s', self)
for provider in self._providers:
callbacks = [partial(self.handle_message, provider)]
consumer = consumer_cls(queues=[provider.queue],
callbacks=callbacks,
accept=self.accept
)
consumer.qos(prefetch_count=self.prefetch_count)
self._consumers[provider] = consumer
return self._consumers.values()
而后 kombu 在收到音讯之后,会调用之前注册的回调函数 site-packages/kombu/messaging.py
def receive(self, body, message):
"""Method called when a message is received.
This dispatches to the registered :attr:`callbacks`.
Arguments:
body (Any): The decoded message body.
message (~kombu.Message): The message instance.
Raises:
NotImplementedError: If no consumer callbacks have been
registered.
"""
callbacks = self.callbacks
if not callbacks:
raise NotImplementedError('Consumer does not have any callbacks')
[callback(body, message) for callback in callbacks]
所以,情理很简略,我依照 nameko 的思路,革新了一下我的代码
最初残缺的代码如下:
import random
import eventlet
eventlet.monkey_patch() # noqa (code before rest of imports)
from kombu.mixins import ConsumerMixin
from kombu.messaging import Consumer
from kombu import Connection, Exchange, Queue
from loguru import logger
import time
class MyConsumer(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
print('创立消费者 start')
queue_name = 'evt-ye.events-take--dna_create_service.auth'
exchange_name = 'ye.events'
routing_key = 'take'
exchange = Exchange(exchange_name, type='topic')
queue = Queue(
queue_name, exchange=exchange,
routing_key=routing_key,
queue_arguments={'x-max-priority': 10}
)
# 创立一个消费者,并设置预取音讯数量为 10
consumer = Consumer(queues=[queue], callbacks=[self.on_message],
prefetch_count=10
)
print('创立消费者 down')
return [consumer]
def on_message(self, body, message):
eventlet.spawn(self._on_message, body, message)
def _on_message(self, body, message):
logger.debug(f"Received message: {body}")
logger.debug(f'开始生产')
time.sleep(3)
logger.debug(f'完结生产')
message.ack() # 发送 ACK 确认音讯接管
with Connection('amqp://pon:pon@192.168.38.191:5672//') as conn:
consumer = MyConsumer(conn)
consumer.run()
批改其实很简略,把回调函数革新一下,收到收到音讯之后,立即起一个协程解决,让协程到后盾缓缓解决,放弃回调函数永远不阻塞就好了