我的项目目录构造
pom依赖
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shining</groupId> <artifactId>shinelinker-server-sdk</artifactId> <version>final</version> <dependencies> <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>4.5.9</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.23</version> </dependency> <!-- 引入阿里数据库连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.2.1</version> </dependency> <!-- mysql驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.12</version> </dependency> </dependencies></project>
ImeiBean
package com.shining.serversdk.bean;/** * @Author wulongbo * @Date 2021/2/20 10:05 * @Version 1.0 */public class ImeiBean { private String imei; public String getImei() { return imei; } public void setImei(String imei) { this.imei = imei; }}
PublishBean
package com.shining.serversdk.bean;import cn.hutool.json.JSONObject;/** * 公布的音讯封装 */public class PublishBean { private String topic; private boolean mutable = true; private JSONObject payload; private int qos = 2; private boolean retained = false; private boolean dup = false; public boolean isMutable() { return mutable; } public void setMutable(boolean mutable) { this.mutable = mutable; } public JSONObject getPayload() { return payload; } public void setPayload(JSONObject payload) { this.payload = payload; } public int getQos() { return qos; } public void setQos(int qos) { this.qos = qos; } public boolean isRetained() { return retained; } public void setRetained(boolean retained) { this.retained = retained; } public boolean isDup() { return dup; } public void setDup(boolean dup) { this.dup = dup; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; }}
DBConn
package com.shining.serversdk.db;import java.sql.*;/** * @Author wulongbo * @Date 2021/2/20 10:03 * @Version 1.0 */public class DBConn { private static final String JDBCDriver = "com.mysql.cj.jdbc.Driver"; //mysql驱动 private static final String url = "jdbc:mysql://39.102.56.91:3306/mybatis_plus"; private static final String username = "root"; //数据库用户名 private static final String password = "babaAdmin"; //数据库明码 private static final Connection conn = null; /** * 连贯数据库 * @return */ public static Connection conn() { Connection conn = null; try { Class.forName(JDBCDriver); //加载数据库驱动 try { conn = DriverManager.getConnection(url, username, password); //连贯数据库 } catch (SQLException e) { e.printStackTrace(); } } catch (ClassNotFoundException e) { e.printStackTrace(); } return conn; } /** * 敞开数据库链接 * @return */ public static void close() { if(conn != null) { try { conn.close(); //敞开数据库链接 } catch (SQLException e) { e.printStackTrace(); } } }}
DBUtil
package com.shining.serversdk.db;import com.shining.serversdk.bean.ImeiBean;import java.sql.*;/** * @Author wulongbo * @Date 2021/2/20 10:04 * @Version 1.0 */public class DBUtil { private static Connection conn = null; private static PreparedStatement ps = null; private static ResultSet rs = null; private static final CallableStatement cs = null; /** * Insert办法封装 * * @param imeiBean 传入参数 */ public static void Insert(ImeiBean imeiBean) { conn = DBConn.conn(); //调用 DBconnection 类的 conn() 办法连贯数据库 String sql = "INSERT INTO student01 (sno,sname,dname,ssex,cno,mark,type) VALUES(?,?,?,?,?,?,?)"; //插入sql语句 try { ps = conn.prepareStatement(sql); /** * 调用实体imeiBean类,获取须要插入的各个字段的值 * 留神参数占位的地位 * 通过set办法设置参数的地位 * 通过get办法取参数的值 */ ps.setString(1, imeiBean.getImei()); ps.executeUpdate(); //执行sql语句 System.out.println("插入胜利(* ̄︶ ̄)"); } catch (SQLException e) { e.printStackTrace(); } finally { DBConn.close(); } }}
MessageHandler
package com.shining.serversdk.handler;import cn.hutool.json.JSONObject;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttMessage;/** * 音讯处理器 */public abstract class MessageHandler implements MqttCallback { /** * 异样解决 * * @param throwable */ public void connectionLost(Throwable throwable) { throwable.printStackTrace(); onError(throwable); } /** * @param topic mqttTopic * @param mqttMessage Message * @throws Exception */ public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { JSONObject jsonObject = new JSONObject(); jsonObject.put("topic", topic); jsonObject.put("payload", new String(mqttMessage.getPayload(), "UTF-8")); jsonObject.put("retain", mqttMessage.isRetained()); jsonObject.put("dup", mqttMessage.isDuplicate()); jsonObject.put("messageId", mqttMessage.getId()); messageArrived(jsonObject); } /** * 后续步骤 * * @param iMqttDeliveryToken MessageID治理类 */ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { onComplete(iMqttDeliveryToken); } /** * 音讯达到封装JSON */ public abstract void messageArrived(JSONObject receivedMessage); /** * 出错解决 */ public abstract void onError(Throwable throwable); /** * */ public abstract void onComplete(IMqttDeliveryToken iMqttDeliveryToken); /** * */ public abstract void onConnected();}
PublishHandler
package com.shining.serversdk.handler;public abstract class PublishHandler { public abstract void onSuccess(); public abstract void onException(Exception e);}
SubscribeHandler
package com.shining.serversdk.handler;/** * 订阅音讯处理器 * @author wwhai * @date 2019/7/31 20:21 * @email:751957846@qq.com 瞅啥瞅?代码拿过去我看看有没有BUG。 */public abstract class SubscribeHandler { public abstract void onSuccess(String topic,int qos); public abstract void onError(Exception e);}
IServerMqttClient
package com.shining.serversdk.sdkcore;import cn.hutool.json.JSONObject;import com.shining.serversdk.bean.PublishBean;import com.shining.serversdk.handler.PublishHandler;import com.shining.serversdk.handler.SubscribeHandler;import org.eclipse.paho.client.mqttv3.MqttMessage;/** * @author wwhai * 服务器端的SDK */public interface IServerMqttClient { /** * @param topic * @param qos * @return */ boolean subscribe(String topic, int qos, SubscribeHandler subscribeHandler); /** * @param topic * @return */ boolean unSubscribe(String topic); /** * @param topic * @param message * @return */ boolean publish(String topic, MqttMessage message, PublishHandler publishHandler); /** * @param topic * @param message * @return */ boolean publishJson(String topic, JSONObject message, PublishHandler publishHandler); /** * @param topic * @param bytes * @return */ boolean publishHex(String topic, byte[] bytes, PublishHandler publishHandler); /** * publish */ boolean publish(PublishBean publishBean, PublishHandler publishHandler);}
ServerMqttClient
package com.shining.serversdk.sdkcore;import cn.hutool.json.JSONObject;import com.shining.serversdk.bean.PublishBean;import com.shining.serversdk.handler.MessageHandler;import com.shining.serversdk.handler.PublishHandler;import com.shining.serversdk.handler.SubscribeHandler;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public final class ServerMqttClient implements IServerMqttClient { private boolean isConnected; private String host; private int port; private MqttClient client; private String clientId; private String userName; private String passWord; /** * @param host * @param port * @param clientId * @param username * @param password */ public ServerMqttClient(String host, int port, String clientId, String username, String password) { this.host = host; this.port = port; this.userName = username; this.passWord = password; this.clientId = clientId; } public boolean connect(MessageHandler messageHandler) { try { client = new MqttClient("tcp://" + host + ":" + port, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(passWord.toCharArray()); // 设置超时工夫 options.setConnectionTimeout(10); // 设置会话心跳工夫 options.setKeepAliveInterval(20); client.setCallback(messageHandler); client.connect(options); isConnected = true; messageHandler.onConnected(); return true; } catch (Exception e) { e.printStackTrace(); isConnected = false; return false; } } /** * 订阅 * * @param topic * @param qos * @param subscribeHandler * @return */ public boolean subscribe(String topic, int qos, SubscribeHandler subscribeHandler) { try { client.subscribe(topic, qos); subscribeHandler.onSuccess(topic, qos); return true; } catch (MqttException e) { e.printStackTrace(); subscribeHandler.onError(e); return false; } } /** * 勾销订阅 * * @param topic * @return */ public boolean unSubscribe(String topic) { try { client.unsubscribe(topic); return true; } catch (MqttException e) { e.printStackTrace(); return false; } } /** * 公布 * * @param topic * @param message * @return */ public boolean publish(String topic, MqttMessage message, PublishHandler publishHandler) { try { client.publish(topic, message); publishHandler.onSuccess(); return true; } catch (MqttException e) { e.printStackTrace(); publishHandler.onException(e); return false; } } /** * 公布JSON * @param topic * @param message * @param publishHandler * @return */ public boolean publishJson(String topic, JSONObject message, PublishHandler publishHandler) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(message.toJSONString(1).getBytes()); return this.publish(topic, mqttMessage, publishHandler); } /** * 公布16进制 * @param topic * @param bytes * @param publishHandler * @return */ public boolean publishHex(String topic, byte[] bytes, PublishHandler publishHandler) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(bytes); return this.publish(topic, mqttMessage, publishHandler); } /** * 公布音讯 * @param publishBean * @param publishHandler * @return */ public boolean publish(PublishBean publishBean, PublishHandler publishHandler) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(publishBean.getQos()); mqttMessage.setRetained(publishBean.isRetained()); mqttMessage.setPayload(publishBean.getPayload().toString().getBytes()); try { client.publish(publishBean.getTopic(), mqttMessage); publishHandler.onSuccess(); return true; } catch (MqttException e) { e.printStackTrace(); publishHandler.onException(e); return false; } } /** * 是否在线 * @return */ public boolean isConnected() { return isConnected; }}
Main启动类
package com.shining;import cn.hutool.json.JSONObject;import com.shining.serversdk.bean.ImeiBean;import com.shining.serversdk.db.DBUtil;import com.shining.serversdk.handler.MessageHandler;import com.shining.serversdk.handler.PublishHandler;import com.shining.serversdk.handler.SubscribeHandler;import com.shining.serversdk.sdkcore.ServerMqttClient;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Main { public static Logger logger = LoggerFactory.getLogger(Main.class); private static ServerMqttClient serverMqttClient; public static void main(String[] args) { serverMqttClient = new ServerMqttClient("39.102.56.91", 1883, "JAVA-EMQ", "wulongbo", "study"); serverMqttClient.connect(new MessageHandler() { public void messageArrived(JSONObject receivedMessage) { logger.info("音讯{}达到", receivedMessage.toString()); com.alibaba.fastjson.JSONObject jsonObject = com.alibaba.fastjson.JSON.parseObject(receivedMessage.toString()); String code = jsonObject.getString("code"); if (code != null && code.equals("3002")) { String imei = jsonObject.getString("imei"); /** * 上面为手动连贯数据库 */ try { ImeiBean imeiBean = new ImeiBean(); imeiBean.setImei(imei); DBUtil.Insert(imeiBean); } catch (Exception e) { e.printStackTrace(); } } } public void onError(Throwable throwable) { } public void onComplete(IMqttDeliveryToken iMqttDeliveryToken) { } public void onConnected() { } }); /** * 判断是否在线 */ if (serverMqttClient.isConnected()) { /** * 订阅 */ serverMqttClient.subscribe("study/java/emq", 2, new SubscribeHandler() { @Override public void onSuccess(String topic, int qos) { logger.info("主題{}公布胜利!", topic); } @Override public void onError(Exception e) { } }); /** * 公布 Message */ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(new byte[]{70, 85, 67, 75}); serverMqttClient.publish("publish/test/topic", new MqttMessage(), new PublishHandler() { @Override public void onSuccess() { logger.info("主題公布Message胜利!"); } @Override public void onException(Exception e) { } }); /** * 公布JSON */ JSONObject jsonObject = new JSONObject(); jsonObject.put("action", "1"); jsonObject.put("msgid", System.currentTimeMillis()); jsonObject.put("type", "1"); jsonObject.put("price", "1"); serverMqttClient.publishJson("test", jsonObject, new PublishHandler() { @Override public void onSuccess() { logger.info("主題公布JSON胜利!"); //入库 } @Override public void onException(Exception e) { // } }); /** * 公布Hex */ serverMqttClient.publishHex("test", new byte[]{97, 98, 99, 100}, new PublishHandler() { @Override public void onSuccess() { } @Override public void onException(Exception e) { } }); /** * 应用封装好的Pub Bean */// PublishBean publishBean = new PublishBean();// publishBean.setTopic("test");// publishBean.setPayload(jsonObject);// publishBean.setDup(true);// publishBean.setRetained(true);// publishBean.setMutable(true);//// serverMqttClient.publish(publishBean, new PublishHandler() {// @Override// public void onSuccess() {//// }//// @Override// public void onException(Exception e) {//// }// }); } else { System.out.println("连贯失败"); } } public static void send() { for (int i = 0; i < 10; i++) { JSONObject jsonObject = new JSONObject(); jsonObject.put("action", "1"); jsonObject.put("msgid", System.currentTimeMillis()); jsonObject.put("type", "1"); jsonObject.put("price", "1"); send2(jsonObject); } } private static void send2(JSONObject jsonObject) { serverMqttClient.publishJson("test", jsonObject, new PublishHandler() { @Override public void onSuccess() { //入库 } @Override public void onException(Exception e) { // send2 } }); }}