简介
mica-mqtt 基于 t-io 实现的简略、低提早、高性能 的 mqtt 物联网开源组件。
mica-mqtt server 更加易于集成到已有服务和二次开发,升高自研物联网平台开发成本。
mica-mqtt client 是简略、易用的 java mqtt 客户端,更加容易集成到本人的业务代码中。明天笔者次要要介绍的就是 mica-mqtt client 的应用。
应用
mica-mqtt-client Spring boot starter
增加依赖
<dependency> <groupId>net.dreamlu</groupId> <artifactId>mica-mqtt-client-spring-boot-starter</artifactId> <version>1.3.7</version></dependency>
配置项阐明
mqtt: client: enabled: true # 是否开启客户端,默认:false 应用到的场景无限,非必要请不要启用 ip: 127.0.0.1 # 连贯的服务端 ip ,默认:127.0.0.1 port: 1883 # 端口:默认:1883 name: Mica-Mqtt-Client # 名称,默认:Mica-Mqtt-Client clientId: 000001 # 客户端Id(十分重要,个别为设施 sn,不可反复) user-name: mica # 认证的用户名 password: 123456 # 认证的明码 timeout: 5 # 超时工夫,单位:秒,默认:5秒 reconnect: true # 是否重连,默认:true re-interval: 5000 # 重连工夫,默认 5000 毫秒 version: MQTT_5 # mqtt 协定版本,默认:3.1.1 read-buffer-size: 8KB # 接收数据的 buffer size,默认:8k max-bytes-in-message: 10MB # 音讯解析最大 bytes 长度,默认:10M buffer-allocator: heap # 堆内存和堆外内存,默认:堆内存 keep-alive-secs: 60 # keep-alive 工夫,单位:秒 clean-session: true # mqtt clean session,默认:true use-ssl: false # 是否启用 ssl,默认:false
连贯状态监听
@Servicepublic class MqttClientConnectListener { private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class); @Autowired private MqttClientCreator mqttClientCreator; @EventListener public void onConnected(MqttConnectedEvent event) { logger.info("MqttConnectedEvent:{}", event); } @EventListener public void onDisconnect(MqttDisconnectEvent event) { // 离线时更新重连时的明码,实用于相似阿里云 mqtt clientId 连贯带工夫戳的形式 logger.info("MqttDisconnectEvent:{}", event); // 在断线时更新 clientId、username、password mqttClientCreator.clientId("newClient" + System.currentTimeMillis()) .username("newUserName") .password("newPassword"); }}
自定义配置java(可选)
@Configuration(proxyBeanMethods = false)public class MqttClientCustomizerConfiguration { @Bean public MqttClientCustomizer mqttClientCustomizer() { return new MqttClientCustomizer() { @Override public void customize(MqttClientCreator creator) { // 此处可自定义配置 creator,会笼罩 yml 中的配置 System.out.println("----------------MqttServerCustomizer-----------------"); } }; }}
订阅示例
@Servicepublic class MqttClientSubscribeListener { private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class); @MqttClientSubscribe("/test/#") public void subQos0(String topic, ByteBuffer payload) { logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload)); } @MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.AT_LEAST_ONCE) public void subQos1(String topic, ByteBuffer payload) { logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload)); }}
MqttClientTemplate 应用示例
@Servicepublic class MainService { private static final Logger logger = LoggerFactory.getLogger(MainService.class); @Autowired private MqttClientTemplate client; public boolean publish() { // 公布音讯示例 client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8))); return true; } public boolean sub() { // 订阅音讯示例 client.subQos0("/test/#", (topic, payload) -> { logger.info(topic + '\t' + ByteBufferUtil.toString(payload)); }); return true; }}
共享订阅 topic 阐明
mica-mqtt client 反对两种共享订阅形式:
- 共享订阅:订阅前缀
$queue/
,多个客户端订阅了$queue/topic
,发布者公布到topic,则只有一个客户端会接管到音讯。 - 分组订阅:订阅前缀
$share/<group>/
,组客户端订阅了$queue/group1/topic
、$queue/group2/topic
..,发布者公布到topic,则音讯会公布到每个group中,然而每个group中只有一个客户端会接管到音讯。
jfinal mica-mqtt client(1.3.7 开始反对)
增加依赖
<dependency> <groupId>net.dreamlu</groupId> <artifactId>jfinal-mica-mqtt-client</artifactId> <version>1.3.7</version></dependency>
删除 jfinal-demo 中的 slf4j-nop 依赖
增加 slf4j-log4j12
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.33</version></dependency>
在 jfinal Config configPlugin 中增加 mica-mqtt client 插件
MqttClientPlugin mqttClientPlugin = new MqttClientPlugin();mqttClientPlugin.config(mqttClientCreator -> { // 设置 mqtt 连贯配置信息 mqttClientCreator .clientId("clientId") // 按需配置,雷同的会互踢 .ip("mqtt.dreamlu.net") .port(1883) .connectListener(Aop.get(MqttClientConnectListener.class));});me.add(mqttClientPlugin);
在 jfinal Config onStart 启动实现之后增加 mqtt 订阅
@Overridepublic void onStart() { IMqttClientMessageListener clientMessageListener = Aop.get(TestMqttClientMessageListener.class); MqttClientKit.subQos0("#", clientMessageListener);}
应用 MqttClientKit 发送音讯
MqttClientKit.publish("mica", "hello".getBytes(StandardCharsets.UTF_8));
示例代码 MqttClientConnectListener
public class MqttClientConnectListener implements IMqttClientConnectListener { @Override public void onConnected(ChannelContext channelContext, boolean isReconnect) { if (isReconnect) { System.out.println("重连 mqtt 服务器重连胜利..."); } else { System.out.println("连贯 mqtt 服务器胜利..."); } } @Override public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) { System.out.println("mqtt 链接断开 remark:" + remark + " isRemove:" + isRemove); }}
示例 TestMqttClientMessageListener
public class TestMqttClientMessageListener implements IMqttClientMessageListener { @Override public void onMessage(String topic, MqttPublishMessage mqttPublishMessage, ByteBuffer byteBuffer) { System.out.println("收到音讯 topic:" + topic + "内容:\n" + ByteBufferUtil.toString(byteBuffer)); }}
其它 java 我的项目
增加依赖
<dependency> <groupId>net.dreamlu</groupId> <artifactId>mica-mqtt-core</artifactId> <version>1.3.7</version> <exclusions> <exclusion> <groupId>org.t-io</groupId> <artifactId>tio-websocket-server</artifactId> </exclusion> <exclusion> <groupId>net.dreamlu</groupId> <artifactId>mica-mqtt-model</artifactId> </exclusion> <exclusion> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </exclusion> </exclusions></dependency>
应用
// 初始化 mqtt 客户端MqttClient client = MqttClient.create() .ip("127.0.0.1") // mqtt 服务端 ip 地址 .port(1883) // 默认:1883 .username("admin") // 账号 .password("123456") // 明码 .version(MqttVersion.MQTT_5) // 默认:3_1_1 .clientId("xxxxxx") // 十分重要务必手动设置,个别设施 sn 号,默认:MICA-MQTT- 前缀和 36进制的纳秒数 .bufferAllocator(ByteBufferAllocator.DIRECT) // 堆内存和堆外内存,默认:堆内存 .readBufferSize(512) // 音讯一起解析的长度,默认:为 8092 (mqtt 音讯最大长度) .maxBytesInMessage(1024 * 10) // 最大包体长度,如果包体过大须要设置此参数,默认为: 10M (10*1024*1024) .keepAliveSecs(120) // 默认:60s .timeout(10) // 超时工夫,t-io 配置,可为 null,为 null 时,t-io 默认为 5 .reconnect(true) // 是否重连,默认:true .reInterval(5000) // 重连重试工夫,reconnect 为 true 时无效,t-io 默认为:5000 .willMessage(builder -> { builder.topic("/test/offline").messageText("down"); // 遗嘱音讯 }) .connectListener(new IMqttClientConnectListener() { @Override public void onConnected(ChannelContext context, boolean isReconnect) { logger.info("链接服务器胜利..."); } @Override public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) { logger.info("与链接服务器断开连接..."); } }) .properties() // mqtt5 properties .connect(); // 异步连贯,还反对同步 connectSync() // 音讯订阅,同类办法 subxxx client.subQos0("/test/#", (topic, payload) -> { logger.info(topic + '\t' + ByteBufferUtil.toString(payload)); }); // 勾销订阅 client.unSubscribe("/test/#"); // 发送音讯 client.publish("/test/client", ByteBuffer.wrap("mica-mqtt 牛皮".getBytes(StandardCharsets.UTF_8))); // 断开连接 client.disconnect(); // 重连 client.reconnect();
鸣谢
mica-mqtt 从一个试验性的我的项目逐步欠缺,目前 gitee 上已有 800 多颗星。
mica-mqtt 的成长也离不开大伙应用和踊跃反馈,感激 @冷月宫主
、@willianfu
、@hjkJOJO
、@Symous
、@hongfeng11
、@胡萝博
、@杨钊
、@一醉化千愁
、@toskeyfine
、@亡羊补牛
等同学,谢谢大家!!!
应用文档
- mica-mqtt 疾速开始
- mica-mqtt-client-spring-boot-starter 应用文档
- mica-mqtt-server-spring-boot-starter 应用文档
- jfinal-mica-mqtt-client 应用文档
- jfinal-mica-mqtt-server 应用文档
- mica-mqtt 应用文档
- mica-mqtt http api 文档详见
- mica-mqtt 应用常见问题汇总
- mica-mqtt 发行版本