乐趣区

关于java:Gateway绑定MQTT实现发布订阅

前言

实现 MQTT 协定的中间件有很多,本文应用的是企业级 EMQX EnterPrise,不理解的小伙伴能够翻阅之前的博客。这里,次要介绍 SpringBoot2.0 集成 MQTT 实现音讯推送的性能。

创立我的项目

创立父工程

关上 idea 点击 File>New>Project 抉择 Spring Initializr >JDK 版本 >Next 并按下图创立我的项目

点击 next , 开发者工具 Developer Tools咱们勾选前两个,
Web 咱们勾选第一个,平安框架和 SQL 这里临时不须要勾选,Messaging 中间件, 咱们同样勾选第一个就好,Cloud 组件咱们也不必勾选。

顺次点击 next finish创立好我的项目

删除 src ,.gitignore,HELP.md,mvnwmvnw.cmd 目录,本文采纳 Gateway 绑定的形式, 须要引入以下依赖:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

父工程 pom 文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging>
    <modules>
        <module>springboot_emqx_common</module>
        <module>springboot_emqx_publish</module>
        <module>springboot_emqx_subscribe</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.baba.wlb</groupId>
    <artifactId>springboot_emqx</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>springboot_emqx</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
<!--                <configuration>-->
<!--                    <excludes>-->
<!--                        <exclude>-->
<!--                            <groupId>org.projectlombok</groupId>-->
<!--                            <artifactId>lombok</artifactId>-->
<!--                        </exclude>-->
<!--                    </excludes>-->
<!--                </configuration>-->
                <configuration>
                    <mainClass>com.baba.wlb.publish.PublishApplication</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

创立子工程

在父工程中点击New>>module>Next 别离创立三个子工程:
springboot_emqx_common
springboot_emqx_publish
springboot_emqx_subscribe

springboot_emqx_common

在该模块下新建如下 package 包
注:(config 包下临时没放公共配置,因为我试过良久,发现丢进来的配置只有主类 'mainClass' 能力加载到,其余模块加载不到通用配置,不分明是不是我漏了什么注解,望理解这部分的人多多指教!所以只好拆分配置到各个模块中了)

零碎常量:Constants.java

package com.baba.wlb.share.common;

/**
 * @Author wulongbo
 * @Date 2020/12/29 13:50
 * @Version 1.0
 */

/**
 * 零碎常量
 */
public class Constants {

    public static final String MQTT_PUBLISH_CHANNEL = "mqttPublishChannel";
    public static final String MQTT_SUBSCRIBE_CHANNEL = "mqttSubscribeChannel";

}

Emqx 配置类:EmqxMqttProperties.java

package com.baba.wlb.share.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @Author wulongbo
 * @Date 2020/12/29 11:33
 * @Version 1.0
 */

/**
 * 配置文件
 */

@Data
@Component
@ConfigurationProperties("wulongbo.mqtt.emqx")
public class EmqxMqttProperties {
    private String username;
    private String password;
    private String hostUrl;
    private String clientId;
    private String defaultTopic;
    private Integer timeout;
    private Integer keepAlive;
    private Integer qos;
    private Integer version;
}

resource资源目录下新建一个 application-common.yml的 yml 文件。
注:办法一:以 application-*.yml 的模式命名。办法二:模块之间并不必写依赖配置,间接在 common 模块的 resource 目录,增加一个 config 文件夹,在外面创立 application.yml 文件即可
官网是这么介绍的
这里抉择第一种形式。

yml 配置文件:application-common.yml

wulongbo:
  mqtt:
    emqx:
      username: admin
      password: public
      #tcp://ip:port
      host-url: tcp://39.102.56.91:1883
      client-id: wulongbo${random.value}
      default-topic: wulongbo_topic
      #      default-topic: $SYS/brokers/+/clients/#
      timeout: 60
      keep-alive: 60
      # qos:{0: 至少一次的传输 /1: 至多散发一次,可反复 /2: 只散发一次,不可反复}
      qos: 1
      version: 4

注:我本身的 EMQX 是启用了 Mysql 认证登录的,并且敞开了匿名登录的哈,所以须要正确的用户名和明码

springboot_emqx_publish

在该模块下新建如下 package 包

config 类:EmqxMqttConfig.java

package com.baba.wlb.publish.config;

/**
 * @Author wulongbo
 * @Date 2020/12/29 11:38
 * @Version 1.0
 */

import com.baba.wlb.share.common.Constants;
import com.baba.wlb.share.properties.EmqxMqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**
 * EMQX 配置工具类
 */

@Configuration
@IntegrationComponentScan // 音讯扫描件
@Slf4j
public class EmqxMqttConfig {

    @Resource
    private EmqxMqttProperties emqxMqttProperties;

    /**
     * MQTT 的连贯
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        // 设置相干的属性
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());
        mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});
        // 心跳
        mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());
        mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());
        mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());
        // 保留 / 清空已经连贯的客户端信息
        mqttConnectOptions.setCleanSession(false);
        // qos
        String playload = "设施已断开连接";
        // 遗嘱音讯
        mqttConnectOptions.setWill("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false);
        return mqttConnectOptions;
    }

    /**
     * paho factory,mqtt 自定义的连贯放入 factory 工厂中
     */
    @Bean
    public MqttPahoClientFactory getMqttPahoClientFactory() {DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
        return defaultMqttPahoClientFactory;
    }

    /**
     * 开启连贯通道
     */
    @Bean(name = Constants.MQTT_PUBLISH_CHANNEL)
    public MessageChannel getMqttPublishMessageChannel() {DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }


//    /**
//     * 开启连贯通道
//     */
//    @Bean(name = Constants.MQTT_SUBSCRIBE_CHANNEL)
//    public MessageChannel getMqttSubscribeMessageChannel() {//        DirectChannel directChannel = new DirectChannel();
//        return directChannel;
//    }
//
//
//
//    /**
//     * 监听 topic. 订阅者,消费者
//     */
//    @Bean
//    public MessageProducer inbound() {
//        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(//                emqxMqttProperties.getClientId() + "_wlb", getMqttPahoClientFactory(), emqxMqttProperties.getDefaultTopic().split(",")
//        );
//        mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());
//        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
//        mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());
//        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttPublishMessageChannel());
//        return mqttPahoMessageDrivenChannelAdapter;
//    }

    /**
     * 订阅者,消费者
     */
    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_PUBLISH_CHANNEL)
    public MessageHandler getMessageHandler() {MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());
        mqttPahoMessageHandler.setAsync(true);
        mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());
        mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());
        return mqttPahoMessageHandler;
    }

}

controller 类:PublishController.java

package com.baba.wlb.publish.controller;


import com.baba.wlb.publish.service.PublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author wulongbo
 * @Date 2020/12/29 13:58
 * @Version 1.0
 */

/**
 * 发送音讯的 Controller
 */

@RestController
@RequestMapping("/publish")
public class PublishController {

    /**
     * 注入发布者的 service 服务
     */
    @Autowired
    private PublishService publishService;

    /**
     * 发送音讯
     */
    @RequestMapping("/emqxPublish")
    public String emqxPublish(String data,String topic){publishService.sendToMqtt(data,topic);
        return "success";
    }
}

service: PublishService.java

package com.baba.wlb.publish.service;

import com.baba.wlb.share.common.Constants;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:00
 * @Version 1.0
 */

@MessagingGateway(defaultRequestChannel = Constants.MQTT_PUBLISH_CHANNEL)
@Component
public interface PublishService {void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

    void sendToMqtt(String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC)String topic, int qos, String data);
}

注:必须加 @Header(MqttHeaders.TOPIC)注解哈

application 启动类:PublishApplication.java

package com.baba.wlb.publish;

import com.baba.wlb.share.properties.EmqxMqttProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:04
 * @Version 1.0
 */

/**
 * emqx 发布者启动程序
 */
@SpringBootApplication
@EnableConfigurationProperties({EmqxMqttProperties.class})
public class PublishApplication {public static void main(String[] args) {SpringApplication.run(PublishApplication.class,args);
    }
}

注:须退出 @EnableConfigurationProperties,能力加载到配置文件

yml 文件:application.yml

server:
  port: 1001

#spring:
#  profiles:
#    active: common

注:这里咱们因为把 publish 模块设置成为了主类,所以可引入 common yml, 也能够不引入

springboot_emqx_subscribe

在该模块下新建如下 package 包

config 类:EmqxMqttConfig.java

package com.baba.wlb.subscribe.config;

/**
 * @Author wulongbo
 * @Date 2020/12/29 11:38
 * @Version 1.0
 */

import com.baba.wlb.share.common.Constants;
import com.baba.wlb.share.properties.EmqxMqttProperties;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.Resource;

/**
 * EMQX 配置工具类
 */

@Configuration
@IntegrationComponentScan // 音讯扫描件
@Slf4j
public class EmqxMqttConfig {

    @Resource
    private EmqxMqttProperties emqxMqttProperties;

    /**
     * MQTT 的连贯
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        // 设置相干的属性
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setUserName(emqxMqttProperties.getUsername());
        mqttConnectOptions.setPassword(emqxMqttProperties.getPassword().toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});
        // 心跳
        mqttConnectOptions.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());
        mqttConnectOptions.setMqttVersion(emqxMqttProperties.getVersion());
        mqttConnectOptions.setConnectionTimeout(emqxMqttProperties.getTimeout());
        // 保留 / 清空已经连贯的客户端信息
        mqttConnectOptions.setCleanSession(false);
        // qos
        String playload = "设施已断开连接";
        // 遗嘱音讯
        mqttConnectOptions.setWill("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false);
        return mqttConnectOptions;
    }

    /**
     * paho factory,mqtt 自定义的连贯放入 factory 工厂中
     */
    @Bean
    public MqttPahoClientFactory getMqttPahoClientFactory() {DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
        return defaultMqttPahoClientFactory;
    }

//    /**
//     * 开启连贯通道
//     */
//    @Bean(name = Constants.MQTT_PUBLISH_CHANNEL)
//    public MessageChannel getMqttPublishMessageChannel() {//        DirectChannel directChannel = new DirectChannel();
//        return directChannel;
//    }


    /**
     * 开启连贯通道
     */
    @Bean(name = Constants.MQTT_SUBSCRIBE_CHANNEL)
    public MessageChannel getMqttSubscribeMessageChannel() {DirectChannel directChannel = new DirectChannel();
        return directChannel;
    }



    /**
     * 监听 topic. 订阅者,消费者
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter(emqxMqttProperties.getClientId() + "_wlb", getMqttPahoClientFactory(), emqxMqttProperties.getDefaultTopic().split(",")
        );
        mqttPahoMessageDrivenChannelAdapter.setDisconnectCompletionTimeout(emqxMqttProperties.getTimeout());
        mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
        mqttPahoMessageDrivenChannelAdapter.setQos(emqxMqttProperties.getQos());
        mqttPahoMessageDrivenChannelAdapter.setOutputChannel(getMqttSubscribeMessageChannel());
        return mqttPahoMessageDrivenChannelAdapter;
    }

    /**
     * 发布者,生产者
     */
    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)
    public MessageHandler getMessageHandler() {MqttPahoMessageHandler mqttPahoMessageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(),getMqttPahoClientFactory());
        mqttPahoMessageHandler.setAsync(true);
        mqttPahoMessageHandler.setDefaultQos(emqxMqttProperties.getQos());
        mqttPahoMessageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());
        return mqttPahoMessageHandler;
    }

}

service 业务类:SubscribeService.java

package com.baba.wlb.subscribe.service;

import com.baba.wlb.share.common.Constants;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:11
 * @Version 1.0
 */

/**
 * 订阅者
 */
@Service
public class SubscribeService {

    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)
    public MessageHandler messageHandler() {MessageHandler messageHandler = new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {System.out.println("订阅者订阅音讯头是:" + message.getHeaders());
                System.out.println("订阅者订阅音讯主体是:" + message.getPayload());
            }
        };
        return messageHandler;
    }
}

注:咱们把 MessageHandler 放入了专门的 server 做业务解决,其实放 config 类也是 OK 的

application 启动类:SubscribeApplication.java

package com.baba.wlb.subscribe;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:16
 * @Version 1.0
 */

import com.baba.wlb.share.properties.EmqxMqttProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

/**
 * 订阅者启动类
 */
@SpringBootApplication
@EnableConfigurationProperties({EmqxMqttProperties.class})
public class SubscribeApplication {public static void main(String[] args) {SpringApplication.run(SubscribeApplication.class,args);
    }
}

yml 配置文件:application.yml

server:
  port: 1002

spring:
  profiles:
    include: common

注:当然咱们下面的 publish 和 subscribe 模块都是依赖于 common 模块的咱们须要在各个模块上右击 --Open Model Settings

并按下图顺次来增加模块之间的依赖关系

最初,咱们在别离在 publish 和 subscribe 模块的 pom 文件中引入 common 依赖就 Ok 了

    <dependencies>
        <dependency>
            <groupId>com.baba.wlb</groupId>
            <artifactId>springboot_emqx_common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>


至此,咱们多模块用 Gateway 绑定的形式就集成好了 MQTT 音讯推送和音讯订阅性能。

启动我的项目

别离启动 PublishApplicationSubscribeApplication
端口别离为:1001,1002

PostMan 测试

关上 postman: 发动 Get 申请
localhost:1001/publish/emqxPublish?topic=wulongbo_topic&data= 我是一条音讯
能够看到咱们订阅者订阅到了这条音讯:

至于 service 业务模块对音讯的解决:具体是依据主题来筛选,还是依据 playload 来辨别,看具体的业务场景和设计须要。当然 EMQX 有更解耦的形式就是规定引擎来对各个事件响应动作,也有 HTTP API 供咱们调用,读者灵活运用即可。

退出移动版