MQTT 是一种基于公布 / 订阅(publish/subscribe)模式的 ” 轻量级 ” 通信协定,作为一种低开销、低带宽占用的即时通讯协定,曾经成为物联网的重要组成部分
Swoole 也给 PHP 提供了开发物联网我的项目的能力,只须要设置一个 open_mqtt_protocol 选项,启用后就会解析 MQTT 包头,在 Worker 过程的 onReceive 事件每次都会返回一个残缺的 MQTT 数据包
当然其余的也有,例如 Workerman 之前提供的 异步 mqtt 客户端库,还有其余的开源库,这里就不一一介绍了
Simps 的第一个版本 MQTT 库 就是参考了 Workerman 的实现,使其可能应用 Swoole 的协程能力,同时也修复了一些问题
在此也要感激 @walkor 对 PHP 生态作出的奉献
第一个版本的实现是放在了框架当中,限度了一些用户的应用。于是又开始了重构,将 MQTT 独立为一个 library,不便用户应用的同时也丰盛了 PHP 生态,让 PHP 程序员不再局限于 Web 开发
在第一个版本公布之后,Simps 的交换群中也有不少用户询问 MQTT 的问题,Swoole 也修复了一些相干的 Bug,当初应用 PHP + Swoole 去开发物联网相干的我的项目应该是锦上添花
同时第一个版本的 MQTT 库,只反对 MQTT 3.x,不反对 MQTT 5.0,在 GitHub 上也没有找到相干反对的类库,所以在重构了 3.x 版本之后,也反对了一下 MQTT 5.0
兴许这是第一个反对 MQTT v5.0 协定的 PHP library…
反对 MQTT 协定 3.1、3.1.1 和 5.0 版本,反对 QoS 0、QoS 1、QoS 2,那么它来了,应用 composer 来装置
composer require simps/mqtt
装置胜利之后咱们来看一下订阅和公布的应用,以 MQTT5.0 为例
订阅
首先应该是订阅,订阅胜利之后能力收到对应主题的公布音讯,创立一个 subscribe.php
写入以下内容
include __DIR__ . '/vendor/autoload.php';
use Simps\MQTT\Hex\ReasonCode;
use Swoole\Coroutine;
use Simps\MQTT\Client;
use Simps\MQTT\Types;
$config = [
'host' => 'broker.emqx.io',
'port' => 1883,
'time_out' => 5,
'user_name' => 'user001',
'password' => 'hLXQ9ubnZGzkzf',
'client_id' => Client::genClientID(),
'keep_alive' => 10,
'properties' => [
'session_expiry_interval' => 60,
'receive_maximum' => 200,
'topic_alias_maximum' => 200,
],
'protocol_level' => 5,
];
Coroutine\run(function () use ($config) {$client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]);
while (!$data = $client->connect()) {Coroutine::sleep(3);
$client->connect();}
$topics['simps-mqtt/user001/get'] = [
'qos' => 1,
'no_local' => true,
'retain_as_published' => true,
'retain_handling' => 2,
];
$timeSincePing = time();
$res = $client->subscribe($topics);
// 订阅的后果
var_dump($res);
while (true) {$buffer = $client->recv();
if ($buffer && $buffer !== true) {$timeSincePing = time();
// 收到的数据包
var_dump($buffer);
}
if (isset($config['keep_alive']) && $timeSincePing < (time() - $config['keep_alive'])) {$buffer = $client->ping();
if ($buffer) {
echo 'send ping success' . PHP_EOL;
$timeSincePing = time();} else {$client->close();
break;
}
}
// QoS1 公布回复
if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {
$client->send(
[
'type' => Types::PUBACK,
'message_id' => $buffer['message_id'],
'code' => ReasonCode::SUCCESS
]
);
}
}
});
执行php subscribe.php
,就会失去这样的输入
array(3) {["type"]=>
int(9)
["message_id"]=>
int(1)
["codes"]=>
array(1) {[0]=>
int(1)
}
}
示意订阅胜利,codes 对应的是对应订阅主题的 QoS 等级
公布
订阅胜利之后,创立一个 publish.php
来测试公布
include __DIR__ . '/vendor/autoload.php';
use Swoole\Coroutine;
use Simps\MQTT\Client;
$config = [
'host' => 'broker.emqx.io',
'port' => 1883,
'time_out' => 5,
'user_name' => 'user002',
'password' => 'adIJS1D482sd',
'client_id' => Client::genClientID(),
'keep_alive' => 20,
'properties' => [
'session_expiry_interval' => 60,
'receive_maximum' => 200,
'topic_alias_maximum' => 200,
],
'protocol_level' => 5,
];
Coroutine\run(function () use ($config) {$client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]);
while (!$client->connect()) {Coroutine::sleep(3);
$client->connect();}
while (true) {
$response = $client->publish(
'simps-mqtt/user001/get',
'{"time":' . time() . '}',
1,
0,
0,
['topic_alias' => 1]
);
var_dump($response);
Coroutine::sleep(3);
}
});
代码的意思是每隔 3 秒给订阅的主题 simps-mqtt/user001/get
公布一次音讯
关上一个新的终端窗口,执行 php publish.php
就会失去输入:
array(4) {["type"]=>
int(4)
["message_id"]=>
int(1)
["code"]=>
int(0)
["message"]=>
string(7) "Success"
}
这里减少了 message,为了用户可读,不须要去查找对应的 code 含意是什么
返回到订阅的窗口,就会看到所打印的公布信息
array(8) {["type"]=>
int(3)
["topic"]=>
string(0) ""["message"]=>
string(19) "{"time":1608017156}"
["dup"]=>
int(1)
["qos"]=>
int(1)
["retain"]=>
int(0)
["message_id"]=>
int(4)
["properties"]=>
array(1) {["topic_alias"]=>
int(1)
}
}
这样一个简略的公布订阅性能就实现了
在这个库中还有一些值得优化和还未实现的局部,如还没有反对 MQTT5 的 Auth
type,以及局部的properties
还未反对
想参加的同学能够提交 PR,如果有问题也能够提交 Issue,让咱们独特去建设 PHP 的生态
仓库地址:simps/mqtt,反对记得点个 Star 哦