生产者
1 链接 rabbitmq
2 创立队列
3 向指定的队列插入数据
消费者
1 链接 rabbitmq
2 监听模式
3 确定回调函数
#生产者
import pika
#链接 rabbitmq
creadentials = pika.PlainCredentials(username='用户名',password='明码')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port= 端口))
channel = connection.channel()
#创立队列
channel.queue_declare(queue='ceshi')
#向指定队列插入数据 [exchange: 交换机模式, 简略模式为空 routing_key: 指定队列 body: 要插入的值]
channel.basic_publish(exchange='',routing_key='ceshi',body='hello world!')
print('[x]--')
#消费者
import pika
#创立链接
pika.PlainCredentials(username='用户名',password='明码')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port= 端口))
channel = connection.channel()
#创立队列
channel.queue_declare(queue='ceshi')
#确定回调函数
def callback(ch,method,properties,body):
print("[x]:",body)
#确定监听队列 [auto_ack: 默认应答]
channel.basic_consume(queue='ceshi',auto_ack=True,on_message_callback=callback)
print('[*]:---')
#正式监听
channel.start_consuming()
消费者确定监听队列时的一个参数
auto_ack
True: 默认应答:从队列取走一条数据后队列里这条数据就不存在了, 如果在数据处理过程中程序呈现问题, 会造成数据失落景象
False: 手动应答:从队列取走一条数据后, 这条数据还存在于队列中
ch.basic_ack(delivery_tag=method.delivery_tag) 增加在回调函数最初
最初执行这条命令会通知队列, 我曾经执行实现了, 能够删除这条数据了
手动应答必然会影响效率, 具体依据我的项目需要抉择:是谋求数据安全还是效率
将数据保留到磁盘, 避免程序运行中途 rabbitmq 服务异样导致数据失落
生产者
创立队列时进行申明 durable=True
channel.queue_declare(queue='ceshi',durable=True)
插入数据时申明 properties=pika.BasicProperties(delivery_mode=2)
channel.basic_publish(exchange='',
routing_key='ceshi',
body='hello world!',
properties=pika.BasicProperties(delivery_mode=2)
)
消费者
创立队列时进行申明 durable=True
channel.queue_declare(queue='ceshi2',durable=True)
轮询散发
失常开启多个消费者都是轮询散发, 比方队列有 8 个数据, 每人 4 个
偏心散发
解决快的获取到的数据便越多,, 偏心散发须要应用手动应答形式才能够
消费者增加 channel.basic_qos(prefetch_count=1)
生产者
1 链接 rabbitmq
2 创立一个交换机, 类型为 fanout
3 向交换机内插入数据
消费者
1 链接 rabbitmq
2 创立一个交换机, 类型为 fanout
3 创立队列并绑定交换机
4 监听模式
5 确定回调函数
#生产者
import pika
#链接 rabbitmq
pika.PlainCredentials(username='用户名',password='明码')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port= 端口))
channel = connection.channel()
#申明一个名为 logs 类型为 fanout 的交换机
channel.exchange_declare(exchange='logs',
exchange_type='fanout') #fanout: 公布订阅模式
#向 logs 交换机插入数据 hello world!
channel.basic_publish(exchange='logs',
routing_key='',
body='hello world!')
print('[x]---')
connection.close()
#消费者
import pika
#创立链接
pika.PlainCredentials(username='用户名',password='明码')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port= 端口))
channel = connection.channel()
#申明一个与生产者名称类型雷同的交换机, 防止先启动消费者 - 队列找不到交换机的状况
channel.exchange_declare(exchange='logs',
exchange_type='fanout') #fanout: 公布订阅模式
#创立队列 exclusive: 零碎会创立一个随机惟一的队列名
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
#将指定队列绑定到交换机上
channel.queue_bind(exchange='logs',
queue= queue_name)
#确定回调函数
def callback(ch,method,properties,body):
print("[x]:",body)
#确定监听队列 [auto_ack: 默认应答]
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
print('[*]:---')
#正式监听
channel.start_consuming()
生产者
1 链接 rabbitmq
2 创立一个交换机, 类型为 direct
3 向交换机内插入数据, 插入时增加关键字,routing_key: 想要进入哪一个消费者的队列, 就设置某个队列的关键字
消费者
1 链接 rabbitmq
2 创立一个交换机, 类型为 direct
3 创立队列并绑定交换机, 绑定交换机时增加关键字 routing_key
4 监听模式
5 确定回调函数
#生产者
import pika
#链接 rabbitmq
pika.PlainCredentials(username='用户名',password='明码')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port= 端口))
channel = connection.channel()
#申明一个名为 logs 类型为 direct 的交换机
channel.exchange_declare(exchange='logs',
exchange_type='direct') #direct: 关键字模式
#向 logs 交换机插入数据 hello world! routing_key 为关键字
channel.basic_publish(exchange='logs',
routing_key='info',
body='hello world!')
print('[x]---')
connection.close
#消费者
import pika
#创立链接
pika.PlainCredentials(username='用户名',password='明码')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port= 端口))
channel = connection.channel()
#申明一个与生产者名称类型雷同的交换机, 防止先启动消费者 - 队列找不到交换机的状况
channel.exchange_declare(exchange='logs',
exchange_type='direct') #direct: 关键字模式
#创立队列 exclusive: 零碎会创立一个随机惟一的队列名
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
#将指定队列绑定到交换机上,routing_key: 关键字, 多个关键字绑定多个
channel.queue_bind(exchange='logs',
queue= queue_name,
routing_key='info')
channel.queue_bind(exchange='logs',
queue= queue_name,
routing_key='error')
#确定回调函数
def callback(ch,method,properties,body):
print("[x]:",body)
#确定监听队列 [auto_ack: 默认应答]
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
print('[*]:---')
#正式监听
channel.start_consuming()
生产者
1 链接 rabbitmq
2 创立一个交换机, 类型为 topic
3 向交换机内插入数据, 插入时增加关键字,routing_key: 想要进入哪一个消费者的队列, 就设置某个队列的关键字, 关键字能够应用. 来宰割
消费者
1 链接 rabbitmq
2 创立一个交换机, 类型为 topic
3 创立队列并绑定交换机, 绑定交换机时增加关键字 routing_key 关键字能够应用通配符 [*: 匹配一次,#: 匹配一次或屡次]
4 监听模式
5 确定回调函数
#生产者
import pika
#链接 rabbitmq
pika.PlainCredentials(username='用户名',password='明码')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port= 端口))
channel = connection.channel()
#申明一个名为 logs 类型为 topic 的交换机
channel.exchange_declare(exchange='logs',
exchange_type='topic') #topic: 通配符模式
#向 logs 交换机插入数据 hello world!
channel.basic_publish(exchange='logs3',
routing_key='usa.aaaa',
body='hello 21321!')
print('[x]---')
connection.close()
#消费者
import pika
#创立链接
pika.PlainCredentials(username='用户名',password='明码')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port= 端口))
channel = connection.channel()
#申明一个与生产者名称类型雷同的交换机, 防止先启动消费者 - 队列找不到交换机的状况
channel.exchange_declare(exchange='logs',
exchange_type='topic') #topic: 通配符模式
#创立队列 exclusive: 零碎会创立一个随机惟一的队列名
result = channel.queue_declare(queue='',exclusive=True)
queue_name = result.method.queue
print(queue_name)
#将指定队列绑定到交换机上,routing_key: 关键字, 多个关键字绑定多个
channel.queue_bind(exchange='logs',
queue= queue_name,
routing_key='#.aaaa')
#确定回调函数
def callback(ch,method,properties,body):
print("[x]:",body)
#确定监听队列 [auto_ack: 默认应答]
channel.basic_consume(queue=queue_name,
auto_ack=True,
on_message_callback=callback)
print('[*]:---')
#正式监听
channel.start_consuming()