共计 4972 个字符,预计需要花费 13 分钟才能阅读完成。
小 T 导读:TDengine 3.0 引入了全新的流式计算引擎,既反对工夫驱动的流式计算,也反对事件驱动的流式计算。本文将对新的流式计算引擎的语法规定进行具体介绍,不便开发者及企业应用。
TDengine 是一款开源、云原生的时序数据库(Time Series Database,TSDB),专为物联网、工业互联网、金融、IT 运维监控等场景设计并优化。近期公布的 TDengine 3.0,全新的流式计算引擎是其一大亮点。
TDengine 3.0 的流式计算引擎提供了实时处理写入的数据流能力,应用 SQL 定义实时流变换,当数据被写入流的源表后,数据会被以定义的形式主动解决,并依据定义的触发模式向目标表推送后果。它提供了代替简单流解决零碎的轻量级解决方案,并可能在高吞吐的数据写入状况下,提供毫秒级的计算结果提早。
流式计算能够蕴含数据过滤,标量函数计算(含 UDF),以及窗口聚合(反对滑动窗口、会话窗口与状态窗口),能够以超级表、子表、一般表为源表,写入到目标超级表。在创立流时,目标超级表将被主动创立,随后新插入的数据会被流定义的形式解决并写入其中,通过 partition by 子句,能够以表名或标签划分 partition,不同的 partition 将写入到目标超级表的不同子表。
TDengine 的流式计算可能反对散布在多个 vnode 中的超级表聚合;还可能解决乱序数据的写入:它提供了 watermark 机制以度量容忍数据乱序的水平,并提供了 ignore expired 配置项以决定乱序数据的解决策略——抛弃或者从新计算。
上面咱们就一起看一下 TDengine 中流式计算相干的 SQL 语法。
流式计算的创立、删除与展现
创立
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
WATERMARK time
}
其中 subquery 是 select 一般查问语法的子集:
subquery: SELECT select_list
from_clause
[WHERE condition]
[PARTITION BY tag_list]
[window_clause]
反对会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与 partition by tbname 一起应用:
window_clause: {SESSION(ts_col, tol_val)
| STATE_WINDOW(col)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
}
在上述语句中,SESSION 是会话窗口,tol_val 是工夫距离的最大范畴。在 tol_val 工夫距离范畴内的数据都属于同一个窗口,如果有间断两条数据的工夫超过 tol_val,则主动开启下一个窗口。窗口的定义与时序数据特色查问中的定义完全相同,详见 TDengine 特色查问。
例如,应用如下语句创立流式计算,同时主动创立名为 avg_vol 的超级表,此流计算以一分钟为工夫窗口、30 秒为前向增量统计这些电表的均匀电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会别离创立子表并写入不同子表。
CREATE STREAM avg_vol_s INTO avg_vol AS
SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
删除
DROP STREAM [IF NOT EXISTS] stream_name;
仅删除流式计算工作,由流式计算写入的数据不会被删除。
展现
SHOW STREAMS;
若要展现更具体的信息,能够应用:
SELECT * from performance_schema.`perf_streams`;
流式计算的 partition
咱们能够应用 PARTITION BY TBNAME 或 PARTITION BY tag 对一个流进行多分区的计算,每个分区的工夫线与工夫窗口是独立的,会各自聚合,并写入到目标表中的不同子表。如果不带 PARTITION BY 选项,那所有的数据将写入到一张子表。
流式计算创立的超级表有惟一的 tag 列 groupId,每个 partition 会被调配惟一 groupId。与 schemaless 写入统一,咱们通过 MD5 计算子表名,并主动创立它。
流式计算的触发模式
在创立流时,能够通过 TRIGGER 指令指定流式计算的触发模式。
对于非窗口计算,流式计算的触发是实时的;对于窗口计算,目前提供如下 3 种触发模式:
- AT_ONCE:写入立刻触发
- WINDOW_CLOSE:窗口敞开时触发(窗口敞开由事件工夫决定,可配合 watermark 应用)
- MAX_DELAY time:若窗口敞开,则触发计算。若窗口未敞开,且未敞开时长超过 max delay 指定的工夫,则触发计算。
因为窗口敞开是由事件工夫所决定的,如果因事件流中断、或继续提早导致事件工夫无奈更新,可能无奈失去最新的计算结果。因而,流式计算提供了以事件工夫联合解决工夫计算的 MAX_DELAY 触发模式,MAX_DELAY 模式在窗口敞开时会立刻触发计算。此外,当数据写入后,计算触发的工夫超过 max delay 指定的工夫,则立刻触发计算。
流式计算的窗口敞开
流式计算以事件工夫(插入记录中的工夫戳主键)为基准计算窗口敞开,而非以 TDengine 服务器的工夫,这样能够防止客户端与服务器工夫不统一带来的问题,无效解决乱序数据写入等难题。同时,流式计算还提供了 watermark 来定义容忍的乱序水平。
在创立流时,咱们能够在 stream_option 中指定 watermark,它定义了数据乱序的容忍上界。流式计算通过 watermark 来度量对乱序数据的容忍水平,watermark 默认为 0。
T = 最新事件工夫 – watermark
每次写入的数据都会以上述公式更新窗口敞开工夫,并将窗口完结工夫 < T 的所有关上的窗口敞开,若触发模式为 WINDOW_CLOSE 或 MAX_DELAY,则推送窗口聚合后果。
在上图中,纵轴示意不同时刻,对于不同时刻,咱们画出其对应的 TDengine 收到的数据,即为横轴。已知横轴上的数据点示意曾经收到的数据,其中蓝色的点示意事件工夫(即数据中的工夫戳主键)最初的数据,该数据点减去定义的 watermark 工夫,就失去乱序容忍的上界 T。所有完结工夫小于 T 的窗口都将被敞开(图中以灰色方框标记)。
在 T2 时刻,乱序数据(黄色的点)达到 TDengine,因为有 watermark 的存在,这些数据进入的窗口并未被敞开,因而能够被正确处理。在 T3 时刻,最新事件达到,T 向后推移超过了第二个窗口敞开的工夫,该窗口被敞开,乱序数据被正确处理。
但要留神,在 window_close 或 max_delay 模式下,窗口敞开间接影响推送后果。在 at_once 模式下,窗口敞开只与内存占用无关。
流式计算的过期数据解决策略
对于已敞开的窗口,再次落入该窗口中的数据就会被标记为过期数据。TDengine 对于过期数据提供两种解决形式,由 IGNORE EXPIRED 选项指定:
- 从新计算,即 IGNORE EXPIRED 0:默认配置,从 TSDB 中从新查找对应窗口的所有数据并从新计算失去最新后果
- 间接抛弃,即 IGNORE EXPIRED 1:疏忽过期数据
无论在哪种模式下,watermark 都应该被妥善设置,来失去正确后果(间接抛弃模式)或防止频繁触发重算带来的性能开销(从新计算模式)。
示例
企业电表的数据常常都是成千盈百亿条的,想要将这些扩散、凌乱的数据荡涤或转换都须要比拟长的工夫,很难做到高效性和实时性。在如下例子中,通过 TDengine 流计算能够将电表电压大于 220V 的数据荡涤掉,而后以 5 秒为窗口整合并计算出每个窗口中电流的最大值,最初将后果输入到指定的数据表中。
创立 Database 和原始数据表
首先筹备数据,实现建库、建一张超级表和多张子表操作:
DROP DATABASE IF EXISTS power;
CREATE DATABASE power;
USE power;
CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2);
CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3);
CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2);
CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);
创立流
create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s);
写入数据
insert into d1001 values("2018-10-03 14:38:05.000", 10.30000, 219, 0.31000);
insert into d1001 values("2018-10-03 14:38:15.000", 12.60000, 218, 0.33000);
insert into d1001 values("2018-10-03 14:38:16.800", 12.30000, 221, 0.31000);
insert into d1002 values("2018-10-03 14:38:16.650", 10.30000, 218, 0.25000);
insert into d1003 values("2018-10-03 14:38:05.500", 11.80000, 221, 0.28000);
insert into d1003 values("2018-10-03 14:38:16.600", 13.40000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:05.000", 10.80000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000);
查问以察看后果
taos> select start, end, max_current from current_stream_output_stb;
start | end | max_current |
===========================================================================
2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 | 10.30000 |
2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 | 12.60000 |
Query OK, 2 rows in database (0.018762s)
写在最初
如果大家可能使用好 TDengine 3.0 提供的流计算引擎,就不须要再部署其余的第三方流解决零碎,这样一来,不仅升高了零碎的复杂度,还大大减少了研发和运维老本。在实际操作中利用 TDengine 流计算引擎时,上述的具体语法会带给你很多帮忙,如果还产生了其余更为简单的利用问题,你也能够进入 TDengine 社区向技术人员寻求帮忙。
想理解更多 TDengine Database 的具体细节,欢送大家在 GitHub 上查看相干源代码。