简略模式-生产与生产

producer生产者

import pikaimport json credentials = pika.PlainCredentials('guest', 'guest')conn = pika.BlockingConnection(    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials)) channel = conn.channel()result = channel.queue_declare(queue='zy_test') for i in range(100):    message = json.dumps({'OrderId': f'1000{i}'})    channel.basic_publish(exchange='', routing_key='zy_test', body=message)    print(message) conn.close()

consumer消费者

import pikaimport json credentials = pika.PlainCredentials('guest', 'guest')conn = pika.BlockingConnection(    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials)) channel = conn.channel()# durable 长久化 音讯不失落channel.queue_declare(queue='zy_test', durable=True)# 负载平衡:应用basicQos( prefetchCount = 1)办法,来限度RabbitMQ只发不超过1条的音讯给同一个消费者。当音讯处理完毕后,有了反馈,才会进行第二次发送。channel.basic_qos(prefetch_count=1) def callback(ch, method, properties, body):    ch.basic_ack(delivery_tag=method.delivery_tag)    print(body.decode()) channel.basic_consume(queue='zy_test', on_message_callback=callback)channel.start_consuming()

公布与订阅

公布与订阅要借助交换机(EXchange)的原理来实现:

EXchange一共有四种工作模式:fanout,direct,topic,headers(不罕用)

1、fanout模式

fanout模式下,传递到exchange的音讯将会转发到所有与其绑定的queue上。艰深了解为:发送给与exchange绑定的所有queue

  • 不须要指定routing_key,即便指定了也是有效的
  • 须要提前将exchange和queue绑定,多对多关系:一个exchange能够绑定多个queue,一个queue能够绑定多个exchange
  • 须要先启动订阅者,此模式下的队列是consumer随机生成的,发布者仅仅发消息到exchange,由exchange转发音讯至queue。

发布者:

import jsonfrom time import sleep import pika credentials = pika.PlainCredentials('guest', 'guest')conn = pika.BlockingConnection(    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials)) channel = conn.channel()# 申明exchange,由exchange指定音讯在哪个队列传递,如不存在,则创立。durable = True 代表exchange长久化存储,False 非长久化存储channel.exchange_declare(exchange='zy_test', exchange_type='fanout', durable=True) for i in range(100):    message = json.dumps({'OrderId': f'20000{i}'})    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 申明音讯在队列中长久化,delivery_mod = 1 音讯非长久化。routing_key 不须要配置    channel.basic_publish(        exchange='zy_test',        routing_key='',        body=message,        properties=pika.BasicProperties(delivery_mode=2)    )    print(f'Sent: {message}')    sleep(1)conn.close()

订阅者:

import pika credentials = pika.PlainCredentials('guest', 'guest')conn = pika.BlockingConnection(    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials)) channel = conn.channel()channel.exchange_declare(exchange='logs', exchange_type='fanout')# 创立长期队列,队列名传空字符,consumer敞开后,队列主动删除result = channel.queue_declare(queue='', exclusive=True)# 绑定exchange和队列queue,exchange 使咱们可能确切地指定音讯应该到哪个队列去channel.queue_bind(exchange='zy_test', queue=result.method.queue)  # 回调函数,解决音讯内容def callback(ch, method, properties, body):    ch.basic_ack(delivery_tag=method.delivery_tag)    print(body.decode())  # auto_ack设置成 False,在调用callback函数时,未收到确认标识,音讯会重回队列。True,无论调用callback胜利与否,音讯都被生产掉channel.basic_consume(result.method.queue, callback, auto_ack=False)channel.start_consuming()

2、direct模式

direct模式原理:音讯发送至exchange,exchange依据路由键(routing_key)转达到对应的queue上;艰深的了解为:依据routing_key过滤。

  • 能够应用默认exchange='', 也能够自定义exchange
  • direct模式不须要将exchange和queue,routing_key,queue进行绑定,当然能够绑定;
  • 传递或接管音讯时 须要指定 routing_key
  • 须要先启动 订阅者,此模式下的consumer是随机生成的,发布者仅仅公布音讯到exchange,由exchange转发音讯至queue。如果后启动订阅者,则会失落启动前的音讯数据。

发布者:

import jsonfrom time import sleep import pika credentials = pika.PlainCredentials('guest', 'guest')conn = pika.BlockingConnection(    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials)) channel = conn.channel()channel.exchange_declare(exchange='zy_test_d', durable=True, exchange_type='direct')for i in range(100):    message = json.dumps({'OrderId': f'30000{i}'})    # 向队列插入数值 routing_key是队列名。delivery_mode = 2 申明音讯在队列中长久化,delivery_mod = 1 音讯非长久化。routing_key 不须要配置    channel.basic_publish(        exchange='zy_test_d',        routing_key='OrderId',        body=message,        properties=pika.BasicProperties(delivery_mode=2)    )    print(f'Sent: {message}')    sleep(1)conn.close()

订阅者:

import pika credentials = pika.PlainCredentials('guest', 'guest')conn = pika.BlockingConnection(    pika.ConnectionParameters(host='your robbitmq server', port=5672, virtual_host='/', credentials=credentials)) channel = conn.channel()channel.exchange_declare(exchange='zy_test_d', exchange_type='direct', durable=True)# 创立长期队列,队列名传空字符,consumer敞开后,队列主动删除result = channel.queue_declare(queue='', exclusive=True)# 绑定exchange和队列queue,exchange 使咱们可能确切地指定音讯应该到哪个队列去channel.queue_bind(exchange='zy_test_d', queue=result.method.queue, routing_key='OrderId')  # 回调函数,解决音讯内容def callback(ch, method, properties, body):    ch.basic_ack(delivery_tag=method.delivery_tag)    print(body.decode())  # auto_ack设置成 False,在调用callback函数时,未收到确认标识,音讯会重回队列。True,无论调用callback胜利与否,音讯都被生产掉channel.basic_consume(result.method.queue, callback, auto_ack=False)channel.start_consuming()

3、topic模式

topic模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发音讯到指定的 queue 。 _不同点是 routing_key 应用正则表达式反对含糊匹配_,但匹配规定又与惯例的正则表达式不同,比方“#”是匹配全副,“*”是匹配一个词。
举例:routing_key =“#orderid#”,意思是将音讯转发至所有 routing_key 蕴含 “orderid” 字符的队列中。代码和模式二 相似,就不贴出来了。

基于rabbitMQ的RPC

Callback queue 回调队列

一个客户端向服务器发送申请,服务器端解决申请后,将其处理结果保留在一个存储体中。而客户端为了取得处理结果,那么客户在向服务器发送申请时,同时发送一个回调队列地址 reply_to

Correlation id 关联标识

一个客户端可能会发送多个申请给服务器,当服务器解决完后,客户端无奈分别在回调队列中的响应具体和那个申请时对应的。为了解决这种状况,客户端在发送每个申请时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中依据correlation_id字段的值就能够分辨此响应属于哪个申请。

客户端发送申请

客户端在发送RPC申请到RPC申请队列时,客户端至多发送带有reply_to以及correlation_id两个属性的信息

服务端工作流

期待承受客户端发来RPC申请,当申请呈现的时候,服务器从RPC申请队列中取出申请,进行解决后,将响应发送到reply_to指定的回调队列中

客户端承受处理结果

客户端期待回调队列中呈现响应,当响应呈现时,它会依据响应中correlation_id字段的值,将其返回给对应的利用

server端

import pika  def fib(n):    if n == 0:        return 0    elif n == 1:        return 1    else:        return fib(n - 1) + fib(n - 2)  # 解决RPC申请队列中数据,并回调def on_request(ch, method, properties, body):    n = int(body)    print(f'[recv] n={n}')    response = fib(n)     ch.basic_publish(exchange='',                     routing_key=properties.reply_to,                     properties=pika.BasicProperties(correlation_id=properties.correlation_id),                     body=str(response)                     )    ch.basic_ack(delivery_tag=method.delivery_tag) # 鉴权credentials = pika.PlainCredentials('guest', 'guest')# 建设链接conn = pika.BlockingConnection(    pika.ConnectionParameters(host='your rabbitmq host', port=5672, virtual_host='/', credentials=credentials))# 建设会话channel = conn.channel()# 申明RPC申请队列channel.queue_declare(queue='rpc_queue')# 负载平衡channel.basic_qos(prefetch_count=1)# 生产客户端的音讯,并申请回调办法on_requestchannel.basic_consume(queue='rpc_queue', on_message_callback=on_request)channel.start_consuming()

client端

import uuidimport pika  class fibRpcClient:    def __init__(self):        self.__credentials = pika.PlainCredentials('guest', 'guest')        self.conn = pika.BlockingConnection(pika.ConnectionParameters(            host='your rabbitmq server', port=5672, virtual_host='/', credentials=self.__credentials        ))        # 定义回调音讯会话        self.channel = self.conn.channel()        result = self.channel.queue_declare(queue='rpc_back', exclusive=True)        self.callback_queue = result.method.queue        self.correlation_id = str(uuid.uuid4())        self.response = None        # 收到回调音讯,开始生产        self.channel.basic_consume(            on_message_callback=self.on_response,            queue=self.callback_queue,            auto_ack=False        )     # 收到音讯后 数据处理办法    def on_response(self, ch, method, properties, body):        if self.correlation_id == properties.correlation_id:            self.response = body     def call(self, n):        # 发送音讯        self.channel.basic_publish(            exchange='',            routing_key='rpc_queue',            body=str(n),            properties=pika.BasicProperties(                reply_to=self.callback_queue,                correlation_id=self.correlation_id            )        )        print(f'[send] n={n}')        # 没有收到相应,就始终非阻塞式的start_consumer        while self.response is None:            self.conn.process_data_events()        print(f'[recv] result is {int(self.response)}') if __name__ == '__main__':    fib = fibRpcClient()    fib.call(17)

长久化

MQ默认建设的是长期 queue 和 exchange,如果不申明长久化,一旦 rabbitmq 挂掉,queue、exchange 将会全副失落。所以咱们个别在创立 queue 或者 exchange 的时候会申明 长久化。

1、queue 申明长久化

申明音讯队列,音讯将在这个队列传递,如不存在,则创立。
durable = True 代表音讯队列长久化存储,False 非长久化存储

result = channel.queue_declare(queue = 'python-test',durable = True)

2、exchange 申明长久化

申明exchange,由exchange指定音讯在哪个队列传递,如不存在,则创立。
durable = True 代表exchange长久化存储,False 非长久化存储

channel.exchange_declare(exchange = 'python-test', durable = True)

3、音讯长久化

尽管exchange 和 queue 都申明了长久化,但如果音讯只存在内存里,rabbitmq 重启后,内存里的货色还是会失落。所以必须申明音讯也是长久化,从内存转存到硬盘。

向队列插入数值 routing_key是队列名。
delivery_mode = 2 申明音讯在队列中长久化,delivery_mod = 1 音讯非长久化 

channel.basic_publish(exchange = '',routing_key = 'python-test',body = message, properties=pika.BasicProperties(delivery_mode = 2))

4、acknowledgement音讯不失落

消费者(consumer)调用callback函数时,会存在解决音讯失败的危险,如果解决失败,则音讯失落。然而也能够抉择消费者解决失败时,将音讯回退给 rabbitmq ,从新再被消费者生产,这个时候须要设置确认标识。

auto_ack设置成 False,在调用callback函数时,未收到确认标识,音讯会重回队列。
True,无论调用callback胜利与否,音讯都被生产掉 

channel.basic_consume(callback,queue = 'python-test', auto_ack= False)