base64_utils
import os
from PIL import Image
import re
import base64
from io import BytesIO
def image_to_base64(file_path):
"""
file_path 文件门路
将图片转换为 b64encode 编码格局
"""with open(file_path,'rb') as fp:
return base64.b64encode(fp.read()).decode('utf-8')
def pImg_to_base64(pImg):
"""二进制图片转 base64"""
buffered = BytesIO()
pImg.save(buffered, format="JPEG")
# return base64.b64encode(buffered.getvalue())
return base64.b64encode(buffered.getvalue()).decode('utf-8')
# https://www.jb51.net/article/178106.htm
def base64_to_pImg(data):
"""base64 转为二进制图片 不便用 PIL 的 Image.open(binary_img).show()"""
# binary_img = BytesIO(base64.b64decode(data))
binary_img = BytesIO(base64.b64decode(data.encode('utf-8')))
return Image.open(binary_img)
def base64_to_image(filename, data, path=''):"""
:param filename: 转换后的 image 名称
:param data: base64
:param path: 转换后的 image 文件存在门路
:return:
"""
file_path = os.path.join(path, filename)
with open(file_path, "wb") as fp:
fp.write(base64.b64decode(data.encode('utf-8'))) # 转换为 image
image_path = 'D:/slife/4.1/object-detections-yolov4-1/data/img/0234.jpg'
if __name__ == '__main__':
baseData = image_to_base64(image_path)
image = base64_to_pImg(baseData)
basexxx = pImg_to_base64(image)
base64_to_image('aaa.jpg', basexxx)
# byte_img = BytesIO(base64.b64decode(baseData.encode('utf-8')))
# image = Image.open(byte_img)
# image.show()
# print(image.size)
# # print(baseData)
# base64_save_image(baseData)
# # Image.open(image_path).show()
rabbit_utils
import pika
import json
def create_connection(config):
"""
创立 RabbitMQ 连贯
:param config:
:return:
"""
# 性能: 创立连贯时的登录凭证
credentials = pika.PlainCredentials(username=config['username'], password=config['password']) # mq 用户名和明码
# 虚构队列须要指定参数 virtual_host,如果是默认的能够不填。# 性能: 连贯 MQ 的参数设置
param = pika.ConnectionParameters(host=config['host'], port=config['port'],
virtual_host=config['virtual_host'], credentials=credentials)
return pika.BlockingConnection(param)
class Subscriber():
"""音讯订阅者"""
def __init__(self, queueName, bindingKey, config):
self.queueName = queueName
self.bindingKey = bindingKey
self.config = config
self.connection = create_connection(self.config)
def __del__(self):
self.connection.close()
def on_message_callback(self, channel, method, properties, body):
"""
:param channel:
:param method:
:param properties:
:param body:
"""
# 将收到的音讯从 json 转为 string
#message = json.loads(body)
message = body
print("[received] %r : %r" % (method.routing_key, message))
def setup(self,on_message_callback):
# 性能:创立信道
channel = self.connection.channel()
# 申明交换器 exchange,由 exchange 指定音讯在哪个队列传递,如不存在,则创立。# durable = True 代表 exchange 长久化存储,False 非长久化存储
# channel.exchange_declare(exchange=self.config['exchange'],exchange_type='topic', durable=True)
# 性能:申明队列
channel.queue_declare(queue=self.queueName, durable=True)
# 性能:通过路由键将队列和交换器绑定
channel.queue_bind(queue=self.queueName, exchange=self.config['exchange'], routing_key=self.bindingKey)
# 性能:从队列中拿到音讯开始生产
channel.basic_consume(queue=self.queueName,
on_message_callback=on_message_callback, auto_ack=True)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
class Publisher:
"""音讯发布者"""
def __init__(self, queueName, config):
self.queueName = queueName
self.config = config
def publish(self, routing_key, message):
"""
:param routing_key:
:param message:
"""
connection = create_connection(self.config)
channel = connection.channel()
# 申明 exchange,由 exchange 指定音讯在哪个队列传递,如不存在,则创立。# durable = True 代表 exchange 长久化存储,False 非长久化存储
# channel.exchange_declare(exchange=self.config['exchange'], exchange_type='topic', durable=True)
channel.queue_declare(queue=self.queueName, durable=True)
# 性能:将音讯公布到 Rabbitmq 交换器上
channel.basic_publish(exchange=self.config['exchange'], routing_key=routing_key, body=message)
print("[x] Sent message %r for %r" % (message, routing_key))
receive_client.py
from utils.rabbit_utils import Subscriber
from utils.base64_utils import base64_to_pImg
import json
config = {'host': '192.167.113.83',
'port': 5672,
'username': 'guest',
'password': 'guest',
'exchange': 'exchange01',
'virtual_host': '/',
}
def on_message_callback(channel, method, properties, body):
"""
:param channel:
:param method:
:param properties:
:param body:
"""
# 将收到的音讯从 json 转为 string
# message = json.loads(body)
message = body.decode()
json_ = json.loads(message)
name = json_['name']
base64 = json_['base64']
pIg = base64_to_pImg(base64)
# pIg 就是图片
# pIg.show()
print(name)
print(base64)
print("[received] %r : %r" % (method.routing_key, message))
if __name__ == '__main__':
receive = Subscriber('hello', 'rout', config)
receive.setup(on_message_callback=on_message_callback)
send_client.py
from utils.rabbit_utils import Publisher
import os,json,time
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from utils.base64_utils import image_to_base64
config = {'host': '192.167.113.83',
'port': 5672,
'username': 'guest',
'password': 'guest',
'exchange': 'exchange01',
'virtual_host': '/',
}
path = '/Users/apple/Desktop/app/nginx'
sender = Publisher("hello", config)
# pip install watchdog
class MyEventHandler(FileSystemEventHandler):
def on_created(self, event):
_dict = {}
f = event.src_path
f_name = f.rsplit('/')[-1]
base64 = image_to_base64(f)
_dict['name'] = f_name
_dict['base64'] = base64
result = json.dumps(_dict)
sender.publish("rout", result)
os.remove(f)
# def on_closed(self, event):
# print(event.src_path)
if __name__ == '__main__':
event_handler = MyEventHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
finally:
observer.stop()
observer.join()