RabbitMQ入门

8次阅读

共计 2220 个字符,预计需要花费 6 分钟才能阅读完成。

记得在学习 Celery 的时候, 官方文档就建议我们的 broker 使用 RabbitMQ 或 Redis, 可是当然由于 Redis 使用比较习惯, 并且不用在去另外安装, 因此好长时间都没有去学习这个东西. 首先 RabbitMQ 绝对不像大众所认识的那样, 是 JAVA 或者 Python,GO 写的, 而是一种交换机语言 ERLang, 前段时间这个语言的发明人的离世也是轰动了整个 IT 界, 而作为程序员的我因为时间问题没有学习 RabbitMQ, 很是后悔呀, 所以现在赶紧学习学习, 废话不多说, 请欣赏 …

这是什么东西?

  • RabbitMQ 是一个消息队列的中间件, 通俗点讲就是饮料生产商 (康师傅, 达利园等等) 生产东西放入超市, 然后用户去超市里拿. 这里的超市就是我们的 RabbitMQ, 饮料生产商就是所谓的生产者, 用户就是消费者, 这里生产者可以好多种消息 (茉莉, 红茶等等) 放入到队列中, 消费者也可以根据名称去拿(我只能去买), 这就是其功能.

点击进入安装教程
但是不要以为这样就能开始, 当你运行以下代码就会发现报错:

530, "NOT_ALLOWED - access to vhost'/'refused for user'jim'"

这是怎么回事呢? 我也给大家解决了, 点击查看
__生产者__:

import pika

class Produce:
    def __init__(self, host, port, credentials):
        # 创建连接参数对象
        parameters = pika.ConnectionParameters(host, port, credentials=credentials)
        # 创建连接对象
        self._conn = pika.BlockingConnection(parameters)
        # 创建一个默认 channel
        self._channel = self._conn.channel()

    def create_channel(self, num):
        """创建 channel"""
        self._channel = self._conn.channel(num)

    def create_queue(self, name):
        """创建 queue"""
        self._channel.queue_declare(queue=name)
        self._key = name

    def run(self, body):
        """生产消息"""
        self._channel.basic_publish(exchange='', routing_key=self._key, body=body,)

        print("[x] Sent Message Successful!")

        self._conn.close()


if __name__ == '__main__':
    host = 'localhost'
    port = 5672
    credentials = pika.PlainCredentials('jim', 'adminjim')
    produce = Produce(host, port, credentials)
    produce.create_queue('你傻不傻')
    produce.run('你就是一个垃圾')

__消费者__:

import pika

class Custumer:
    def __init__(self, host, port, credentials):
        # 连接到 rabbitmq 服务器
        self.conn = pika.BlockingConnection(pika.ConnectionParameters(host, port, credentials=credentials))
        self.channel = self.conn.channel()

    def create_or_user_queue(self, name):
        # 声明消息队列,消息将在这个队列中进行传递。如果队列不存在,则创建
        self._key = name
        self.channel.queue_declare(queue=name)

    def _callback(self, ch, method, properties, body):
        """定义一个回调函数来处理,这边的回调函数就是将信息打印出来。"""
        print("[x] Received %r" % (body,))

    # 告诉 rabbitmq 使用 callback 来接收信息
    def run(self):
        self.channel.basic_consume(on_message_callback=self._callback, queue=self._key, auto_ack=True)  # no_ack=True 表示在回调函数中不需要发送确认标识

        print('[*] Waiting for messages. To exit press CTRL+C')

        self.channel.start_consuming()  # 开始接收信息,并进入阻塞状态,队列里有信息才会调用 callback 进行处理。if __name__ == '__main__':
    host = 'localhost'
    port = 5672
    credentials = pika.PlainCredentials('jim', 'adminjim')
    custumer = Custumer(host, port, credentials)
    custumer.create_or_user_queue('你傻不傻')
    custumer.run()

有没有学会呢, 学不会或者觉得简单的都不要着急, 我会学深入了再来满足大家.

正文完
 0