简略模式
生产者 1 链接rabbitmq 2 创立队列 3 向指定的队列插入数据消费者 1 链接rabbitmq 2 监听模式 3 确定回调函数
示例:
#生产者 import pika #链接rabbitmq creadentials = pika.PlainCredentials(username='用户名',password='明码') connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口)) channel = connection.channel() #创立队列 channel.queue_declare(queue='ceshi') #向指定队列插入数据 [exchange:交换机模式,简略模式为空 routing_key:指定队列 body:要插入的值] channel.basic_publish(exchange='',routing_key='ceshi',body='hello world!') print('[x]--')
#消费者 import pika #创立链接 pika.PlainCredentials(username='用户名',password='明码') connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口)) channel = connection.channel() #创立队列 channel.queue_declare(queue='ceshi') #确定回调函数 def callback(ch,method,properties,body): print("[x]:",body) #确定监听队列 [auto_ack:默认应答] channel.basic_consume(queue='ceshi',auto_ack=True,on_message_callback=callback) print('[*]:---') #正式监听 channel.start_consuming()
参数应用
1.应答参数
消费者确定监听队列时的一个参数auto_ack True:默认应答: 从队列取走一条数据后队列里这条数据就不存在了,如果在数据处理过程中程序呈现问题,会造成数据失落景象 False:手动应答: 从队列取走一条数据后,这条数据还存在于队列中 ch.basic_ack(delivery_tag=method.delivery_tag)增加在回调函数最初 最初执行这条命令会通知队列,我曾经执行实现了,能够删除这条数据了 手动应答必然会影响效率,具体依据我的项目需要抉择:是谋求数据安全还是效率
2.长久化
将数据保留到磁盘,避免程序运行中途rabbitmq服务异样导致数据失落 生产者 创立队列时进行申明 durable=True channel.queue_declare(queue='ceshi',durable=True) 插入数据时申明 properties=pika.BasicProperties(delivery_mode=2) channel.basic_publish(exchange='', routing_key='ceshi', body='hello world!', properties=pika.BasicProperties( delivery_mode=2) ) 消费者 创立队列时进行申明 durable=True channel.queue_declare(queue='ceshi2',durable=True)
3.散发参数
轮询散发 失常开启多个消费者都是轮询散发,比方队列有8个数据,每人4个 偏心散发 解决快的获取到的数据便越多,,偏心散发须要应用手动应答形式才能够 消费者增加 channel.basic_qos(prefetch_count=1)
交换机模式
公布订阅模式
生产者 1 链接rabbitmq 2 创立一个交换机,类型为fanout 3 向交换机内插入数据 消费者 1 链接rabbitmq 2 创立一个交换机,类型为fanout 3 创立队列并绑定交换机 4 监听模式 5 确定回调函数
示例:
#生产者 import pika #链接rabbitmq pika.PlainCredentials(username='用户名',password='明码') connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口)) channel = connection.channel() #申明一个名为logs类型为fanout的交换机 channel.exchange_declare(exchange='logs', exchange_type='fanout') #fanout:公布订阅模式 #向logs交换机插入数据 hello world! channel.basic_publish(exchange='logs', routing_key='', body='hello world!') print('[x]---') connection.close()
#消费者 import pika #创立链接 pika.PlainCredentials(username='用户名',password='明码') connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口)) channel = connection.channel() #申明一个与生产者名称类型雷同的交换机,防止先启动消费者-队列找不到交换机的状况 channel.exchange_declare(exchange='logs', exchange_type='fanout') #fanout:公布订阅模式 #创立队列 exclusive:零碎会创立一个随机惟一的队列名 result = channel.queue_declare(queue='',exclusive=True) queue_name = result.method.queue print(queue_name) #将指定队列绑定到交换机上 channel.queue_bind(exchange='logs', queue= queue_name) #确定回调函数 def callback(ch,method,properties,body): print("[x]:",body) #确定监听队列 [auto_ack:默认应答] channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) print('[*]:---') #正式监听 channel.start_consuming()
关键字模式
生产者 1 链接rabbitmq 2 创立一个交换机,类型为direct 3 向交换机内插入数据,插入时增加关键字,routing_key:想要进入哪一个消费者的队列,就设置某个队列的关键字 消费者 1 链接rabbitmq 2 创立一个交换机,类型为direct 3 创立队列并绑定交换机,绑定交换机时增加关键字 routing_key 4 监听模式 5 确定回调函数
示例:
#生产者 import pika #链接rabbitmq pika.PlainCredentials(username='用户名',password='明码') connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口)) channel = connection.channel() #申明一个名为logs类型为direct的交换机 channel.exchange_declare(exchange='logs', exchange_type='direct') #direct:关键字模式 #向logs交换机插入数据 hello world! routing_key为关键字 channel.basic_publish(exchange='logs', routing_key='info', body='hello world!') print('[x]---') connection.close
#消费者 import pika #创立链接 pika.PlainCredentials(username='用户名',password='明码') connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口)) channel = connection.channel() #申明一个与生产者名称类型雷同的交换机,防止先启动消费者-队列找不到交换机的状况 channel.exchange_declare(exchange='logs', exchange_type='direct') #direct:关键字模式 #创立队列 exclusive:零碎会创立一个随机惟一的队列名 result = channel.queue_declare(queue='',exclusive=True) queue_name = result.method.queue print(queue_name) #将指定队列绑定到交换机上,routing_key:关键字,多个关键字绑定多个 channel.queue_bind(exchange='logs', queue= queue_name, routing_key='info') channel.queue_bind(exchange='logs', queue= queue_name, routing_key='error') #确定回调函数 def callback(ch,method,properties,body): print("[x]:",body) #确定监听队列 [auto_ack:默认应答] channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) print('[*]:---') #正式监听 channel.start_consuming()
通配符模式
生产者 1 链接rabbitmq 2 创立一个交换机,类型为topic 3 向交换机内插入数据,插入时增加关键字,routing_key:想要进入哪一个消费者的队列,就设置某个队列的关键字,关键字能够应用.来宰割 消费者 1 链接rabbitmq 2 创立一个交换机,类型为topic 3 创立队列并绑定交换机,绑定交换机时增加关键字 routing_key 关键字能够应用通配符[*:匹配一次,#:匹配一次或屡次] 4 监听模式 5 确定回调函数
示例:
#生产者 import pika #链接rabbitmq pika.PlainCredentials(username='用户名',password='明码') connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口)) channel = connection.channel() #申明一个名为logs类型为topic的交换机 channel.exchange_declare(exchange='logs', exchange_type='topic') #topic:通配符模式 #向logs交换机插入数据 hello world! channel.basic_publish(exchange='logs3', routing_key='usa.aaaa', body='hello 21321!') print('[x]---') connection.close()
#消费者 import pika #创立链接 pika.PlainCredentials(username='用户名',password='明码') connection = pika.BlockingConnection(pika.ConnectionParameters(host='地址',port=端口)) channel = connection.channel() #申明一个与生产者名称类型雷同的交换机,防止先启动消费者-队列找不到交换机的状况 channel.exchange_declare(exchange='logs', exchange_type='topic') #topic:通配符模式 #创立队列 exclusive:零碎会创立一个随机惟一的队列名 result = channel.queue_declare(queue='',exclusive=True) queue_name = result.method.queue print(queue_name) #将指定队列绑定到交换机上,routing_key:关键字,多个关键字绑定多个 channel.queue_bind(exchange='logs', queue= queue_name, routing_key='#.aaaa') #确定回调函数 def callback(ch,method,properties,body): print("[x]:",body) #确定监听队列 [auto_ack:默认应答] channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=callback) print('[*]:---') #正式监听 channel.start_consuming()