pom 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.shining</groupId>
<artifactId>shinelinker-server-sdk</artifactId>
<version>final</version>
<dependencies> <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.1</version>
</dependency>
<dependency> <groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.5.9</version>
</dependency>
<dependency> <groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.23</version>
</dependency>
<!-- 引入阿里数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.1</version>
</dependency>
<!-- mysql 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency> </dependencies></project>
ImeiBean
package com.shining.serversdk.bean;
/**
* @Author wulongbo
* @Date 2021/2/20 10:05
* @Version 1.0
*/
public class ImeiBean {
private String imei;
public String getImei() {return imei;}
public void setImei(String imei) {this.imei = imei;}
}
PublishBean
package com.shining.serversdk.bean;
import cn.hutool.json.JSONObject;
/**
* 公布的音讯封装
*/
public class PublishBean {
private String topic;
private boolean mutable = true;
private JSONObject payload;
private int qos = 2;
private boolean retained = false;
private boolean dup = false;
public boolean isMutable() {return mutable;}
public void setMutable(boolean mutable) {this.mutable = mutable;}
public JSONObject getPayload() {return payload;}
public void setPayload(JSONObject payload) {this.payload = payload;}
public int getQos() {return qos;}
public void setQos(int qos) {this.qos = qos;}
public boolean isRetained() {return retained;}
public void setRetained(boolean retained) {this.retained = retained;}
public boolean isDup() {return dup;}
public void setDup(boolean dup) {this.dup = dup;}
public String getTopic() {return topic;}
public void setTopic(String topic) {this.topic = topic;}
}
DBConn
package com.shining.serversdk.db;
import java.sql.*;
/**
* @Author wulongbo
* @Date 2021/2/20 10:03
* @Version 1.0
*/public class DBConn {
private static final String JDBCDriver = "com.mysql.cj.jdbc.Driver"; //mysql 驱动
private static final String url = "jdbc:mysql://39.102.56.91:3306/mybatis_plus";
private static final String username = "root"; // 数据库用户名
private static final String password = "babaAdmin"; // 数据库明码
private static final Connection conn = null;
/**
* 连贯数据库
* @return
*/
public static Connection conn() {
Connection conn = null;
try {Class.forName(JDBCDriver); // 加载数据库驱动
try {conn = DriverManager.getConnection(url, username, password); // 连贯数据库
} catch (SQLException e) {e.printStackTrace();
}
} catch (ClassNotFoundException e) {e.printStackTrace();
}
return conn;
}
/**
* 敞开数据库链接
* @return
*/
public static void close() {if(conn != null) {
try {conn.close(); // 敞开数据库链接
} catch (SQLException e) {e.printStackTrace();
}
}
}
}
DBUtil
package com.shining.serversdk.db;
import com.shining.serversdk.bean.ImeiBean;
import java.sql.*;
/**
* @Author wulongbo
* @Date 2021/2/20 10:04
* @Version 1.0
*/public class DBUtil {
private static Connection conn = null;
private static PreparedStatement ps = null;
private static ResultSet rs = null;
private static final CallableStatement cs = null;
/**
* Insert 办法封装
*
* @param imeiBean 传入参数
*/
public static void Insert(ImeiBean imeiBean) {conn = DBConn.conn(); // 调用 DBconnection 类的 conn() 办法连贯数据库
String sql = "INSERT INTO student01 (sno,sname,dname,ssex,cno,mark,type) VALUES(?,?,?,?,?,?,?)"; // 插入 sql 语句
try {ps = conn.prepareStatement(sql);
/**
* 调用实体 imeiBean 类, 获取须要插入的各个字段的值
* 留神参数占位的地位
* 通过 set 办法设置参数的地位
* 通过 get 办法取参数的值
*/
ps.setString(1, imeiBean.getImei());
ps.executeUpdate(); // 执行 sql 语句
System.out.println("插入胜利(*~︶~)");
} catch (SQLException e) {e.printStackTrace();
} finally {DBConn.close();
}
}
}
MessageHandler
package com.shining.serversdk.handler;
import cn.hutool.json.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 音讯处理器
*/
public abstract class MessageHandler implements MqttCallback {
/**
* 异样解决
*
* @param throwable
*/
public void connectionLost(Throwable throwable) {throwable.printStackTrace();
onError(throwable);
}
/**
* @param topic mqttTopic
* @param mqttMessage Message
* @throws Exception
*/ public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", topic);
jsonObject.put("payload", new String(mqttMessage.getPayload(), "UTF-8"));
jsonObject.put("retain", mqttMessage.isRetained());
jsonObject.put("dup", mqttMessage.isDuplicate());
jsonObject.put("messageId", mqttMessage.getId());
messageArrived(jsonObject);
}
/**
* 后续步骤
*
* @param iMqttDeliveryToken MessageID 治理类
*/
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {onComplete(iMqttDeliveryToken);
}
/**
* 音讯达到封装 JSON
*/ public abstract void messageArrived(JSONObject receivedMessage);
/**
* 出错解决
*/
public abstract void onError(Throwable throwable);
/**
* */ public abstract void onComplete(IMqttDeliveryToken iMqttDeliveryToken);
/**
* */ public abstract void onConnected();}
PublishHandler
package com.shining.serversdk.handler;
public abstract class PublishHandler {public abstract void onSuccess();
public abstract void onException(Exception e);
}
SubscribeHandler
package com.shining.serversdk.handler;
/**
* 订阅音讯处理器
* @author wwhai
* @date 2019/7/31 20:21
* @email:751957846@qq.com 瞅啥瞅?代码拿过去我看看有没有 BUG。*/
public abstract class SubscribeHandler {public abstract void onSuccess(String topic,int qos);
public abstract void onError(Exception e);
}
IServerMqttClient
package com.shining.serversdk.sdkcore;
import cn.hutool.json.JSONObject;
import com.shining.serversdk.bean.PublishBean;
import com.shining.serversdk.handler.PublishHandler;
import com.shining.serversdk.handler.SubscribeHandler;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* @author wwhai
* 服务器端的 SDK
*/public interface IServerMqttClient {
/**
* @param topic
* @param qos
* @return
*/
boolean subscribe(String topic, int qos, SubscribeHandler subscribeHandler);
/**
* @param topic
* @return
*/
boolean unSubscribe(String topic);
/**
* @param topic
* @param message
* @return
*/
boolean publish(String topic, MqttMessage message, PublishHandler publishHandler);
/**
* @param topic
* @param message
* @return
*/
boolean publishJson(String topic, JSONObject message, PublishHandler publishHandler);
/**
* @param topic
* @param bytes
* @return
*/
boolean publishHex(String topic, byte[] bytes, PublishHandler publishHandler);
/**
* publish */ boolean publish(PublishBean publishBean, PublishHandler publishHandler);
}
ServerMqttClient
package com.shining.serversdk.sdkcore;
import cn.hutool.json.JSONObject;
import com.shining.serversdk.bean.PublishBean;
import com.shining.serversdk.handler.MessageHandler;
import com.shining.serversdk.handler.PublishHandler;
import com.shining.serversdk.handler.SubscribeHandler;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public final class ServerMqttClient implements IServerMqttClient {
private boolean isConnected;
private String host;
private int port;
private MqttClient client;
private String clientId;
private String userName;
private String passWord;
/**
* @param host
* @param port
* @param clientId
* @param username
* @param password
*/
public ServerMqttClient(String host, int port, String clientId, String username, String password) {
this.host = host;
this.port = port;
this.userName = username;
this.passWord = password;
this.clientId = clientId;
}
public boolean connect(MessageHandler messageHandler) {
try {client = new MqttClient("tcp://" + host + ":" + port, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 设置超时工夫
options.setConnectionTimeout(10);
// 设置会话心跳工夫
options.setKeepAliveInterval(20);
client.setCallback(messageHandler);
client.connect(options);
isConnected = true;
messageHandler.onConnected();
return true;
} catch (Exception e) {e.printStackTrace();
isConnected = false;
return false; }
}
/**
* 订阅
*
* @param topic
* @param qos
* @param subscribeHandler
* @return
*/
public boolean subscribe(String topic, int qos, SubscribeHandler subscribeHandler) {
try {client.subscribe(topic, qos);
subscribeHandler.onSuccess(topic, qos);
return true; } catch (MqttException e) {e.printStackTrace();
subscribeHandler.onError(e);
return false; }
}
/**
* 勾销订阅
*
* @param topic
* @return
*/
public boolean unSubscribe(String topic) {
try {client.unsubscribe(topic);
return true; } catch (MqttException e) {e.printStackTrace();
return false; }
}
/**
* 公布
*
* @param topic
* @param message
* @return
*/
public boolean publish(String topic, MqttMessage message, PublishHandler publishHandler) {
try {client.publish(topic, message);
publishHandler.onSuccess();
return true; } catch (MqttException e) {e.printStackTrace();
publishHandler.onException(e);
return false; }
}
/**
* 公布 JSON
* @param topic
* @param message
* @param publishHandler
* @return
*/
public boolean publishJson(String topic, JSONObject message, PublishHandler publishHandler) {MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.toJSONString(1).getBytes());
return this.publish(topic, mqttMessage, publishHandler);
}
/**
* 公布 16 进制
* @param topic
* @param bytes
* @param publishHandler
* @return
*/
public boolean publishHex(String topic, byte[] bytes, PublishHandler publishHandler) {MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(bytes);
return this.publish(topic, mqttMessage, publishHandler);
}
/**
* 公布音讯
* @param publishBean
* @param publishHandler
* @return
*/
public boolean publish(PublishBean publishBean, PublishHandler publishHandler) {MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(publishBean.getQos());
mqttMessage.setRetained(publishBean.isRetained());
mqttMessage.setPayload(publishBean.getPayload().toString().getBytes());
try {client.publish(publishBean.getTopic(), mqttMessage);
publishHandler.onSuccess();
return true; } catch (MqttException e) {e.printStackTrace();
publishHandler.onException(e);
return false; }
}
/**
* 是否在线
* @return
*/
public boolean isConnected() {return isConnected;}
}
Main 启动类
package com.shining;
import cn.hutool.json.JSONObject;
import com.shining.serversdk.bean.ImeiBean;
import com.shining.serversdk.db.DBUtil;
import com.shining.serversdk.handler.MessageHandler;
import com.shining.serversdk.handler.PublishHandler;
import com.shining.serversdk.handler.SubscribeHandler;
import com.shining.serversdk.sdkcore.ServerMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Main {public static Logger logger = LoggerFactory.getLogger(Main.class);
private static ServerMqttClient serverMqttClient;
public static void main(String[] args) {
serverMqttClient = new ServerMqttClient("39.102.56.91",
1883,
"JAVA-EMQ",
"wulongbo",
"study");
serverMqttClient.connect(new MessageHandler() {public void messageArrived(JSONObject receivedMessage) {logger.info("音讯 {} 达到", receivedMessage.toString());
com.alibaba.fastjson.JSONObject jsonObject = com.alibaba.fastjson.JSON.parseObject(receivedMessage.toString());
String code = jsonObject.getString("code");
if (code != null && code.equals("3002")) {String imei = jsonObject.getString("imei");
/**
* 上面为手动连贯数据库
*/
try {ImeiBean imeiBean = new ImeiBean();
imeiBean.setImei(imei);
DBUtil.Insert(imeiBean);
} catch (Exception e) {e.printStackTrace();
}
}
}
public void onError(Throwable throwable) { }
public void onComplete(IMqttDeliveryToken iMqttDeliveryToken) { }
public void onConnected() {}
});
/**
* 判断是否在线
*/
if (serverMqttClient.isConnected()) {
/**
* 订阅
*/
serverMqttClient.subscribe("study/java/emq", 2, new SubscribeHandler() {
@Override
public void onSuccess(String topic, int qos) {logger.info("主題 {} 公布胜利!", topic);
}
@Override
public void onError(Exception e) {}});
/**
* 公布 Message
*/ MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(new byte[]{70, 85, 67, 75});
serverMqttClient.publish("publish/test/topic", new MqttMessage(), new PublishHandler() {
@Override
public void onSuccess() {logger.info("主題公布 Message 胜利!");
}
@Override
public void onException(Exception e) {}});
/**
* 公布 JSON
*/
JSONObject jsonObject = new JSONObject();
jsonObject.put("action", "1");
jsonObject.put("msgid", System.currentTimeMillis());
jsonObject.put("type", "1");
jsonObject.put("price", "1");
serverMqttClient.publishJson("test", jsonObject, new PublishHandler() {
@Override
public void onSuccess() {logger.info("主題公布 JSON 胜利!");
// 入库
}
@Override
public void onException(Exception e) {//}
});
/**
* 公布 Hex
*/ serverMqttClient.publishHex("test", new byte[]{97, 98, 99, 100}, new PublishHandler() {
@Override
public void onSuccess() {}
@Override
public void onException(Exception e) {}});
/**
* 应用封装好的 Pub Bean
*/// PublishBean publishBean = new PublishBean();
// publishBean.setTopic("test");
// publishBean.setPayload(jsonObject);
// publishBean.setDup(true);
// publishBean.setRetained(true);
// publishBean.setMutable(true);
//
// serverMqttClient.publish(publishBean, new PublishHandler() {
// @Override
// public void onSuccess() {
//
// }
//
// @Override
// public void onException(Exception e) {
//
// }
// });
} else {System.out.println("连贯失败");
}
}
public static void send() {for (int i = 0; i < 10; i++) {JSONObject jsonObject = new JSONObject();
jsonObject.put("action", "1");
jsonObject.put("msgid", System.currentTimeMillis());
jsonObject.put("type", "1");
jsonObject.put("price", "1");
send2(jsonObject);
}
}
private static void send2(JSONObject jsonObject) {serverMqttClient.publishJson("test", jsonObject, new PublishHandler() {
@Override
public void onSuccess() {// 入库}
@Override
public void onException(Exception e) {// send2}
});
}
}