关于rabbitmq:Python对RabbitMQ的简单使用

5次阅读

共计 12149 个字符,预计需要花费 31 分钟才能阅读完成。

(一) RabbitMQ 的简介

RabbitMq 是实现了高级音讯队列协定(AMQP)(Advanced Message Queuing Protocol)的开源音讯代理中间件。音讯队列是一种应用程序对应用程序的通行形式,应用程序通过写音讯,将消息传递于队列,由另一应用程序读取 实现通信。而作为中间件的 RabbitMq 无疑是目前最风行的音讯队列之一。目前应用较多的音讯队列有 ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

RabbitMQ 总体架构

PS:生产者和消费者可能在不同的程序或主机中,当然也有可能一个程序有可能既是生产者,也是消费者。

RabbitMq 利用场景宽泛:

  1. 零碎的高可用:日常生活当中各种商城秒杀,高流量,高并发的场景。当服务器接管到如此大量申请解决业务时,有宕机的危险。某些业务可能极其简单,但这部分不是高时效性,不须要立刻反馈给用户,咱们能够将这部分解决申请抛给队列,让程序后置去解决,加重服务器在高并发场景下的压力。
  2. 分布式系统,集成系统,子系统之间的对接,以及架构设计中经常须要思考音讯队列的利用。

(二) RabbitMQ 的装置

这里通过官网下载须要的版本:RabbitMQ 官网网址
鉴于官网拜访下载比较慢,贴一个云盘地址:百度云盘地址

Mac 的话,能够间接应用 brew 装置,brew 相似 ubuntu 下的 apt-get 性能

# brew install rabbitmq

# 启动 rabbitmq:rabbitmq-server start
# 进行 rabbitmq:rabbitmq-server stop
# 重启 rabbitmq:rabbitmq-server restart

启动后,就能够登陆后盾治理页面了,浏览器输出 ip:15672

自带的明码和用户名都是 guest

(三) python 操作 RabbitMQ

python 中应用 pika 操作 RabbitMQ

pip3 install pika

(四) RabbitMQ 简略模式

上代码

# coding=utf-8
### 生产者

import pika
import time

user_info = pika.PlainCredentials('guest', 'guest')# 用户名和明码
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', user_info))# 连贯服务器上的 RabbitMQ 服务

# 创立一个 channel
channel = connection.channel()

# 如果指定的 queue 不存在,则会创立一个 queue,如果曾经存在 则不会做其余动作,官网举荐,每次应用时都能够加上这句
channel.queue_declare(queue='hello')

for i in range(0, 100):
    channel.basic_publish(exchange='',# 以后是一个简略模式,所以这里设置为空字符串就能够了
                          routing_key='hello',# 指定音讯要发送到哪个 queue
                          body='{}'.format(i)# 指定要发送的音讯
                          )
    time.sleep(1)
    
# 敞开连贯
# connection.close()

PS:RabbitMQ 中所有的音讯都要先通过交换机,空字符串示意应用默认的交换机

# coding=utf-8
### 消费者

import pika

user_info = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的 queue 不存在,则会创立一个 queue,如果曾经存在 则不会做其余动作,生产者和消费者都做这一步的益处是
# 这样生产者和消费者就没有必要的先后启动程序了
channel.queue_declare(queue='hello')


# 回调函数
def callback(ch, method, properties, body):
    print('消费者收到:{}'.format(body))

# channel: 蕴含 channel 的所有属性和办法
# method: 蕴含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通过 properties 传入的参数
# body: basic_publish 发送的音讯


channel.basic_consume(queue='hello',  # 接管指定 queue 的音讯
                      auto_ack=True,  # 指定为 True,示意音讯接管到后主动给音讯发送方回复确认,已收到音讯
                      on_message_callback=callback  # 设置收到音讯的回调函数
                      )

print('Waiting for messages. To exit press CTRL+C')

# 始终处于期待接管音讯的状态,如果没收到音讯就始终处于阻塞状态,收到音讯就调用下面的回调函数
channel.start_consuming()

对于下面的这种模式,有一下两个不好的中央:

一个是在咱们的消费者还没开始生产完队列里的音讯,如果这时 rabbitmq 服务挂了,那么音讯队列里的音讯将会全副失落,解决办法是在申明队列时,申明队列为可长久化存储队列,并且在生产者在将音讯插入到音讯队列时,设置音讯长久化存储,具体如下

# coding=utf-8
### 生产者
import pika
import time

user_info = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', user_info))

# 创立一个 channel
channel = connection.channel()

# 如果指定的 queue 不存在,则会创立一个 queue,如果曾经存在 则不会做其余动作,官网举荐,每次应用时都能够加上这句
channel.queue_declare(queue='durable_queue',durable=True)
#PS:这里不同种队列不容许名字雷同


for i in range(0, 100):
    channel.basic_publish(exchange='',
                          routing_key='durable_queue',
                          body='{}'.format(i),
                          properties=pika.BasicProperties(delivery_mode=2)
                          )


# 敞开连贯
# connection.close()

消费者与下面的消费者没有什么不同,具体的就是生产申明的队列,也要是可长久化的队列,还有就是,即便在生产者插入音讯时,设置以后音讯长久化存储(properties=pika.BasicProperties(delivery_mode=2)),并不能百分百保障音讯真的被长久化,因为 RabbitMQ 挂掉的时候它可能还保留在缓存中,没来得及同步到磁盘中

在生产者插入音讯后,立即进行 rabbitmq,并重新启动,其实咱们在 web 治理页面也可看到未被生产的信息,当然在启动消费者后也胜利接管到了音讯

下面说的第二点不好就是,如果在消费者获取到队列里的音讯后,在回调函数的处理过程中,消费者忽然出错或程序解体等异样,那么就会造成这条音讯并未被理论失常的解决掉。为了解决这个问题,咱们只需在消费者 basic_consume(auto_ack=False), 并在回调函数中设置手动应答即可 ch.basic_ack(delivery_tag=method.delivery_tag),具体如下

# coding=utf-8
### 消费者

import pika
import time

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的 queue 不存在,则会创立一个 queue,如果曾经存在 则不会做其余动作,生产者和消费者都做这一步的益处是
# 这样生产者和消费者就没有必要的先后启动程序了
channel.queue_declare(queue='queue')


# 回调函数
def callback(ch, method, properties, body):
    time.sleep(5)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('消费者收到:{}'.format(body.decode('utf-8')))


# channel: 蕴含 channel 的所有属性和办法
# method: 蕴含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通过 properties 传入的参数
# body: basic_publish 发送的音讯


channel.basic_consume(queue='queue',  # 接管指定 queue 的音讯
                      auto_ack=False,  # 指定为 False,示意勾销自动应答,交由回调函数手动应答
                      on_message_callback=callback  # 设置收到音讯的回调函数
                      )

# 应答的实质是通知音讯队列能够将这条音讯销毁了

print('Waiting for messages. To exit press CTRL+C')

# 始终处于期待接管音讯的状态,如果没收到音讯就始终处于阻塞状态,收到音讯就调用下面的回调函数
channel.start_consuming()

这里只须要配置消费者,生产者并不要批改

还有就是在上的应用形式在,都是一个生产者和一个消费者,还有一种状况就是,一个生产者和多个消费者,即多个消费者同时监听一个音讯队列,这时候队列里的音讯就是轮询散发(即如果音讯队列里有 100 条信息,如果有 2 个消费者,那么每个就会收到 50 条信息),然而在某些状况下,不同的消费者解决工作的能力是不同的,这时还依照轮询的形式散发音讯并不是很正当,那么只须要再配合手动应答的形式,设置消费者接管的音讯没有解决完,队列就不要给我放送新的音讯即可,具体配置形式如下:

# coding=utf-8
### 消费者

import pika
import time

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 如果指定的 queue 不存在,则会创立一个 queue,如果曾经存在 则不会做其余动作,生产者和消费者都做这一步的益处是
# 这样生产者和消费者就没有必要的先后启动程序了
channel.queue_declare(queue='queue')


# 回调函数
def callback(ch, method, properties, body):
    time.sleep(0)# 通过设置休眠工夫来模仿不同消费者的解决工夫
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('消费者收到:{}'.format(body.decode('utf-8')))


# prefetch_count 示意接管的音讯数量,当我接管的音讯没有解决完(用 basic_ack 标记音讯已处理完毕)之前不会再接管新的音讯了
channel.basic_qos(prefetch_count=1)  # 还有就是这个设置必须在 basic_consume 之上,否则不失效

channel.basic_consume(queue='queue',  # 接管指定 queue 的音讯
                      auto_ack=False,  # 指定为 False,示意勾销自动应答,交由回调函数手动应答
                      on_message_callback=callback  # 设置收到音讯的回调函数
                      )

# 应答的实质是通知音讯队列能够将这条音讯销毁了

print('Waiting for messages. To exit press CTRL+C')

# 始终处于期待接管音讯的状态,如果没收到音讯就始终处于阻塞状态,收到音讯就调用下面的回调函数
channel.start_consuming()

PS: 这种状况必须敞开自动应答 ack,改成手动应答。应用 basicQos(perfetch=1)限度每次只发送不超过 1 条音讯到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个

(五) RabbitMQ 公布订阅模式

公布订阅会将音讯发送给所有的订阅者,而音讯队列中的数据被生产一次便隐没。所以,RabbitMQ 实现公布和订阅时,会为每一个订阅者创立一个队列,而发布者公布音讯时,会将音讯搁置在所有相干队列中

这个模式中会引入交换机的概念,其实在 RabbitMQ 中,所有的生产者都不会间接把音讯发送到队列中,甚至生产者都不晓得音讯在收回后有没有发送到 queue 中,事实上,生产者只能将音讯发送给交换机,由交换机来决定发送到哪个队列中。

交换机的一端用来从生产者中接管音讯,另一端用来发送音讯到队列,交换机的类型规定了怎么解决接管到的音讯,公布订阅模式应用到的交换机类型为 fanout,这种交换机类型非常简单,就是将接管到的音讯播送给已知的(即绑定到此交换机的)所有消费者。

当然,如果不想应用特定的交换机,能够应用 exchange=’’示意应用默认的交换机,默认的替换机会将音讯发送到 routing_key 指定的 queue,能够参考简略模式。

上代码:

# 生产者
import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

# 创立一个指定名称的交换机,并指定类型为 fanout,用于将接管到的音讯播送到所有 queue 中
channel.exchange_declare(exchange='交换机', exchange_type='fanout')


# 将音讯发送给指定的交换机,在 fanout 类型中,routing_key='' 示意不必发送到指定 queue 中,# 而是将发送到绑定到此交换机的所有 queue
channel.basic_publish(exchange='交换机', routing_key='', body=' 这是一条测试音讯 ')
# 消费者
import pika

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()

channel.exchange_declare(exchange='交换机', exchange_type='fanout')

# 应用 RabbitMQ 给本人生成一个专有的 queue
result = channel.queue_declare(queue='333')
# result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 这里如果设置 exclusive=True 参数,那么该队列就是一个只有队列,在消费者完结后,该专有队列也会主动革除,如果 queue='' 没有设置名字的话,那么就会主动生成一个
# 不会反复的队列名

# 将 queue 绑定到指定交换机
channel.queue_bind(exchange='交换机', queue=queue_name)

print('[*] Waiting for  message.')


def callback(ch, method, properties, body):
    print("消费者收到:{}".format(body.decode('utf-8')))


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

该模式与简略模式的还有一个区别就是,这里的音讯队列都是由消费者申明的,所以如果是生产者先启动,并将音讯发给交换机的画,这里的音讯就会失落,所以咱们也能够在消费者端申明队列并绑定交换机(不能是专有队列),所以认真想想,其实这所谓的公布订阅模式并没有说什么了不起,它不过是让交换机同时推送多条音讯给绑定的队列,咱们当然也能够在简略模式的根底上多进行几次 basic_publish 发送音讯到指定的队列。当然咱们这样做的话,可能就没方法做到由交换机的同时发送了,效率可能也没有一次 basic_publish 的高

(六) RabbitMQ RPC 模式

上面实现由 rpc 近程调用加减运算

客户端

import pika
import uuid
import json


class RPC(object):

    def __init__(self):
        self.call_id = None
        self.response = None
        user_info = pika.PlainCredentials('root', 'root')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
        self.channel = self.connection.channel()

        # 创立一个此客户端专用的 queue,用于接管服务端发过来的音讯
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        # 判断接管到的 response 是否属于对应 request
        if self.call_id == props.correlation_id:
            self.response = json.loads(body.decode('utf-8')).get('result')

    def call(self, func, param):
        self.response = None
        self.call_id = str(uuid.uuid4())  # 为该音讯指定 uuid,相似于申请 id
        self.channel.queue_declare(queue='rpc_queue')
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',  # 将音讯发送到该 queue
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,  # 从该 queue 中取音讯
                correlation_id=self.call_id,  # 为此次音讯指定 uuid
            ),
            body=json.dumps(
                {
                    'func': func,
                    'param': {'a': param[0], 'b': param[1]}
                }
            )
        )
        
        self.connection.process_data_events(time_limit=3)# 与 start_consuming()类似,能够设置超时参数
        return self.response


rpc = RPC()

print("发送音讯到消费者, 期待返回后果")

response = rpc.call(func='del', param=(1, 2))

print("收到来自消费者返回的后果:{}".format(response))

服务端

import pika
import json

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))

channel = connection.channel()

# 指定接管音讯的 queue
channel.queue_declare(queue='rpc_queue')


def add_number(a, b):
    return a + b


def del_num(a, b):
    return a - b


execute_map = {
    'add': add_number,
    'del': del_num
}


def on_request(ch, method, props, body):
    body = json.loads(body.decode('utf-8'))
    func = body.get('func')
    param = body.get('param')
    result = execute_map.get(func)(param.get('a'), param.get('b'))
    print('进行 {} 运算,并将后果返回个消费者'.format(func))

    ch.basic_publish(exchange='',  # 应用默认交换机
                     routing_key=props.reply_to,  # response 发送到该 queue
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),  # 应用 correlation_id 让此 response 与申请音讯对应起来
                     body=json.dumps({'result': result}))

    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
# 从 rpc_queue 中取音讯,而后应用 on_request 进行解决
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print("[x] Awaiting RPC requests")
channel.start_consuming()

(六) RabbitMQ RPC 模式

上面实现由 rpc 近程调用加减运算

客户端

import pika
import uuid
import json


class RPC(object):

    def __init__(self):
        self.call_id = None
        self.response = None
        user_info = pika.PlainCredentials('root', 'root')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
        self.channel = self.connection.channel()

        # 创立一个此客户端专用的 queue,用于接管服务端发过来的音讯
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

    def on_response(self, ch, method, props, body):
        # 判断接管到的 response 是否属于对应 request
        if self.call_id == props.correlation_id:
            self.response = json.loads(body.decode('utf-8')).get('result')

    def call(self, func, param):
        self.response = None
        self.call_id = str(uuid.uuid4())  # 为该音讯指定 uuid,相似于申请 id
        self.channel.queue_declare(queue='rpc_queue')
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',  # 将音讯发送到该 queue
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,  # 从该 queue 中取音讯
                correlation_id=self.call_id,  # 为此次音讯指定 uuid
            ),
            body=json.dumps(
                {
                    'func': func,
                    'param': {'a': param[0], 'b': param[1]}
                }
            )
        )
        
        self.connection.process_data_events(time_limit=3)# 与 start_consuming()类似,能够设置超时参数
        return self.response


rpc = RPC()

print("发送音讯到消费者, 期待返回后果")

response = rpc.call(func='del', param=(1, 2))

print("收到来自消费者返回的后果:{}".format(response))

服务端

import pika
import json

user_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))

channel = connection.channel()

# 指定接管音讯的 queue
channel.queue_declare(queue='rpc_queue')


def add_number(a, b):
    return a + b


def del_num(a, b):
    return a - b


execute_map = {
    'add': add_number,
    'del': del_num
}


def on_request(ch, method, props, body):
    body = json.loads(body.decode('utf-8'))
    func = body.get('func')
    param = body.get('param')
    result = execute_map.get(func)(param.get('a'), param.get('b'))
    print('进行 {} 运算,并将后果返回个消费者'.format(func))

    ch.basic_publish(exchange='',  # 应用默认交换机
                     routing_key=props.reply_to,  # response 发送到该 queue
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),  # 应用 correlation_id 让此 response 与申请音讯对应起来
                     body=json.dumps({'result': result}))

    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
# 从 rpc_queue 中取音讯,而后应用 on_request 进行解决
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print("[x] Awaiting RPC requests")
channel.start_consuming()

(七) 结语

对于 rabbitmq 的模式还有 Routing 模式和 Topics 模式等,这里就不复述了,其实 pika 对于 RabbitMQ 的应用还有很多细节和参数值得深究。这篇博客也就是简略的记录下我对 pika 操作 raabbitmq 过程和简略的了解

参考链接:

https://www.cnblogs.com/guyuy…
https://blog.csdn.net/wohu110…

正文完
 0