乐趣区

关于python:源码探索nameko-是如何实现协程并发消费的

应用 nameko 的时候,设置 max_workers 能够实现不同并发的生产

我本人用 eventlet+kombu 写了一个消费者,却不能实现并发生产,只能一个一个排队生产

代码如下:

import random
import eventlet
eventlet.monkey_patch()  # noqa (code before rest of imports)
from kombu.mixins import ConsumerMixin
from kombu.messaging import Consumer
from kombu import Connection, Exchange, Queue
from loguru import logger
import time


class MyConsumer(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        print('创立消费者 start')
        queue_name = 'evt-ye.events-take--dna_create_service.auth'
        exchange_name = 'ye.events'
        routing_key = 'take'

        exchange = Exchange(exchange_name, type='topic')
        queue = Queue(
            queue_name, exchange=exchange,
            routing_key=routing_key,
            queue_arguments={'x-max-priority': 10}
        )

        # 创立一个消费者,并设置预取音讯数量为 10
        consumer = Consumer(queues=[queue], callbacks=[self.on_message],
            prefetch_count=10
        )
        print('创立消费者 down')
        return [consumer]

    def on_message(self, body, message):
        logger.debug(f"Received message: {body}")
        logger.debug(f'开始生产')
        time.sleep(3)
        logger.debug(f'完结生产')
        message.ack()  # 发送 ACK 确认音讯接管


with Connection('amqp://pon:pon@192.168.38.191:5672//') as conn:
    consumer = MyConsumer(conn)
    consumer.run()

就很烦,怎么办,我就去翻了半天 nameko 的源码代码,发现问题在这里:

site-packages/nameko/messaging.py 注册回调函数

def handle_message(self, provider, body, message):
    ident = u"{}.handle_message[{}]".format(type(provider).__name__, message.delivery_info['routing_key']
    )
    self.container.spawn_managed_thread(partial(provider.handle_message, body, message), identifier=ident
    )

def get_consumers(self, consumer_cls, channel):
    """ Kombu callback to set up consumers.

    Called after any (re)connection to the broker.
    """_log.debug('setting up consumers %s', self)

    for provider in self._providers:
        callbacks = [partial(self.handle_message, provider)]

        consumer = consumer_cls(queues=[provider.queue],
            callbacks=callbacks,
            accept=self.accept
        )
        consumer.qos(prefetch_count=self.prefetch_count)

        self._consumers[provider] = consumer

    return self._consumers.values()

而后 kombu 在收到音讯之后,会调用之前注册的回调函数
site-packages/kombu/messaging.py

def receive(self, body, message):
    """Method called when a message is received.

    This dispatches to the registered :attr:`callbacks`.

    Arguments:
        body (Any): The decoded message body.
        message (~kombu.Message): The message instance.

    Raises:
        NotImplementedError: If no consumer callbacks have been
            registered.
    """
    callbacks = self.callbacks
    if not callbacks:
        raise NotImplementedError('Consumer does not have any callbacks')
    
    [callback(body, message) for callback in callbacks]

所以,情理很简略,我依照 nameko 的思路,革新了一下我的代码

最初残缺的代码如下:

import random
import eventlet
eventlet.monkey_patch()  # noqa (code before rest of imports)
from kombu.mixins import ConsumerMixin
from kombu.messaging import Consumer
from kombu import Connection, Exchange, Queue
from loguru import logger
import time


class MyConsumer(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        print('创立消费者 start')
        queue_name = 'evt-ye.events-take--dna_create_service.auth'
        exchange_name = 'ye.events'
        routing_key = 'take'

        exchange = Exchange(exchange_name, type='topic')
        queue = Queue(
            queue_name, exchange=exchange,
            routing_key=routing_key,
            queue_arguments={'x-max-priority': 10}
        )

        # 创立一个消费者,并设置预取音讯数量为 10
        consumer = Consumer(queues=[queue], callbacks=[self.on_message],
            prefetch_count=10
        )
        print('创立消费者 down')
        return [consumer]

    def on_message(self, body, message):
        eventlet.spawn(self._on_message, body, message)

    def _on_message(self, body, message):
        logger.debug(f"Received message: {body}")
        logger.debug(f'开始生产')
        time.sleep(3)
        logger.debug(f'完结生产')
        message.ack()  # 发送 ACK 确认音讯接管


with Connection('amqp://pon:pon@192.168.38.191:5672//') as conn:
    consumer = MyConsumer(conn)
    consumer.run()

批改其实很简略,把回调函数革新一下,收到收到音讯之后,立即起一个协程解决,让协程到后盾缓缓解决,放弃回调函数永远不阻塞就好了

退出移动版