乐趣区

关于tdengine:如何使用-TDengine-Sink-Connector

小 T 导读:TDengine Kafka Connector 在 TDengine 的官网文档上放进去曾经有一段时间了,咱们也收到了一些开发者的反馈。文档中的教程应用 Confluent 平台(集成了 Kafka)演示了如何应用 Source Connector 和 Sink Connector,然而很多开发者在生产环境中并没有应用 Confluent,所以为不便大家,本文将应用独立部署的 Kafka 来演示。

本文蕴含以下内容:

  1. 如何应用 TDengine Sink Connector,把数据从 Kafka 同步到 TDengine。
  2. TDengine Sink Connector 的实现原理。
  3. 一个简略的测试脚本,帮忙你在本人的环境中疾速测试。通过更改生成测试数据的程序和配置参数,你能够模仿本人的应用场景。
  4. 测试同步同一个 topic,应用不同分区数和不同 Sink 工作数对性能的影响。

背景常识

如果你对文章结尾呈现的术语并不生疏,那么能够跳过这一部分。

· 什么是 Kafka?

Kafka 的外围是一个通用的、分布式的、可反复生产的音讯队列。

与之相比,作为一款时序数据库(Time-Series Database),TDengine 也可看作针对结构化的时序数据的音讯队列。

· 什么是 Kafka Connect? 为什么应用 Kafka Connect?

Kafka Connect 是 Kafka 的一个组件,简化了 Kafka 与其它数据源的集成。用户通过 Kafka Connect 读写 Kafka;通过 Kafka Connect 插件(也称 Kafka Connector)来读写各种数据源。

为不便集成,Kafka 曾经提供了生产者和消费者 API 以及客户端库,那为什么还须要 Kafka Connect 呢?因为一个好的 Kafka 客户端程序,不是单单生产或生产数据,还须要思考容错、重启、日志、弹性伸缩、序列化以及反序列化等。当开发者本人实现了这所有,就相当于开发了一个和 Kafka Connect 相似的货色。

与 Kafka 集成是 Kafka Connect 曾经解决的问题,用户不须要反复造轮子,只有多数边缘场景才须要定制化的集成计划。

TDengine Sink Connector 的实现原理

TDengine Sink Connector 用于将 Kafka 中指定 topic 的数据(批量或实时)同步到 TDengine 的 database 中。

启动 Sink Connector 须要一个 properties 配置文件。具体配置见官网文档的配置参考。

Sink Connector 外部的实现非常简单,整体工作流程分为以下几个步骤:

  1. Connect 框架依据配置启动 N 个消费者线程。
  2. N 个消费者同时订阅数据,并用配置文件中指定的 key.converter 和 value.converter 做反序列化。
  3. Connect 框架把反序列化后的数据传递给 N 个 SinkTask 的实例。
  4. SinkTask 应用 TDengine 提供的 schemaless 写入接口来写入数据。

上述 4 个步骤,只有最初一步写数据是 Sink Connector 须要关怀的,其它都是 Connect 框架主动实现的。

上面重点探讨几个问题。

· 反对的数据格式

因为应用了 schemaless 写入接口,因而 TDengine Sink Connector 只反对三种格局的数据:InfluxDB 行协定格局、OpenTSDB Telnet 协定格局 和 OpenTSDB JSON 协定格局。应用配置项 db.schemaless 来指定写入时应用的数据格式。例如:
db.schemaless=line
如果 Kafka 中的数据曾经是这三种格局之一,那么配置文件中的 value.converer,只需指定为 Connnect 内置的 org.apache.kafka.connect.storage.StringConverter。

value.converter=org.apache.kafka.connect.storage.StringConverter

如果 Kafka 中已有的数据不是上述三种之一,则须要实现本人的 Converter 类,将其转换为三种格局之一,这个链接兴许能帮到你。

· 如何指定 Consumer 的参数?

既然 Connect 框架曾经帮咱们做了 Consumer 要做的事,那么咱们怎么来管制 Consumer 的行为呢?比方如何管制 Consumer 订阅的主题?如何管制 Consumer 每次 poll 的音讯数和工夫距离?

对于订阅哪些主题,能够用配置项 topics 来指定。

如果想笼罩 Consumer 的其它默认配置,能够间接在 Sink Connector 的配置文件中编写,然而要加前缀“consumer.override.”,比方想把每次 poll 的最大音讯数改为 3000,能够这样配置:
consumer.override.max.poll.records=3000

· 如何管制写入线程数?

对于 Kafka Connect Sink,task 实质上就是消费者线程,接管从 topic 的分区读出来的数据。用配置参数 tasks.max 来管制最大工作数,一个工作一个线程。理论启动的工作数还与 topic 的分区数无关。如果你有 10 个分区,并且 tasks.max 设置为 5,那么每个 task 会收到 2 个分区的数据,并跟踪 2 个分区的 offsets。如果你配置的 tasks.max 比 partition 数大,Connect 会启动的 task 数与 topic 的 partition 数雷同。如果你订阅了 5 个 topic,每个 topic 都是 1 个分区,并且设置 tasks.max = 5, 那么理论会启动多少个工作呢?答案是 1 个,工作数与 topic 数量没有关系。

TDengine Sink Connector 应用示例

这一部分咱们在一台 Linux 服务器上搭建测试环境,并运行简略的示例程序。示例中将 Kafka 部署到了集体的 home 目录。操作时请留神把门路中的用户名(bding)替换为本人的用户名。

· 环境筹备

  1. Java 1.8
  2. Maven
  3. 装置并启动了 TDengine 相干服务过程:taosd 和 taosAdapter。

第一步:装置 Kafka

wget https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz

编辑 .bash_profile,退出:

export KAFKA_HOME=/home/bding/kafka_2.13-3.2.0
export PATH=$PATH:$KAFKA_HOME/bin

source .bash_profile

第二步:配置 Kafka

配置 Kafka Connect 加载插件的门路。

cd kafka_2.13-3.2.0/config/
vi connect-standalone.properties

追加
plugin.path=/home/bding/connectors
批改 Connector 插件的日志级别。这一步十分重要,咱们将通过插件的日志统计同步数据破费的工夫。
vi connect-log4j.properties
追加

log4j.logger.com.taosdata.kafka.connect.sink=DEBUG

第三步:编译并装置插件

git clone git@github.com:taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package
unzip -d ~/connectors target/components/packages/taosdata-kafka-connect-tdengine-*.zip

第四步:启动 ZooKeeper Server 和 Kafka Server

zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

第五步:创立 topic

kafka-topics.sh --create --topic meters --partitions 1 --bootstrap-server localhost:9092

第六步:生成测试数据

将下列脚本保留为 gen-data.py:

#!/usr/bin/python3
import random
import sys
topic = sys.argv[1]
count = int(sys.argv[2])
start_ts = 1648432611249000000
location = ["SanFrancisco", "LosAngeles", "SanDiego"]
for i in range(count):
    ts = start_ts + i
    row = f"{topic},location={location[i % 3]},groupid=2 current={random.random() * 10},voltage={random.randint(100, 300)},phase={random.random()} {ts}"
    print(row)

而后执行:

python3 gen-data.py meters 10000  | kafka-console-producer.sh --broker-list localhost:9092 --topic meters

生成 10000 条 InfluxDB 行协定格局的数据到 topic meters。每条数据又蕴含 2 个标签字段和 3 个数据字段。

第七步:启动 Kafka Connect

将下列配置保留为 sink-test.properties。

name=TDengineSinkConnector
connector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnector
tasks.max=1
topics=meters
connection.url=jdbc:TAOS://127.0.0.1:6030
connection.user=root
connection.password=taosdata
connection.database=power
db.schemaless=line
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

而后执行:

connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties

第八步:查看 TDengine 中的数据

应用 TDengine CLI 查问 power 数据库 meters 表,查看是否正好蕴含 10000 条数据。

[bding@vm95 test]$ taos
 
Welcome to the TDengine shell from Linux, Client Version:2.6.0.4
Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.
taos> select count(*) from power.meters;
       count(*)        |
========================
                 10000 |

TDengine Sink Connector 性能测试

· 测试流程

这一部分,咱们将下面示例步骤中的第四步到第七步封装成可反复运行的 shell 脚本,并做以下批改:

  1. 将 topic 的分区数作为脚本的第 1 个参数,同时配置 tasks.max,使其等于分区数。这样咱们能够管制每次测试应用的写入线程数。
  2. 将生成测试数据的条数作为脚本的第 2 个参数,用来管制每次测试同步的数据量。
  3. 启动测试前清空所有数据,测试完结后进行 Connect、Kafka 和 ZooKeeper。

每次测试都先写数据到 Kafka,而后再启动 Connect 同步数据到 TDengine,这样做能够把同步数据的压力全副集中到 Sink 插件这边。咱们统计 Sink Connector 从接管到第一批数据到接管到最初一批数据之间的工夫,作为同步数据的总耗时。

残缺脚本如下:

#!/bin/bash
if [$# -lt 2];then
        echo  "Usage: ./run-test.sh <num_of_partitions>  <total_records>"
        exit 0
fi
echo "---------------------------TEST STARTED---------------------------------------"
echo clean data and logs
taos -s "DROP DATABASE IF EXISTS power"
rm -rf /tmp/kafka-logs /tmp/zookeeper
rm -f $KAFKA_HOME/logs/connect.log
np=$1     # number of partitions
total=$2  # number of records
echo number of partitions is $np, number of recordes is $total.
echo start zookeeper
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
echo start kafka
sleep 3
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
sleep 5
echo create topic
kafka-topics.sh --create --topic meters --partitions $np --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic meters --bootstrap-server localhost:9092
echo generate test data
python3 gen-data.py meters $total  | kafka-console-producer.sh --broker-list localhost:9092 --topic meters
 
echo alter connector configuration setting tasks.max=$np
sed -i  "s/tasks.max=.*/tasks.max=${np}/"  sink-test.properties
echo start kafka connect
connect-standalone.sh -daemon $KAFKA_HOME/config/connect-standalone.properties sink-test.properties
 
echo -e "\e[1;31m open another console to monitor connect.log. press enter when no more data received.\e[0m"
read
 
echo stop connect
jps | grep ConnectStandalone | awk '{print $1}' | xargs kill
echo stop kafka server
kafka-server-stop.sh
echo stop zookeeper
zookeeper-server-stop.sh
# extract timestamps of receiving the first batch of data and the last batch of data
grep "records" $KAFKA_HOME/logs/connect.log  | grep meters- > tmp.log
start_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | head -1`
stop_time=`cat tmp.log | grep -Eo "[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}" | tail -1`
 
echo "--------------------------TEST FINISHED------------------------------------"
echo "| records | partitions | start time | stop time |"
echo "|---------|------------|------------|-----------|"
echo "| $total | $np | $start_time | $stop_time |"

如果要测试应用 1 个分区,共 100 万条数据的性能,能够这样执行:
./run-test.sh 1 1000000
执行过程的截图如下:

留神两头有一个交互过程。因为脚本无奈确定数据是否同步完,须要用户监控 connect.log 来确定是否曾经生产完了所有数据,例如:

[bding@vm95 ~]$ cd kafka_2.13-3.2.0/logs/
[bding@vm95 logs]$ tail -f connect.log
[2022-06-21 17:39:00,176] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314496). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
[2022-06-21 17:39:00,180] DEBUG [TDengineSinkConnector|task-0] Received 500 records. First record kafka coordinates:(meters-0-314996). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)

当日志不再滚动,就阐明曾经生产完了

· 测试后果

写入速度与数据量和线程数的关系表

上表第 1 列为总数据量,第 1 行为消费者线程数,也是写入线程数。两头为均匀每秒写入记录数。

写入速度与数据量和线程数的关系图

后果剖析

从上图能够看出,雷同数据量,线程越多写入速度越快。当应用单线程写入时,每秒能写入大略 10 万以上。当应用 5 个线程写入时,每秒写入大略 35 万左右。当应用 10 个线程时,每秒能写入 55 万左右。

写入速度比拟安稳,与总数据量关系不大。

同时也发现线程减少越多,线程减少带来的速度晋升越少。线程数从 1 变到 10,速度只从 10 万变到 50 万。可能的起因是数据在各个分区散布不平均。有的 task 执行工夫长,有的 task 执行工夫短,数据量越大,数据歪斜越大。比方 1000 万数据,10 个分区的时候,各分区的数据量:

[bding@vm95 kafka-logs]$ du -h ./ -d 1
125M    ./meters-8
149M    ./meters-7
119M    ./meters-9
138M    ./meters-4
110M    ./meters-3
158M    ./meters-6
131M    ./meters-5
105M    ./meters-0
113M    ./meters-2
99M     ./meters-1

另一个影响多线程写入速度的是数据的乱序水平。本测试场景中,多条工夫线的数据随机调配到了不同分区,当单线程写入时(即 1 个分区时),数据是严格有序的,写入速度最快。线程越多乱序水平越大。

所以在理论利用场景中,倡议将同一个子表的数据,放在 Kafka 同一个分区中。

附录

· 测试程序

本文中用到的所有代码和原始测试后果数据都已上传到 GitHub 仓库。

· 测试环境


想理解更多 TDengine Database 的具体细节,欢送大家在 GitHub 上查看相干源代码。

退出移动版