关于python:Python-MQTT-异步框架-HBMQTT

51次阅读

共计 3869 个字符,预计需要花费 10 分钟才能阅读完成。

什么是异步

CPU 的速度远远快于磁盘、网络等 IO 操作,而在一个线程中,无论 CPU 执行得再快,遇到 IO 操作时,都得停下来期待读写实现,这无疑节约了许多工夫。

为了解决这个问题,Python 退出了异步 IO 的个性。在 Python 3.4 中,正式将 asyncio 纳入规范库中,并在 Python 3.5 中,退出了 async/await 关键字。用户能够很轻松的应用在函数前退出 async 关键字,使函数变成异步函数。

在 Python 的 MQTT 客户端库中,HBMQTT 是最早反对异步 IO 的 Python MQTT 库。

HBMQTT 库

HBMQTT 是基于 Python 编写的开源库,实现了 MQTT 3.1.1 协定,个性如下:

  • 反对 QoS 0, QoS 1 以及 QoS 2 音讯
  • 客户端主动重连
  • 反对 TCP 和 WebSocket
  • 反对 SSL
  • 反对插件零碎

上面咱们将演示如何应用 Python MQTT 异步框架 – HBMQTT,轻松实现一个具备 MQTT 公布、订阅性能的异步 Demo。

我的项目初始化

确定 Python 版本

本我的项目应用 Python 3.6 进行开发测试,读者可用如下命令确认 Python 的版本。

因为须要应用 async 关键字,须要确保 Python 版本不低于 Python 3.5

➜  ~ python3 --version
Python 3.6.7

应用 Pip 装置 HBMQTT 库

Pip 是 Python 的包管理工具,该工具提供了对 Python 包的查找、下载、装置和卸载性能。

pip3 install -i https://pypi.doubanio.com/simple hbmqtt

连贯 MQTT 服务器

本文将应用 EMQ X 提供的收费公共 MQTT 服务器,该服务基于 EMQ X 的 MQTT 物联网云平台创立。服务器接入信息如下:

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

首先,导入 MQTT 客户端库。

from hbmqtt.client import MQTTClient

client = MQTTClient()
# 连贯服务器
client.connect('mqtt://broker.emqx.io/')
# 断开连接
client.disconnect()

异步写法如下:

async def test_pub():
    client = MQTTClient()
    await client.connect('mqtt://broker.emqx.io/')
    await client.disconnect()

公布音讯

公布音讯函数为 MQTTClient 类的 publish 函数。

client = MQTTClient()
# 函数的三个参数别离为主题、音讯内容、QoS
client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)

异步写法如下:

async def test_pub():
    client = MQTTClient()
    await Client.connect('mqtt://broker.emqx.io/')
    await asyncio.gather(client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
    )
    logging.info("messages published")
    await Client.disconnect()

在这段代码中,咱们将三个发送音讯函数放进 asyncio 的工作列表里,它们将会顺次被运行。当所有工作都实现后,断开连接。

订阅音讯

定阅音讯函数为 MQTTClient 类中的 subscribe 函数。

client = MQTTClient()
# 订阅
client.subscribe([('topic/0', QOS_0),
  ('topic/1', QOS_1),  
])
# 勾销订阅
client.unsubscribe([('topic/0', QOS_0),
]

异步写法如下:

async def test_sub():
    client = MQTTClient()
    await client.connect('mqtt://broker.emqx.io/')
    await client.subscribe([('a/b', QOS_1),
         ])
    for i in range(0, 10):
        message = await client.deliver_message()
        packet = message.publish_packet
        print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
    await client.disconnect()

在这段代码中,咱们在接管音讯时设置了 await 期待,当代码执行到如下地位时,CPU 会先去执行其它工作,直到有音讯传播,再将其打印。

message = await client.deliver_message()

最终,程序会期待 10 次音讯接管,而后敞开连贯。

残缺代码

音讯订阅代码

# sub.py
# python 3.6+

import asyncio
import logging

from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1


async def test_sub():
    client = MQTTClient()
    await client.connect('mqtt://broker.emqx.io/')
    await client.subscribe([('a/b', QOS_1),
    ])
    for i in range(0, 10):
        message = await client.deliver_message()
        packet = message.publish_packet
        print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
    await client.disconnect()


if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.INFO, format=formatter)
    asyncio.run(test_sub())

音讯公布代码

# pub.py
# python 3.6+

import asyncio
import logging

from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2


async def test_pub():
    client = MQTTClient()

    await client.connect('mqtt://broker.emqx.io/')
    await asyncio.gather(client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
    )
    logging.info("messages published")
    await client.disconnect()


if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.INFO, format=formatter)
    asyncio.run(test_pub())

测试

音讯公布

运行 MQTT 音讯公布代码,咱们将看到客户端连贯胜利,并且胜利公布音讯。

如下为 MQTT X 客户端胜利接管到 HBMQTT 客户端公布的音讯:

音讯订阅

运行 MQTT 音讯订阅代码,咱们将看到客户端连贯胜利,此时客户端正在期待音讯进入

应用 MQTT X 客户端连贯 broker.emqx.io,而后向主题 a/b 发送 10 次音讯

回到终端,咱们看到客户端接管并打印消息,并且在收到 10 条音讯后,被动退出了程序。

总结

至此,咱们实现了 HBMQTT 库连贯到公共 MQTT 服务器,并实现了测试客户端与 MQTT 服务器的连贯、音讯公布和订阅。通过应用 Python 异步 IO 执行音讯的发送接管,能够帮忙咱们实现更加高效的 MQTT 客户端。

接下来咱们将会陆续公布更多对于物联网开发及 Python 的相干文章,敬请关注。

版权申明:本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.cn/blog/python-async-mqtt-client-hbmqtt

正文完
 0