关于java:EMQ入门使用

31次阅读

共计 13367 个字符,预计需要花费 34 分钟才能阅读完成。

我的项目目录构造

pom 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.shining</groupId>
 <artifactId>shinelinker-server-sdk</artifactId>
 <version>final</version>
 <dependencies> <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
 <dependency>
 <groupId>org.eclipse.paho</groupId>
 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
 <version>1.2.1</version>
 </dependency>
 <dependency> <groupId>cn.hutool</groupId>
 <artifactId>hutool-all</artifactId>
 <version>4.5.9</version>
 </dependency>
 <dependency> <groupId>org.slf4j</groupId>
 <artifactId>slf4j-log4j12</artifactId>
 <version>1.7.25</version>
 </dependency>
 <!-- fastjson -->
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.58</version>
 </dependency>
 <!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>druid-spring-boot-starter</artifactId>
 <version>1.1.23</version>
 </dependency>
 <!-- 引入阿里数据库连接池 -->
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>druid</artifactId>
 <version>1.2.1</version>
 </dependency>
 <!-- mysql 驱动 -->
 <dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>8.0.12</version>
 </dependency> </dependencies></project>
ImeiBean
package com.shining.serversdk.bean;
/**
 * @Author wulongbo
 * @Date 2021/2/20 10:05
 * @Version 1.0
 */
public class ImeiBean {
    private String imei;
 public String getImei() {return imei;}
    public void setImei(String imei) {this.imei = imei;}
}
PublishBean
package com.shining.serversdk.bean;
import cn.hutool.json.JSONObject;
/**
 * 公布的音讯封装
 */
public class PublishBean {
    private String topic;
 private boolean mutable = true;
 private JSONObject payload;
 private int qos = 2;
 private boolean retained = false;
 private boolean dup = false;
 public boolean isMutable() {return mutable;}
    public void setMutable(boolean mutable) {this.mutable = mutable;}
    public JSONObject getPayload() {return payload;}
    public void setPayload(JSONObject payload) {this.payload = payload;}
    public int getQos() {return qos;}
    public void setQos(int qos) {this.qos = qos;}
    public boolean isRetained() {return retained;}
    public void setRetained(boolean retained) {this.retained = retained;}
    public boolean isDup() {return dup;}
    public void setDup(boolean dup) {this.dup = dup;}
    public String getTopic() {return topic;}
    public void setTopic(String topic) {this.topic = topic;}
}
DBConn
package com.shining.serversdk.db;
import java.sql.*;
/**
 * @Author wulongbo
 * @Date 2021/2/20 10:03
 * @Version 1.0
 */public class DBConn {
    private static final String JDBCDriver = "com.mysql.cj.jdbc.Driver"; //mysql 驱动
 private static final String url = "jdbc:mysql://39.102.56.91:3306/mybatis_plus";
 private static final String username = "root"; // 数据库用户名
 private static final String password = "babaAdmin"; // 数据库明码
 private static final Connection conn = null;
 /**
 * 连贯数据库
 * @return
 */
 public static Connection conn() {
        Connection conn = null;
 try {Class.forName(JDBCDriver); // 加载数据库驱动
 try {conn = DriverManager.getConnection(url, username, password); // 连贯数据库
 } catch (SQLException e) {e.printStackTrace();
 }
        } catch (ClassNotFoundException e) {e.printStackTrace();
 }
        return conn;
 }
    /**
 * 敞开数据库链接
 * @return
 */
 public static void close() {if(conn != null) {
            try {conn.close(); // 敞开数据库链接
 } catch (SQLException e) {e.printStackTrace();
 }
        }
    }
}
DBUtil
package com.shining.serversdk.db;
import com.shining.serversdk.bean.ImeiBean;
import java.sql.*;
/**
 * @Author wulongbo
 * @Date 2021/2/20 10:04
 * @Version 1.0
 */public class DBUtil {
    private static Connection conn = null;
 private static PreparedStatement ps = null;
 private static ResultSet rs = null;
 private static final CallableStatement cs = null;
 /**
 * Insert 办法封装
 *
 * @param imeiBean 传入参数
 */
 public static void Insert(ImeiBean imeiBean) {conn = DBConn.conn(); // 调用 DBconnection 类的 conn() 办法连贯数据库
 String sql = "INSERT INTO student01 (sno,sname,dname,ssex,cno,mark,type) VALUES(?,?,?,?,?,?,?)"; // 插入 sql 语句
 try {ps = conn.prepareStatement(sql);
 /**
 * 调用实体 imeiBean 类, 获取须要插入的各个字段的值
 * 留神参数占位的地位
 * 通过 set 办法设置参数的地位
 * 通过 get 办法取参数的值
 */
 ps.setString(1, imeiBean.getImei());
 ps.executeUpdate(); // 执行 sql 语句
 System.out.println("插入胜利(*~︶~)");
 } catch (SQLException e) {e.printStackTrace();
 } finally {DBConn.close();
 }
    }
}
MessageHandler
package com.shining.serversdk.handler;
import cn.hutool.json.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * 音讯处理器
 */
public abstract class MessageHandler implements MqttCallback {
    /**
 * 异样解决
 *
 * @param throwable
 */
 public void connectionLost(Throwable throwable) {throwable.printStackTrace();
 onError(throwable);
 }
    /**
 * @param topic mqttTopic
 * @param mqttMessage Message
 * @throws Exception
 */ public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {JSONObject jsonObject = new JSONObject();
 jsonObject.put("topic", topic);
 jsonObject.put("payload", new String(mqttMessage.getPayload(), "UTF-8"));
 jsonObject.put("retain", mqttMessage.isRetained());
 jsonObject.put("dup", mqttMessage.isDuplicate());
 jsonObject.put("messageId", mqttMessage.getId());
 messageArrived(jsonObject);
 }
    /**
 * 后续步骤
 *
 * @param iMqttDeliveryToken MessageID 治理类
 */
 public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {onComplete(iMqttDeliveryToken);
 }
    /**
 * 音讯达到封装 JSON
 */ public abstract void messageArrived(JSONObject receivedMessage);
 /**
 * 出错解决
 */
 public abstract void onError(Throwable throwable);
 /**
 * */ public abstract void onComplete(IMqttDeliveryToken iMqttDeliveryToken);
 /**
 * */ public abstract void onConnected();}
PublishHandler
package com.shining.serversdk.handler;
public abstract class PublishHandler {public abstract void onSuccess();
 public abstract void onException(Exception e);
}
SubscribeHandler
package com.shining.serversdk.handler;
/**
 * 订阅音讯处理器
 * @author wwhai
 * @date 2019/7/31 20:21
 * @email:751957846@qq.com 瞅啥瞅?代码拿过去我看看有没有 BUG。*/
public abstract class SubscribeHandler {public abstract void onSuccess(String topic,int qos);
 public abstract void onError(Exception e);
}
IServerMqttClient
package com.shining.serversdk.sdkcore;
import cn.hutool.json.JSONObject;
import com.shining.serversdk.bean.PublishBean;
import com.shining.serversdk.handler.PublishHandler;
import com.shining.serversdk.handler.SubscribeHandler;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
 * @author wwhai
 * 服务器端的 SDK
 */public interface IServerMqttClient {
    /**
 * @param topic
 * @param qos
 * @return
 */
 boolean subscribe(String topic, int qos, SubscribeHandler subscribeHandler);
 /**
 * @param topic
 * @return
 */
 boolean unSubscribe(String topic);
 /**
 * @param topic
 * @param message
 * @return
 */
 boolean publish(String topic, MqttMessage message, PublishHandler publishHandler);
 /**
 * @param topic
 * @param message
 * @return
 */
 boolean publishJson(String topic, JSONObject message, PublishHandler publishHandler);
 /**
 * @param topic
 * @param bytes
 * @return
 */
 boolean publishHex(String topic, byte[] bytes, PublishHandler publishHandler);
 /**
 * publish */ boolean publish(PublishBean publishBean, PublishHandler publishHandler);
}
ServerMqttClient
package com.shining.serversdk.sdkcore;
import cn.hutool.json.JSONObject;
import com.shining.serversdk.bean.PublishBean;
import com.shining.serversdk.handler.MessageHandler;
import com.shining.serversdk.handler.PublishHandler;
import com.shining.serversdk.handler.SubscribeHandler;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public final class ServerMqttClient implements IServerMqttClient {
    private boolean isConnected;
 private String host;
 private int port;
 private MqttClient client;
 private String clientId;
 private String userName;
 private String passWord;
 /**
 * @param host
 * @param port
 * @param clientId
 * @param username
 * @param password
 */
 public ServerMqttClient(String host, int port, String clientId, String username, String password) {
        this.host = host;
 this.port = port;
 this.userName = username;
 this.passWord = password;
 this.clientId = clientId;
 }
    public boolean connect(MessageHandler messageHandler) {
        try {client = new MqttClient("tcp://" + host + ":" + port, clientId, new MemoryPersistence());
 MqttConnectOptions options = new MqttConnectOptions();
 options.setCleanSession(false);
 options.setUserName(userName);
 options.setPassword(passWord.toCharArray());
 // 设置超时工夫
 options.setConnectionTimeout(10);
 // 设置会话心跳工夫
 options.setKeepAliveInterval(20);
 client.setCallback(messageHandler);
 client.connect(options);
 isConnected = true;
 messageHandler.onConnected();
 return true;
 } catch (Exception e) {e.printStackTrace();
 isConnected = false;
 return false; }
    }
    /**
 * 订阅
 *
 * @param topic
 * @param qos
 * @param subscribeHandler
 * @return
 */
 public boolean subscribe(String topic, int qos, SubscribeHandler subscribeHandler) {
        try {client.subscribe(topic, qos);
 subscribeHandler.onSuccess(topic, qos);
 return true; } catch (MqttException e) {e.printStackTrace();
 subscribeHandler.onError(e);
 return false; }
    }
    /**
 * 勾销订阅
 *
 * @param topic
 * @return
 */
 public boolean unSubscribe(String topic) {
        try {client.unsubscribe(topic);
 return true; } catch (MqttException e) {e.printStackTrace();
 return false; }
    }
    /**
 * 公布
 *
 * @param topic
 * @param message
 * @return
 */
 public boolean publish(String topic, MqttMessage message, PublishHandler publishHandler) {
        try {client.publish(topic, message);
 publishHandler.onSuccess();
 return true; } catch (MqttException e) {e.printStackTrace();
 publishHandler.onException(e);
 return false; }
    }
    /**
 * 公布 JSON
 * @param topic
 * @param message
 * @param publishHandler
 * @return
 */
 public boolean publishJson(String topic, JSONObject message, PublishHandler publishHandler) {MqttMessage mqttMessage = new MqttMessage();
 mqttMessage.setPayload(message.toJSONString(1).getBytes());
 return this.publish(topic, mqttMessage, publishHandler);
 }
    /**
 * 公布 16 进制
 * @param topic
 * @param bytes
 * @param publishHandler
 * @return
 */
 public boolean publishHex(String topic, byte[] bytes, PublishHandler publishHandler) {MqttMessage mqttMessage = new MqttMessage();
 mqttMessage.setPayload(bytes);
 return this.publish(topic, mqttMessage, publishHandler);
 }
    /**
 * 公布音讯
 * @param publishBean
 * @param publishHandler
 * @return
 */
 public boolean publish(PublishBean publishBean, PublishHandler publishHandler) {MqttMessage mqttMessage = new MqttMessage();
 mqttMessage.setQos(publishBean.getQos());
 mqttMessage.setRetained(publishBean.isRetained());
 mqttMessage.setPayload(publishBean.getPayload().toString().getBytes());
 try {client.publish(publishBean.getTopic(), mqttMessage);
 publishHandler.onSuccess();
 return true; } catch (MqttException e) {e.printStackTrace();
 publishHandler.onException(e);
 return false; }
    }
    /**
 * 是否在线
 * @return
 */
 public boolean isConnected() {return isConnected;}
}
Main 启动类
package com.shining;
import cn.hutool.json.JSONObject;
import com.shining.serversdk.bean.ImeiBean;
import com.shining.serversdk.db.DBUtil;
import com.shining.serversdk.handler.MessageHandler;
import com.shining.serversdk.handler.PublishHandler;
import com.shining.serversdk.handler.SubscribeHandler;
import com.shining.serversdk.sdkcore.ServerMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Main {public static Logger logger = LoggerFactory.getLogger(Main.class);
 private static ServerMqttClient serverMqttClient;
 public static void main(String[] args) {
        serverMqttClient = new ServerMqttClient("39.102.56.91",
 1883,
 "JAVA-EMQ",
 "wulongbo",
 "study");
 serverMqttClient.connect(new MessageHandler() {public void messageArrived(JSONObject receivedMessage) {logger.info("音讯 {} 达到", receivedMessage.toString());
 com.alibaba.fastjson.JSONObject jsonObject = com.alibaba.fastjson.JSON.parseObject(receivedMessage.toString());
 String code = jsonObject.getString("code");
 if (code != null && code.equals("3002")) {String imei = jsonObject.getString("imei");
 /**
 * 上面为手动连贯数据库
 */
 try {ImeiBean imeiBean = new ImeiBean();
 imeiBean.setImei(imei);
 DBUtil.Insert(imeiBean);
 } catch (Exception e) {e.printStackTrace();
 }
                }
            }
            public void onError(Throwable throwable) { }
            public void onComplete(IMqttDeliveryToken iMqttDeliveryToken) { }
            public void onConnected() {}
        });
 /**
 * 判断是否在线
 */
 if (serverMqttClient.isConnected()) {
            /**
 * 订阅
 */
 serverMqttClient.subscribe("study/java/emq", 2, new SubscribeHandler() {
                @Override
 public void onSuccess(String topic, int qos) {logger.info("主題 {} 公布胜利!", topic);
 }
                @Override
 public void onError(Exception e) {}});
 /**
 * 公布 Message
 */ MqttMessage mqttMessage = new MqttMessage();
 mqttMessage.setPayload(new byte[]{70, 85, 67, 75});
 serverMqttClient.publish("publish/test/topic", new MqttMessage(), new PublishHandler() {
                @Override
 public void onSuccess() {logger.info("主題公布 Message 胜利!");
 }
                @Override
 public void onException(Exception e) {}});
 /**
 * 公布 JSON
 */
 JSONObject jsonObject = new JSONObject();
 jsonObject.put("action", "1");
 jsonObject.put("msgid", System.currentTimeMillis());
 jsonObject.put("type", "1");
 jsonObject.put("price", "1");
 serverMqttClient.publishJson("test", jsonObject, new PublishHandler() {
                @Override
 public void onSuccess() {logger.info("主題公布 JSON 胜利!");
 // 入库
 }
                @Override
 public void onException(Exception e) {//}
            });
 /**
 * 公布 Hex
 */ serverMqttClient.publishHex("test", new byte[]{97, 98, 99, 100}, new PublishHandler() {
                @Override
 public void onSuccess() {}
                @Override
 public void onException(Exception e) {}});
 /**
 * 应用封装好的 Pub Bean
 *///            PublishBean publishBean = new PublishBean();
//            publishBean.setTopic("test");
//            publishBean.setPayload(jsonObject);
//            publishBean.setDup(true);
//            publishBean.setRetained(true);
//            publishBean.setMutable(true);
//
//            serverMqttClient.publish(publishBean, new PublishHandler() {
//                @Override
//                public void onSuccess() {
//
//                }
//
//                @Override
//                public void onException(Exception e) {
//
//                }
//            });
 } else {System.out.println("连贯失败");
 }
    }
    public static void send() {for (int i = 0; i < 10; i++) {JSONObject jsonObject = new JSONObject();
 jsonObject.put("action", "1");
 jsonObject.put("msgid", System.currentTimeMillis());
 jsonObject.put("type", "1");
 jsonObject.put("price", "1");
 send2(jsonObject);
 }
    }
    private static void send2(JSONObject jsonObject) {serverMqttClient.publishJson("test", jsonObject, new PublishHandler() {
            @Override
 public void onSuccess() {// 入库}
            @Override
 public void onException(Exception e) {// send2}
        });
 }
}

正文完
 0