maven的pom.xml引入包
<!--mqtt--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <version>2.3.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>5.3.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.3.4.RELEASE</version> </dependency>
mqtt.yml配置文件
spring: mqtt: username: admin password: beyond_2021 url: tcp://192.168.3.100:1883 client-id: data-clientId server-id: data-serverId data-topic: data/# will-topic: data-will will-content: data server offline completion-timeout: 10000
初始化MQTT配置bean
package com.beyond.config;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.event.EventListener;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;import org.springframework.integration.mqtt.event.MqttMessageSentEvent;import org.springframework.integration.mqtt.event.MqttSubscribedEvent;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException;import java.security.SecureRandom;import java.util.Date;@Configuration@IntegrationComponentScanpublic class MqttConfig { private static final Logger log = LoggerFactory.getLogger(MqttConfig.class); @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client-id}") private String clientId; @Value("${spring.mqtt.server-id}") private String serverId; @Value("${spring.mqtt.data-topic:data/#}") private String dataTopic; @Value("${spring.mqtt.will-topic}") private String willTopic; @Value("${spring.mqtt.will-content}") private String willContent; /** * @desc 连贯超时 */ @Value("${spring.mqtt.completion-timeout}") private int completionTimeout ; @Bean public MqttConnectOptions getMqttConnectOptions(){ // MQTT的连贯设置 MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 设置连贯的用户名 mqttConnectOptions.setUserName(username); // 设置连贯的明码 mqttConnectOptions.setPassword(password.toCharArray()); // 设置是否清空session,这里如果设置为false示意服务器会保留客户端的连贯记录, // 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会革除session, // 当重连后能够接管之前订阅主题的音讯。当客户端上线后会承受到它离线的这段时间的音讯 mqttConnectOptions.setCleanSession(true); // 设置公布端地址,多个用逗号分隔, 如:tcp://111:1883,tcp://222:1883 // 当第一个111连贯上后,222不会在连,如果111挂掉后,重试连111几次失败后,会主动去连贯222 mqttConnectOptions.setServerURIs(hostUrl.split(",")); // 设置会话心跳工夫 单位为秒 服务器会每隔1.5*20秒的工夫向客户端发送个音讯判断客户端是否在线,但这个办法并没有重连的机制 mqttConnectOptions.setKeepAliveInterval(20); mqttConnectOptions.setAutomaticReconnect(true); // 设置“遗嘱”音讯的话题,若客户端与服务器之间的连贯意外中断,服务器将公布客户端的“遗嘱”音讯。 mqttConnectOptions.setWill(willTopic, willContent.getBytes(), 2, false); mqttConnectOptions.setMaxInflight(1000000); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } /** * @desc 发送通道配置 默认主题 * @date 2021/3/16 */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { //clientId每个连贯必须惟一,否则,两个雷同的clientId互相挤掉线 String clientIdStr = clientId + new SecureRandom().nextInt(10); MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdStr, mqttClientFactory()); //async如果为true,则调用方不会阻塞。而是在发送音讯时期待传递确认。默认值为false(发送将阻塞,直到确认发送) messageHandler.setAsync(true); messageHandler.setAsyncEvents(true); messageHandler.setDefaultTopic(dataTopic); messageHandler.setDefaultQos(1); return messageHandler; } /** * @desc 发送通道 * @date 2021/3/16 */ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * @desc 接管通道 * @date 2021/3/16 */ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * @desc 配置监听的 topic 反对通配符 * @date 2021/3/16 */ @Bean public MessageProducer inbound() { //clientId每个连贯必须惟一,否则,两个雷同的clientId互相挤掉线 String serverIdStr = serverId + new SecureRandom().nextInt(10); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(serverIdStr, mqttClientFactory(), dataTopic); adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * @desc 通过通道获取数据 订阅的数据 * @date 2021/3/16 */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { String payload = message.getPayload().toString(); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); //////////////////解决订阅topic:(data/#)到的所有的数据 } }; } /** * @desc mqtt连贯失败或者订阅失败时,触发MqttConnectionFailedEvent事件 * @date 2021/7/22 *@param event * @return void */ @EventListener(MqttConnectionFailedEvent.class) public void mqttConnectionFailedEvent(MqttConnectionFailedEvent event) { log.error("mqttConnectionFailedEvent连贯mqtt失败: " + "date={}, hostUrl={}, username={}, error={}", new Date(), hostUrl, username, event.getCause().getMessage()); } /** * @desc 当async和async事件(async-events)都为true时,将收回MqttMessageSentEvent * 它蕴含音讯、主题、客户端库生成的音讯id、clientId和clientInstance(每次连贯客户端时递增) * @date 2021/7/22 *@param event * @return void */ @EventListener(MqttMessageSentEvent.class) public void mqttMessageSentEvent(MqttMessageSentEvent event) { log.info("mqttMessageSentEvent发送信息: date={}, info={}", new Date(), event.toString()); } /** * @desc 当async和async事件(async-events)都为true时,将收回MqttMessageDeliveredEvent * 当客户端库确认传递时,将收回MqttMessageDeliveredEvent。它蕴含messageId、clientId和clientInstance,使传递与发送相干。 * @date 2021/7/22 *@param event * @return void */ @EventListener(MqttMessageDeliveredEvent.class) public void mqttMessageDeliveredEvent(MqttMessageDeliveredEvent event) { log.info("mqttMessageDeliveredEvent发送胜利信息: date={}, info={}", new Date(), event.toString()); } /** * @desc 胜利订阅到主题,MqttSubscribedEvent事件就会被触发(多个主题,屡次触发) * @date 2021/7/22 *@param event * @return void */ @EventListener(MqttSubscribedEvent.class) public void mqttSubscribedEvent(MqttSubscribedEvent event) { log.info("mqttSubscribedEvent订阅胜利信息: date={}, info={}", new Date(), event.toString()); }}
mqtt发送数据网关配置
package com.beyond.data.component;import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.stereotype.Component;/** * @desc MQTT发送网关 * @date 2021/3/12 */@Component@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGatewayComponent { void sendToMqtt(String data); void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
发送数据到mqtt伪代码
@Autowiredprivate MqttGatewayComponent mqttGatewayComponent;//发送字符串或json字符串,到指定的topicmqttGatewayComponent.sendToMqtt("json string", "data/abcd");
参考链接:
https://blog.csdn.net/sinat_2...
https://blog.csdn.net/qq_2946...
https://blog.csdn.net/myinser...