关于mqtt:使用Java代码搭建一个MQTT客户端

41次阅读

共计 8761 个字符,预计需要花费 22 分钟才能阅读完成。

一个 MQTT 客户端能够做到:
1、公布其余客户端可能会订阅的信息
2、订阅其它客户端公布的音讯
3、退订或删除应用程序的音讯
4、断开与服务器连贯

所以在应用 Java 代码进行搭建时,也要实现这几种性能。


MQTT 客户端(发布者)

1、创立一个 SendMQTT 类,对 MQTT 客户端进行初始化操作,创立连贯。

public class SendMqtt {
    public static MqttClient mqttClient = null;
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;


    static {init("happyfan");
    }


    public static  void init(String clientId) {
        // 初始化连贯设置对象
        mqttConnectOptions = new MqttConnectOptions();
        // 初始化 MqttClient
        if(null != mqttConnectOptions) {
//            true 能够平安地应用内存持久性作为客户端断开连接时革除的所有状态
            mqttConnectOptions.setCleanSession(true);
//            设置连贯超时
            mqttConnectOptions.setConnectionTimeout(30);
//            设置长久化形式
            memoryPersistence = new MemoryPersistence();
            if(null != memoryPersistence && null != clientId) {
                try {mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}
            }else {}}else {System.out.println("mqttConnectOptions 对象为空");
        }
        System.out.println(mqttClient.isConnected());
        if(null != mqttClient) {if(!mqttClient.isConnected()) {
                try {System.out.println("创立连贯");
                    mqttClient.connect(mqttConnectOptions);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}

            }
        }else {System.out.println("mqttClient 为空");
        }
        System.out.println(mqttClient.isConnected());
    }
}

2、实现与 MQTT 客户端断开连接以及从新连贯的性能:

public class SendMqtt {
    public static MqttClient mqttClient = null;
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;


    static {init("happyfan");
    }


    public static  void init(String clientId) {
        // 初始化连贯设置对象
        mqttConnectOptions = new MqttConnectOptions();
        // 初始化 MqttClient
        if(null != mqttConnectOptions) {
//            true 能够平安地应用内存持久性作为客户端断开连接时革除的所有状态
            mqttConnectOptions.setCleanSession(true);
//            设置连贯超时
            mqttConnectOptions.setConnectionTimeout(30);
//            设置长久化形式
            memoryPersistence = new MemoryPersistence();
            if(null != memoryPersistence && null != clientId) {
                try {mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}
            }else {}}else {System.out.println("mqttConnectOptions 对象为空");
        }
        System.out.println(mqttClient.isConnected());
        if(null != mqttClient) {if(!mqttClient.isConnected()) {
                try {System.out.println("创立连贯");
                    mqttClient.connect(mqttConnectOptions);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}

            }
        }else {System.out.println("mqttClient 为空");
        }
        System.out.println(mqttClient.isConnected());
    }
    // 与 MQTT 服务器断开连接
    public void closeConnect() {
        // 敞开存储形式
        if(null != memoryPersistence) {
            try {memoryPersistence.close();
            } catch (MqttPersistenceException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();}
        }else {System.out.println("memoryPersistence is null");
        }

        // 敞开连贯
        if(null != mqttClient) {if(mqttClient.isConnected()) {
                try {mqttClient.disconnect();
                    mqttClient.close();} catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}
            }else {System.out.println("mqttClient is not connect");
            }
        }else {System.out.println("mqttClient is null");
        }
    }
    // 从新连贯 MQTT 服务器
    public void reConnect() {if(null != mqttClient) {if(!mqttClient.isConnected()) {if(null != mqttConnectOptions) {
                    try {mqttClient.connect(mqttConnectOptions);
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();}
                }else {System.out.println("mqttConnectOptions is null");
                }
            }else {System.out.println("mqttClient is null or connect");
            }
        }else {init("happyfan");// 其实就是从新回到初始化的办法中去尝试连贯服务器
        }

    }
}

3、实现订阅、公布、清空主题:

public class SendMqtt {
    public static MqttClient mqttClient = null;
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;


    static {init("happyfan");
    }


    public static  void init(String clientId) {
        // 初始化连贯设置对象
        mqttConnectOptions = new MqttConnectOptions();
        // 初始化 MqttClient
        if(null != mqttConnectOptions) {
//            true 能够平安地应用内存持久性作为客户端断开连接时革除的所有状态
            mqttConnectOptions.setCleanSession(true);
//            设置连贯超时
            mqttConnectOptions.setConnectionTimeout(30);
//            设置长久化形式
            memoryPersistence = new MemoryPersistence();
            if(null != memoryPersistence && null != clientId) {
                try {mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}
            }else {}}else {System.out.println("mqttConnectOptions 对象为空");
        }
        System.out.println(mqttClient.isConnected());
        if(null != mqttClient) {if(!mqttClient.isConnected()) {
                try {System.out.println("创立连贯");
                    mqttClient.connect(mqttConnectOptions);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}

            }
        }else {System.out.println("mqttClient 为空");
        }
        System.out.println(mqttClient.isConnected());
    }

    public void closeConnect() {
        // 敞开存储形式
        if(null != memoryPersistence) {
            try {memoryPersistence.close();
            } catch (MqttPersistenceException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();}
        }else {System.out.println("memoryPersistence is null");
        }

        // 敞开连贯
        if(null != mqttClient) {if(mqttClient.isConnected()) {
                try {mqttClient.disconnect();
                    mqttClient.close();} catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}
            }else {System.out.println("mqttClient is not connect");
            }
        }else {System.out.println("mqttClient is null");
        }
    }
    
    //    从新连贯
    public void reConnect() {if(null != mqttClient) {if(!mqttClient.isConnected()) {if(null != mqttConnectOptions) {
                    try {mqttClient.connect(mqttConnectOptions);
                    } catch (MqttException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();}
                }else {System.out.println("mqttConnectOptions is null");
                }
            }else {System.out.println("mqttClient is null or connect");
            }
        }else {init("happyfan");
        }

    }
    //    公布音讯
    public void publishMessage(String pubTopic,String message,int qos) {if(null != mqttClient&& mqttClient.isConnected()) {System.out.println("公布音讯"+mqttClient.isConnected());
            System.out.println("id:"+mqttClient.getClientId());
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setQos(qos);
            mqttMessage.setPayload(message.getBytes());

            MqttTopic topic = mqttClient.getTopic(pubTopic);

            if(null != topic) {
                try {MqttDeliveryToken publish = topic.publish(mqttMessage);
                    if(!publish.isComplete()) {System.out.println("音讯公布胜利");
                    }
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}
            }

        }else {reConnect();
        }

    }
    //    订阅主题
    public void subTopic(String topic) {if(null != mqttClient&& mqttClient.isConnected()) {
            try {mqttClient.subscribe(topic, 1);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();}
        }else {System.out.println("mqttClient is error");
        }
    }


    //    清空主题
    public void cleanTopic(String topic) {if(null != mqttClient&& !mqttClient.isConnected()) {
            try {mqttClient.unsubscribe(topic);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();}
        }else {System.out.println("mqttClient is error");
        }
    }

}

在应用公布音讯的性能时,只须要调用这个类中的 publishMessage()办法即可。指定一个公布的主题 78:21:84:B8:EB:0C(轻易本人的),应用 MQTTX 尝试一下能不能接管到这个主题的音讯(MQTTX 也得订阅这个主题)。

能够看到,MQTTX 以及胜利的接管到了这个音讯,这样一个 MQTT 客户端 (发布者) 就搭建实现了。

MQTT 客户端(订阅者)

1、创立一个 MqttReceive 类,对 MQTT 客户端进行初始化操作,并且实现接管音讯的性能(把主题订阅上,创立回调函数):

@Service
public class MqttReceive {
    @Autowired
    private  MqttReceiveCallback mqttReceiveCallback;

    private static int QoS = 1;// 通信的品质,最高是 2
    private static String Host = "tcp://127.0.0.1:1883";
    private static MemoryPersistence memoryPersistence = null;
    private static MqttConnectOptions mqttConnectOptions = null;
    private static MqttClient mqttClient  = null;

    public  void init(String clientId) {mqttConnectOptions = new MqttConnectOptions();
        memoryPersistence = new MemoryPersistence();

        if(null != memoryPersistence && null != clientId && null != Host) {
            try {mqttClient = new MqttClient(Host, clientId, memoryPersistence);
            } catch (MqttException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();}
        }

        if(null != mqttConnectOptions) {mqttConnectOptions.setCleanSession(true);
            mqttConnectOptions.setConnectionTimeout(30);
            mqttConnectOptions.setKeepAliveInterval(45);
            if(null != mqttClient && !mqttClient.isConnected()) {// 这里能够本人 new 一个回调函数,比方 new MqttReceiveCallback()。我这里应用主动拆卸,让 Spring 容器来治理 bean 与 bean 的依赖
               mqttClient.setCallback(mqttReceiveCallback);
                try {System.out.println(mqttReceiveCallback);
                    System.out.println("尝试连贯");
                    mqttClient.connect();} catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}
            }
        }
    }
    public  void receive(String topic) {int[] Qos = {QoS};
        String[] topics = {topic};

        if(null != mqttClient && mqttClient.isConnected()) {if(null!=topics && null!=Qos && topics.length>0 && Qos.length>0) {
                try {System.out.println("订阅主题");

                    mqttClient.subscribe(topics, Qos);
                } catch (MqttException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();}
            }
        }else {System.out.println("初始化");
            init("HAPPYFAN");
            receive(topic);
        }
    }
}

在应用时,只有调用这个类中的 receive(String topic)办法即可。当 MQTT 客户端接管到订阅的主题 (topic) 的音讯时,传输的信息就会到回调函数中(就是在 MqttReceive 类中,本人 new 的那个回调函数,要确保统一类名和 new 进去的回调函数统一)。
2、创立回调函数

@Service
public class MqttReceiveCallback implements MqttCallback {

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {String msg = new String(message.getPayload());
        System.out.println("Client 接管音讯主题 :" + topic);
        System.out.println("Client 接管音讯 Qos :" + message.getQos());
        System.out.println("Client 接管音讯内容 :" + msg);
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) { }
    @Override
    public void connectionLost(Throwable cause) {}}

调用 receive(String topic)办法,指定一个接管音讯的主题(也是随本人的,我这里写的就是 receive),尝试能不能接管到这个主题的信息。

能够看到,再 idea 控制台中胜利的打印出了接管主题的音讯、品质、以及内容。一个 MQTT 客户端 (订阅者) 就搭建实现了。

正文完
 0