关于springboot:spring-boot使用IoTDB的两种方式

77次阅读

共计 8983 个字符,预计需要花费 23 分钟才能阅读完成。

InfluxDB 和 IotDB 介绍与性能比照

Linux MacBook Docker 装置 IoTDB 及应用

形式一: session 形式拜访 IotDB (举荐应用, 自带连接池)

maven 依赖 iotdb-session

<dependency>
       <groupId>org.apache.iotdb</groupId>
       <artifactId>iotdb-session</artifactId>
       <version>0.11.2</version>
</dependency>
springboot IotDB 配置信息 session 形式
spring:
  iotdb:
    username: root
    password: root
    ip: 192.168.0.5
    port: 6667
    maxSize: 100
    
IotDB-session 配置类

package com.beyond.data.config;

import org.apache.iotdb.session.pool.SessionPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Configuration
public class IotDBSessionConfig {private static final Logger log = LoggerFactory.getLogger(IotDBSessionConfig.class);

    @Value("${spring.iotdb.username:root}")
    private String username;

    @Value("${spring.iotdb.password:root}")
    private String password;

    @Value("${spring.iotdb.ip:127.0.0.1}")
    private String ip;

    @Value("${spring.iotdb.port:6667}")
    private int port;

    @Value("${spring.iotdb.maxSize:10}")
    private int maxSize;

    private static SessionPool sessionPool;

    private SessionPool getSessionPool() {if (sessionPool == null) {sessionPool = new SessionPool(ip, port, username, password, maxSize);
        }

        return sessionPool;
    }

    public void insertRecord(String deviceId, long time, List<String> measurements, List<String> values) {getSessionPool();
        try {log.info("iotdb 数据入库:device_id:[{}], measurements:[{}], values:[{}]", deviceId, measurements, values);
            sessionPool.insertRecord(deviceId, time, measurements, values);
        } catch (Exception e) {log.error("IotDBSession insertRecord 失败: deviceId={}, time={}, measurements={}, values={}, error={}",
                    deviceId, time, measurements, values, e.getMessage());
        }
    }

}

调用 session 形式

@Autowired
private IotDBSessionConfig iotDBSessionConfig;

......

StringBuffer tableName = new StringBuffer();
tableName.append("root").append(".").append("test").append("deviceid");

long currentTime = System.currentTimeMillis();

List<String> iotMeasurements = new ArrayList<>();
iotMeasurements.add("aaa");
iotMeasurements.add("bbb");

List<String> iotValues = new ArrayList<>();
iotValues.add("123");
iotValues.add("abide");

iotDBSessionConfig.insertRecord(tableName.toString(), currentTime, iotMeasurements, iotValues);

形式二: jdbc 形式拜访 IotDB (本人实现连接池)

maven 依赖 iotdb-jdbc

<dependency>
     <groupId>org.apache.iotdb</groupId>
     <artifactId>iotdb-jdbc</artifactId>
     <version>0.11.2</version>
</dependency>

 <!-- alibaba 的 druid 数据库连接池 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
</dependency>
spring boot IotDB 配置信息 jdbc
spring:
  iotdb:
    username: root
    password: root
    driver-name: org.apache.iotdb.jdbc.IoTDBDriver
    url: jdbc:iotdb://192.168.0.5:6667/
    initial-size: 5
    min-idle: 10
    max-active: 50
    max-wait: 60000
    remove-abandoned: true
    remove-abandoned-timeout: 30
    time-between-eviction-runs-millis: 60000
    min-evictable-idle-time-millis: 300000
    test-while-idle: false
    test-on-borrow: false
    test-on-return: false
    
IotDB-jdbc 配置类
package com.beyond.data.config;

import com.alibaba.druid.pool.DruidDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component
@Configuration
public class IotDBConfig {private static final Logger log = LoggerFactory.getLogger(IotDBConfig.class);

    @Value("${spring.iotdb.username}")
    private String username;

    @Value("${spring.iotdb.password}")
    private String password;

    @Value("${spring.iotdb.driver-name}")
    private String driverName;

    @Value("${spring.iotdb.url}")
    private String url;

    @Value("${spring.iotdb.initial-size:20}")
    private int initialSize;

    @Value("${spring.iotdb.min-idle:10}")
    private int minIdle;

    @Value("${spring.iotdb.max-active:500}")
    private int maxActive;

    @Value("${spring.iotdb.max-wait:60000}")
    private int maxWait;

    @Value("${spring.iotdb.remove-abandoned:true}")
    private boolean removeAbandoned;

    @Value("${spring.iotdb.remove-abandoned-timeout:30}")
    private int removeAbandonedTimeout;

    @Value("${spring.iotdb.time-between-eviction-runs-millis:60000}")
    private int timeBetweenEvictionRunsMillis;

    @Value("${spring.iotdb.min-evictable-idle-time-millis:300000}")
    private int minEvictableIdleTimeMillis;

    @Value("${spring.iotdb.test-while-idle:false}")
    private boolean testWhileIdle;

    @Value("${spring.iotdb.test-on-borrow:false}")
    private boolean testOnBorrow;

    @Value("${spring.iotdb.test-on-return:false}")
    private boolean testOnReturn;

    private static DruidDataSource iotDbDataSource;

    // 应用阿里的 druid 连接池
    private Connection getConnection() {if (iotDbDataSource == null) {iotDbDataSource = new DruidDataSource();
            // 设置连贯参数
            iotDbDataSource.setUrl(url);
            iotDbDataSource.setDriverClassName(driverName);
            iotDbDataSource.setUsername(username);
            iotDbDataSource.setPassword(password);
            // 配置初始化大小、最小、最大
            iotDbDataSource.setInitialSize(initialSize);
            iotDbDataSource.setMinIdle(minIdle);
            iotDbDataSource.setMaxActive(maxActive);
            // 配置获取连贯期待超时的工夫
            iotDbDataSource.setMaxWait(maxWait);
            // 连贯透露监测
            iotDbDataSource.setRemoveAbandoned(removeAbandoned);
            iotDbDataSource.setRemoveAbandonedTimeout(removeAbandonedTimeout);
            // 配置距离多久才进行一次检测,检测须要敞开的闲暇连贯,单位是毫秒
            iotDbDataSource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
            iotDbDataSource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
            // 避免过期
            iotDbDataSource.setTestWhileIdle(testWhileIdle);
            iotDbDataSource.setTestOnBorrow(testOnBorrow);
            iotDbDataSource.setTestOnReturn(testOnReturn);
        }

        Connection connection = null;
        try {connection = iotDbDataSource.getConnection();
        } catch (SQLException e) {e.printStackTrace();
            log.error("iotDB getConnection 失败: error={}", e.getMessage());
        }
        return connection;
    }

    public void insert(String sql) {log.info("iotDB insert sql={}", sql);
        Connection connection = getConnection();
        Statement statement = null;
        try {if(connection!=null){statement = connection.createStatement();
                long systemTime = System.currentTimeMillis();
                statement.execute(sql);
                log.info("执行 IotDb 的 sql----[{}], 工夫:[{}]ms", sql, System.currentTimeMillis()-systemTime);
            }
        } catch (SQLException e) {log.error("iotDB insert 失败: error={}", e.getMessage());
        } finally {close(statement, connection);
        }
    }

    public List<Map<String, Object>> query(String sql) {Connection connection = getConnection();
        Statement statement = null;
        List<Map<String, Object>> resultList = null;
        ResultSet resultSet = null;
        try {if(connection!=null){statement = connection.createStatement();
                long systemTime = System.currentTimeMillis();
                resultSet = statement.executeQuery(sql);
                log.info("查问 IotDb 的 sql----[{}], 工夫:[{}]ms", sql,System.currentTimeMillis()-systemTime);
                resultList = outputResult(resultSet);
            }
        } catch (SQLException e) {e.printStackTrace();
            log.error("iotDB query 失败: error={}", e.getMessage());
        } finally {
            try {if (resultSet != null) {resultSet.close();
                }
            } catch (SQLException e) {log.error("iotDB resultSet 敞开异样: error={}", e.getMessage());
            }
            close(statement, connection);
        }
        return resultList;
    }

    private List<Map<String, Object>> outputResult(ResultSet resultSet) throws SQLException {List<Map<String, Object>> resultList = new ArrayList<>();
        if (resultSet != null) {ResultSetMetaData metaData = resultSet.getMetaData();
            int columnCount = metaData.getColumnCount();
            while (resultSet.next()) {Map resultMap = new HashMap<>();
                for (int i = 1; i <= columnCount; i++) {String colunmName = metaData.getColumnLabel(i);
                    if (colunmName.indexOf('.')>=0) {colunmName = colunmName.substring(colunmName.lastIndexOf('.') + 1);
                    }
                    if (colunmName.indexOf(')')>=0){// 过滤 函数 () 括号
                        colunmName = colunmName.substring(0, colunmName.lastIndexOf(')'));
                    }
                    if (colunmName.equals("Time")){// 时序库自带的工夫格局转换
                        Long timeStamp = Long.parseLong(resultSet.getString(i));
                        if(timeStamp > 0) {Date d = new Date(timeStamp);
                            SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                            resultMap.put("Time", sf.format(d));
                        }
                    } else {resultMap.put(colunmName, resultSet.getString(i));
                    }
                }
                resultList.add(resultMap);
            }
        }

        return resultList;
    }



    private void close(Statement statement, Connection connection) {
        try {if (statement != null) {statement.close();
            }
            if (connection != null) {connection.close();
            }
        } catch (Exception e) {//            e.printStackTrace();
            log.error("iotDB close 失败: error={}", e.getMessage());
        }
    }

}

调用 jdbc 形式

@Autowired
private IotDBConfig iotDBConfig;

......
StringBuffer tableName = new StringBuffer();
tableName.append("root").append(".").append("test").append("deviceid");

long currentTime = System.currentTimeMillis();

List<String> iotMeasurements = new ArrayList<>();
iotMeasurements.add("aaa");
iotMeasurements.add("bbb");

List<String> iotValues = new ArrayList<>();
iotValues.add("123");
iotValues.add("abde");

StringBuffer sql = new StringBuffer();
sql.append("insert into").append(tableName.toString());
sql.append("(timestamp,");
sql.append(String.join( ",", iotMeasurements)).append(")");
sql.append("values(").append(currentTime).append(",");
sql.append(String.join(",", iotValues)).append(")");
iotDBConfig.insert(sql.toString());

// 查问
StringBuffer querySql = new StringBuffer();
querySql.append("select").append("aaa");
querySql.append("from").append(tableName.toString());
querySql.append("where").append("bbb").append("='");
querySql.append("abde").append("'");
querySql.append("order by time desc limit 1");
log.info("sql----{}",  querySql);
List<Map<String, Object>> resultList = iotDBConfig.query(querySql.toString());

正文完
 0