一个MQTT客户端能够做到:
1、公布其余客户端可能会订阅的信息
2、订阅其它客户端公布的音讯
3、退订或删除应用程序的音讯
4、断开与服务器连贯

所以在应用Java代码进行搭建时,也要实现这几种性能。


MQTT客户端(发布者)

1、创立一个SendMQTT类,对MQTT客户端进行初始化操作,创立连贯。

public class SendMqtt {    public static MqttClient mqttClient = null;    private static MemoryPersistence memoryPersistence = null;    private static MqttConnectOptions mqttConnectOptions = null;    static {        init("happyfan");    }    public static  void init(String clientId) {        //初始化连贯设置对象        mqttConnectOptions = new MqttConnectOptions();        //初始化MqttClient        if(null != mqttConnectOptions) {//            true能够平安地应用内存持久性作为客户端断开连接时革除的所有状态            mqttConnectOptions.setCleanSession(true);//            设置连贯超时            mqttConnectOptions.setConnectionTimeout(30);//            设置长久化形式            memoryPersistence = new MemoryPersistence();            if(null != memoryPersistence && null != clientId) {                try {                    mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }else {            }        }else {            System.out.println("mqttConnectOptions对象为空");        }        System.out.println(mqttClient.isConnected());        if(null != mqttClient) {            if(!mqttClient.isConnected()) {                try {                    System.out.println("创立连贯");                    mqttClient.connect(mqttConnectOptions);                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        }else {            System.out.println("mqttClient为空");        }        System.out.println(mqttClient.isConnected());    }}

2、实现与MQTT客户端断开连接以及从新连贯的性能:

public class SendMqtt {    public static MqttClient mqttClient = null;    private static MemoryPersistence memoryPersistence = null;    private static MqttConnectOptions mqttConnectOptions = null;    static {        init("happyfan");    }    public static  void init(String clientId) {        //初始化连贯设置对象        mqttConnectOptions = new MqttConnectOptions();        //初始化MqttClient        if(null != mqttConnectOptions) {//            true能够平安地应用内存持久性作为客户端断开连接时革除的所有状态            mqttConnectOptions.setCleanSession(true);//            设置连贯超时            mqttConnectOptions.setConnectionTimeout(30);//            设置长久化形式            memoryPersistence = new MemoryPersistence();            if(null != memoryPersistence && null != clientId) {                try {                    mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }else {            }        }else {            System.out.println("mqttConnectOptions对象为空");        }        System.out.println(mqttClient.isConnected());        if(null != mqttClient) {            if(!mqttClient.isConnected()) {                try {                    System.out.println("创立连贯");                    mqttClient.connect(mqttConnectOptions);                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        }else {            System.out.println("mqttClient为空");        }        System.out.println(mqttClient.isConnected());    }    //与MQTT服务器断开连接    public void closeConnect() {        //敞开存储形式        if(null != memoryPersistence) {            try {                memoryPersistence.close();            } catch (MqttPersistenceException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }else {            System.out.println("memoryPersistence is null");        }        //敞开连贯        if(null != mqttClient) {            if(mqttClient.isConnected()) {                try {                    mqttClient.disconnect();                    mqttClient.close();                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }else {                System.out.println("mqttClient is not connect");            }        }else {            System.out.println("mqttClient is null");        }    }    //从新连贯MQTT服务器    public void reConnect() {        if(null != mqttClient) {            if(!mqttClient.isConnected()) {                if(null != mqttConnectOptions) {                    try {                        mqttClient.connect(mqttConnectOptions);                    } catch (MqttException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }else {                    System.out.println("mqttConnectOptions is null");                }            }else {                System.out.println("mqttClient is null or connect");            }        }else {            init("happyfan");//其实就是从新回到初始化的办法中去尝试连贯服务器        }    }}

3、实现订阅、公布、清空主题:

public class SendMqtt {    public static MqttClient mqttClient = null;    private static MemoryPersistence memoryPersistence = null;    private static MqttConnectOptions mqttConnectOptions = null;    static {        init("happyfan");    }    public static  void init(String clientId) {        //初始化连贯设置对象        mqttConnectOptions = new MqttConnectOptions();        //初始化MqttClient        if(null != mqttConnectOptions) {//            true能够平安地应用内存持久性作为客户端断开连接时革除的所有状态            mqttConnectOptions.setCleanSession(true);//            设置连贯超时            mqttConnectOptions.setConnectionTimeout(30);//            设置长久化形式            memoryPersistence = new MemoryPersistence();            if(null != memoryPersistence && null != clientId) {                try {                    mqttClient = new MqttClient("tcp://127.0.0.1:1883", clientId,memoryPersistence);                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }else {            }        }else {            System.out.println("mqttConnectOptions对象为空");        }        System.out.println(mqttClient.isConnected());        if(null != mqttClient) {            if(!mqttClient.isConnected()) {                try {                    System.out.println("创立连贯");                    mqttClient.connect(mqttConnectOptions);                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        }else {            System.out.println("mqttClient为空");        }        System.out.println(mqttClient.isConnected());    }    public void closeConnect() {        //敞开存储形式        if(null != memoryPersistence) {            try {                memoryPersistence.close();            } catch (MqttPersistenceException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }else {            System.out.println("memoryPersistence is null");        }        //敞开连贯        if(null != mqttClient) {            if(mqttClient.isConnected()) {                try {                    mqttClient.disconnect();                    mqttClient.close();                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }else {                System.out.println("mqttClient is not connect");            }        }else {            System.out.println("mqttClient is null");        }    }        //    从新连贯    public void reConnect() {        if(null != mqttClient) {            if(!mqttClient.isConnected()) {                if(null != mqttConnectOptions) {                    try {                        mqttClient.connect(mqttConnectOptions);                    } catch (MqttException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }else {                    System.out.println("mqttConnectOptions is null");                }            }else {                System.out.println("mqttClient is null or connect");            }        }else {            init("happyfan");        }    }    //    公布音讯    public void publishMessage(String pubTopic,String message,int qos) {        if(null != mqttClient&& mqttClient.isConnected()) {            System.out.println("公布音讯   "+mqttClient.isConnected());            System.out.println("id:"+mqttClient.getClientId());            MqttMessage mqttMessage = new MqttMessage();            mqttMessage.setQos(qos);            mqttMessage.setPayload(message.getBytes());            MqttTopic topic = mqttClient.getTopic(pubTopic);            if(null != topic) {                try {                    MqttDeliveryToken publish = topic.publish(mqttMessage);                    if(!publish.isComplete()) {                        System.out.println("音讯公布胜利");                    }                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        }else {            reConnect();        }    }    //    订阅主题    public void subTopic(String topic) {        if(null != mqttClient&& mqttClient.isConnected()) {            try {                mqttClient.subscribe(topic, 1);            } catch (MqttException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }else {            System.out.println("mqttClient is error");        }    }    //    清空主题    public void cleanTopic(String topic) {        if(null != mqttClient&& !mqttClient.isConnected()) {            try {                mqttClient.unsubscribe(topic);            } catch (MqttException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }else {            System.out.println("mqttClient is error");        }    }}

在应用公布音讯的性能时,只须要调用这个类中的publishMessage()办法即可。指定一个公布的主题78:21:84:B8:EB:0C(轻易本人的),应用MQTTX尝试一下能不能接管到这个主题的音讯(MQTTX也得订阅这个主题)。

能够看到,MQTTX以及胜利的接管到了这个音讯,这样一个MQTT客户端(发布者)就搭建实现了。

MQTT客户端(订阅者)

1、创立一个MqttReceive类,对MQTT客户端进行初始化操作,并且实现接管音讯的性能(把主题订阅上,创立回调函数):

@Servicepublic class MqttReceive {    @Autowired    private  MqttReceiveCallback mqttReceiveCallback;    private static int QoS = 1;//通信的品质,最高是2    private static String Host = "tcp://127.0.0.1:1883";    private static MemoryPersistence memoryPersistence = null;    private static MqttConnectOptions mqttConnectOptions = null;    private static MqttClient mqttClient  = null;    public  void init(String clientId) {        mqttConnectOptions = new MqttConnectOptions();        memoryPersistence = new MemoryPersistence();        if(null != memoryPersistence && null != clientId && null != Host) {            try {                mqttClient = new MqttClient(Host, clientId, memoryPersistence);            } catch (MqttException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }        if(null != mqttConnectOptions) {            mqttConnectOptions.setCleanSession(true);            mqttConnectOptions.setConnectionTimeout(30);            mqttConnectOptions.setKeepAliveInterval(45);            if(null != mqttClient && !mqttClient.isConnected()) {                //这里能够本人new一个回调函数,比方new MqttReceiveCallback()。我这里应用主动拆卸,让Spring容器来治理bean与bean的依赖               mqttClient.setCallback(mqttReceiveCallback);                try {                    System.out.println(mqttReceiveCallback);                    System.out.println("尝试连贯");                    mqttClient.connect();                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        }    }    public  void receive(String topic) {        int[] Qos = {QoS};        String[] topics = {topic};        if(null != mqttClient && mqttClient.isConnected()) {            if(null!=topics && null!=Qos && topics.length>0 && Qos.length>0) {                try {                    System.out.println("订阅主题");                    mqttClient.subscribe(topics, Qos);                } catch (MqttException e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }            }        }else {            System.out.println("初始化");            init("HAPPYFAN");            receive(topic);        }    }}

在应用时,只有调用这个类中的 receive(String topic)办法即可。当MQTT客户端接管到订阅的主题(topic)的音讯时,传输的信息就会到回调函数中(就是在MqttReceive类中,本人new的那个回调函数,要确保统一类名和new进去的回调函数统一)。
2、创立回调函数

@Servicepublic class MqttReceiveCallback implements MqttCallback {    @Override    public void messageArrived(String topic, MqttMessage message) throws Exception {        String msg = new String(message.getPayload());        System.out.println("Client 接管音讯主题 : " + topic);        System.out.println("Client 接管音讯Qos : " + message.getQos());        System.out.println("Client 接管音讯内容 : " + msg);    }    @Override    public void deliveryComplete(IMqttDeliveryToken token) {    }    @Override    public void connectionLost(Throwable cause) {    }}

调用receive(String topic)办法,指定一个接管音讯的主题(也是随本人的,我这里写的就是receive),尝试能不能接管到这个主题的信息。

能够看到,再idea控制台中胜利的打印出了接管主题的音讯、品质、以及内容。一个MQTT客户端(订阅者)就搭建实现了。