InfluxDB和IotDB介绍与性能比照
Centos MacBook Docker离线装置InfluxDB超级简略
maven依赖InfluxDB
<dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.14</version></dependency>
InfluxDB配置
spring: influx: url: http://192.168.1.5:8086 user: admin password: abcd_2021 database: demo
InfluxDB配置类
package com.beyond.data.config;import org.influxdb.InfluxDB;import org.influxdb.InfluxDBFactory;import org.influxdb.dto.BatchPoints;import org.influxdb.dto.Point;import org.influxdb.dto.Query;import org.influxdb.dto.QueryResult;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Configuration;import org.springframework.stereotype.Component;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.TimeUnit;@Component@Configurationpublic class InfluxDBConfig { @Value("${spring.influx.user}") private String userName; @Value("${spring.influx.password}") private String password; @Value("${spring.influx.url}") private String url; //数据库 @Value("${spring.influx.database}") private String database; //保留策略 private String retentionPolicy; private InfluxDB influxDB; public InfluxDBConfig(){} public InfluxDBConfig(String userName, String password, String url, String database) { this.userName = userName; this.password = password; this.url = url; this.database = database; // autogen默认的数据保留策略 this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy; this.influxDB = influxDbBuild(); } /** * 设置数据保留策略 defalut 策略名 /database 数据库名/ 30d 数据保留时限30天/ 1 正本个数为1/ 结尾DEFAULT * 示意 设为默认的策略 */ public void createRetentionPolicy() { String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT", "defalut", database, "30d", 1); this.query(command); } /** * 连贯时序数据库;取得InfluxDB **/ private InfluxDB influxDbBuild() { if (influxDB == null) { influxDB = InfluxDBFactory.connect(url, userName, password); influxDB.setDatabase(database); } return influxDB; } /** * 插入 * @param measurement 表 * @param tags 标签 * @param fields 字段 */ public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) { influxDbBuild(); Point.Builder builder = Point.measurement(measurement); builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS); builder.tag(tags); builder.fields(fields); influxDB.write(database, "", builder.build()); } /** * @desc 插入,带工夫time * @date 2021/3/27 *@param measurement *@param time *@param tags *@param fields * @return void */ public void insert(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) { influxDbBuild(); Point.Builder builder = Point.measurement(measurement); builder.time(time, TimeUnit.MILLISECONDS); builder.tag(tags); builder.fields(fields); influxDB.write(database, "", builder.build()); } /** * @desc influxDB开启UDP性能,默认端口:8089,默认数据库:udp,没提供代码传数据库性能接口 * @date 2021/3/13 *@param measurement *@param time *@param tags *@param fields * @return void */ public void insertUDP(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) { influxDbBuild(); Point.Builder builder = Point.measurement(measurement); builder.time(time, TimeUnit.MILLISECONDS); builder.tag(tags); builder.fields(fields); int udpPort = 8089; influxDB.write(udpPort, builder.build()); } /** * 查问 * @param command 查问语句 * @return */ public QueryResult query(String command) { influxDbBuild(); return influxDB.query(new Query(command, database)); } /** * @desc 查问后果解决 * @date 2021/5/12 *@param queryResult */ public List<Map<String, Object>> queryResultProcess(QueryResult queryResult) { List<Map<String, Object>> mapList = new ArrayList<>(); List<QueryResult.Result> resultList = queryResult.getResults(); //把查问出的后果集转换成对应的实体对象,聚合成list for(QueryResult.Result query : resultList){ List<QueryResult.Series> seriesList = query.getSeries(); if(seriesList != null && seriesList.size() != 0) { for(QueryResult.Series series : seriesList){ List<String> columns = series.getColumns(); String[] keys = columns.toArray(new String[columns.size()]); List<List<Object>> values = series.getValues(); if(values != null && values.size() != 0) { for(List<Object> value : values){ Map<String, Object> map = new HashMap(keys.length); for (int i = 0; i < keys.length; i++) { map.put(keys[i], value.get(i)); } mapList.add(map); } } } } } return mapList; } /** * @desc InfluxDB 查问 count总条数 * @date 2021/4/8 */ public long countResultProcess(QueryResult queryResult) { long count = 0; List<Map<String, Object>> list = queryResultProcess(queryResult); if(list != null && list.size() != 0) { Map<String, Object> map = list.get(0); double num = (Double)map.get("count"); count = new Double(num).longValue(); } return count; } /** * 查问 * @param dbName 创立数据库 * @return */ public void createDB(String dbName) { influxDbBuild(); influxDB.createDatabase(dbName); } /** * 批量写入测点 * * @param batchPoints */ public void batchInsert(BatchPoints batchPoints) { influxDbBuild(); influxDB.write(batchPoints); } /** * 批量写入数据 * * @param database * 数据库 * @param retentionPolicy * 保留策略 * @param consistency * 一致性 * @param records * 要保留的数据(调用BatchPoints.lineProtocol()可失去一条record) */ public void batchInsert(final String database, final String retentionPolicy, final InfluxDB.ConsistencyLevel consistency, final List<String> records) { influxDbBuild(); influxDB.write(database, retentionPolicy, consistency, records); } /** * @desc 批量写入数据 * @date 2021/3/19 *@param consistency *@param records */ public void batchInsert(final InfluxDB.ConsistencyLevel consistency, final List<String> records) { influxDbBuild(); influxDB.write(database, "", consistency, records); } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getDatabase() { return database; } public void setDatabase(String database) { this.database = database; } public String getRetentionPolicy() { return retentionPolicy; } public void setRetentionPolicy(String retentionPolicy) { this.retentionPolicy = retentionPolicy; } public InfluxDB getInfluxDB() { return influxDB; } public void setInfluxDB(InfluxDB influxDB) { this.influxDB = influxDB; }}
调用
@Autowiredprivate InfluxDBConfig influxDBConfig;......influxDBConfig.insert(measurement, tags, fields);