base64_utils
import osfrom PIL import Imageimport reimport base64from io import BytesIOdef image_to_base64(file_path): """ file_path 文件门路 将图片转换为b64encode编码格局 """ with open(file_path, 'rb') as fp: return base64.b64encode(fp.read()).decode('utf-8')def pImg_to_base64(pImg): """ 二进制图片转base64 """ buffered = BytesIO() pImg.save(buffered, format="JPEG") # return base64.b64encode(buffered.getvalue()) return base64.b64encode(buffered.getvalue()).decode('utf-8')# https://www.jb51.net/article/178106.htmdef base64_to_pImg(data): """ base64 转为二进制图片 不便用 PIL 的 Image.open(binary_img).show() """ # binary_img = BytesIO(base64.b64decode(data)) binary_img = BytesIO(base64.b64decode(data.encode('utf-8'))) return Image.open(binary_img)def base64_to_image(filename, data, path=''): """ :param filename: 转换后的image名称 :param data: base64 :param path: 转换后的image文件存在门路 :return: """ file_path = os.path.join(path, filename) with open(file_path, "wb") as fp: fp.write(base64.b64decode(data.encode('utf-8'))) # 转换为imageimage_path = 'D:/slife/4.1/object-detections-yolov4-1/data/img/0234.jpg'if __name__ == '__main__': baseData = image_to_base64(image_path) image = base64_to_pImg(baseData) basexxx = pImg_to_base64(image) base64_to_image('aaa.jpg', basexxx) # byte_img = BytesIO(base64.b64decode(baseData.encode('utf-8'))) # image = Image.open(byte_img) # image.show() # print(image.size) # # print(baseData) # base64_save_image(baseData) # # Image.open(image_path).show()
rabbit_utils
import pikaimport jsondef create_connection(config): """ 创立RabbitMQ连贯 :param config: :return: """ # 性能: 创立连贯时的登录凭证 credentials = pika.PlainCredentials(username=config['username'], password=config['password']) # mq用户名和明码 # 虚构队列须要指定参数 virtual_host,如果是默认的能够不填。 # 性能: 连贯 MQ 的参数设置 param = pika.ConnectionParameters(host=config['host'], port=config['port'], virtual_host=config['virtual_host'], credentials=credentials) return pika.BlockingConnection(param)class Subscriber(): """ 音讯订阅者 """ def __init__(self, queueName, bindingKey, config): self.queueName = queueName self.bindingKey = bindingKey self.config = config self.connection = create_connection(self.config) def __del__(self): self.connection.close() def on_message_callback(self, channel, method, properties, body): """ :param channel: :param method: :param properties: :param body: """ # 将收到的音讯从json转为string #message = json.loads(body) message = body print(" [received] %r : %r" % (method.routing_key, message)) def setup(self,on_message_callback): # 性能:创立信道 channel = self.connection.channel() # 申明交换器exchange,由exchange指定音讯在哪个队列传递,如不存在,则创立。 # durable = True 代表exchange长久化存储,False 非长久化存储 # channel.exchange_declare(exchange=self.config['exchange'],exchange_type='topic', durable=True) # 性能:申明队列 channel.queue_declare(queue=self.queueName, durable=True) # 性能:通过路由键将队列和交换器绑定 channel.queue_bind(queue=self.queueName, exchange=self.config['exchange'], routing_key=self.bindingKey) # 性能:从队列中拿到音讯开始生产 channel.basic_consume(queue=self.queueName, on_message_callback=on_message_callback, auto_ack=True) try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming()class Publisher: """ 音讯发布者 """ def __init__(self, queueName, config): self.queueName = queueName self.config = config def publish(self, routing_key, message): """ :param routing_key: :param message: """ connection = create_connection(self.config) channel = connection.channel() # 申明exchange,由exchange指定音讯在哪个队列传递,如不存在,则创立。 # durable = True 代表exchange长久化存储,False 非长久化存储 # channel.exchange_declare(exchange=self.config['exchange'], exchange_type='topic', durable=True) channel.queue_declare(queue=self.queueName, durable=True) # 性能:将音讯公布到Rabbitmq交换器上 channel.basic_publish(exchange=self.config['exchange'], routing_key=routing_key, body=message) print("[x] Sent message %r for %r" % (message, routing_key))
receive_client.py
from utils.rabbit_utils import Subscriberfrom utils.base64_utils import base64_to_pImgimport jsonconfig = {'host': '192.167.113.83', 'port': 5672, 'username': 'guest', 'password': 'guest', 'exchange': 'exchange01', 'virtual_host': '/', }def on_message_callback(channel, method, properties, body): """ :param channel: :param method: :param properties: :param body: """ # 将收到的音讯从json转为string # message = json.loads(body) message = body.decode() json_ = json.loads(message) name = json_['name'] base64 = json_['base64'] pIg = base64_to_pImg(base64) # pIg 就是图片 # pIg.show() print(name) print(base64) print(" [received] %r : %r" % (method.routing_key, message))if __name__ == '__main__': receive = Subscriber('hello', 'rout', config) receive.setup(on_message_callback=on_message_callback)
send_client.py
from utils.rabbit_utils import Publisherimport os,json,timefrom watchdog.events import FileSystemEventHandlerfrom watchdog.observers import Observerfrom utils.base64_utils import image_to_base64config = {'host': '192.167.113.83', 'port': 5672, 'username': 'guest', 'password': 'guest', 'exchange': 'exchange01', 'virtual_host': '/', }path = '/Users/apple/Desktop/app/nginx'sender = Publisher("hello", config)# pip install watchdogclass MyEventHandler(FileSystemEventHandler): def on_created(self, event): _dict = {} f = event.src_path f_name = f.rsplit('/')[-1] base64 = image_to_base64(f) _dict['name'] = f_name _dict['base64'] = base64 result = json.dumps(_dict) sender.publish("rout", result) os.remove(f) # def on_closed(self, event): # print(event.src_path)if __name__ == '__main__': event_handler = MyEventHandler() observer = Observer() observer.schedule(event_handler, path, recursive=True) observer.start() try: while True: time.sleep(1) finally: observer.stop() observer.join()