前言

实现MQTT协定的中间件有很多,本文应用的是企业级 EMQX EnterPrise,不理解的小伙伴能够翻阅之前的博客。这里,次要介绍SpringBoot2.0集成MQTT实现音讯推送的性能。

创立我的项目

创立父工程

关上 idea 点击 File>New>Project 抉择Spring Initializr >JDK版本>Next 并按下图创立我的项目

点击 next ,开发者工具 Developer Tools咱们勾选前两个,
Web 咱们勾选第一个,平安框架和SQL这里临时不须要勾选,Messaging中间件,咱们同样勾选第一个就好,Cloud组件咱们也不必勾选。

顺次点击 next finish创立好我的项目

删除 src ,.gitignore,HELP.md,mvnwmvnw.cmd 目录,本文采纳Gateway绑定的形式,须要引入以下依赖:

<dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-stream</artifactId></dependency><dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-mqtt</artifactId></dependency>

父工程pom文件:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <packaging>pom</packaging>    <modules>        <module>springboot_emqx_common</module>        <module>springboot_emqx_publish</module>        <module>springboot_emqx_subscribe</module>    </modules>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.4.1</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.baba.wlb</groupId>    <artifactId>springboot_emqx</artifactId>    <version>1.0-SNAPSHOT</version>    <name>springboot_emqx</name>    <description>Demo project for Spring Boot</description>    <properties>        <java.version>1.8</java.version>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-integration</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.integration</groupId>            <artifactId>spring-integration-stream</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.integration</groupId>            <artifactId>spring-integration-mqtt</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-devtools</artifactId>            <scope>runtime</scope>            <optional>true</optional>        </dependency>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <optional>true</optional>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>        <dependency>            <groupId>org.springframework.integration</groupId>            <artifactId>spring-integration-test</artifactId>            <scope>test</scope>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId><!--                <configuration>--><!--                    <excludes>--><!--                        <exclude>--><!--                            <groupId>org.projectlombok</groupId>--><!--                            <artifactId>lombok</artifactId>--><!--                        </exclude>--><!--                    </excludes>--><!--                </configuration>-->                <configuration>                    <mainClass>com.baba.wlb.publish.PublishApplication</mainClass>                </configuration>            </plugin>        </plugins>    </build></project>

创立子工程

在父工程中点击New>>module>Next 别离创立三个子工程:
springboot_emqx_common
springboot_emqx_publish
springboot_emqx_subscribe

springboot_emqx_common

在该模块下新建如下package包
注:(config包下临时没放公共配置,因为我试过良久,发现丢进来的配置只有主类'mainClass'能力加载到,其余模块加载不到通用配置,不分明是不是我漏了什么注解,望理解这部分的人多多指教!所以只好拆分配置到各个模块中了)

零碎常量:Constants.java

package com.baba.wlb.share.common;/** * @Author wulongbo * @Date 2020/12/29 13:50 * @Version 1.0 *//** * 零碎常量 */public class Constants {    public static final String MQTT_PUBLISH_CHANNEL = "mqttPublishChannel";    public static final String MQTT_SUBSCRIBE_CHANNEL = "mqttSubscribeChannel";}

Emqx配置类:EmqxMqttProperties.java

package com.baba.wlb.share.properties;import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;/** * @Author wulongbo * @Date 2020/12/29 11:33 * @Version 1.0 *//** * 配置文件 */@Data@Component@ConfigurationProperties("wulongbo.mqtt.emqx")public class EmqxMqttProperties {    private String username;    private String password;    private String hostUrl;    private String clientId;    private String defaultTopic;    private Integer timeout;    private Integer keepAlive;    private Integer qos;    private Integer version;}

resource资源目录下新建一个 application-common.yml的yml文件。
注:办法一:以application-*.yml的模式命名。 办法二:模块之间并不必写依赖配置,间接在common模块的resource目录,增加一个config文件夹,在外面创立application.yml文件即可
官网是这么介绍的
这里抉择第一种形式。

yml配置文件: application-common.yml

wulongbo:  mqtt:    emqx:      username: admin      password: public      #tcp://ip:port      host-url: tcp://39.102.56.91:1883      client-id: wulongbo${random.value}      default-topic: wulongbo_topic      #      default-topic: $SYS/brokers/+/clients/#      timeout: 60      keep-alive: 60      # qos:{0:至少一次的传输 /1:至多散发一次,可反复 /2:只散发一次,不可反复}      qos: 1      version: 4

注:我本身的EMQX 是启用了Mysql认证登录的,并且敞开了匿名登录的哈,所以须要正确的用户名和明码

springboot_emqx_publish

在该模块下新建如下package包

config类: EmqxMqttConfig.java

package com.baba.wlb.publish.config;/** * @Author wulongbo * @Date 2020/12/29 11:38 * @Version 1.0 */import com.baba.wlb.share.common.Constants;import com.baba.wlb.share.properties.EmqxMqttProperties;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import javax.annotation.Resource;/** * EMQX配置工具类 */@Configuration@IntegrationComponentScan //音讯扫描件@Slf4jpublic class EmqxMqttConfig {    @Resource    private EmqxMqttProperties emqxMqttProperties;    /**     * MQTT的连贯     */    @Bean    public MqttConnectOptions getMqttConnectOptions() {        // 设置相干的属性        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();        mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());        mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());        mqttConnectOptions.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});        // 心跳        mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());        mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());        mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());        // 保留/清空已经连贯的客户端信息        mqttConnectOptions.setCleanSession(false);        // qos        String playload = "设施已断开连接";        // 遗嘱音讯        mqttConnectOptions.setWill("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false);        return mqttConnectOptions;    }    /**     * paho factory,mqtt自定义的连贯放入factory工厂中     */    @Bean    public MqttPahoClientFactory getMqttPahoClientFactory() {        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();        defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());        return defaultMqttPahoClientFactory;    }    /**     * 开启连贯通道     */    @Bean(name = Constants.MQTT_PUBLISH_CHANNEL)    public MessageChannel getMqttPublishMessageChannel() {        DirectChannel directChannel = new DirectChannel();        return directChannel;    }//    /**//     * 开启连贯通道//     *///    @Bean(name = Constants.MQTT_SUBSCRIBE_CHANNEL)//    public MessageChannel getMqttSubscribeMessageChannel() {//        DirectChannel directChannel = new DirectChannel();//        return directChannel;//    }////////    /**//     * 监听topic.订阅者,消费者//     *///    @Bean//    public MessageProducer inbound() {//        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(//                emqxMqttProperties.getClientId() + "_wlb", getMqttPahoClientFactory(), emqxMqttProperties.getDefaultTopic().split(",")//        );//        mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());//        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());//        mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());//        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttPublishMessageChannel());//        return mqttPahoMessageDrivenChannelAdapter;//    }    /**     * 订阅者,消费者     */    @Bean    @ServiceActivator(inputChannel = Constants.MQTT_PUBLISH_CHANNEL)    public MessageHandler getMessageHandler() {        MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());        mqttPahoMessageHandler.setAsync(true);        mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());        mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());        return mqttPahoMessageHandler;    }}

controller类: PublishController.java

package com.baba.wlb.publish.controller;import com.baba.wlb.publish.service.PublishService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @Author wulongbo * @Date 2020/12/29 13:58 * @Version 1.0 *//** * 发送音讯的Controller */@RestController@RequestMapping("/publish")public class PublishController {    /**     * 注入发布者的service服务     */    @Autowired    private PublishService publishService;    /**     * 发送音讯     */    @RequestMapping("/emqxPublish")    public String emqxPublish(String data,String topic){        publishService.sendToMqtt(data,topic);        return "success";    }}

service: PublishService.java

package com.baba.wlb.publish.service;import com.baba.wlb.share.common.Constants;import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.stereotype.Component;/** * @Author wulongbo * @Date 2020/12/29 14:00 * @Version 1.0 */@MessagingGateway(defaultRequestChannel = Constants.MQTT_PUBLISH_CHANNEL)@Componentpublic interface PublishService {    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);    void sendToMqtt(String data);    void sendToMqtt(@Header(MqttHeaders.TOPIC)String topic, int qos, String data);}

注:必须加@Header(MqttHeaders.TOPIC)注解哈

application启动类: PublishApplication.java

package com.baba.wlb.publish;import com.baba.wlb.share.properties.EmqxMqttProperties;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.context.properties.EnableConfigurationProperties;/** * @Author wulongbo * @Date 2020/12/29 14:04 * @Version 1.0 *//** * emqx 发布者启动程序 */@SpringBootApplication@EnableConfigurationProperties({EmqxMqttProperties.class})public class PublishApplication {    public static void main(String[] args) {        SpringApplication.run(PublishApplication.class,args);    }}

注:须退出@EnableConfigurationProperties,能力加载到配置文件

yml文件: application.yml

server:  port: 1001#spring:#  profiles:#    active: common

注:这里咱们因为把publish模块设置成为了主类,所以可引入common yml,也能够不引入

springboot_emqx_subscribe

在该模块下新建如下package包

config类: EmqxMqttConfig.java

package com.baba.wlb.subscribe.config;/** * @Author wulongbo * @Date 2020/12/29 11:38 * @Version 1.0 */import com.baba.wlb.share.common.Constants;import com.baba.wlb.share.properties.EmqxMqttProperties;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import javax.annotation.Resource;/** * EMQX配置工具类 */@Configuration@IntegrationComponentScan //音讯扫描件@Slf4jpublic class EmqxMqttConfig {    @Resource    private EmqxMqttProperties emqxMqttProperties;    /**     * MQTT的连贯     */    @Bean    public MqttConnectOptions getMqttConnectOptions() {        // 设置相干的属性        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();        mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());        mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());        mqttConnectOptions.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});        // 心跳        mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());        mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());        mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());        // 保留/清空已经连贯的客户端信息        mqttConnectOptions.setCleanSession(false);        // qos        String playload = "设施已断开连接";        // 遗嘱音讯        mqttConnectOptions.setWill("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false);        return mqttConnectOptions;    }    /**     * paho factory,mqtt自定义的连贯放入factory工厂中     */    @Bean    public MqttPahoClientFactory getMqttPahoClientFactory() {        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();        defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());        return defaultMqttPahoClientFactory;    }//    /**//     * 开启连贯通道//     *///    @Bean(name = Constants.MQTT_PUBLISH_CHANNEL)//    public MessageChannel getMqttPublishMessageChannel() {//        DirectChannel directChannel = new DirectChannel();//        return directChannel;//    }    /**     * 开启连贯通道     */    @Bean(name = Constants.MQTT_SUBSCRIBE_CHANNEL)    public MessageChannel getMqttSubscribeMessageChannel() {        DirectChannel directChannel = new DirectChannel();        return directChannel;    }    /**     * 监听topic.订阅者,消费者     */    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(                emqxMqttProperties.getClientId() + "_wlb", getMqttPahoClientFactory(), emqxMqttProperties.getDefaultTopic().split(",")        );        mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());        mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttSubscribeMessageChannel());        return mqttPahoMessageDrivenChannelAdapter;    }    /**     * 发布者,生产者     */    @Bean    @ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)    public MessageHandler getMessageHandler() {        MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());        mqttPahoMessageHandler.setAsync(true);        mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());        mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());        return mqttPahoMessageHandler;    }}

service业务类: SubscribeService.java

package com.baba.wlb.subscribe.service;import com.baba.wlb.share.common.Constants;import org.springframework.context.annotation.Bean;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;import org.springframework.stereotype.Service;/** * @Author wulongbo * @Date 2020/12/29 14:11 * @Version 1.0 *//** * 订阅者 */@Servicepublic class SubscribeService {    @Bean    @ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)    public MessageHandler messageHandler() {        MessageHandler messageHandler = new MessageHandler() {            @Override            public void handleMessage(Message<?> message) throws MessagingException {                System.out.println("订阅者订阅音讯头是:" + message.getHeaders());                System.out.println("订阅者订阅音讯主体是:" + message.getPayload());            }        };        return messageHandler;    }}

注:咱们把MessageHandler放入了专门的server做业务解决,其实放config类也是OK的

application启动类: SubscribeApplication.java

package com.baba.wlb.subscribe;/** * @Author wulongbo * @Date 2020/12/29 14:16 * @Version 1.0 */import com.baba.wlb.share.properties.EmqxMqttProperties;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.context.properties.EnableConfigurationProperties;/** * 订阅者启动类 */@SpringBootApplication@EnableConfigurationProperties({EmqxMqttProperties.class})public class SubscribeApplication {    public static void main(String[] args) {        SpringApplication.run(SubscribeApplication.class,args);    }}

yml配置文件: application.yml

server:  port: 1002spring:  profiles:    include: common

注:当然咱们下面的publish和subscribe模块都是依赖于common模块的咱们须要在各个模块上右击--Open Model Settings

并按下图顺次来增加模块之间的依赖关系

最初,咱们在别离在 publish和subscribe模块的pom文件中引入common依赖就Ok了

    <dependencies>        <dependency>            <groupId>com.baba.wlb</groupId>            <artifactId>springboot_emqx_common</artifactId>            <version>1.0-SNAPSHOT</version>        </dependency>    </dependencies>


至此,咱们多模块用Gateway绑定的形式就集成好了MQTT音讯推送和音讯订阅性能。

启动我的项目

别离启动PublishApplicationSubscribeApplication
端口别离为:1001,1002

PostMan测试

关上postman:发动Get申请
localhost:1001/publish/emqxPublish?topic=wulongbo_topic&data=我是一条音讯
能够看到咱们订阅者订阅到了这条音讯:

至于service业务模块对音讯的解决:具体是依据主题来筛选,还是依据playload来辨别,看具体的业务场景和设计须要。当然EMQX 有更解耦的形式就是规定引擎来对各个事件响应动作,也有HTTP API供咱们调用,读者灵活运用即可。