概述
该计划首先感激社区 Spark Doris Connector 的作者
从 Doris 角度看,将其数据引入 Flink,能够应用 Flink 一系列丰盛的生态产品,拓宽了产品的想象力,也使得 Doris 和其余数据源的联结查问成为可能
从咱们业务架构登程和业务需要,咱们抉择了 Flink 作为咱们架构的一部分,用于数据的 ETL 及实时计算框架,社区目前反对 Spark doris connector,因而咱们参照 Spark doris connector 设计开发了 Flink doris Connector。
技术选型
一开始咱们选型的时候,也是和 Spark Doris Connector 一样,开始思考的是 JDBC 的形式,然而这种形式就像 Spark doris connector 那篇文章中说的,有长处,然而毛病更显著。起初咱们浏览及测试了 Spark 的代码,决定站在伟人的肩上来实现(备注:间接拷贝代码批改)。
以下内容来自 Spark Doris Connector 博客的,间接拷贝了
于是咱们开发了针对 Doris 的新的 Datasource,Spark-Doris-Connector。这种计划下,Doris 能够裸露 Doris 数据分布给 Spark。Spark 的 Driver 拜访 Doris 的 FE 获取 Doris 表的 Schema 和底层数据分布。之后,根据此数据分布,正当调配数据查问工作给 Executors。最初,Spark 的 Executors 别离拜访不同的 BE 进行查问。大大晋升了查问的效率
应用办法
在 Doris 的代码库的 extension/flink-doris-connector/ 目录下编译生成 doris-flink-1.0.0-SNAPSHOT.jar,将这个 jar 包退出 flink 的 ClassPath 中,即可应用 Flink-on-Doris 性能了
SQL 形式
CREATE TABLE flink_doris_source (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
)
WITH (
'connector' = 'doris',
'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
'username' = '$YOUR_DORIS_USERNAME',
'password' = '$YOUR_DORIS_PASSWORD'
);
CREATE TABLE flink_doris_sink (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
)
WITH (
'connector' = 'doris',
'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
'username' = '$YOUR_DORIS_USERNAME',
'password' = '$YOUR_DORIS_PASSWORD'
);
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
DataStream 形式
DorisOptions.Builder options = DorisOptions.builder()
.setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.setUsername("$YOUR_DORIS_USERNAME")
.setPassword("$YOUR_DORIS_PASSWORD")
.setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME");
env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();
实用场景
1. 应用 Flink 对 Doris 中的数据和其余数据源进行联结剖析
很多业务部门会将本人的数据放在不同的存储系统上,比方一些在线剖析、报表的数据放在 Doris 中,一些结构化检索数据放在 Elasticsearch 中、一些须要事物的数据放在 MySQL 中,等等。业务往往须要跨多个存储源进行剖析,通过 Flink Doris Connector 买通 Flink 和 Doris 后,业务能够间接应用 Flink,将 Doris 中的数据与多个内部数据源做联结查问计算。
2. 实时数据接入
Flink Doris Connector 之前:针对业务不规则数据,常常须要针对音讯做标准解决,空值过滤等写入新的 topic,而后再启动 Routine load 写入 Doris。
Flink Doris Connector 之后:flink 读取 kafka,间接写入 doris。
技术实现
架构图
Doris 对外提供更多能力
Doris FE
对外开放了获取外部表的元数据信息、单表查问布局和局部统计信息的接口。
所有的 Rest API 接口都须要进行 HttpBasic 认证,用户名和明码是登录数据库的用户名和明码,须要留神权限的正确调配。
// 获取表 schema 元信息
GET api/{database}/{table}/_schema
// 获取对单表的查问布局模版
POST api/{database}/{table}/_query_plan
{"sql": "select k1, k2 from {database}.{table}"
}
// 获取表大小
GET api/{database}/{table}/_count
Doris BE
通过 Thrift 协定,间接对外提供数据的过滤、扫描和裁剪能力。service TDorisExternalService {
// 初始化查问执行器
TScanOpenResult open_scanner(1: TScanOpenParams params);
// 流式 batch 获取数据,Apache Arrow 数据格式
TScanBatchResult get_next(1: TScanNextBatchParams params);
// 完结扫描
TScanCloseResult close_scanner(1: TScanCloseParams params);
}
Thrift 相干构造体定义可参考:https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift
实现 DataStream
继承 org.apache.flink.streaming.api.functions.source.RichSourceFunction,自定义 DorisSourceFunction,初始化时,获取相干表的执行打算,获取对应的分区。
重写 run 办法,循环从分区中读取数据。
public void run(SourceContext sourceContext){
// 循环读取各分区
for(PartitionDefinition partitions : dorisPartitions){scalaValueReader = new ScalaValueReader(partitions, settings);
while (scalaValueReader.hasNext()){Object next = scalaValueReader.next();
sourceContext.collect(next);
}
}
}
实现 Flink SQL on Doris
参考了 Flink 自定义 Source&Sink 和 Flink-jdbc-connector,实现了上面的成果,能够实现用 Flink SQL 间接操作 Doris 的表,包含读和写。
实现细节
1. 实现 DynamicTableSourceFactory , DynamicTableSinkFactory 注册 doris connector
2. 自定义 DynamicTableSource 和 DynamicTableSink 生成逻辑打算
3.DorisRowDataInputFormat 和 DorisDynamicOutputFormat 获取到逻辑打算后开始执行。
实现中最次要的是基于 RichInputFormat 和 RichOutputFormat 定制的 DorisRowDataInputFormat 和 DorisDynamicOutputFormat。
在 DorisRowDataInputFormat 中,将获取到的 dorisPartitions 在 createInputSplits 中 切分成多个分片,用于并行计算。
public DorisTableInputSplit[] createInputSplits(int minNumSplits) {List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
int splitNum = 0;
for (PartitionDefinition partition : dorisPartitions) {dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));
}
return dorisSplits.toArray(new DorisTableInputSplit[0]);
}
public RowData nextRecord(RowData reuse) {if (!hasNext) {
// 曾经读完数据,返回 null
return null;
}
List next = (List)scalaValueReader.next();
GenericRowData genericRowData = new GenericRowData(next.size());
for(int i =0;i<next.size();i++){genericRowData.setField(i, next.get(i));
}
// 判断是否还有数据
hasNext = scalaValueReader.hasNext();
return genericRowData;
}
在 DorisRowDataOutputFormat 中,通过 streamload 的形式向 doris 中写数据。streamload 程序参考 org.apache.doris.plugin.audit.DorisStreamLoader
public void writeRecord(RowData row) throws IOException {
//streamload 默认分隔符 \t
StringJoiner value = new StringJoiner("\t");
GenericRowData rowData = (GenericRowData) row;
for(int i = 0; i < row.getArity(); ++i) {value.add(rowData.getField(i).toString());
}
//streamload 写数据
DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value.toString());
System.out.println(loadResponse);
}