共计 3200 个字符,预计需要花费 8 分钟才能阅读完成。
小 T 导读:为了帮忙利用实时获取写入时序数据库(Time Series Database)TDengine 的数据,或者以事件达到程序解决数据,TDengine 提供了相似音讯队列产品的数据订阅、生产接口。这样在很多场景下,采纳 TDengine 的时序数据处理系统就不须要再集成如 Kafka 个别的音讯队列产品,从而简化零碎设计的复杂度,升高运维老本。TDengine 3.0 对数据订阅性能又进行了优化降级,本文将具体介绍其语法规定,不便开发者及企业应用。
与 Kafka 一样,利用 TDengine 时你也须要定义 topic, 但 TDengine 的 topic 是基于一个曾经存在的超级表、子表或一般表的查问条件,即一个 SELECT 语句。你能够应用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包含数据聚合)。与其余音讯队列软件相比,这是 TDengine 数据订阅性能最大的劣势,它提供了更大的灵活性,数据的颗粒度能够由利用随时调整,而数据的过滤与预处理是交给 TDengine 来实现,无效地缩小传输的数据量与利用的复杂度。
消费者订阅 topic 后(一个消费者能够订阅多个 topic),能够实时取得最新的数据。多个消费者能够组成一个消费者组 (consumer group),一个消费者组里的多个消费者共享生产进度,便于多线程、分布式地生产数据,进步生产速度;但不同消费者组中的消费者即便生产同一个 topic,也并不共享生产进度。如果订阅的是超级表,数据可能会散布在多个不同的 vnode 上,也就是多个 shard 上,这样一个生产组里有多个消费者能够进步生产效率。TDengine 的音讯队列提供了音讯的 ACK 机制,在宕机、重启等简单环境下也能确保 at least once 生产。
为了实现上述性能,TDengine 会为 WAL (Write-Ahead-Log) 文件主动创立索引以反对疾速随机拜访,并提供了灵便可配置的文件切换与保留机制,用户能够按需指定 WAL 文件保留的工夫以及大小:
- WAL_RETENTION_PERIOD:为了数据订阅生产,须要 WAL 日志文件额定保留的最大时长策略。WAL 日志清理,不受订阅客户端生产状态影响。单位为 s,默认为 3600,示意在 WAL 保留最近 3600 秒的数据,用户能够依据数据订阅的须要批改这个参数为适当值。
- WAL_RETENTION_SIZE:为了数据订阅生产,须要 WAL 日志文件额定保留的最大累计大小策略。单位为 KB,默认为 0,示意累计大小无下限。
通过以上形式,咱们将 WAL 革新成了一个保留事件达到程序的、可长久化的存储引擎(但因为 TSDB 具备远比 WAL 更高的压缩率,因而不举荐保留太长时间,一般来说倡议不超过几天)。对于以 topic 模式创立的查问,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在生产时,TDengine 依据以后生产进度从 WAL 间接读取数据,并应用对立的查问引擎实现过滤、变换等操作,将数据推送给消费者。
为了不便大家上手实操,下文将对 TDengine 数据订阅相干语法进行具体解读。
写入数据
首先实现建库、建一张超级表和多张子表操作,而后就能够写入数据了,比方:
DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
创立 topic
TDengine 应用 SQL 创立如下所示 topic(topic 创立个数有下限,通过参数 tmqMaxTopicNum 管制,默认 20 个):
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHEREc1 > 1;
TMQ 反对以下多种订阅类型:
列订阅
CREATE TOPIC topic_name as subquery
通过 SELECT 语句订阅(包含 SELECT *,或 SELECT ts, c1 等指定列订阅,能够带条件过滤、标量函数计算,但不反对聚合函数、不反对工夫窗口聚合)。但须要留神的是:
- 该类型 TOPIC 一旦创立则订阅数据的构造确定;
- 被订阅或用于计算的列或标签不可被删除(ALTER table DROP)、批改(ALTER table MODIFY);
- 若产生表构造变更,新增的列不呈现在后果中。
超级表订阅
CREATE TOPIC topic_name AS STABLE stb_name
与 SELECT * from stbName 订阅的区别是:
- 不会限度用户的表构造变更。
- 返回的是非结构化的数据:返回数据的构造会随超级表的表构造变动而变动。
- with meta 参数可选,抉择时将返回创立超级表,子表等语句,次要用于 taosx 做超级表迁徙。
- where_condition 参数可选,抉择时将用来过滤符合条件的子表,订阅这些子表。where 条件里不能有一般列,只能是 tag 或 tbname,where 条件里能够用函数,用来过滤 tag,然而不能是聚合函数,因为子表 tag 值无奈做聚合。也能够是常量表达式,比方 2 > 1(订阅全副子表),或者 false(订阅 0 个子表)。
- 返回数据不蕴含标签。
数据库订阅
CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
通过该语句可创立一个蕴含数据库所有表数据的订阅,with meta 参数可选,同上。
创立消费者
订阅 topics
一个 consumer 反对同时订阅多个 topic。以 Java 为例:
List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
生产
在 Java 语言下如何对 TMQ 音讯进行生产,代码示意如下:
while(running){ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
for (Meters meter : meters) {processMsg(meter);
}
}
完结生产
生产完结后,该当勾销订阅。
/* 勾销订阅 */
tmq_unsubscribe(tmq);
/* 敞开消费者对象 */
tmq_consumer_close(tmq);
删除 topic
如果不再须要订阅数据,能够删除 topic,须要留神:只有以后未在订阅中的 topic 能力被删除。
`/ 删除 topic /
DROP TOPIC topic_name;`
状态查看
1、topics:查问曾经创立的 topic
SHOW TOPICS;
2、consumers:查问 consumer 的状态及其订阅的 topic
SHOW CONSUMERS;
3、subscriptions:查问 consumer 与 vgroup 之间的分配关系
SHOW SUBSCRIPTIONS;
写在最初
受文章篇幅所限,本文只分享了局部语法的具体实现,须要理解相干设置及更多语言的代码示例,能够进入 TDengine 官网查问数据订阅的相干文档。对于更为简单的利用问题,也欢送大家退出 TDengine 的开发者交换群(增加 小 T vx:tdengine),间接向社区技术支持人员寻求帮忙。