简介

mica-mqtt 基于 t-io 实现的简略低提早高性能 的 mqtt 物联网开源组件。

mica-mqtt server 更加易于集成到已有服务和二次开发,升高自研物联网平台开发成本。

mica-mqtt client 是简略、易用的 java mqtt 客户端,更加容易集成到本人的业务代码中。明天笔者次要要介绍的就是 mica-mqtt client 的应用。

应用

mica-mqtt-client Spring boot starter

增加依赖

<dependency>  <groupId>net.dreamlu</groupId>  <artifactId>mica-mqtt-client-spring-boot-starter</artifactId>  <version>1.3.7</version></dependency>

配置项阐明

mqtt:  client:    enabled: true               # 是否开启客户端,默认:false 应用到的场景无限,非必要请不要启用    ip: 127.0.0.1               # 连贯的服务端 ip ,默认:127.0.0.1    port: 1883                  # 端口:默认:1883    name: Mica-Mqtt-Client      # 名称,默认:Mica-Mqtt-Client    clientId: 000001            # 客户端Id(十分重要,个别为设施 sn,不可反复)    user-name: mica             # 认证的用户名    password: 123456            # 认证的明码    timeout: 5                  # 超时工夫,单位:秒,默认:5秒    reconnect: true             # 是否重连,默认:true    re-interval: 5000           # 重连工夫,默认 5000 毫秒    version: MQTT_5             # mqtt 协定版本,默认:3.1.1    read-buffer-size: 8KB       # 接收数据的 buffer size,默认:8k    max-bytes-in-message: 10MB  # 音讯解析最大 bytes 长度,默认:10M    buffer-allocator: heap      # 堆内存和堆外内存,默认:堆内存    keep-alive-secs: 60         # keep-alive 工夫,单位:秒    clean-session: true         # mqtt clean session,默认:true    use-ssl: false              # 是否启用 ssl,默认:false

连贯状态监听

@Servicepublic class MqttClientConnectListener {    private static final Logger logger = LoggerFactory.getLogger(MqttClientConnectListener.class);    @Autowired    private MqttClientCreator mqttClientCreator;    @EventListener    public void onConnected(MqttConnectedEvent event) {        logger.info("MqttConnectedEvent:{}", event);    }    @EventListener    public void onDisconnect(MqttDisconnectEvent event) {        // 离线时更新重连时的明码,实用于相似阿里云 mqtt clientId 连贯带工夫戳的形式         logger.info("MqttDisconnectEvent:{}", event);        // 在断线时更新 clientId、username、password        mqttClientCreator.clientId("newClient" + System.currentTimeMillis())            .username("newUserName")            .password("newPassword");    }}

自定义配置java(可选)

@Configuration(proxyBeanMethods = false)public class MqttClientCustomizerConfiguration {    @Bean    public MqttClientCustomizer mqttClientCustomizer() {        return new MqttClientCustomizer() {            @Override            public void customize(MqttClientCreator creator) {                // 此处可自定义配置 creator,会笼罩 yml 中的配置                System.out.println("----------------MqttServerCustomizer-----------------");            }        };    }}

订阅示例

@Servicepublic class MqttClientSubscribeListener {    private static final Logger logger = LoggerFactory.getLogger(MqttClientSubscribeListener.class);    @MqttClientSubscribe("/test/#")    public void subQos0(String topic, ByteBuffer payload) {        logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload));    }    @MqttClientSubscribe(value = "/qos1/#", qos = MqttQoS.AT_LEAST_ONCE)    public void subQos1(String topic, ByteBuffer payload) {        logger.info("topic:{} payload:{}", topic, ByteBufferUtil.toString(payload));    }}

MqttClientTemplate 应用示例

@Servicepublic class MainService {    private static final Logger logger = LoggerFactory.getLogger(MainService.class);    @Autowired    private MqttClientTemplate client;    public boolean publish() {        // 公布音讯示例        client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));        return true;    }    public boolean sub() {        // 订阅音讯示例        client.subQos0("/test/#", (topic, payload) -> {            logger.info(topic + '\t' + ByteBufferUtil.toString(payload));        });        return true;    }}

共享订阅 topic 阐明

mica-mqtt client 反对两种共享订阅形式:

  • 共享订阅:订阅前缀 $queue/,多个客户端订阅了 $queue/topic,发布者公布到topic,则只有一个客户端会接管到音讯。
  • 分组订阅:订阅前缀 $share/<group>/,组客户端订阅了$queue/group1/topic$queue/group2/topic..,发布者公布到topic,则音讯会公布到每个group中,然而每个group中只有一个客户端会接管到音讯。

jfinal mica-mqtt client(1.3.7 开始反对)

增加依赖

<dependency>    <groupId>net.dreamlu</groupId>    <artifactId>jfinal-mica-mqtt-client</artifactId>    <version>1.3.7</version></dependency>

删除 jfinal-demo 中的 slf4j-nop 依赖

增加 slf4j-log4j12

<dependency>    <groupId>org.slf4j</groupId>    <artifactId>slf4j-log4j12</artifactId>    <version>1.7.33</version></dependency>

在 jfinal Config configPlugin 中增加 mica-mqtt client 插件

MqttClientPlugin mqttClientPlugin = new MqttClientPlugin();mqttClientPlugin.config(mqttClientCreator -> {    // 设置 mqtt 连贯配置信息    mqttClientCreator            .clientId("clientId") // 按需配置,雷同的会互踢            .ip("mqtt.dreamlu.net")            .port(1883)            .connectListener(Aop.get(MqttClientConnectListener.class));});me.add(mqttClientPlugin);

在 jfinal Config onStart 启动实现之后增加 mqtt 订阅

@Overridepublic void onStart() {    IMqttClientMessageListener clientMessageListener = Aop.get(TestMqttClientMessageListener.class);    MqttClientKit.subQos0("#", clientMessageListener);}

应用 MqttClientKit 发送音讯

MqttClientKit.publish("mica", "hello".getBytes(StandardCharsets.UTF_8));

示例代码 MqttClientConnectListener

public class MqttClientConnectListener implements IMqttClientConnectListener {    @Override    public void onConnected(ChannelContext channelContext, boolean isReconnect) {        if (isReconnect) {            System.out.println("重连 mqtt 服务器重连胜利...");        } else {            System.out.println("连贯 mqtt 服务器胜利...");        }    }    @Override    public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {        System.out.println("mqtt 链接断开 remark:" + remark + " isRemove:" + isRemove);    }}

示例 TestMqttClientMessageListener

public class TestMqttClientMessageListener implements IMqttClientMessageListener {    @Override    public void onMessage(String topic, MqttPublishMessage mqttPublishMessage, ByteBuffer byteBuffer) {        System.out.println("收到音讯 topic:" + topic + "内容:\n" + ByteBufferUtil.toString(byteBuffer));    }}

其它 java 我的项目

增加依赖

<dependency>    <groupId>net.dreamlu</groupId>    <artifactId>mica-mqtt-core</artifactId>    <version>1.3.7</version>    <exclusions>        <exclusion>            <groupId>org.t-io</groupId>            <artifactId>tio-websocket-server</artifactId>        </exclusion>        <exclusion>            <groupId>net.dreamlu</groupId>            <artifactId>mica-mqtt-model</artifactId>        </exclusion>        <exclusion>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>        </exclusion>    </exclusions></dependency>

应用

// 初始化 mqtt 客户端MqttClient client = MqttClient.create()    .ip("127.0.0.1")                // mqtt 服务端 ip 地址    .port(1883)                     // 默认:1883    .username("admin")              // 账号    .password("123456")             // 明码    .version(MqttVersion.MQTT_5)    // 默认:3_1_1    .clientId("xxxxxx")             // 十分重要务必手动设置,个别设施 sn 号,默认:MICA-MQTT- 前缀和 36进制的纳秒数    .bufferAllocator(ByteBufferAllocator.DIRECT) // 堆内存和堆外内存,默认:堆内存    .readBufferSize(512)            // 音讯一起解析的长度,默认:为 8092 (mqtt 音讯最大长度)    .maxBytesInMessage(1024 * 10)   // 最大包体长度,如果包体过大须要设置此参数,默认为: 10M (10*1024*1024)    .keepAliveSecs(120)             // 默认:60s    .timeout(10)                    // 超时工夫,t-io 配置,可为 null,为 null 时,t-io 默认为 5    .reconnect(true)                // 是否重连,默认:true    .reInterval(5000)               // 重连重试工夫,reconnect 为 true 时无效,t-io 默认为:5000    .willMessage(builder -> {        builder.topic("/test/offline").messageText("down");    // 遗嘱音讯    })    .connectListener(new IMqttClientConnectListener() {        @Override        public void onConnected(ChannelContext context, boolean isReconnect) {            logger.info("链接服务器胜利...");        }                @Override        public void onDisconnect(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) {            logger.info("与链接服务器断开连接...");        }    })    .properties()                   // mqtt5 properties    .connect();                     // 异步连贯,还反对同步 connectSync()    // 音讯订阅,同类办法 subxxx    client.subQos0("/test/#", (topic, payload) -> {        logger.info(topic + '\t' + ByteBufferUtil.toString(payload));    });    // 勾销订阅    client.unSubscribe("/test/#");    // 发送音讯    client.publish("/test/client", ByteBuffer.wrap("mica-mqtt 牛皮".getBytes(StandardCharsets.UTF_8)));    // 断开连接    client.disconnect();    // 重连    client.reconnect();

鸣谢

mica-mqtt 从一个试验性的我的项目逐步欠缺,目前 gitee 上已有 800 多颗星。

mica-mqtt 的成长也离不开大伙应用和踊跃反馈,感激 @冷月宫主@willianfu@hjkJOJO@Symous@hongfeng11@胡萝博@杨钊@一醉化千愁@toskeyfine@亡羊补牛 等同学,谢谢大家!!!

应用文档

  • mica-mqtt 疾速开始
  • mica-mqtt-client-spring-boot-starter 应用文档
  • mica-mqtt-server-spring-boot-starter 应用文档
  • jfinal-mica-mqtt-client 应用文档
  • jfinal-mica-mqtt-server 应用文档
  • mica-mqtt 应用文档
  • mica-mqtt http api 文档详见
  • mica-mqtt 应用常见问题汇总
  • mica-mqtt 发行版本