Emqx配置类:EmqxMqttProperties.java

package com.bbzn.device.client.config;//package com.spring.security.demo.config;import com.bbzn.device.client.enums.TopicName;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationEventPublisher;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;import javax.annotation.Resource;import java.util.Arrays;/** * @Author wulongbo * @Date 2020/12/29 11:38 * @Version 1.0 * * EMQX配置工具类 */@Configuration@IntegrationComponentScan //音讯扫描件@Slf4jpublic class EmqxMqttConfig {    @Resource    private EmqxMqttProperties emqxMqttProperties;    @Bean    public DefaultMqttPahoClientFactory getMqttConnectOptions() {        DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();        defaultMqttPahoClientFactory.setUserName(emqxMqttProperties.getUsername());        defaultMqttPahoClientFactory.setPassword(emqxMqttProperties.getPassword());        defaultMqttPahoClientFactory.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});        // 心跳        defaultMqttPahoClientFactory.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());        defaultMqttPahoClientFactory.setConnectionTimeout(emqxMqttProperties.getTimeout());        // 保留/清空已经连贯的客户端信息        defaultMqttPahoClientFactory.setCleanSession(false);        // qos        String playload = "设施已断开连接";        // 遗嘱音讯        defaultMqttPahoClientFactory.setWill(new DefaultMqttPahoClientFactory.Will("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false));        return defaultMqttPahoClientFactory;    }    @Bean    public MessageChannel mqttSubscribeChannel() {        return new DirectChannel();    }    @Bean    public MessageChannel nbMqttSubscribeChannel() {        return new DirectChannel();    }    @Bean    public MessageChannel bridgeMqttSubscribeChannel() {        return new DirectChannel();    }    /**     * 配置client,监听的topic     */    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(emqxMqttProperties.getClientId() + "_inbound", getMqttConnectOptions(),                        emqxMqttProperties.getDefaultTopic().split(","));        adapter.setCompletionTimeout(Math.toIntExact(Long.valueOf(emqxMqttProperties.getTimeout())));        adapter.setConverter(new DefaultPahoMessageConverter());        //默认增加TopicName中所有tipic        // 当一个client解决的时候应用遍历批准订阅//        Arrays.stream(TopicName.values()).forEach(topicName -> adapter.addTopic(topicName.getValue(), 2));        // 当应用多个client解决的时候独自解决        adapter.addTopic(TopicName.ROLL_CALL_DEFAULT.getValue(),2);//        Arrays.stream(emqxMqttProperties.getDefaultTopic().split(",")).forEach(topicName -> adapter.addTopic(topicName, 2));//        adapter.addTopic();        adapter.setQos(2);        adapter.setOutputChannel(mqttSubscribeChannel());        return adapter;    }    /**     * 配置nb的client,监听的topic  nb/imei/client     */    @Bean    public MessageProducer nbInbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(emqxMqttProperties.getClientId() + "_nbInbound", getMqttConnectOptions(),                        emqxMqttProperties.getNbTopic().split(","));        adapter.setCompletionTimeout(Math.toIntExact(Long.valueOf(emqxMqttProperties.getTimeout())));        adapter.setConverter(new DefaultPahoMessageConverter());        //默认增加TopicName中所有tipic        // 当一个client解决的时候应用遍历批准订阅//        Arrays.stream(TopicName.values()).forEach(topicName -> adapter.addTopic(topicName.getValue(), 2));        // 当应用多个client解决的时候独自解决        adapter.addTopic(TopicName.NB_TOPIC_DEFAULT.getValue(),2);//        Arrays.stream(emqxMqttProperties.getDefaultTopic().split(",")).forEach(topicName -> adapter.addTopic(topicName, 2));//        adapter.addTopic();        adapter.setQos(2);        adapter.setOutputChannel(nbMqttSubscribeChannel());        return adapter;    }    /**     * 配置桥接的client,监听的topic  bridge/imei/client     */    @Bean    public MessageProducer bridgeInbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(emqxMqttProperties.getClientId() + "_bridgeInbound", getMqttConnectOptions(),                        emqxMqttProperties.getBridgeTopic().split(","));        adapter.setCompletionTimeout(Math.toIntExact(Long.valueOf(emqxMqttProperties.getTimeout())));        adapter.setConverter(new DefaultPahoMessageConverter());        //默认增加TopicName中所有tipic        // 当一个client解决的时候应用遍历批准订阅//        Arrays.stream(TopicName.values()).forEach(topicName -> adapter.addTopic(topicName.getValue(), 2));        // 当应用多个client解决的时候独自解决        adapter.addTopic(TopicName.BRIDGE_TOPIC_DEFAULT.getValue(),2);//        Arrays.stream(emqxMqttProperties.getDefaultTopic().split(",")).forEach(topicName -> adapter.addTopic(topicName, 2));//        adapter.addTopic();        adapter.setQos(2);        adapter.setOutputChannel(bridgeMqttSubscribeChannel());        return adapter;    }    /**     * 事件触发     */    @Autowired    private ApplicationEventPublisher eventPublisher;    @Bean    @ServiceActivator(inputChannel = Constants.MQTT_PUBLISH_CHANNEL)    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(), getMqttConnectOptions());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());        return messageHandler;    }    @Bean    @ServiceActivator(inputChannel = Constants.MQTT_NB_PUBLISH_CHANNEL)    public MessageHandler nbMqttOutbound() {        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(), getMqttConnectOptions());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(emqxMqttProperties.getNbTopic());        return messageHandler;    }    @Bean    @ServiceActivator(inputChannel = Constants.MQTT_BRIDGE_PUBLISH_CHANNEL)    public MessageHandler bridgeMqttOutbound() {        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(), getMqttConnectOptions());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(emqxMqttProperties.getBridgeTopic());        return messageHandler;    }    @Bean    public MessageChannel mqttPublishChannel() {        return new DirectChannel();    }    @Bean    public MessageChannel nbMqttPublishChannel() {        return new DirectChannel();    }    @Bean    public MessageChannel bridgeMqttPublishChannel() {        return new DirectChannel();    }}

controller类: PublishController.java

package com.baba.wlb.publish.controller;import com.baba.wlb.publish.service.PublishService;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @Author wulongbo * @Date 2020/12/29 13:58 * @Version 1.0 *//** * 发送音讯的Controller */@RestController@RequestMapping("/publish")public class PublishController {    /**     * 注入发布者的service服务     */    @Autowired    private PublishService publishService;    /**     * 发送音讯     */    @RequestMapping("/emqxPublish")    public String emqxPublish(String data,String topic){        publishService.sendToMqtt(data,topic);        return "success";    }}

service: PublishService.java

package com.baba.wlb.publish.service;import com.baba.wlb.share.common.Constants;import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.stereotype.Component;/** * @Author wulongbo * @Date 2020/12/29 14:00 * @Version 1.0 */@MessagingGateway(defaultRequestChannel = Constants.MQTT_PUBLISH_CHANNEL)@Componentpublic interface PublishService {    void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);    void sendToMqtt(String data);    void sendToMqtt(@Header(MqttHeaders.TOPIC)String topic, int qos, String data);}

注:必须加@Header(MqttHeaders.TOPIC)注解哈

application启动类: PublishApplication.java

package com.baba.wlb.publish;import com.baba.wlb.share.properties.EmqxMqttProperties;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.context.properties.EnableConfigurationProperties;/** * @Author wulongbo * @Date 2020/12/29 14:04 * @Version 1.0 *//** * emqx 发布者启动程序 */@SpringBootApplication@EnableConfigurationProperties({EmqxMqttProperties.class})public class PublishApplication {    public static void main(String[] args) {        SpringApplication.run(PublishApplication.class,args);    }}

注:须退出@EnableConfigurationProperties,能力加载到配置文件

yml文件: application.yml

server:  port: 1001#spring:#  profiles:#    active: common

注:这里咱们因为把publish模块设置成为了主类,所以可引入common yml,也能够不引入

service业务类: SubscribeService.java

package com.baba.wlb.subscribe.service;import com.baba.wlb.share.common.Constants;import org.springframework.context.annotation.Bean;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;import org.springframework.stereotype.Service;/** * @Author wulongbo * @Date 2020/12/29 14:11 * @Version 1.0 *//** * 订阅者 */@Servicepublic class SubscribeService {    @Bean    @ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)    public MessageHandler messageHandler() {        MessageHandler messageHandler = new MessageHandler() {            @Override            public void handleMessage(Message<?> message) throws MessagingException {                System.out.println("订阅者订阅音讯头是:" + message.getHeaders());                System.out.println("订阅者订阅音讯主体是:" + message.getPayload());            }        };        return messageHandler;    }}

注:咱们把MessageHandler放入了专门的server做业务解决,其实放config类也是OK的

service业务类: SubscribeNbService.java

package com.bbzn.device.client.service.emq;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.bbzn.device.client.config.Constants;import com.bbzn.device.client.dataobject.Strategy;import com.bbzn.device.client.service.StrategyService;import com.bbzn.device.client.service.emq.smoke.SmokeStrategy;import com.bbzn.device.client.utils.SpringUtils;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;import org.springframework.stereotype.Service;/** * @Author wulongbo * @Date 2021/7/3 15:28 * @Version 1.0 */@Service@Slf4jpublic class SubscribeNbService {    @Autowired    private StrategyService strategyService;    @Bean    @ServiceActivator(inputChannel = Constants.MQTT_NB_SUBSCRIBE_CHANNEL)    public MessageHandler nbMessageHandler() {        MessageHandler messageHandler = new MessageHandler() {            @SneakyThrows            @Override            public void handleMessage(Message<?> message) throws MessagingException {                log.info("NB-烟感订阅者订阅音讯头是:" + message.getHeaders());                log.info("NB订阅者订阅主体是:" + message.getPayload());                // 辨别主题订阅模式,不便不再须要判断topic//                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();//                String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());//                if("topic1".equalsIgnoreCase(topic)){//                    System.out.println("topic1,"+message.getPayload().toString());//                }else if("topic2".equalsIgnoreCase(topic)){//                    System.out.println("topic2,"+message.getPayload().toString());//                }                //执行业务                nbHandle((String) message.getPayload());            }        };        return messageHandler;    }    public String nbHandle(String payLoad) {        try {            JSONObject jsonObject = JSON.parseObject(payLoad);            String imei = jsonObject.getString("imei");            String channelCode = jsonObject.getString("code");            //业务判断            if (null == channelCode) {                return "channerlCode不能为空";            }            // 2.依据code查问具体业务实现            Strategy strategy = strategyService.findOneByChannelCode(Long.valueOf(channelCode));            // 2.依据code查问具体业务实现            if (null == strategy) {                return "没有查问到该渠道信息";            }            String strategyBeanId = strategy.getStrategyBeanId();            if (null == strategyBeanId) {                return "没有配置策略BeanId";            }            // 应用beanid从容器获取对象            SmokeStrategy smokeStrategy = SpringUtils.getBean(strategyBeanId, SmokeStrategy.class);            // 执行业务            smokeStrategy.dealAlarm(channelCode, imei);        } catch (Exception e) {            e.printStackTrace();            return "解决失败 : 参数信息" + e.getMessage();        }        return "";    }}

service类: SubscribeNbBridgeService.java

package com.bbzn.device.client.service.emq;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import com.bbzn.device.client.config.Constants;import com.bbzn.device.client.dataobject.Strategy;import com.bbzn.device.client.service.EmqStrategy;import com.bbzn.device.client.service.StrategyService;import com.bbzn.device.client.utils.SpringUtils;import lombok.SneakyThrows;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.messaging.Message;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;import org.springframework.stereotype.Service;/** * @Author: wulongbo * @Date : 2021/4/10 9:14 * @Version 1.0 */@Service@Slf4jpublic class SubscribeNbBridgeService {    @Autowired    private StrategyService strategyService;    @Bean    @ServiceActivator(inputChannel = Constants.MQTT_BRIDGE_SUBSCRIBE_CHANNEL)    public MessageHandler messageHandler() {        MessageHandler messageHandler = new MessageHandler() {            @SneakyThrows            @Override            public void handleMessage(Message<?> message) throws MessagingException {                //执行业务                bridgeHandle((String) message.getPayload());            }        };        return messageHandler;    }    public String bridgeHandle(String payLoad) {        try {            JSONObject jsonObject = JSON.parseObject(payLoad);            Long channelCode = jsonObject.getLong("code");            //业务判断            if (null == channelCode) {                return "channerlCode不能为空";            }            // 2.依据code查问具体业务实现            Strategy strategy = strategyService.findOneByChannelCode(channelCode);            if (null == strategy) {                return "没有查问到该渠道信息";            }            String strategyBeanId = strategy.getStrategyBeanId();            if (null == strategyBeanId) {                return "没有配置策略BeanId";            }            // 应用beanid从容器获取对象            EmqStrategy emqStrategy = SpringUtils.getBean(strategyBeanId, EmqStrategy.class);            // 执行业务            return emqStrategy.emqAction(payLoad);        } catch (Exception e) {            e.printStackTrace();            return "解决失败 : 参数信息" + e.getMessage();        }    }}

常理类:Constants

package com.bbzn.device.client.config;/** * 零碎常量 */public class Constants {    public static final String MQTT_PUBLISH_CHANNEL = "mqttPublishChannel";    public static final String MQTT_NB_PUBLISH_CHANNEL = "nbMqttPublishChannel";    public static final String MQTT_BRIDGE_PUBLISH_CHANNEL = "bridgeMqttPublishChannel";    public static final String MQTT_SUBSCRIBE_CHANNEL = "mqttSubscribeChannel";    public static final String MQTT_NB_SUBSCRIBE_CHANNEL = "nbMqttSubscribeChannel";    public static final String MQTT_BRIDGE_SUBSCRIBE_CHANNEL = "bridgeMqttSubscribeChannel";}

application启动类: SubscribeApplication.java

package com.baba.wlb.subscribe;/** * @Author wulongbo * @Date 2020/12/29 14:16 * @Version 1.0 */import com.baba.wlb.share.properties.EmqxMqttProperties;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.boot.context.properties.EnableConfigurationProperties;/** * 订阅者启动类 */@SpringBootApplication@EnableConfigurationProperties({EmqxMqttProperties.class})public class SubscribeApplication {    public static void main(String[] args) {        SpringApplication.run(SubscribeApplication.class,args);    }}

至此,咱们用Gateway绑定的形式就集成好了MQTT音讯推送和音讯订阅性能。