import json
import sys
引入mqtt包
import paho.mqtt.client as mqtt
应用独立线程运行
from threading import Thread
建设mqtt连贯
def on_connect(client, userdata, flag, rc):
if rc == 0: # 连贯胜利 print("Connection successful")elif rc == 1: # 协定版本谬误 print("Protocol version error")elif rc == 2: # 有效的客户端标识 print("Invalid client identity")elif rc == 3: # 服务器无奈应用 print("server unavailable")elif rc == 4: # 谬误的用户名或明码 print("Wrong user name or password")elif rc == 5: # 未经受权 print("unaccredited")print("Connect with the result code " + str(rc))# 订阅频道# client.subscribe('31765425213673472', qos=2)
当与代理断开连接时调用
def on_disconnect(client, userdata, rc):
# rc == 0回调被调用以响应disconnect()调用# 如果以任何其余值断开连接是意外的,例如可能呈现网络谬误。if rc != 0: print("Unexpected disconnection %s" % rc)
当收到对于客户订阅的主题的音讯时调用。
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))json_msg = json.loads(msg.payload.decode('utf-8'))# 退出集体逻辑pass
当应用应用publish()发送的音讯曾经传输到代理时被调用。
def on_publish(client, obj, mid):
print("on_Publish, mid: " + str(mid))
当代理响应订阅申请时被调用
def on_subscribe(client, userdata, mid, granted_qos):
print("on_Subscribed: " + str(mid) + " " + str(granted_qos))
当代理响应勾销订阅申请时调用。
def on_unsubscribe(client, userdata, mid):
print("on_unsubscribe, mid: " + str(mid))
当客户端有日志信息时调用
def on_log(client, obj, level, string):
print("on_Log:" + string)
mqtt订阅启动函数
def mqtt_subscribe():
global clientclient.loop_forever()
mqtt公布启动函数
def mqtt_publish(sensor_data, topic='xxxxxxxx', qos=2):
global clienttry: client.publish(topic=topic, payload=sensor_data, qos=qos)except KeyboardInterrupt: print("EXIT") # 这是网络循环的阻塞模式,[金融期货](https://www.gendan5.com/futures/ff.html)直到客户端调用disconnect()时才会返回。它会主动解决从新连贯。 client.disconnect() sys.exit(0)
client = mqtt.Client()
启动函数
def mqtt_run():
# 账号密码验证放到最后面client.username_pw_set('user', 'user')# client = mqtt.Client()# 建设mqtt连贯client.on_connect = on_connectclient.on_subscribe = on_subscribeclient.on_message = on_message# 当与代理断开连接时调用client.on_disconnect = on_disconnectclient.on_log = on_log# 绑定 MQTT 服务器地址broker_ip = ''# MQTT服务器的端口号# client.connect(host=broker_ip, port=1883, keepalive=6000)client.connect(host=broker_ip, port=1883)client.reconnect_delay_set(min_delay=1, max_delay=2000)client.subscribe('xxxxxxxx', qos=0)# 创立线程去继续接管订阅信息subscribe_thread = Thread(target=mqtt_subscribe)subscribe_thread.start()
if name == "__main__":
mqtt_run()