eclipse-mqttclient-性能MQTT32202-正在发布过多的消息

mqttclient性能&MQTT(32202): 正在发布过多的消息org.eclipse.paho.client.mqttv32.2 GHz Intel Core i7 mac系统 publish性能,注意请使用单线程的 mqttclinet 1万条 341毫秒4万条 1163毫秒5万 1450毫秒10万条 2700毫秒 多线程的 mqttclinet MQTT(32202): 正在发布过多的消息 问题 异常信息[15:07:21]: publish failed, message: aaaa正在进行过多的发布 (32202) at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:496) at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:132) at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:156) at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1027) at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:399) at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:171) at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:161) at io.communet.ichater.emq.sub.MqttSendMsgEventSubscribe.onEvent(MqttSendMsgEventSubscribe.java:28) at java.lang.reflect.Method.invoke(Native Method) at java.lang.reflect.Method.invoke(Method.java:372) at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:507) at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:501) at org.greenrobot.eventbus.AsyncPoster.run(AsyncPoster.java:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587) at java.lang.Thread.run(Thread.java:818) 解决办法消息发送发送限流用单独的一个线程来完成 MQ 消息的推送 (不用这个MqttAsyncClient ,使用MqttClient 就没有事)options.setMaxInflight(1000) 增加 actualInFlight 的值; 反思笔者出现这个错误是因为使用 EventBus, 之前使用单独线程的 Handler 是没有问题的, 调查发现, 使用 EventBus 是新建线程运行的, 而 Handler 是单独一个线程.所以当发送大量消息的时候, EventBus 几乎是同一个点发出去, 就会造成这个错误 ...

July 15, 2019 · 2 min · jiezi

一个基于 swoole 的异步 mqtt 客户端库,可用于接收或者发送 mqtt 协议的消息

一个基于 swoole 的异步 mqtt 客户端库,可用于接收或者发送 mqtt 协议的消息。支持 QoS 0、QoS 1、QoS 2。支持 MQTT 3.1 和 3.1.1 版本.安装composer require try-to/swoole_mqttExamplesubscribe.php<?phpuse TrytoMqtt\Client;require_once DIR . ‘/vendor/autoload.php’;$options = [ ‘clean_session’ => false, ‘client_id’ => ‘demo-subscribe-123456’, ‘username’ => ‘’, ‘password’ => ‘’,];$mqtt = new Client(‘127.0.0.1’, 1883, $options);$mqtt->onConnect = function ($mqtt) { $mqtt->subscribe(’/World’);};$mqtt->onMessage = function ($topic, $content) { var_dump($topic, $content);};$mqtt->onError = function ($exception) use ($mqtt) { echo “error\n”; // $mqtt->reconnect(1000);};$mqtt->onClose = function () { echo “close\n”;};$mqtt->connect();命令行运行 php subscribe.php 启动publish.php<?phpuse TrytoMqtt\Client;require_once DIR . ‘/../vendor/autoload.php’;$options = [ ‘clean_session’ => false, ‘client_id’ => ‘demo-publish-123456’, ‘username’ => ‘’, ‘password’ => ‘’,];$mqtt = new Client(‘127.0.0.1’, 1883, $options);$mqtt->onConnect = function ($mqtt) { $mqtt->publish(’/World’, ‘hello swoole mqtt’);};$mqtt->onError = function ($exception) { echo “error\n”;};$mqtt->onClose = function () { echo “close\n”;};$mqtt->connect();命令行运行 php publish.php 启动实现的接口Client::__construct()Client::connect()Client::reconnect()Client::publish()Client::subscribe()Client::unsubscribe()Client::disconnect()Client::close()callback onConnectcallback onMessagecallback onErrorcallback onClose地址github地址码云地址 ...

December 20, 2018 · 1 min · jiezi