base64_utils

import osfrom PIL import Imageimport reimport base64from io import BytesIOdef image_to_base64(file_path):    """    file_path 文件门路    将图片转换为b64encode编码格局    """    with open(file_path, 'rb') as fp:        return base64.b64encode(fp.read()).decode('utf-8')def pImg_to_base64(pImg):    """    二进制图片转base64    """    buffered = BytesIO()    pImg.save(buffered, format="JPEG")    # return base64.b64encode(buffered.getvalue())    return base64.b64encode(buffered.getvalue()).decode('utf-8')# https://www.jb51.net/article/178106.htmdef base64_to_pImg(data):    """    base64 转为二进制图片 不便用 PIL 的  Image.open(binary_img).show()    """    # binary_img = BytesIO(base64.b64decode(data))    binary_img = BytesIO(base64.b64decode(data.encode('utf-8')))    return Image.open(binary_img)def base64_to_image(filename, data, path=''):    """    :param filename: 转换后的image名称    :param data: base64    :param path: 转换后的image文件存在门路    :return:    """    file_path = os.path.join(path, filename)    with open(file_path, "wb") as fp:        fp.write(base64.b64decode(data.encode('utf-8')))  # 转换为imageimage_path = 'D:/slife/4.1/object-detections-yolov4-1/data/img/0234.jpg'if __name__ == '__main__':    baseData = image_to_base64(image_path)    image = base64_to_pImg(baseData)    basexxx = pImg_to_base64(image)    base64_to_image('aaa.jpg', basexxx)    # byte_img = BytesIO(base64.b64decode(baseData.encode('utf-8')))    # image = Image.open(byte_img)    # image.show()    # print(image.size)    # # print(baseData)    # base64_save_image(baseData)    # # Image.open(image_path).show()

rabbit_utils

import pikaimport jsondef create_connection(config):    """    创立RabbitMQ连贯    :param config:    :return:    """    # 性能: 创立连贯时的登录凭证    credentials = pika.PlainCredentials(username=config['username'], password=config['password'])  # mq用户名和明码    # 虚构队列须要指定参数 virtual_host,如果是默认的能够不填。    # 性能: 连贯 MQ 的参数设置    param = pika.ConnectionParameters(host=config['host'], port=config['port'],                                      virtual_host=config['virtual_host'], credentials=credentials)    return pika.BlockingConnection(param)class Subscriber():    """        音讯订阅者    """    def __init__(self, queueName, bindingKey, config):        self.queueName = queueName        self.bindingKey = bindingKey        self.config = config        self.connection = create_connection(self.config)    def __del__(self):        self.connection.close()    def on_message_callback(self, channel, method, properties, body):        """        :param channel:        :param method:        :param properties:        :param body:        """        # 将收到的音讯从json转为string        #message = json.loads(body)        message = body        print(" [received] %r : %r" % (method.routing_key, message))    def setup(self,on_message_callback):        # 性能:创立信道        channel = self.connection.channel()        # 申明交换器exchange,由exchange指定音讯在哪个队列传递,如不存在,则创立。        # durable = True 代表exchange长久化存储,False 非长久化存储        # channel.exchange_declare(exchange=self.config['exchange'],exchange_type='topic', durable=True)        # 性能:申明队列        channel.queue_declare(queue=self.queueName, durable=True)        # 性能:通过路由键将队列和交换器绑定        channel.queue_bind(queue=self.queueName, exchange=self.config['exchange'], routing_key=self.bindingKey)        # 性能:从队列中拿到音讯开始生产        channel.basic_consume(queue=self.queueName,                              on_message_callback=on_message_callback, auto_ack=True)        try:            channel.start_consuming()        except KeyboardInterrupt:            channel.stop_consuming()class Publisher:    """        音讯发布者    """    def __init__(self, queueName, config):        self.queueName = queueName        self.config = config    def publish(self, routing_key, message):        """        :param routing_key:        :param message:        """        connection = create_connection(self.config)        channel = connection.channel()        # 申明exchange,由exchange指定音讯在哪个队列传递,如不存在,则创立。        # durable = True 代表exchange长久化存储,False 非长久化存储        # channel.exchange_declare(exchange=self.config['exchange'], exchange_type='topic', durable=True)        channel.queue_declare(queue=self.queueName, durable=True)        # 性能:将音讯公布到Rabbitmq交换器上        channel.basic_publish(exchange=self.config['exchange'], routing_key=routing_key, body=message)        print("[x] Sent message %r for %r" % (message, routing_key))

receive_client.py

from utils.rabbit_utils import Subscriberfrom utils.base64_utils import base64_to_pImgimport jsonconfig = {'host': '192.167.113.83',          'port': 5672,          'username': 'guest',          'password': 'guest',          'exchange': 'exchange01',          'virtual_host': '/',          }def on_message_callback(channel, method, properties, body):    """    :param channel:    :param method:    :param properties:    :param body:    """    # 将收到的音讯从json转为string    # message = json.loads(body)    message = body.decode()    json_ = json.loads(message)    name = json_['name']    base64 = json_['base64']    pIg = base64_to_pImg(base64)    # pIg 就是图片    # pIg.show()    print(name)    print(base64)    print(" [received] %r : %r" % (method.routing_key, message))if __name__ == '__main__':    receive = Subscriber('hello', 'rout', config)    receive.setup(on_message_callback=on_message_callback)

send_client.py

from utils.rabbit_utils import Publisherimport os,json,timefrom watchdog.events import FileSystemEventHandlerfrom watchdog.observers import Observerfrom utils.base64_utils import image_to_base64config = {'host': '192.167.113.83',          'port': 5672,          'username': 'guest',          'password': 'guest',          'exchange': 'exchange01',          'virtual_host': '/',          }path = '/Users/apple/Desktop/app/nginx'sender = Publisher("hello", config)# pip install watchdogclass MyEventHandler(FileSystemEventHandler):    def on_created(self, event):        _dict = {}        f = event.src_path        f_name = f.rsplit('/')[-1]        base64 = image_to_base64(f)        _dict['name'] = f_name        _dict['base64'] = base64        result = json.dumps(_dict)        sender.publish("rout", result)        os.remove(f)    # def on_closed(self, event):    #     print(event.src_path)if __name__ == '__main__':    event_handler = MyEventHandler()    observer = Observer()    observer.schedule(event_handler, path, recursive=True)    observer.start()    try:        while True:            time.sleep(1)    finally:        observer.stop()        observer.join()