从 v4.1 版本开始,EMQ X MQTT 服务器 提供了专门的多语言反对插件 emqx_extension_hook ,现已反对应用其余编程语言来解决 EMQ X 中的钩子事件,开发者能够应用 Python 或者 Java 疾速开发本人的插件,在官网性能的根底上进行扩大,满足本人的业务场景。例如:

  • 验证某客户端的登录权限:客户端连贯时触发对应函数,通过参数获取客户端信息后通过读取数据库、比对等操作断定是否有登录权限
  • 记录客户端在线状态与高低线历史:客户端状态变动时触发对应函数,通过参数获取客户端信息,改写数据库中客户端在线状态
  • 校验某客户端的 PUB/SUB 的操作权限:公布/订阅时触发对应函数,通过参数获取客户端信息与以后主题,断定客户端是否有对应的操作权限
  • 解决会话 (Sessions) 和 音讯 (Message) 事件,实现订阅关系与音讯解决/存储:音讯公布、状态变动时触发对应函数,获取以后客户端信息、音讯状态与音讯内容,转发到 Kafka 或数据库进行存储。
注:音讯(Message) 类钩子,仅在企业版中反对。

Python 和 Java 驱动基于 Erlang/OTP-Port 过程间通信实现,自身具备十分高的吞吐性能,本文以 Python 拓展为例介绍 EMQ X 跨语言拓展应用形式。

Python 拓展应用示例

要求

  • EMQ X 所在服务器需装置 Python 3.6 以上版本

应用步骤

  1. 通过 pip 装置 Python SDK
  2. 调整 EMQ X 配置,确保相干配置项正确指向 Python 我的项目
  3. 引入 SDK 编写代码

Python 插件装置

通过 pip 命令在本地装置 SDK,确保应用 pip3 进行装置

pip3 install emqx-extension-sdk

批改配置

批改 emqx-extension-hook 插件配置,正确应用拓展:

## Setup the supported drivers#### Value: python2 | python3 | javaexhook.drivers = python3## Search path for scripts/libraryexhook.drivers.python3.path = data/extension/hooks.py## Call timeout#### Value: Duration##exhook.drivers.python3.call_timeout = 5s## Initial module name## Your filename or module nameexhook.drivers.python3.init_module = hooks

编写代码

emqx/data/extension 目录下新建 hooks.py 文件,引入 SDK 编写业务逻辑,示例程序如下:

## data/extension/hooks.pyfrom emqx_extension.hooks import EmqxHookSdk, hooks_handlerfrom emqx_extension.types import EMQX_CLIENTINFO_PARSE_T, EMQX_MESSAGE_PARSE_T# 继承 SDK HookSdk 类class CustomHook(EmqxHookSdk):      # 应用装璜器注册 hooks    @hooks_handler()    def on_client_connect(self,                          conninfo: EMQX_CLIENTINFO_PARSE_T = None,                          props: dict = None,                          state: list = None):        print(f'[Python SDK] [on_client_connect] {conninfo.clientid} connecte')    @hooks_handler()    def on_client_connected(self,                            clientinfo: EMQX_CLIENTINFO_PARSE_T,                            state: list = None):        print(            f'[Python SDK] [on_client_connected] {clientinfo.clientid} connected')    @hooks_handler()    def on_client_check_acl(self, clientinfo: EMQX_CLIENTINFO_PARSE_T,                            pubsub: str,                            topic: str,                            result: bool,                            state: tuple) -> bool:        print(            f'[Python SDK] [on_client_check_acl] {clientinfo.username} check ACL: {pubsub} {topic}')        # 用户名为空时,ACL 验证不通过        if clientinfo.username == '':            return False        return True    @hooks_handler()    def on_client_authenticate(self, clientinfo: EMQX_CLIENTINFO_PARSE_T, authresult,                               state) -> bool:        print(            f'[Python SDK] [on_client_authenticate] {clientinfo.clientid} authenticate')        # clientid 不为空时,验证通过        if clientinfo.clientid != '':            return True        return False    # on_message_* 仅反对企业版    @hooks_handler()    def on_message_publish(self, message: EMQX_MESSAGE_PARSE_T, state):        print(            f'[Python SDK] [on_message_publish] {message.topic} {message.payload}')emqx_hook = CustomHook(hook_module=f'{__name__}.emqx_hook')def init():    return emqx_hook.start()def deinit():    return

启动

启动 emqx_extension_hook 插件,如果配置谬误或代码编写谬误将无奈失常启动。启动后尝试建设 MQTT 连贯并察看业务运行状况。

./bin/emqx_ctl plugins load emqx_extension_hook

进阶开发

目前 EMQ X Python 拓展 SDK 是开源的,如果对可控性、性能要求更高,或须要应用 Python 2.7 版本的运行环境,欢送奉献代码或基于原始示例进行开发:

  • 代码仓库:emqx-extension-python-sdk
  • Python 原始示例,可应用该示例自行封装:[emqx-extension-hook main.py
版权申明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.io/cn/blog/d...