InfluxDB和IotDB介绍与性能比照

Centos MacBook Docker离线装置InfluxDB超级简略

maven依赖InfluxDB

<dependency>  <groupId>org.influxdb</groupId>  <artifactId>influxdb-java</artifactId>  <version>2.14</version></dependency>

InfluxDB配置

spring:  influx:    url: http://192.168.1.5:8086    user: admin    password: abcd_2021    database: demo

InfluxDB配置类

package com.beyond.data.config;import org.influxdb.InfluxDB;import org.influxdb.InfluxDBFactory;import org.influxdb.dto.BatchPoints;import org.influxdb.dto.Point;import org.influxdb.dto.Query;import org.influxdb.dto.QueryResult;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Configuration;import org.springframework.stereotype.Component;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.TimeUnit;@Component@Configurationpublic class InfluxDBConfig {    @Value("${spring.influx.user}")    private String userName;    @Value("${spring.influx.password}")    private String password;    @Value("${spring.influx.url}")    private String url;    //数据库    @Value("${spring.influx.database}")    private String database;    //保留策略    private String retentionPolicy;    private InfluxDB influxDB;    public InfluxDBConfig(){}    public InfluxDBConfig(String userName, String password, String url, String database) {        this.userName = userName;        this.password = password;        this.url = url;        this.database = database;        // autogen默认的数据保留策略        this.retentionPolicy = retentionPolicy == null || "".equals(retentionPolicy) ? "autogen" : retentionPolicy;        this.influxDB = influxDbBuild();    }    /**     * 设置数据保留策略 defalut 策略名 /database 数据库名/ 30d 数据保留时限30天/ 1 正本个数为1/ 结尾DEFAULT     * 示意 设为默认的策略     */    public void createRetentionPolicy() {        String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT",                "defalut", database, "30d", 1);        this.query(command);    }    /**     * 连贯时序数据库;取得InfluxDB     **/    private InfluxDB influxDbBuild() {        if (influxDB == null) {            influxDB = InfluxDBFactory.connect(url, userName, password);            influxDB.setDatabase(database);        }        return influxDB;    }    /**     * 插入     * @param measurement 表     * @param tags        标签     * @param fields      字段     */    public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields) {        influxDbBuild();        Point.Builder builder = Point.measurement(measurement);        builder.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS);        builder.tag(tags);        builder.fields(fields);        influxDB.write(database, "", builder.build());    }    /**     * @desc 插入,带工夫time     * @date 2021/3/27      *@param measurement     *@param time     *@param tags     *@param fields     * @return void     */    public void insert(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {        influxDbBuild();        Point.Builder builder = Point.measurement(measurement);        builder.time(time, TimeUnit.MILLISECONDS);        builder.tag(tags);        builder.fields(fields);        influxDB.write(database, "", builder.build());    }    /**     * @desc influxDB开启UDP性能,默认端口:8089,默认数据库:udp,没提供代码传数据库性能接口     * @date 2021/3/13      *@param measurement     *@param time     *@param tags     *@param fields     * @return void     */    public void insertUDP(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {        influxDbBuild();        Point.Builder builder = Point.measurement(measurement);        builder.time(time, TimeUnit.MILLISECONDS);        builder.tag(tags);        builder.fields(fields);        int udpPort = 8089;        influxDB.write(udpPort,  builder.build());    }    /**     * 查问     * @param command 查问语句     * @return     */    public QueryResult query(String command) {        influxDbBuild();        return influxDB.query(new Query(command, database));    }    /**     * @desc 查问后果解决     * @date 2021/5/12      *@param queryResult     */    public List<Map<String, Object>> queryResultProcess(QueryResult queryResult) {        List<Map<String, Object>> mapList = new ArrayList<>();        List<QueryResult.Result> resultList =  queryResult.getResults();        //把查问出的后果集转换成对应的实体对象,聚合成list        for(QueryResult.Result query : resultList){            List<QueryResult.Series> seriesList = query.getSeries();            if(seriesList != null && seriesList.size() != 0) {                for(QueryResult.Series series : seriesList){                    List<String> columns = series.getColumns();                    String[] keys =  columns.toArray(new String[columns.size()]);                    List<List<Object>> values = series.getValues();                    if(values != null && values.size() != 0) {                        for(List<Object> value : values){                            Map<String, Object> map = new HashMap(keys.length);                            for (int i = 0; i < keys.length; i++) {                                map.put(keys[i], value.get(i));                            }                            mapList.add(map);                        }                    }                }            }        }        return mapList;    }    /**     * @desc InfluxDB 查问 count总条数     * @date 2021/4/8     */    public long countResultProcess(QueryResult queryResult) {        long count = 0;        List<Map<String, Object>> list = queryResultProcess(queryResult);        if(list != null && list.size() != 0) {            Map<String, Object> map = list.get(0);            double num = (Double)map.get("count");            count = new Double(num).longValue();        }        return count;    }    /**     * 查问     * @param dbName 创立数据库     * @return     */    public void createDB(String dbName) {        influxDbBuild();        influxDB.createDatabase(dbName);    }    /**     * 批量写入测点     *     * @param batchPoints     */    public void batchInsert(BatchPoints batchPoints) {        influxDbBuild();        influxDB.write(batchPoints);    }    /**     * 批量写入数据     *     * @param database     *            数据库     * @param retentionPolicy     *            保留策略     * @param consistency     *            一致性     * @param records     *            要保留的数据(调用BatchPoints.lineProtocol()可失去一条record)     */    public void batchInsert(final String database, final String retentionPolicy,                            final InfluxDB.ConsistencyLevel consistency, final List<String> records) {        influxDbBuild();        influxDB.write(database, retentionPolicy, consistency, records);    }    /**     * @desc 批量写入数据     * @date 2021/3/19      *@param consistency     *@param records     */    public void batchInsert(final InfluxDB.ConsistencyLevel consistency, final List<String> records) {        influxDbBuild();        influxDB.write(database, "", consistency, records);    }    public String getUserName() {        return userName;    }    public void setUserName(String userName) {        this.userName = userName;    }    public String getPassword() {        return password;    }    public void setPassword(String password) {        this.password = password;    }    public String getUrl() {        return url;    }    public void setUrl(String url) {        this.url = url;    }    public String getDatabase() {        return database;    }    public void setDatabase(String database) {        this.database = database;    }    public String getRetentionPolicy() {        return retentionPolicy;    }    public void setRetentionPolicy(String retentionPolicy) {        this.retentionPolicy = retentionPolicy;    }    public InfluxDB getInfluxDB() {        return influxDB;    }    public void setInfluxDB(InfluxDB influxDB) {        this.influxDB = influxDB;    }}

调用

@Autowiredprivate InfluxDBConfig influxDBConfig;......influxDBConfig.insert(measurement, tags, fields);