SpringBoot集成MQTT

68次阅读

共计 8358 个字符,预计需要花费 21 分钟才能阅读完成。

SpringBoot 集成 MQTT
MQTT
MQTT(消息队列遥测传输)是 ISO 标准(ISO/IEC PRF 20922)下基于发布 / 订阅范式的消息协议。它工作在 TCP/IP 协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布 / 订阅型消息协议。国内很多企业都广泛使用 MQTT 作为 Android 手机客户端与服务器端推送消息的协议。
特点
MQTT 协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

使用发布 / 订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
对负载内容屏蔽的消息传输;
使用 TCP/IP 提供网络连接;

有三种消息发布服务质量;

至多一次:消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。

至少一次:确保消息到达,但消息重复可能会发生。

只有一次:确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

Apache-Apollo
Apache Apollo 是一个代理服务器,其是在 ActiveMQ 基础上发展而来的,可以支持 STOMP, AMQP, MQTT, Openwire, SSL, WebSockets 等多种协议。原理:服务器端创建一个唯一订阅号,发送者可以向这个订阅号中发东西,然后接受者(即订阅了这个订阅号的人)都会收到这个订阅号发出来的消息。以此来完成消息的推送。服务器其实是一个消息中转站。
下载
下载地址:http://activemq.apache.org/ap…
配置与启动

需要安装 JDK 环境
在命令行模式下进入 bin,执行 apollo create mybroker d:\apache-apollo\broker,创建一个名为 mybroker 虚拟主机(Virtual Host)。需要特别注意的是,生成的目录就是以后真正启动程序的位置。
在命令行模式下进入 d:\apache-apollo\broker\bin,执行 apollo-broker run,也可以用 apollo-broker-service.exe 配置服务。
访问 http://127.0.0.1:61680 打开 web 管理界面。(密码查看 broker/etc/users.properties)
启动端口,看 cmd 输出。

SpringBoot2 的开发
添加依赖
<!–
spring-boot 版本 2.1.0.RELEASE
–>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
自定义配置
# src/main/resources/config/mqtt.properties
##################
# MQTT 配置
##################
# 用户名
mqtt.username=admin
# 密码
mqtt.password=password
# 推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613
mqtt.url=tcp://127.0.0.1:61613
##################
# MQTT 生产者
##################
# 连接服务器默认客户端 ID
mqtt.producer.clientId=mqttProducer
# 默认的推送主题,实际可在调用接口时指定
mqtt.producer.defaultTopic=topic1
##################
# MQTT 消费者
##################
# 连接服务器默认客户端 ID
mqtt.consumer.clientId=mqttConsumer
# 默认的接收主题,可以订阅多个 Topic,逗号分隔
mqtt.consumer.defaultTopic=topic1
配置 MQTT 发布和订阅
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
* MQTT 配置,生产者
*
* @author BBF
*/
@Configuration
public class MqttConfig {

private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);

private static final byte[] WILL_DATA;

static {
WILL_DATA = “offline”.getBytes();
}

/**
* 订阅的 bean 名称
*/
public static final String CHANNEL_NAME_IN = “mqttInboundChannel”;
/**
* 发布的 bean 名称
*/
public static final String CHANNEL_NAME_OUT = “mqttOutboundChannel”;

@Value(“${mqtt.username}”)
private String username;

@Value(“${mqtt.password}”)
private String password;

@Value(“${mqtt.url}”)
private String url;

@Value(“${mqtt.producer.clientId}”)
private String producerClientId;

@Value(“${mqtt.producer.defaultTopic}”)
private String producerDefaultTopic;

@Value(“${mqtt.consumer.clientId}”)
private String consumerClientId;

@Value(“${mqtt.consumer.defaultTopic}”)
private String consumerDefaultTopic;

/**
* MQTT 连接器选项
*
* @return {@link org.eclipse.paho.client.mqttv3.MqttConnectOptions}
*/
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空 session, 这里如果设置为 false 表示服务器会保留客户端的连接记录,
// 这里设置为 true 表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(username);
// 设置连接的密码
options.setPassword(password.toCharArray());
options.setServerURIs(StringUtils.split(url, “,”));
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔 1.5*20 秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill(“willTopic”, WILL_DATA, 2, false);
return options;
}

/**
* MQTT 客户端
*
* @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}

/**
* MQTT 信息通道(生产者)
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

/**
* MQTT 消息处理器(生产者)
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
producerClientId,
mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(producerDefaultTopic);
return messageHandler;
}

/**
* MQTT 消息订阅绑定(消费者)
*
* @return {@link org.springframework.integration.core.MessageProducer}
*/
@Bean
public MessageProducer inbound() {
// 可以同时消费(订阅)多个 Topic
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(
consumerClientId, mqttClientFactory(),
StringUtils.split(consumerDefaultTopic, “,”));
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}

/**
* MQTT 信息通道(消费者)
*
* @return {@link org.springframework.messaging.MessageChannel}
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}

/**
* MQTT 消息处理器(消费者)
*
* @return {@link org.springframework.messaging.MessageHandler}
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOGGER.error(“===================={}============”, message.getPayload());
}
};
}
}

消息发布器
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
* MQTT 生产者消息发送接口
* <p>MessagingGateway 要指定生产者的通道名称 </p>
* @author BBF
*/
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface IMqttSender {

/**
* 发送信息到 MQTT 服务器
*
* @param data 发送的文本
*/
void sendToMqtt(String data);

/**
* 发送信息到 MQTT 服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
String payload);

/**
* 发送信息到 MQTT 服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。<br> 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}
发送消息
/**
* MQTT 消息发送
*
* @author BBF
*/
@Controller
@RequestMapping(value = “/”)
public class MqttController {

/**
* 注入发送 MQTT 的 Bean
*/
@Resource
private IMqttSender iMqttSender;

/**
* 发送 MQTT 消息
* @param message 消息内容
* @return 返回
*/
@ResponseBody
@GetMapping(value = “/mqtt”, produces =”text/html”)
public ResponseEntity<String> sendMqtt(@RequestParam(value = “msg”) String message) {
iMqttSender.sendToMqtt(message);
return new ResponseEntity<>(“OK”, HttpStatus.OK);
}
}
入口类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.context.annotation.PropertySource;

/**
* SpringBoot 入口类
*
* @author BBF
*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class,
HibernateJpaAutoConfiguration.class})
@PropertySource(encoding = “UTF-8”, value = {“classpath:config/mqtt.properties”})
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

正文完
 0