import pika
class Rabbitmq():
__new = None
__init = True
def __new__(cls, *args, **kwargs):
if cls.__new is None:
cls.__new = object.__new__(cls)
return cls.__new
def __init__(self,queue):
''':param queue: 队列名称'''
self.queue = queue
if Rabbitmq.__init:
#链接 rabbitmq
pika.PlainCredentials(username='用户名', password='明码')
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='IP 地址', port= 端口号))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1) #偏心散发
self.channel.queue_declare(queue=queue) #创立队列
Rabbitmq.__init = False
def basic_publish(self,body):
'''
:param body: 须要插入的数据
:return: 插入数据
'''
self.channel.basic_publish(
exchange='',
routing_key=self.queue,
body=body)
def basic_consume(self,callback):
'''
:return: 确认监听队列
auto_ck: 默认应答形式
'''
self.channel.basic_consume(
queue=self.queue,
auto_ack=True,
on_message_callback=callback)
def consume(self):
''':return: 正式监听'''
self.channel.start_consuming()
def close(self):
''':return: 敞开链接'''
self.connection.close()
if __name__ == '__main__':
queue = 'ceshi3'
rbmq = Rabbitmq(queue)
for i in range(10000):
print(i)
rbmq.basic_publish('holler word hahahah'+str(i))
def callback(ch, method, properties, body):
print("[x]:", body)
rbmq.basic_consume(callback)
rbmq.consume()
rbmq.close()