乐趣区

关于java:Gateway绑定MQTT实现发布订阅不分模块多个client订阅模式

Emqx 配置类:EmqxMqttProperties.java

package com.bbzn.device.client.config;//package com.spring.security.demo.config;


import com.bbzn.device.client.enums.TopicName;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import javax.annotation.Resource;
import java.util.Arrays;

/**
 * @Author wulongbo
 * @Date 2020/12/29 11:38
 * @Version 1.0
 *
 * EMQX 配置工具类
 */

@Configuration
@IntegrationComponentScan // 音讯扫描件
@Slf4j
public class EmqxMqttConfig {

    @Resource
    private EmqxMqttProperties emqxMqttProperties;

    @Bean
    public DefaultMqttPahoClientFactory getMqttConnectOptions() {DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
        defaultMqttPahoClientFactory.setUserName(emqxMqttProperties.getUsername());
        defaultMqttPahoClientFactory.setPassword(emqxMqttProperties.getPassword());
        defaultMqttPahoClientFactory.setServerURIs(new String[]{emqxMqttProperties.getHostUrl()});
        // 心跳
        defaultMqttPahoClientFactory.setKeepAliveInterval(emqxMqttProperties.getKeepAlive());
        defaultMqttPahoClientFactory.setConnectionTimeout(emqxMqttProperties.getTimeout());
        // 保留 / 清空已经连贯的客户端信息
        defaultMqttPahoClientFactory.setCleanSession(false);
        // qos
        String playload = "设施已断开连接";
        // 遗嘱音讯
        defaultMqttPahoClientFactory.setWill(new DefaultMqttPahoClientFactory.Will("last_topic", playload.getBytes(), emqxMqttProperties.getQos(), false));
        return defaultMqttPahoClientFactory;
    }

    @Bean
    public MessageChannel mqttSubscribeChannel() {return new DirectChannel();
    }

    @Bean
    public MessageChannel nbMqttSubscribeChannel() {return new DirectChannel();
    }

    @Bean
    public MessageChannel bridgeMqttSubscribeChannel() {return new DirectChannel();
    }

    /**
     * 配置 client, 监听的 topic
     */
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(emqxMqttProperties.getClientId() + "_inbound", getMqttConnectOptions(),
                        emqxMqttProperties.getDefaultTopic().split(","));
        adapter.setCompletionTimeout(Math.toIntExact(Long.valueOf(emqxMqttProperties.getTimeout())));
        adapter.setConverter(new DefaultPahoMessageConverter());
        // 默认增加 TopicName 中所有 tipic
        // 当一个 client 解决的时候应用遍历批准订阅
//        Arrays.stream(TopicName.values()).forEach(topicName -> adapter.addTopic(topicName.getValue(), 2));

        // 当应用多个 client 解决的时候独自解决
        adapter.addTopic(TopicName.ROLL_CALL_DEFAULT.getValue(),2);
//        Arrays.stream(emqxMqttProperties.getDefaultTopic().split(",")).forEach(topicName -> adapter.addTopic(topicName, 2));
//        adapter.addTopic();
        adapter.setQos(2);
        adapter.setOutputChannel(mqttSubscribeChannel());
        return adapter;
    }



    /**
     * 配置 nb 的 client, 监听的 topic  nb/imei/client
     */
    @Bean
    public MessageProducer nbInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(emqxMqttProperties.getClientId() + "_nbInbound", getMqttConnectOptions(),
                        emqxMqttProperties.getNbTopic().split(","));
        adapter.setCompletionTimeout(Math.toIntExact(Long.valueOf(emqxMqttProperties.getTimeout())));
        adapter.setConverter(new DefaultPahoMessageConverter());
        // 默认增加 TopicName 中所有 tipic
        // 当一个 client 解决的时候应用遍历批准订阅
//        Arrays.stream(TopicName.values()).forEach(topicName -> adapter.addTopic(topicName.getValue(), 2));
        // 当应用多个 client 解决的时候独自解决
        adapter.addTopic(TopicName.NB_TOPIC_DEFAULT.getValue(),2);

//        Arrays.stream(emqxMqttProperties.getDefaultTopic().split(",")).forEach(topicName -> adapter.addTopic(topicName, 2));
//        adapter.addTopic();
        adapter.setQos(2);
        adapter.setOutputChannel(nbMqttSubscribeChannel());
        return adapter;
    }


    /**
     * 配置桥接的 client, 监听的 topic  bridge/imei/client
     */
    @Bean
    public MessageProducer bridgeInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(emqxMqttProperties.getClientId() + "_bridgeInbound", getMqttConnectOptions(),
                        emqxMqttProperties.getBridgeTopic().split(","));
        adapter.setCompletionTimeout(Math.toIntExact(Long.valueOf(emqxMqttProperties.getTimeout())));
        adapter.setConverter(new DefaultPahoMessageConverter());
        // 默认增加 TopicName 中所有 tipic
        // 当一个 client 解决的时候应用遍历批准订阅
//        Arrays.stream(TopicName.values()).forEach(topicName -> adapter.addTopic(topicName.getValue(), 2));
        // 当应用多个 client 解决的时候独自解决
        adapter.addTopic(TopicName.BRIDGE_TOPIC_DEFAULT.getValue(),2);

//        Arrays.stream(emqxMqttProperties.getDefaultTopic().split(",")).forEach(topicName -> adapter.addTopic(topicName, 2));
//        adapter.addTopic();
        adapter.setQos(2);
        adapter.setOutputChannel(bridgeMqttSubscribeChannel());
        return adapter;
    }


    /**
     * 事件触发
     */
    @Autowired
    private ApplicationEventPublisher eventPublisher;


    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_PUBLISH_CHANNEL)
    public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(), getMqttConnectOptions());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(emqxMqttProperties.getDefaultTopic());
        return messageHandler;

    }


    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_NB_PUBLISH_CHANNEL)
    public MessageHandler nbMqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(), getMqttConnectOptions());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(emqxMqttProperties.getNbTopic());
        return messageHandler;

    }

    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_BRIDGE_PUBLISH_CHANNEL)
    public MessageHandler bridgeMqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(emqxMqttProperties.getClientId(), getMqttConnectOptions());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(emqxMqttProperties.getBridgeTopic());
        return messageHandler;

    }

    @Bean
    public MessageChannel mqttPublishChannel() {return new DirectChannel();
    }

    @Bean
    public MessageChannel nbMqttPublishChannel() {return new DirectChannel();
    }

    @Bean
    public MessageChannel bridgeMqttPublishChannel() {return new DirectChannel();
    }



}

controller 类:PublishController.java

package com.baba.wlb.publish.controller;


import com.baba.wlb.publish.service.PublishService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author wulongbo
 * @Date 2020/12/29 13:58
 * @Version 1.0
 */

/**
 * 发送音讯的 Controller
 */

@RestController
@RequestMapping("/publish")
public class PublishController {

    /**
     * 注入发布者的 service 服务
     */
    @Autowired
    private PublishService publishService;

    /**
     * 发送音讯
     */
    @RequestMapping("/emqxPublish")
    public String emqxPublish(String data,String topic){publishService.sendToMqtt(data,topic);
        return "success";
    }
}

service: PublishService.java

package com.baba.wlb.publish.service;

import com.baba.wlb.share.common.Constants;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:00
 * @Version 1.0
 */

@MessagingGateway(defaultRequestChannel = Constants.MQTT_PUBLISH_CHANNEL)
@Component
public interface PublishService {void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);

    void sendToMqtt(String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC)String topic, int qos, String data);
}

注:必须加 @Header(MqttHeaders.TOPIC) 注解哈

application 启动类:PublishApplication.java

package com.baba.wlb.publish;

import com.baba.wlb.share.properties.EmqxMqttProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:04
 * @Version 1.0
 */

/**
 * emqx 发布者启动程序
 */
@SpringBootApplication
@EnableConfigurationProperties({EmqxMqttProperties.class})
public class PublishApplication {public static void main(String[] args) {SpringApplication.run(PublishApplication.class,args);
    }
}

注:须退出 @EnableConfigurationProperties,能力加载到配置文件

yml 文件:application.yml

server:
  port: 1001

#spring:
#  profiles:
#    active: common

注:这里咱们因为把 publish 模块设置成为了主类,所以可引入 common yml, 也能够不引入

service 业务类:SubscribeService.java

package com.baba.wlb.subscribe.service;

import com.baba.wlb.share.common.Constants;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:11
 * @Version 1.0
 */

/**
 * 订阅者
 */
@Service
public class SubscribeService {

    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_SUBSCRIBE_CHANNEL)
    public MessageHandler messageHandler() {MessageHandler messageHandler = new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {System.out.println("订阅者订阅音讯头是:" + message.getHeaders());
                System.out.println("订阅者订阅音讯主体是:" + message.getPayload());
            }
        };
        return messageHandler;
    }
}

注:咱们把 MessageHandler 放入了专门的 server 做业务解决,其实放 config 类也是 OK 的

service 业务类:SubscribeNbService.java

package com.bbzn.device.client.service.emq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bbzn.device.client.config.Constants;
import com.bbzn.device.client.dataobject.Strategy;
import com.bbzn.device.client.service.StrategyService;
import com.bbzn.device.client.service.emq.smoke.SmokeStrategy;
import com.bbzn.device.client.utils.SpringUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

/**
 * @Author wulongbo
 * @Date 2021/7/3 15:28
 * @Version 1.0
 */

@Service
@Slf4j
public class SubscribeNbService {

    @Autowired
    private StrategyService strategyService;

    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_NB_SUBSCRIBE_CHANNEL)
    public MessageHandler nbMessageHandler() {MessageHandler messageHandler = new MessageHandler() {
            @SneakyThrows
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {log.info("NB- 烟感订阅者订阅音讯头是:" + message.getHeaders());
                log.info("NB 订阅者订阅主体是:" + message.getPayload());
                // 辨别主题订阅模式,不便不再须要判断 topic
//                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
//                String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
//                if("topic1".equalsIgnoreCase(topic)){//                    System.out.println("topic1,"+message.getPayload().toString());
//                }else if("topic2".equalsIgnoreCase(topic)){//                    System.out.println("topic2,"+message.getPayload().toString());
//                }
                // 执行业务
                nbHandle((String) message.getPayload());
            }
        };
        return messageHandler;
    }


    public String nbHandle(String payLoad) {
        try {JSONObject jsonObject = JSON.parseObject(payLoad);
            String imei = jsonObject.getString("imei");
            String channelCode = jsonObject.getString("code");
            // 业务判断
            if (null == channelCode) {return "channerlCode 不能为空";}
            // 2. 依据 code 查问具体业务实现
            Strategy strategy = strategyService.findOneByChannelCode(Long.valueOf(channelCode));
            // 2. 依据 code 查问具体业务实现

            if (null == strategy) {return "没有查问到该渠道信息";}
            String strategyBeanId = strategy.getStrategyBeanId();
            if (null == strategyBeanId) {return "没有配置策略 BeanId";}
            // 应用 beanid 从容器获取对象
            SmokeStrategy smokeStrategy = SpringUtils.getBean(strategyBeanId, SmokeStrategy.class);
            // 执行业务
            smokeStrategy.dealAlarm(channelCode, imei);
        } catch (Exception e) {e.printStackTrace();
            return "解决失败 : 参数信息" + e.getMessage();}
        return "";
    }
}

service 类:SubscribeNbBridgeService.java

package com.bbzn.device.client.service.emq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bbzn.device.client.config.Constants;
import com.bbzn.device.client.dataobject.Strategy;
import com.bbzn.device.client.service.EmqStrategy;
import com.bbzn.device.client.service.StrategyService;
import com.bbzn.device.client.utils.SpringUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Service;

/**
 * @Author: wulongbo
 * @Date : 2021/4/10 9:14
 * @Version 1.0
 */
@Service
@Slf4j
public class SubscribeNbBridgeService {
    @Autowired
    private StrategyService strategyService;

    @Bean
    @ServiceActivator(inputChannel = Constants.MQTT_BRIDGE_SUBSCRIBE_CHANNEL)
    public MessageHandler messageHandler() {MessageHandler messageHandler = new MessageHandler() {
            @SneakyThrows
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                // 执行业务
                bridgeHandle((String) message.getPayload());
            }
        };
        return messageHandler;
    }


    public String bridgeHandle(String payLoad) {

        try {JSONObject jsonObject = JSON.parseObject(payLoad);
            Long channelCode = jsonObject.getLong("code");
            // 业务判断
            if (null == channelCode) {return "channerlCode 不能为空";}

            // 2. 依据 code 查问具体业务实现
            Strategy strategy = strategyService.findOneByChannelCode(channelCode);
            if (null == strategy) {return "没有查问到该渠道信息";}
            String strategyBeanId = strategy.getStrategyBeanId();
            if (null == strategyBeanId) {return "没有配置策略 BeanId";}
            // 应用 beanid 从容器获取对象
            EmqStrategy emqStrategy = SpringUtils.getBean(strategyBeanId, EmqStrategy.class);
            // 执行业务

            return emqStrategy.emqAction(payLoad);
        } catch (Exception e) {e.printStackTrace();
            return "解决失败 : 参数信息" + e.getMessage();}
    }

}

常理类:Constants

package com.bbzn.device.client.config;

/**
 * 零碎常量
 */
public class Constants {
    public static final String MQTT_PUBLISH_CHANNEL = "mqttPublishChannel";
    public static final String MQTT_NB_PUBLISH_CHANNEL = "nbMqttPublishChannel";
    public static final String MQTT_BRIDGE_PUBLISH_CHANNEL = "bridgeMqttPublishChannel";
    public static final String MQTT_SUBSCRIBE_CHANNEL = "mqttSubscribeChannel";
    public static final String MQTT_NB_SUBSCRIBE_CHANNEL = "nbMqttSubscribeChannel";
    public static final String MQTT_BRIDGE_SUBSCRIBE_CHANNEL = "bridgeMqttSubscribeChannel";
}

application 启动类:SubscribeApplication.java

package com.baba.wlb.subscribe;

/**
 * @Author wulongbo
 * @Date 2020/12/29 14:16
 * @Version 1.0
 */

import com.baba.wlb.share.properties.EmqxMqttProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;

/**
 * 订阅者启动类
 */
@SpringBootApplication
@EnableConfigurationProperties({EmqxMqttProperties.class})
public class SubscribeApplication {public static void main(String[] args) {SpringApplication.run(SubscribeApplication.class,args);
    }
}

至此,咱们用 Gateway 绑定的形式就集成好了 MQTT 音讯推送和音讯订阅性能。

退出移动版