一个MQTT客户端能够做到:
1、公布其余客户端可能会订阅的信息
2、订阅其它客户端公布的音讯
3、退订或删除应用程序的音讯
4、断开与服务器连贯
所以在应用Java代码进行搭建时,也要实现这几种性能。
MQTT客户端(发布者)
1、创立一个SendMQTT类,对MQTT客户端进行初始化操作,创立连贯。
public class SendMqtt { public static MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; static { init("happyfan"); } public static void init(String clientId) { //初始化连贯设置对象 mqttConnectOptions = new MqttConnectOptions(); //初始化MqttClient if(null != mqttConnectOptions) {// true能够平安地应用内存持久性作为客户端断开连接时革除的所有状态 mqttConnectOptions.setCleanSession(true);// 设置连贯超时 mqttConnectOptions.setConnectionTimeout(30);// 设置长久化形式 memoryPersistence = new MemoryPersistence(); if(null != memoryPersistence && null != clientId) { try { mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { } }else { System.out.println("mqttConnectOptions对象为空"); } System.out.println(mqttClient.isConnected()); if(null != mqttClient) { if(!mqttClient.isConnected()) { try { System.out.println("创立连贯"); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { System.out.println("mqttClient为空"); } System.out.println(mqttClient.isConnected()); }}
2、实现与MQTT客户端断开连接以及从新连贯的性能:
public class SendMqtt { public static MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; static { init("happyfan"); } public static void init(String clientId) { //初始化连贯设置对象 mqttConnectOptions = new MqttConnectOptions(); //初始化MqttClient if(null != mqttConnectOptions) {// true能够平安地应用内存持久性作为客户端断开连接时革除的所有状态 mqttConnectOptions.setCleanSession(true);// 设置连贯超时 mqttConnectOptions.setConnectionTimeout(30);// 设置长久化形式 memoryPersistence = new MemoryPersistence(); if(null != memoryPersistence && null != clientId) { try { mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { } }else { System.out.println("mqttConnectOptions对象为空"); } System.out.println(mqttClient.isConnected()); if(null != mqttClient) { if(!mqttClient.isConnected()) { try { System.out.println("创立连贯"); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { System.out.println("mqttClient为空"); } System.out.println(mqttClient.isConnected()); } //与MQTT服务器断开连接 public void closeConnect() { //敞开存储形式 if(null != memoryPersistence) { try { memoryPersistence.close(); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("memoryPersistence is null"); } //敞开连贯 if(null != mqttClient) { if(mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is not connect"); } }else { System.out.println("mqttClient is null"); } } //从新连贯MQTT服务器 public void reConnect() { if(null != mqttClient) { if(!mqttClient.isConnected()) { if(null != mqttConnectOptions) { try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttConnectOptions is null"); } }else { System.out.println("mqttClient is null or connect"); } }else { init("happyfan");//其实就是从新回到初始化的办法中去尝试连贯服务器 } }}
3、实现订阅、公布、清空主题:
public class SendMqtt { public static MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; static { init("happyfan"); } public static void init(String clientId) { //初始化连贯设置对象 mqttConnectOptions = new MqttConnectOptions(); //初始化MqttClient if(null != mqttConnectOptions) {// true能够平安地应用内存持久性作为客户端断开连接时革除的所有状态 mqttConnectOptions.setCleanSession(true);// 设置连贯超时 mqttConnectOptions.setConnectionTimeout(30);// 设置长久化形式 memoryPersistence = new MemoryPersistence(); if(null != memoryPersistence && null != clientId) { try { mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { } }else { System.out.println("mqttConnectOptions对象为空"); } System.out.println(mqttClient.isConnected()); if(null != mqttClient) { if(!mqttClient.isConnected()) { try { System.out.println("创立连贯"); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { System.out.println("mqttClient为空"); } System.out.println(mqttClient.isConnected()); } public void closeConnect() { //敞开存储形式 if(null != memoryPersistence) { try { memoryPersistence.close(); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("memoryPersistence is null"); } //敞开连贯 if(null != mqttClient) { if(mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is not connect"); } }else { System.out.println("mqttClient is null"); } } // 从新连贯 public void reConnect() { if(null != mqttClient) { if(!mqttClient.isConnected()) { if(null != mqttConnectOptions) { try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttConnectOptions is null"); } }else { System.out.println("mqttClient is null or connect"); } }else { init("happyfan"); } } // 公布音讯 public void publishMessage(String pubTopic,String message,int qos) { if(null != mqttClient&& mqttClient.isConnected()) { System.out.println("公布音讯 "+mqttClient.isConnected()); System.out.println("id:"+mqttClient.getClientId()); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(message.getBytes()); MqttTopic topic = mqttClient.getTopic(pubTopic); if(null != topic) { try { MqttDeliveryToken publish = topic.publish(mqttMessage); if(!publish.isComplete()) { System.out.println("音讯公布胜利"); } } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { reConnect(); } } // 订阅主题 public void subTopic(String topic) { if(null != mqttClient&& mqttClient.isConnected()) { try { mqttClient.subscribe(topic, 1); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is error"); } } // 清空主题 public void cleanTopic(String topic) { if(null != mqttClient&& !mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is error"); } }}
在应用公布音讯的性能时,只须要调用这个类中的publishMessage()办法即可。指定一个公布的主题78:21:84:B8:EB:0C(轻易本人的),应用MQTTX尝试一下能不能接管到这个主题的音讯(MQTTX也得订阅这个主题)。
能够看到,MQTTX以及胜利的接管到了这个音讯,这样一个MQTT客户端(发布者)就搭建实现了。
MQTT客户端(订阅者)
1、创立一个MqttReceive类,对MQTT客户端进行初始化操作,并且实现接管音讯的性能(把主题订阅上,创立回调函数):
@Servicepublic class MqttReceive { @Autowired private MqttReceiveCallback mqttReceiveCallback; private static int QoS = 1;//通信的品质,最高是2 private static String Host = "tcp://127.0.0.1:1883"; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; private static MqttClient mqttClient = null; public void init(String clientId) { mqttConnectOptions = new MqttConnectOptions(); memoryPersistence = new MemoryPersistence(); if(null != memoryPersistence && null != clientId && null != Host) { try { mqttClient = new MqttClient(Host, clientId, memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if(null != mqttConnectOptions) { mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(30); mqttConnectOptions.setKeepAliveInterval(45); if(null != mqttClient && !mqttClient.isConnected()) { //这里能够本人new一个回调函数,比方new MqttReceiveCallback()。我这里应用主动拆卸,让Spring容器来治理bean与bean的依赖 mqttClient.setCallback(mqttReceiveCallback); try { System.out.println(mqttReceiveCallback); System.out.println("尝试连贯"); mqttClient.connect(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public void receive(String topic) { int[] Qos = {QoS}; String[] topics = {topic}; if(null != mqttClient && mqttClient.isConnected()) { if(null!=topics && null!=Qos && topics.length>0 && Qos.length>0) { try { System.out.println("订阅主题"); mqttClient.subscribe(topics, Qos); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { System.out.println("初始化"); init("HAPPYFAN"); receive(topic); } }}
在应用时,只有调用这个类中的 receive(String topic)办法即可。当MQTT客户端接管到订阅的主题(topic)的音讯时,传输的信息就会到回调函数中(就是在MqttReceive类中,本人new的那个回调函数,要确保统一类名和new进去的回调函数统一)。
2、创立回调函数
@Servicepublic class MqttReceiveCallback implements MqttCallback { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String msg = new String(message.getPayload()); System.out.println("Client 接管音讯主题 : " + topic); System.out.println("Client 接管音讯Qos : " + message.getQos()); System.out.println("Client 接管音讯内容 : " + msg); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } @Override public void connectionLost(Throwable cause) { }}
调用receive(String topic)办法,指定一个接管音讯的主题(也是随本人的,我这里写的就是receive),尝试能不能接管到这个主题的信息。
能够看到,再idea控制台中胜利的打印出了接管主题的音讯、品质、以及内容。一个MQTT客户端(订阅者)就搭建实现了。