关于物联网:手把手教你如何使用-Timestream-实现物联网时序数据存储和分析

46次阅读

共计 15194 个字符,预计需要花费 38 分钟才能阅读完成。

Amazon Timestream 是一种疾速、可扩大的无服务器工夫序列数据库服务,实用于物联网和经营应用程序,应用该服务每天能够轻松存储和剖析数万亿个事件,速度进步了 1000 倍,而老本仅为关系数据库的十分之一。通过将近期数据保留在内存中,并依据用户定义的策略将历史数据移至老本优化的存储层,Amazon Timestream 为客户节俭了治理工夫序列数据生命周期的工夫和老本。Amazon Timestream 专门构建的查问引擎可用于拜访和剖析近期数据和历史数据,而无需在查问中显式指定数据是保留在内存中还是老本优化层中。Amazon Timestream 内置了工夫序列剖析函数,能够实现近乎实时地辨认数据的趋势和模式。Amazon Timestream 是无服务器服务,可主动缩放以调整容量和性能,因而无需治理底层基础设施,能够专一于构建应用程序。

本文介绍通过 Timestream、Kinesis Stream 托管服务和 Grafana 和 Flink Connector 开源软件实现物联网(以 PM 2.5 场景为示例)时序数据实时采集、存储和剖析,其中蕴含部署架构、环境部署、数据采集、数据存储和剖析,心愿当您有类似物联网时序数据存储和剖析需要的时候,能从中取得启发,助力业务倒退。

架构

Amazon Timestream 可能应用内置的剖析函数(如平滑、近似和插值)疾速剖析物联网应用程序生成的工夫序列数据。例如,智能家居设施制造商能够应用 Amazon Timestream 从设施传感器收集静止或温度数据,进行插值以辨认没有静止的工夫范畴,并揭示消费者采取措施(例如缩小热量)以节约能源。

本文物联网(以 PM 2.5 场景为示例),实现 PM2.5 数据实时采集、时序数据存储和实时剖析,其中架构次要分成三大部分:

  • 实时时序数据采集:通过 Python 数据采集程序联合 Kinesis Stream 和 Kinesis Data Analytics for Apache Flink connector 模仿实现从 PM 2.5 监控设施, 将数据实时采集数据到 Timestream。
  • 时序数据存储:通过 Amazon Timestream 时序数据库实现时序数据存储,设定内存和磁性存储(老本优化层)存储时长,能够实现近期数据保留在内存中,并依据用户定义的策略将历史数据移至老本优化的存储层。
  • 实时时序数据分析:通过 Grafana(装置 Timesteam For Grafana 插件)实时拜访 Timestream 数据,通过 Grafana 丰盛的剖析图表模式,联合 Amazon Timestream 内置的工夫序列剖析函数,能够实现近乎实时地辨认物联网数据的趋势和模式。

具体的架构图如下:

部署环境

1.1 创立 Cloudformation

请应用本人帐号 (region 请抉择 us-east-1)

下载 Github 上 Cloudformation Yaml 文件:

git clone https://github.com/bingbingliu18/Timestream-pm25

Timestream-pm25 目录中蕴含上面 Cloudformation 所用文件 timestream-short-new.yaml

其它都抉择缺省,点击 Create Stack button.

Cloud Formation 创立胜利

1.2 连贯到新建的 Ec2 堡垒机:

批改证书文件权限

chmod 0600 [path to downloaded .pem file]

ssh -i [path to downloaded .pem file] ec2-user@[bastionEndpoint]

执行 aws configure:

aws configure

default region name, 输出:“us-east-1”,其它抉择缺省设置。

1.3 连贯到 EC2 堡垒机 装置相应软件

设置时区

TZ='Asia/Shanghai'; export TZ

Install python3

sudo yum install -y python3

Install python3 pip

sudo yum install -y python3-pip

pip3 install boto3

sudo pip3 install boto3

pip3 install numpy

sudo pip3 install numpy

install git

sudo yum install -y git

1.4 下载 Github Timesteram Sample 程序库

git clone https://github.com/awslabs/amazon-timestream-tools amazon-timestream-tools

1.5 装置 Grafana Server

连贯到 EC2 堡垒机:

sudo vi /etc/yum.repos.d/grafana.repo

For OSS releases:(拷贝以下内容到 grafana.repo)

[grafana]
name=grafana
baseurl=https://packages.grafana.com/oss/rpm
repo_gpgcheck=1
enabled=1
gpgcheck=1
gpgkey=https://packages.grafana.com/gpg.key
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt

装置 grafana server:

sudo yum install -y grafana

启动 grafana server:

sudo service grafana-server start
sudo service grafana-server status

配置 grafana server 在操作系统启动时 主动启动:

sudo /sbin/chkconfig --add grafana-server

1.6 装置 timestream Plugin

sudo grafana-cli plugins install grafana-timestream-datasource

重启 grafana

sudo service grafana-server restart

1.7 配置 Grafana 要拜访 Timesteam 服务所用的 IAM Role

获取 IAM Role Name

抉择 IAM 服务,抉择要批改的 role, role name:

timestream-iot-grafanaEC2rolelabview-us-east-1

批改 role trust relationship:

将 Policy document 全副选中,替换成以下内容:

{
  "Version": "2012-10-17",
  "Statement": [
    {"Sid":"","Effect":"Allow","Principal": {"Service":"ec2.amazonaws.com"},"Action":"sts:AssumeRole"
    },
    {"Sid":"","Effect":"Allow","Principal": {"AWS":"[ 请替换成 CloudFormation output 中的 role arn]"},"Action":"sts:AssumeRole"
    } 
  ]
}

批改后的 trust relationship:

1.8 登录到 Grafana server

第一次登录到 Grafana Server:

  1. 关上浏览器 拜访 http://[Grafana server public ip]:3000
  2. 缺省的 Grafana Server 监听端口是:3000 .

如何获取 Ec2 Public IP 地址,如下图所示,拜访 Cloudformation output:

  1. 在登陆界面, 输出 username: admin; password:admin.(输出用户名和明码都是 admin)
  2. 点击 Log In. 登陆胜利后,会收到提醒批改明码

1.9 Grafana server 中减少 Timestream 数据源

减少 Timestream 数据源

1.10 Grafana server 中配置 Timestream 数据源

拷贝配置所须要 role ARN 信息(从 cloudformation output tab)Default Region: us-east-1

IoT 数据存储

2.1 创立 Timestream 数据库 iot

2.2 创立 Timestream 表 pm25

IoT 数据导入

3.1 装置 Flink connector to Timestream

装置 java8

sudo yum install -y java-1.8.0-openjdk*

java -version

装置 debug info, otherwise jmap will throw exception

sudo yum --enablerepo='*-debug*' install -y java-1.8.0-openjdk-debuginfo

Install maven

sudo wget https://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo 
sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo 
sudo yum install -y apache-maven 
mvn --version 

change java version from 1.7 to 1.8

sudo update-alternatives --config java

sudo update-alternatives --config javac

装置 Apache Flink

最新的 Apache Flink 版本反对 Kinesis Data Analytics 是 1.8.2.

  1. Create flink folder

cd

mkdir flink

cd flink

  1. 下载 Apache Flink version 1.8.2 源代码:

wget https://archive.apache.org/dist/flink/flink-1.8.2/flink-1.8.2-src.tgz

  1. 解压 Apache Flink 源代码:

tar -xvf flink-1.8.2-src.tgz

  1. 进入到 Apache Flink 源代码目录:

cd flink-1.8.2

  1. Compile and install Apache Flink (这个编译工夫比拟长 须要大抵 20 分钟):

mvn clean install -Pinclude-kinesis -DskipTests

3.2 创立 Kinesis Data Stream Timestreampm25Stream

aws kinesis create-stream --stream-name Timestreampm25Stream --shard-count 1

3.3 运行 Flink Connector 建设 Kinesis 连贯到 Timestream:

cd
cd amazon-timestream-tools/integrations/flink_connector
mvn clean compile

数据采集过程中 请继续运行以下命令:

mvn exec:java -Dexec.mainClass="com.amazonaws.services.kinesisanalytics.StreamingJob" -Dexec.args="--InputStreamName 
Timestreampm25Stream --Region us-east-1 --TimestreamDbName iot --TimestreamTableName pm25"

3.4 筹备 PM2.5 演示数据:

连贯到 EC2 堡垒机

下载 5 演示数据生成程序:

cd

mkdir pm25

cd pm25

  1. 下载 Github 上数据采集 Python 程序:

git clone https://github.com/bingbingliu18/Timestream-pm25

cd Timestream-pm25

  1. 运行 5 演示数据生成程序 (python 程序 2 个参数 –region default: us-east-1; –stream default: Timestreampm25Stream)

数据采集过程中 请继续运行以下命令:

python3 pm25_new_kinisis_test.py

### IoT 数据分析 **

4.1 登陆到 Grafana Server 创立仪表板和 Panel**

创立 Dashboard 查问时 请设定时区为本地浏览器时区:

创立新的 Panel:

抉择要拜访的数据源,将要查问剖析所执行的 SQL 语句粘贴到新的 Panel 中:

4.2 创立工夫数据分析仪表版 Dashboard PM2.5 Analysis 1(Save as PM2.5 Analysis 1)

4.2.1 查问北京各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'fengtai_xiaotun' THEN avg_pm25 ELSE NULL END AS fengtai_xiaotou,
CASE WHEN location = 'fengtai_yungang' THEN avg_pm25 ELSE NULL END AS fengtai_yungang,
CASE WHEN location = 'daxing' THEN avg_pm25 ELSE NULL END AS daxing,
CASE WHEN location = 'wanshou' THEN avg_pm25 ELSE NULL END AS wanshou,
CASE WHEN location = 'gucheng' THEN avg_pm25 ELSE NULL END AS gucheng,
CASE WHEN location = 'tiantan' THEN avg_pm25 ELSE NULL END AS tiantan,
CASE WHEN location = 'yanshan' THEN avg_pm25 ELSE NULL END AS yanshan,
CASE WHEN location = 'miyun' THEN avg_pm25 ELSE NULL END AS miyun,
CASE WHEN location = 'changping' THEN avg_pm25 ELSE NULL END AS changping,
CASE WHEN location = 'aoti' THEN avg_pm25 ELSE NULL END AS aoti,
CASE WHEN location = 'mengtougou' THEN avg_pm25 ELSE NULL END AS mentougou,
CASE WHEN location = 'huairou' THEN avg_pm25 ELSE NULL END AS huairou,
CASE WHEN location = 'haidian' THEN avg_pm25 ELSE NULL END AS haidian,
CASE WHEN location = 'nongzhan' THEN avg_pm25 ELSE NULL END AS nongzhan,
CASE WHEN location = 'tongzhou' THEN avg_pm25 ELSE NULL END AS tongzhou,
CASE WHEN location = 'dingling' THEN avg_pm25 ELSE NULL END AS dingling,
CASE WHEN location = 'yanqing' THEN avg_pm25 ELSE NULL END AS yanqing,
CASE WHEN location = 'guanyuan' THEN avg_pm25 ELSE NULL END AS guanyuan,
CASE WHEN location = 'dongsi' THEN avg_pm25 ELSE NULL END AS dongsi,
CASE WHEN location = 'shunyi' THEN avg_pm25 ELSE NULL END AS shunyi
FROM 
(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Beijing'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

抉择图形显示 select Gauge

Save Panel as Beijing PM2.5 analysis

Edit Panel Title:Beijing PM2.5 analysis

Save Dashboard PM2.5 analysis 1:

4.2.2 查问上海一天内各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'songjiang' THEN avg_pm25 ELSE NULL END AS songjiang,
CASE WHEN location = 'fengxian' THEN avg_pm25 ELSE NULL END AS fengxian, 
CASE WHEN location = 'no 15 factory' THEN avg_pm25 ELSE NULL END AS No15_factory, 
CASE WHEN location = 'xujing' THEN avg_pm25 ELSE NULL END AS xujing,
 CASE WHEN location = 'pujiang' THEN avg_pm25 ELSE NULL END AS pujiang, 
 CASE WHEN location = 'putuo' THEN avg_pm25 ELSE NULL END AS putuo, 
 CASE WHEN location = 'shangshida' THEN avg_pm25 ELSE NULL END AS shangshida,
CASE WHEN location = 'jingan' THEN avg_pm25 ELSE NULL END AS jingan, 
CASE WHEN location = 'xianxia' THEN avg_pm25 ELSE NULL END AS xianxia, 
CASE WHEN location = 'hongkou' THEN avg_pm25 ELSE NULL END AS hongkou, 
CASE WHEN location = 'jiading' THEN avg_pm25 ELSE NULL END AS jiading, 
CASE WHEN location = 'zhangjiang' THEN avg_pm25 ELSE NULL END AS zhangjiang, 
CASE WHEN location = 'miaohang' THEN avg_pm25 ELSE NULL END AS miaohang, 
CASE WHEN location = 'yangpu' THEN avg_pm25 ELSE NULL END AS yangpu, 
CASE WHEN location = 'huinan' THEN avg_pm25 ELSE NULL END AS huinan, 
CASE WHEN location = 'chongming' THEN avg_pm25 ELSE NULL END AS chongming
From(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shanghai'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

Save Panel as Shanghai PM2.5 analysis

Edit Panel Title:Shanghai PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.3 查问广州各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'panyu' THEN avg_pm25 ELSE NULL END AS panyu,
CASE WHEN location = 'commercial school' THEN avg_pm25 ELSE NULL END AS commercial_school, 
CASE WHEN location = 'No 5 middle school' THEN avg_pm25 ELSE NULL END AS No_5_middle_school,
CASE WHEN location = 'guangzhou monitor station' THEN avg_pm25 ELSE NULL END AS Guangzhou_monitor_station, 
CASE WHEN location = 'nansha street' THEN avg_pm25 ELSE NULL END AS Nansha_street, 
CASE WHEN location = 'No 86 middle school' THEN avg_pm25 ELSE NULL END AS No_86_middle_school, 
CASE WHEN location = 'luhu' THEN avg_pm25 ELSE NULL END AS luhu, 
CASE WHEN location = 'nansha' THEN avg_pm25 ELSE NULL END AS nansha, 
CASE WHEN location = 'tiyu west' THEN avg_pm25 ELSE NULL END AS tiyu_west, 
CASE WHEN location = 'jiulong town' THEN avg_pm25 ELSE NULL END AS jiulong_town, 
CASE WHEN location = 'huangpu' THEN avg_pm25 ELSE NULL END AS Huangpu, 
CASE WHEN location = 'baiyun' THEN avg_pm25 ELSE NULL END AS Baiyun, 
CASE WHEN location = 'maofeng mountain' THEN avg_pm25 ELSE NULL END AS Maofeng_mountain, 
CASE WHEN location = 'chong hua' THEN avg_pm25 ELSE NULL END AS Chonghua, 
CASE WHEN location = 'huadu' THEN avg_pm25 ELSE NULL END AS huadu
from(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Guangzhou'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

Save Panel as Guangzhou PM2.5 analysis

Edit Panel Title:Guangzhou PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.4 查问深圳各个监控站点 PM2.5 平均值

New Panel

SELECT CASE WHEN location = 'huaqiao city' THEN avg_pm25 ELSE NULL END AS Huaqiao_city,
 CASE WHEN location = 'xixiang' THEN avg_pm25 ELSE NULL END AS xixiang,
CASE WHEN location = 'guanlan' THEN avg_pm25 ELSE NULL END AS guanlan,
CASE WHEN location = 'longgang' THEN avg_pm25 ELSE NULL END AS Longgang,
CASE WHEN location = 'honghu' THEN avg_pm25 ELSE NULL END AS Honghu,
CASE WHEN location = 'pingshan' THEN avg_pm25 ELSE NULL END AS Pingshan,
CASE WHEN location = 'henggang' THEN avg_pm25 ELSE NULL END AS Henggang,
CASE WHEN location = 'minzhi' THEN avg_pm25 ELSE NULL END AS Minzhi,
CASE WHEN location = 'lianhua' THEN avg_pm25 ELSE NULL END AS Lianhua,
CASE WHEN location = 'yantian' THEN avg_pm25 ELSE NULL END AS Yantian,
CASE WHEN location = 'nanou' THEN avg_pm25 ELSE NULL END AS Nanou,
CASE WHEN location = 'meisha' THEN avg_pm25 ELSE NULL END AS Meisha
From(SELECT location, round(avg(measure_value::bigint),0) as avg_pm25
FROM "iot"."pm25" 
where measure_name='pm2.5' 
and city='Shenzhen'
and time >= ago(30s)
group by location,bin(time,30s)
order by avg_pm25 desc)

Save Panel as Shenzhen PM2.5 analysis

Edit Panel Title:Shenzhen PM2.5 analysis

Save Dashboard PM2.5 analysis 1

4.2.5 深圳华侨城工夫序列剖析 (最近 5 分钟内 PM2.5 剖析)

New Panel

select location, CREATE_TIME_SERIES(time, measure_value::bigint) as PM25 FROM iot.pm25
where measure_name='pm2.5' 
and location='huaqiao city'
and time >= ago(5m)
GROUP BY location

抉择图形显示 select Lines; Select Points:

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis

Edit Panel Title:深圳华侨城最近 5 分钟 PM2.5 剖析

Save Dashboard PM2.5 analysis 1

4.2.6 找出过来 2 小时内深圳华侨城以 30 秒为距离的均匀 PM2.5 值(应用线性插值填充缺失的值)

New Panel

WITH binned_timeseries AS (SELECT location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND location='huaqiao city'
        AND time > ago(2h)
    GROUP BY location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT location,
        INTERPOLATE_LINEAR(CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY location
)
SELECT time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25)

抉择图形显示 select Lines:

Save Panel as Shen Zhen Huaqiao City PM2.5 analysis 1

Edit Panel Title:过来 2 小时深圳华侨城均匀 PM2.5 值(应用线性插值填充缺失值)

Save Dashboard PM2.5 analysis 1

4.2.7 过来 5 分钟内所有城市 PM2.5 平均值排名(线性插值)

New Panel

SELECT CASE WHEN city = 'Shanghai' THEN inter_avg_PM25 ELSE NULL END AS Shanghai,
CASE WHEN city = 'Beijing' THEN inter_avg_PM25 ELSE NULL END AS Beijing,
CASE WHEN city = 'Guangzhou' THEN inter_avg_PM25 ELSE NULL END AS Guangzhou,
CASE WHEN city = 'Shenzhen' THEN inter_avg_PM25 ELSE NULL END AS Shenzhen,
CASE WHEN city = 'Hangzhou' THEN inter_avg_PM25 ELSE NULL END AS Hangzhou,
CASE WHEN city = 'Nanjing' THEN inter_avg_PM25 ELSE NULL END AS Nanjing,
CASE WHEN city = 'Chengdu' THEN inter_avg_PM25 ELSE NULL END AS Chengdu,
CASE WHEN city = 'Chongqing' THEN inter_avg_PM25 ELSE NULL END AS Chongqing,
CASE WHEN city = 'Tianjin' THEN inter_avg_PM25 ELSE NULL END AS Tianjin,
CASE WHEN city = 'Shenyang' THEN inter_avg_PM25 ELSE NULL END AS Shenyang,
CASE WHEN city = 'Sanya' THEN inter_avg_PM25 ELSE NULL END AS Sanya,
CASE WHEN city = 'Lasa' THEN inter_avg_PM25 ELSE NULL END AS Lasa
from(
WITH binned_timeseries AS (SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), all_location_interpolated as (SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,avg(interpolated_avg_PM25) AS inter_avg_PM25
from all_location_interpolated
group by city
order by avg(interpolated_avg_PM25) desc)

抉择 Panel 图形类型:

Save Panel as all city analysis 1

Edit Panel Title:过来 5 分钟所有城市 PM2.5 平均值

Save Dashboard PM2.5 analysis 1

4.2.8 过来 5 分钟内 PM2.5 最高的十个采集点(线性插值)

New Panel

WITH binned_timeseries AS (SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc desc
limit 10

抉择 Table

Save Panel as all city analysis 2

Edit Panel Title:过来 5 分钟内 PM2.5 最高的十个采集点(线性插值)

Save Dashboard PM2.5 analysis 1

4.2.9 过来 5 分钟内 PM2.5 最低的十个采集点(线性插值)

New Panel

WITH binned_timeseries AS (SELECT city,location, BIN(time, 30s) AS binned_timestamp, ROUND(AVG(measure_value::bigint), 2) AS avg_PM25
    FROM "iot".pm25
    WHERE measure_name = 'pm2.5'
        AND time > ago(5m)
    GROUP BY city,location, BIN(time, 30s)
), interpolated_timeseries AS (
    SELECT city,location,
        INTERPOLATE_LINEAR(CREATE_TIME_SERIES(binned_timestamp, avg_PM25),
                SEQUENCE(min(binned_timestamp), max(binned_timestamp), 30s)) 
                AS interpolated_avg_PM25
    FROM binned_timeseries
    GROUP BY city,location
), interpolated_cross_join as (SELECT city,location,time, ROUND(value, 2) AS interpolated_avg_PM25
FROM interpolated_timeseries
CROSS JOIN UNNEST(interpolated_avg_PM25))
select city,location, avg(interpolated_avg_PM25) as avg_PM25_loc
from interpolated_cross_join
group by city,location
order by avg_PM25_loc asc
limit 10

抉择 Table

Save Panel as all city analysis 3

Edit Panel Title:过来 5 分钟内 PM2.5 最低的十个采集点(线性插值)

Save Dashboard PM2.5 analysis 1

设置仪表板 每 5 秒钟刷新一次:

本 blog 着重介绍通过 Timestream、Kinesis Stream 托管服务和 Grafana 实现物联网(以 PM 2.5 场景为示例)时序数据实时采集、存储和剖析,其中蕴含部署架构、环境部署、数据采集、数据存储和剖析,心愿当您有类似物联网时序数据存储和剖析需要的时候,有所启发,实现海量物联网时序数据高效治理、开掘物联网数据中蕴含的法则、模式和价值,助力业务倒退。

附录

《Amazon Timestream 开发人员指南》

《AWS Timestream 开发程序示例》

《AWS Timestream 与 Grafana 集成示例》

本篇作者

刘冰冰

AWS 数据库解决方案架构师,负责基于 AWS 的数据库解决方案的征询与架构设计,同时致力于大数据方面的钻研和推广。在退出 AWS 之前曾在 Oracle 工作多年,在数据库云布局、设计运维调优、DR 解决方案、大数据和数仓以及企业应用等方面有丰盛的教训。

正文完
 0