我的项目目录构造

pom依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shining</groupId> <artifactId>shinelinker-server-sdk</artifactId> <version>final</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>4.5.9</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.23</version> </dependency> <!-- 引入阿里数据库连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.2.1</version> </dependency> <!-- mysql驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.12</version> </dependency> </dependencies></project>
ImeiBean
package com.shining.serversdk.bean;/** * @Author wulongbo * @Date 2021/2/20 10:05 * @Version 1.0 */public class ImeiBean {    private String imei; public String getImei() {        return imei; }    public void setImei(String imei) {        this.imei = imei; }}
PublishBean
package com.shining.serversdk.bean;import cn.hutool.json.JSONObject;/** * 公布的音讯封装 */public class PublishBean {    private String topic; private boolean mutable = true; private JSONObject payload; private int qos = 2; private boolean retained = false; private boolean dup = false; public boolean isMutable() {        return mutable; }    public void setMutable(boolean mutable) {        this.mutable = mutable; }    public JSONObject getPayload() {        return payload; }    public void setPayload(JSONObject payload) {        this.payload = payload; }    public int getQos() {        return qos; }    public void setQos(int qos) {        this.qos = qos; }    public boolean isRetained() {        return retained; }    public void setRetained(boolean retained) {        this.retained = retained; }    public boolean isDup() {        return dup; }    public void setDup(boolean dup) {        this.dup = dup; }    public String getTopic() {        return topic; }    public void setTopic(String topic) {        this.topic = topic; }}
DBConn
package com.shining.serversdk.db;import java.sql.*;/** * @Author wulongbo * @Date 2021/2/20 10:03 * @Version 1.0 */public class DBConn {    private static final String JDBCDriver = "com.mysql.cj.jdbc.Driver"; //mysql驱动 private static final String url = "jdbc:mysql://39.102.56.91:3306/mybatis_plus"; private static final String username = "root"; //数据库用户名 private static final String password = "babaAdmin"; //数据库明码 private static final Connection conn = null; /** * 连贯数据库 * @return */ public static Connection conn() {        Connection conn = null; try {            Class.forName(JDBCDriver); //加载数据库驱动 try {                conn = DriverManager.getConnection(url, username, password); //连贯数据库 } catch (SQLException e) {                e.printStackTrace(); }        } catch (ClassNotFoundException e) {            e.printStackTrace(); }        return conn; }    /** * 敞开数据库链接 * @return */ public static void close() {        if(conn != null) {            try {                conn.close(); //敞开数据库链接 } catch (SQLException e) {                e.printStackTrace(); }        }    }}
DBUtil
package com.shining.serversdk.db;import com.shining.serversdk.bean.ImeiBean;import java.sql.*;/** * @Author wulongbo * @Date 2021/2/20 10:04 * @Version 1.0 */public class DBUtil {    private static Connection conn = null; private static PreparedStatement ps = null; private static ResultSet rs = null; private static final CallableStatement cs = null; /** * Insert办法封装 * * @param imeiBean 传入参数 */ public static void Insert(ImeiBean imeiBean) {        conn = DBConn.conn(); //调用 DBconnection 类的 conn() 办法连贯数据库 String sql = "INSERT INTO student01 (sno,sname,dname,ssex,cno,mark,type) VALUES(?,?,?,?,?,?,?)"; //插入sql语句 try {            ps = conn.prepareStatement(sql); /** * 调用实体imeiBean类,获取须要插入的各个字段的值 * 留神参数占位的地位 * 通过set办法设置参数的地位 * 通过get办法取参数的值 */ ps.setString(1, imeiBean.getImei()); ps.executeUpdate(); //执行sql语句 System.out.println("插入胜利(* ̄︶ ̄)"); } catch (SQLException e) {            e.printStackTrace(); } finally {            DBConn.close(); }    }}
MessageHandler
package com.shining.serversdk.handler;import cn.hutool.json.JSONObject;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttMessage;/** * 音讯处理器 */public abstract class MessageHandler implements MqttCallback {    /** * 异样解决 * * @param throwable */ public void connectionLost(Throwable throwable) {        throwable.printStackTrace(); onError(throwable); }    /** * @param topic mqttTopic * @param mqttMessage Message * @throws Exception */ public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {        JSONObject jsonObject = new JSONObject(); jsonObject.put("topic", topic); jsonObject.put("payload", new String(mqttMessage.getPayload(), "UTF-8")); jsonObject.put("retain", mqttMessage.isRetained()); jsonObject.put("dup", mqttMessage.isDuplicate()); jsonObject.put("messageId", mqttMessage.getId()); messageArrived(jsonObject); }    /** * 后续步骤 * * @param iMqttDeliveryToken MessageID治理类 */ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {        onComplete(iMqttDeliveryToken); }    /** * 音讯达到封装JSON */ public abstract void messageArrived(JSONObject receivedMessage); /** * 出错解决 */ public abstract void onError(Throwable throwable); /** * */ public abstract void onComplete(IMqttDeliveryToken iMqttDeliveryToken); /** * */ public abstract void onConnected();}
PublishHandler
package com.shining.serversdk.handler;public abstract class PublishHandler {    public abstract void onSuccess(); public abstract void onException(Exception e);}
SubscribeHandler
package com.shining.serversdk.handler;/** * 订阅音讯处理器 * @author wwhai * @date 2019/7/31 20:21 * @email:751957846@qq.com 瞅啥瞅?代码拿过去我看看有没有BUG。 */public abstract class SubscribeHandler {    public abstract void onSuccess(String topic,int qos); public abstract void onError(Exception e);}
IServerMqttClient
package com.shining.serversdk.sdkcore;import cn.hutool.json.JSONObject;import com.shining.serversdk.bean.PublishBean;import com.shining.serversdk.handler.PublishHandler;import com.shining.serversdk.handler.SubscribeHandler;import org.eclipse.paho.client.mqttv3.MqttMessage;/** * @author wwhai * 服务器端的SDK */public interface IServerMqttClient {    /** * @param topic * @param qos * @return */ boolean subscribe(String topic, int qos, SubscribeHandler subscribeHandler); /** * @param topic * @return */ boolean unSubscribe(String topic); /** * @param topic * @param message * @return */ boolean publish(String topic, MqttMessage message, PublishHandler publishHandler); /** * @param topic * @param message * @return */ boolean publishJson(String topic, JSONObject message, PublishHandler publishHandler); /** * @param topic * @param bytes * @return */ boolean publishHex(String topic, byte[] bytes, PublishHandler publishHandler); /** * publish */ boolean publish(PublishBean publishBean, PublishHandler publishHandler);}
ServerMqttClient
package com.shining.serversdk.sdkcore;import cn.hutool.json.JSONObject;import com.shining.serversdk.bean.PublishBean;import com.shining.serversdk.handler.MessageHandler;import com.shining.serversdk.handler.PublishHandler;import com.shining.serversdk.handler.SubscribeHandler;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public final class ServerMqttClient implements IServerMqttClient {    private boolean isConnected; private String host; private int port; private MqttClient client; private String clientId; private String userName; private String passWord; /** * @param host * @param port * @param clientId * @param username * @param password */ public ServerMqttClient(String host, int port, String clientId, String username, String password) {        this.host = host; this.port = port; this.userName = username; this.passWord = password; this.clientId = clientId; }    public boolean connect(MessageHandler messageHandler) {        try {            client = new MqttClient("tcp://" + host + ":" + port, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 设置超时工夫 options.setConnectionTimeout(10); // 设置会话心跳工夫 options.setKeepAliveInterval(20); client.setCallback(messageHandler); client.connect(options); isConnected = true; messageHandler.onConnected(); return true; } catch (Exception e) {            e.printStackTrace(); isConnected = false; return false; }    }    /** * 订阅 * * @param topic * @param qos * @param subscribeHandler * @return */ public boolean subscribe(String topic, int qos, SubscribeHandler subscribeHandler) {        try {            client.subscribe(topic, qos); subscribeHandler.onSuccess(topic, qos); return true; } catch (MqttException e) {            e.printStackTrace(); subscribeHandler.onError(e); return false; }    }    /** * 勾销订阅 * * @param topic * @return */ public boolean unSubscribe(String topic) {        try {            client.unsubscribe(topic); return true; } catch (MqttException e) {            e.printStackTrace(); return false; }    }    /** * 公布 * * @param topic * @param message * @return */ public boolean publish(String topic, MqttMessage message, PublishHandler publishHandler) {        try {            client.publish(topic, message); publishHandler.onSuccess(); return true; } catch (MqttException e) {            e.printStackTrace(); publishHandler.onException(e); return false; }    }    /** * 公布JSON * @param topic * @param message * @param publishHandler * @return */ public boolean publishJson(String topic, JSONObject message, PublishHandler publishHandler) {        MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(message.toJSONString(1).getBytes()); return this.publish(topic, mqttMessage, publishHandler); }    /** * 公布16进制 * @param topic * @param bytes * @param publishHandler * @return */ public boolean publishHex(String topic, byte[] bytes, PublishHandler publishHandler) {        MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(bytes); return this.publish(topic, mqttMessage, publishHandler); }    /** * 公布音讯 * @param publishBean * @param publishHandler * @return */ public boolean publish(PublishBean publishBean, PublishHandler publishHandler) {        MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(publishBean.getQos()); mqttMessage.setRetained(publishBean.isRetained()); mqttMessage.setPayload(publishBean.getPayload().toString().getBytes()); try {            client.publish(publishBean.getTopic(), mqttMessage); publishHandler.onSuccess(); return true; } catch (MqttException e) {            e.printStackTrace(); publishHandler.onException(e); return false; }    }    /** * 是否在线 * @return */ public boolean isConnected() {        return isConnected; }}
Main启动类
package com.shining;import cn.hutool.json.JSONObject;import com.shining.serversdk.bean.ImeiBean;import com.shining.serversdk.db.DBUtil;import com.shining.serversdk.handler.MessageHandler;import com.shining.serversdk.handler.PublishHandler;import com.shining.serversdk.handler.SubscribeHandler;import com.shining.serversdk.sdkcore.ServerMqttClient;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Main {    public static Logger logger = LoggerFactory.getLogger(Main.class); private static ServerMqttClient serverMqttClient; public static void main(String[] args) {        serverMqttClient = new ServerMqttClient("39.102.56.91", 1883, "JAVA-EMQ", "wulongbo", "study"); serverMqttClient.connect(new MessageHandler() {            public void messageArrived(JSONObject receivedMessage) {                logger.info("音讯{}达到", receivedMessage.toString()); com.alibaba.fastjson.JSONObject jsonObject = com.alibaba.fastjson.JSON.parseObject(receivedMessage.toString()); String code = jsonObject.getString("code"); if (code != null && code.equals("3002")) {                    String imei = jsonObject.getString("imei"); /** * 上面为手动连贯数据库 */ try {                        ImeiBean imeiBean = new ImeiBean(); imeiBean.setImei(imei); DBUtil.Insert(imeiBean); } catch (Exception e) {                        e.printStackTrace(); }                }            }            public void onError(Throwable throwable) {            }            public void onComplete(IMqttDeliveryToken iMqttDeliveryToken) {            }            public void onConnected() {            }        }); /** * 判断是否在线 */ if (serverMqttClient.isConnected()) {            /** * 订阅 */ serverMqttClient.subscribe("study/java/emq", 2, new SubscribeHandler() {                @Override public void onSuccess(String topic, int qos) {                    logger.info("主題{}公布胜利!", topic); }                @Override public void onError(Exception e) {                }            }); /** * 公布 Message */ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(new byte[]{70, 85, 67, 75}); serverMqttClient.publish("publish/test/topic", new MqttMessage(), new PublishHandler() {                @Override public void onSuccess() {                    logger.info("主題公布Message胜利!"); }                @Override public void onException(Exception e) {                }            }); /** * 公布JSON */ JSONObject jsonObject = new JSONObject(); jsonObject.put("action", "1"); jsonObject.put("msgid", System.currentTimeMillis()); jsonObject.put("type", "1"); jsonObject.put("price", "1"); serverMqttClient.publishJson("test", jsonObject, new PublishHandler() {                @Override public void onSuccess() {                    logger.info("主題公布JSON胜利!"); //入库 }                @Override public void onException(Exception e) {                    // }            }); /** * 公布Hex */ serverMqttClient.publishHex("test", new byte[]{97, 98, 99, 100}, new PublishHandler() {                @Override public void onSuccess() {                }                @Override public void onException(Exception e) {                }            }); /** * 应用封装好的Pub Bean *///            PublishBean publishBean = new PublishBean();//            publishBean.setTopic("test");//            publishBean.setPayload(jsonObject);//            publishBean.setDup(true);//            publishBean.setRetained(true);//            publishBean.setMutable(true);////            serverMqttClient.publish(publishBean, new PublishHandler() {//                @Override//                public void onSuccess() {////                }////                @Override//                public void onException(Exception e) {////                }//            }); } else {            System.out.println("连贯失败"); }    }    public static void send() {        for (int i = 0; i < 10; i++) {            JSONObject jsonObject = new JSONObject(); jsonObject.put("action", "1"); jsonObject.put("msgid", System.currentTimeMillis()); jsonObject.put("type", "1"); jsonObject.put("price", "1"); send2(jsonObject); }    }    private static void send2(JSONObject jsonObject) {        serverMqttClient.publishJson("test", jsonObject, new PublishHandler() {            @Override public void onSuccess() {                //入库 }            @Override public void onException(Exception e) {                // send2 }        }); }}