关于python:监听文件目录新增-并发送到rabbitmq-案例1

40次阅读

共计 5126 个字符,预计需要花费 13 分钟才能阅读完成。

base64_utils

import os
from PIL import Image
import re
import base64
from io import BytesIO


def image_to_base64(file_path):
    """
    file_path 文件门路
    将图片转换为 b64encode 编码格局
    """with open(file_path,'rb') as fp:
        return base64.b64encode(fp.read()).decode('utf-8')


def pImg_to_base64(pImg):
    """二进制图片转 base64"""
    buffered = BytesIO()
    pImg.save(buffered, format="JPEG")
    # return base64.b64encode(buffered.getvalue())
    return base64.b64encode(buffered.getvalue()).decode('utf-8')


# https://www.jb51.net/article/178106.htm
def base64_to_pImg(data):
    """base64 转为二进制图片 不便用 PIL 的  Image.open(binary_img).show()"""
    # binary_img = BytesIO(base64.b64decode(data))
    binary_img = BytesIO(base64.b64decode(data.encode('utf-8')))
    return Image.open(binary_img)


def base64_to_image(filename, data, path=''):"""
    :param filename: 转换后的 image 名称
    :param data: base64
    :param path: 转换后的 image 文件存在门路
    :return:
    """
    file_path = os.path.join(path, filename)
    with open(file_path, "wb") as fp:
        fp.write(base64.b64decode(data.encode('utf-8')))  # 转换为 image


image_path = 'D:/slife/4.1/object-detections-yolov4-1/data/img/0234.jpg'


if __name__ == '__main__':
    baseData = image_to_base64(image_path)

    image = base64_to_pImg(baseData)
    basexxx = pImg_to_base64(image)

    base64_to_image('aaa.jpg', basexxx)

    # byte_img = BytesIO(base64.b64decode(baseData.encode('utf-8')))
    # image = Image.open(byte_img)
    # image.show()
    # print(image.size)

    # # print(baseData)
    # base64_save_image(baseData)
    # # Image.open(image_path).show()

rabbit_utils

import pika
import json


def create_connection(config):
    """
    创立 RabbitMQ 连贯
    :param config:
    :return:
    """
    # 性能: 创立连贯时的登录凭证
    credentials = pika.PlainCredentials(username=config['username'], password=config['password'])  # mq 用户名和明码
    # 虚构队列须要指定参数 virtual_host,如果是默认的能够不填。# 性能: 连贯 MQ 的参数设置
    param = pika.ConnectionParameters(host=config['host'], port=config['port'],
                                      virtual_host=config['virtual_host'], credentials=credentials)
    return pika.BlockingConnection(param)


class Subscriber():
    """音讯订阅者"""
    def __init__(self, queueName, bindingKey, config):
        self.queueName = queueName
        self.bindingKey = bindingKey
        self.config = config
        self.connection = create_connection(self.config)

    def __del__(self):
        self.connection.close()

    def on_message_callback(self, channel, method, properties, body):
        """
        :param channel:
        :param method:
        :param properties:
        :param body:
        """
        # 将收到的音讯从 json 转为 string
        #message = json.loads(body)
        message = body
        print("[received] %r : %r" % (method.routing_key, message))

    def setup(self,on_message_callback):
        # 性能:创立信道
        channel = self.connection.channel()
        # 申明交换器 exchange,由 exchange 指定音讯在哪个队列传递,如不存在,则创立。# durable = True 代表 exchange 长久化存储,False 非长久化存储
        # channel.exchange_declare(exchange=self.config['exchange'],exchange_type='topic', durable=True)
        # 性能:申明队列
        channel.queue_declare(queue=self.queueName, durable=True)
        # 性能:通过路由键将队列和交换器绑定
        channel.queue_bind(queue=self.queueName, exchange=self.config['exchange'], routing_key=self.bindingKey)
        # 性能:从队列中拿到音讯开始生产
        channel.basic_consume(queue=self.queueName,
                              on_message_callback=on_message_callback, auto_ack=True)
        try:
            channel.start_consuming()
        except KeyboardInterrupt:
            channel.stop_consuming()


class Publisher:
    """音讯发布者"""
    def __init__(self, queueName, config):
        self.queueName = queueName
        self.config = config

    def publish(self, routing_key, message):
        """
        :param routing_key:
        :param message:
        """
        connection = create_connection(self.config)
        channel = connection.channel()
        # 申明 exchange,由 exchange 指定音讯在哪个队列传递,如不存在,则创立。# durable = True 代表 exchange 长久化存储,False 非长久化存储
        # channel.exchange_declare(exchange=self.config['exchange'], exchange_type='topic', durable=True)
        channel.queue_declare(queue=self.queueName, durable=True)
        # 性能:将音讯公布到 Rabbitmq 交换器上
        channel.basic_publish(exchange=self.config['exchange'], routing_key=routing_key, body=message)
        print("[x] Sent message %r for %r" % (message, routing_key))

receive_client.py

from utils.rabbit_utils import Subscriber
from utils.base64_utils import base64_to_pImg
import json

config = {'host': '192.167.113.83',
          'port': 5672,
          'username': 'guest',
          'password': 'guest',
          'exchange': 'exchange01',
          'virtual_host': '/',
          }


def on_message_callback(channel, method, properties, body):
    """
    :param channel:
    :param method:
    :param properties:
    :param body:
    """
    # 将收到的音讯从 json 转为 string
    # message = json.loads(body)

    message = body.decode()
    json_ = json.loads(message)
    name = json_['name']
    base64 = json_['base64']
    pIg = base64_to_pImg(base64)
    # pIg 就是图片
    # pIg.show()
    print(name)
    print(base64)
    print("[received] %r : %r" % (method.routing_key, message))


if __name__ == '__main__':
    receive = Subscriber('hello', 'rout', config)
    receive.setup(on_message_callback=on_message_callback)

send_client.py


from utils.rabbit_utils import Publisher
import os,json,time
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from utils.base64_utils import image_to_base64

config = {'host': '192.167.113.83',
          'port': 5672,
          'username': 'guest',
          'password': 'guest',
          'exchange': 'exchange01',
          'virtual_host': '/',
          }

path = '/Users/apple/Desktop/app/nginx'
sender = Publisher("hello", config)
# pip install watchdog


class MyEventHandler(FileSystemEventHandler):

    def on_created(self, event):
        _dict = {}
        f = event.src_path
        f_name = f.rsplit('/')[-1]
        base64 = image_to_base64(f)
        _dict['name'] = f_name
        _dict['base64'] = base64
        result = json.dumps(_dict)
        sender.publish("rout", result)
        os.remove(f)
    # def on_closed(self, event):
    #     print(event.src_path)


if __name__ == '__main__':
    event_handler = MyEventHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    finally:
        observer.stop()
        observer.join()



正文完
 0