共计 6249 个字符,预计需要花费 16 分钟才能阅读完成。
小 T 导读:想用 Flink 对接 TDengine?保姆级教程来了。
0、前言
TDengine 是由涛思数据开发并开源的一款高性能、分布式、反对 SQL 的时序数据库(Time-Series Database)。
除了外围的时序数据库性能外,TDengine 还提供缓存、数据订阅、流式计算等大数据平台所须要的系列性能。然而很多小伙伴出于架构的思考,还是须要将数据导出到 Apache Flink、Apache Spark 等平台进行计算剖析。
为了帮忙大家对接,咱们特地推出了保姆级课程,包学包会。
1、技术实现
Apache Flink 提供了 SourceFunction 和 SinkFunction,用来提供 Flink 和内部数据源的连贯,其中 SouceFunction 为从数据源读取数据,SinkFunction 为将数据写入数据源。与此同时,Flink 提供了 RichSourceFunction 和 RichSinkFunction 这两个类(继承自 AbstractRichFunction),提供了额定的初始化 (open(Configuration)) 和销毁办法(close())。通过重写这两个办法,能够防止每次读写数据时都从新建设连贯。
2、代码实现
残缺源码:https://github.com/liuyq-617/…
代码逻辑:
1) 自定义类 SourceFromTDengine
用处:数据源连贯,数据读取
package com.taosdata.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.taosdata.model.Sensor;
import java.sql.*;
import java.util.Properties;
public class SourceFromTDengine extends RichSourceFunction<Sensor> {
Statement statement;
private Connection connection;
private String property;
public SourceFromTDengine(){super();
}
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
String driver = "com.taosdata.jdbc.rs.RestfulDriver";
String host = "u05";
String username = "root";
String password = "taosdata";
String prop = System.getProperty("java.library.path");
Logger LOG = LoggerFactory.getLogger(SourceFromTDengine.class);
LOG.info("java.library.path:{}", prop);
System.out.println(prop);
Class.forName(driver);
Properties properties = new Properties();
connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
, properties);
statement = connection.createStatement();}
@Override
public void close() throws Exception {super.close();
if (connection != null) {connection.close();
}
if (statement != null) {statement.close();
}
}
@Override
public void run(SourceContext<Sensor> sourceContext) throws Exception {
try {
String sql = "select * from tt.meters";
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {Sensor sensor = new Sensor( resultSet.getLong(1),
resultSet.getInt("vol"),
resultSet.getFloat("current"),
resultSet.getString("location").trim());
sourceContext.collect(sensor);
}
} catch (Exception e) {e.printStackTrace();
}
}
@Override
public void cancel() {}
}
2) 自定义类 SinkToTDengine
用处:数据源连贯,数据写入
SinkToTDengine
package com.taosdata.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.taosdata.model.Sensor;
import java.sql.*;
import java.util.Properties;
public class SinkToTDengine extends RichSinkFunction<Sensor> {
Statement statement;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
String driver = "com.taosdata.jdbc.rs.RestfulDriver";
String host = "TAOS-FQDN";
String username = "root";
String password = "taosdata";
String prop = System.getProperty("java.library.path");
System.out.println(prop);
Class.forName(driver);
Properties properties = new Properties();
connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
, properties);
statement = connection.createStatement();}
@Override
public void close() throws Exception {super.close();
if (connection != null) {connection.close();
}
if (statement != null) {statement.close();
}
}
@Override
public void invoke(Sensor sensor, Context context) throws Exception {
try {String sql = String.format("insert into sinktest.%s using sinktest.meters tags('%s') values(%d,%d,%f)",
sensor.getLocation(),
sensor.getLocation(),
sensor.getTs(),
sensor.getVal(),
sensor.getCurrent());
statement.executeUpdate(sql);
} catch (Exception e) {e.printStackTrace();
}
}
}
3) 自定义类 Sensor
用处:定义数据结构,用来承受数据
package com.taosdata.model;
public class Sensor {
public long ts;
public int val;
public float current;
public String location;
public Sensor() {}
public Sensor(long ts, int val, float current, String location) {
this.ts = ts;
this.val = val;
this.current = current;
this.location = location;
}
public long getTs() {return ts;}
public void setTs(long ts) {this.ts = ts;}
public int getVal() {return val;}
public void setVal(int val) {this.val = val;}
public float getCurrent() {return current;}
public void setCurrent(float current) {this.current = current;}
public String getLocation() {return location;}
public void setLocation(String location) {this.location = location;}
@Override
public String toString() {
return "Sensor{" +
"ts=" + ts +
", val=" + val +
", current=" + current +
", location='" + location + '\'' +
'}';
}
}
4) 主程序类 ReadFromTDengine
用处:调用 Flink 进行读取和写入数据
package com.taosdata;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.taosdata.model.Sensor;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
public class ReadFromTDengine {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Sensor> SensorList = env.addSource(new com.taosdata.flink.SourceFromTDengine() );
SensorList.print();
SensorList.addSink(new com.taosdata.flink.SinkToTDengine() );
env.execute();}
}
3、简略测试 RESTful 接口
1) 环境筹备:
a) Flink 装置 & 启动:
- wget https://dlcdn.apache.org/flin…
- tar zxf flink-1.14.3-bin-scala_2.12.tgz -C /usr/local
- /usr/local/flink-1.14.3/bin/start-cluster.sh
b) TDengine Database 环境筹备:
创立原始数据:
- create database tt;
- create table
meters
(ts
TIMESTAMP,vol
INT,current
FLOAT) TAGS (location
BINARY(20)); - insert into beijing using meters tags(‘beijing’) values(now,220,30.2);
创立指标数据库表:
- create database sinktest;
- create table
meters
(ts
TIMESTAMP,vol
INT,current
FLOAT) TAGS (location
BINARY(20));
2) 打包编译:
源码地位:https://github.com/liuyq-617/…
mvn clean package
3) 程序启动:
flink run target/test-flink-1.0-SNAPSHOT-dist.jar
读取数据
- vi log/flink-root-taskexecutor-0-xxxxx.out
- 查看到数据打印:Sensor{ts=1645166073101, val=220, current=5.7, location=’beijing’}
写入数据
- show sinktest.tables;
- 曾经创立了 beijing 子表
- select * from sinktest.beijing;
- 能够查问到刚插入的数据
4、应用 JNI 形式
触类旁通的小伙伴此时曾经猜到,只有把 JDBC URL 批改一下就能够了。
然而 Flink 每次分派作业时都在应用一个新的 ClassLoader,而咱们在计算节点上就会失去“Native library already loaded in another classloader”谬误。
为了防止此问题,能够将 JDBC 的 jar 包放到 Flink 的 lib 目录下,不去调用 dist 包就能够了。
- cp taos-jdbcdriver-2.0.37-dist.jar /usr/local/flink-1.14.3/lib
- flink run target/test-flink-1.0-SNAPSHOT.jar
5、小结
通过在我的项目中引入 SourceFromTDengine 和 SinkToTDengine 两个类,即可实现在 Flink 中对 TDengine 的读写操作。前面咱们会有文章介绍 Spark 和 TDengine 的对接。
注:文中应用的是 JDBC 的 RESTful 接口,这样就不必在 Flink 的节点装置 TDengine,JNI 形式须要在 Flink 节点装置 TDengine Database 的客户端。
想理解更多 TDengine Database 的具体细节,欢送大家在 GitHub 上查看相干源代码。