【作者介绍】:大罗,黑格智造架构师,次要从事云原生,大数据系统开发,曾参加国家示范级工业互联网零碎建设等。

做工业互联网或物联网零碎,最根本的需要是展现数据曲线,比方功率曲线,相似于股票的分时图,通常咱们会取每分钟内该设施上报的最初一次功率值为这一分钟的功率,如果某一分钟内,设施没有上报,则取上一分钟的功率值,以此类推。举例如下:

失去的分钟曲线:

通常咱们会把设施上报的数据先写入Apache Kafka。如果是离线计算场景,可能会思考把数据写入Hive,而后应用Spark SQL定时读取Hive,再把计算结果写入HBase;如果是实时计算场景,则会应用Apache Flink生产Kafka数据,把后果写入HBase,这种状况下还须要思考数据乱序和提早投递计算等问题。

而且,基于传统大数据Hadoop的架构,须要搭建ZooKeeper和HDFS,而后才是Hive和HBase,整个体系保护老本很高。此外,HBase基于键值存储时序数据,会节约很多空间在同一键值的数据设计架构下面。

以上所举,是物联网设施属性曲线计算场景的其中一个痛点,另外还须要思考数据增长、数据核查以及数据容灾等特点。

笔者所在的公司,要基于3D打印技术给客户提供整体化解决方案,天然须要对设施的运行状态做继续追踪,须要存储设备的运行数据。这时候咱们找到了开源的物联网大数据平台TDengine(https://github.com/taosdata/TDengine)。

参考TDengine的文档中SQL的写法,在数据齐全的状况下,能够轻松地用一句SQL解决下面的问题:

select last(val) a from super_table_xx where ts >= '2021-06-07 18:10:00' and ts <= '2021-06-07 18:20:00' interval(60s) fill(value, 0);

为什么相似的SQL,TDengine的执行效率能够如此之高呢?

这就在于它的超级表以及子表,针对单个设施的数据,TDengine设计了依照工夫间断存储的个性。而事实上,业务零碎在应用物联网数据的时候,无论是即时查问还是离线剖析,存在读取单个设施的一个间断时间段数据的特点。

假如,咱们要存储设备的温度与湿度,咱们能够设计超级表如下:

create stable if not exists s_device (ts TIMESTAMP,  temperature double,  humidity double) TAGS (device_sn BINARY(1000));

理论应用中,例如针对设施’d1’和’d2’的数据执行插入的SQL如下:

insert into s_device_d1 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d1') values (1623157875000, 35.34, 80.24);insert into s_device_d2 (ts, temperature, humidity) USING s_device (device_sn) TAGS ('d2') values (1623157891000, 29.63, 79.48);

搜寻设施’d1’某个时间段的数据,其SQL如下:

select * from s_device where device_sn = 'd1' and ts > 1623157871000 and ts < 1623157890000 ;

假如统计过来7天的平均温度曲线,每小时1个点:

select avg(temperature) temperature from s_device where  device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime}  interval(1h)

TDengine还提供了很多聚合函数,相似下面的计算1分钟间断曲线的last和fill,以及其余罕用的sum和max等。

在和应用程序联合的过程中,咱们选用MyBatis这种灵便易上手的ORM框架,例如,针对下面的数据表’s_device’,咱们先定义entity :

import com.baomidou.mybatisplus.annotation.TableField;import com.baomidou.mybatisplus.annotation.TableName;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;import java.sql.Timestamp;/** * @author: DaLuo * @date: 2021/06/25 * @description: */@Data@AllArgsConstructor@NoArgsConstructor@Builder@TableName(value = "s_device")public class TestSuperDeviceEntity {    private Timestamp ts;    private Float temperature;    private Float humidity;    @TableField(value = "device_sn")    private String device_sn ;}

再定义 mapper:

import com.baomidou.mybatisplus.core.mapper.BaseMapper;import com.hg.device.kafka.tdengine.entity.TestSuperDeviceEntity;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.ibatis.annotations.Insert;import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Param;import org.apache.ibatis.annotations.Select;import java.sql.Timestamp;import java.util.List;/** * @author: DaLuo * @date: 2021/06/25 * @description: */@Mapperpublic interface TestSuperDeviceMapper extends BaseMapper<TestSuperDeviceEntity> {    /**     * 单个插入     * @param entity     * @return     */    @Insert({            "INSERT INTO 's_device_${entity.deviceSn}' (ts ,temperature, humidity ) ",            "USING s_device (device_sn) TAGS (#{entity.deviceSn}) ",            "VALUES (#{entity.ts}, #{entity.temperature}, #{entity.humidity})"    })    int insertOne(@Param(value = "entity") TestSuperDeviceEntity entity);    /**     * 批量插入     * @param entities     * @return     */    @Insert({            "<script>",            "INSERT INTO ",            "<foreach collection='list' item='item' separator=' '>",            "'s_device_${item.deviceSn}' (ts ,temperature, humidity) USING s_device (device_sn) TAGS (#{item.deviceSn}) ",            "VALUES (#{item.ts}, #{item.temperature}, #{item.humidity})",            "</foreach>",            "</script>"    })    int batchInsert(@Param("list") List<TestSuperDeviceEntity> entities);    /**     *  查问过来一段时间范畴的平均温度,每小时1个数据点     * @param deviceSn     * @param startTime inclusive     * @param endTime   exclusive     * @return     */    @Select("select avg(temperature) temperature from s_device where  device_sn = #{deviceSn} and ts >= #{startTime} and ts < #{endTime}  interval(1h)")    List<TempSevenDaysTemperature> selectSevenDaysTemperature(            @Param(value = "deviceSn") String deviceSn,            @Param(value = "startTime") long startTime,            @Param(value = "endTime") long endTime);    @AllArgsConstructor    @NoArgsConstructor    @Data    @Builder    class TempSevenDaysTemperature {        private Timestamp ts;        private float temperature;    }}

TDengine有一个很奇妙的设计,就是不必事后创立子表,所以咱们能够很不便地利用’tag’标签作为子表名称的一部分,即时插入数据同时创立子表。

留神:思考到跨时区的国际化个性,咱们所有的工夫存储查问交互,都是应用的工夫戳,而非”yyyy-mm-dd hh:MM:ss”格局,因为数据存储波及到应用程序时区,连贯字符串时区,TDengine服务时区,应用”yyyy-mm-dd hh:MM:ss”格局容易导致工夫存储的不准确性,而应用工夫戳,长整型的数据格式则能够完满地防止此类问题。

Java应用TDengine JDBC-driver目前有两种形式:JDBC-JNI和JDBC-RESTful,前者在写入性能上更有劣势。然而须要在利用程序运行的服务器上安装TDengine客户端驱动。

咱们的应用程序用到了Kubernetes集群,程序是运行在Docker外面,为此咱们制作了一个适宜咱们利用程序运行的镜像,例如根底镜像的Dockerfile如下所示:

FROM openjdk:8-jdk-oraclelinux7COPY TDengine-client-2.0.16.0-Linux-x64.tar.gz /RUN tar -xzvf /TDengine-client-2.0.16.0-Linux-x64.tar.gz &&  cd /TDengine-client-2.0.16.0 &&  pwd && ls && ./install_client.sh

build:

docker build -t tdengine-openjdk-8-runtime:2.0.16.0 -f Dockerfile .

援用程序镜像Dockerfile所示:

FROM tdengine-openjdk-8-runtime:2.0.16.0ENV JAVA_OPTS="-Duser.timezone=Asia/Shanghai -Djava.security.egd=file:/dev/./urandom"COPY app.jar /app.jarENTRYPOINT ["java","-jar","/app.jar"]

这样咱们的应用程序就能够调度在任意的K8s节点上了。

另外,咱们的程序波及到工作自动化调度,须要频繁地和设施下位机进行MQTT数据交互,比方,云端发送指令1000-“开始工作A”,下位机回复指令2000-“收到工作A”,把指令了解成设施,把指令序列号以及内容了解成它的属性,天然这种数据也是非常适合存储在TDengine时序数据库中的:

*************************** 1.row ***************************       ts: 2021-06-23 16:10:30.000      msg: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"start"}device_sn: deviceA     kind: 1000*************************** 2.row ***************************       ts: 2021-06-23 16:10:31.000      msg: {"task_id":"7b40ed4edc1149f1837179c77d8c3c1f","action":"received"}device_sn: deviceA     kind: 2000

咱们云端在和设施对接的过程中,频繁须要讲究音讯是否发送的问题,所以迫切需要对指令进行保留,从而在应用程序中新辟线程,专门订阅指令集音讯,批量写入到TDengine数据库。

最初,TDengine还有一个超级表log.dn,外面保留了内存、CPU等应用信息,所以咱们能够利用Grafana展现这些数据,为监控提供牢靠的经营数据参照!