概述

该计划首先感激社区Spark Doris Connector的作者

从Doris角度看,将其数据引入Flink,能够应用Flink一系列丰盛的生态产品,拓宽了产品的想象力,也使得Doris和其余数据源的联结查问成为可能

从咱们业务架构登程和业务需要,咱们抉择了Flink作为咱们架构的一部分,用于数据的ETL及实时计算框架,社区目前反对Spark doris connector,因而咱们参照Spark doris connector 设计开发了Flink doris Connector。

技术选型

一开始咱们选型的时候,也是和Spark Doris Connector 一样,开始思考的是JDBC的形式,然而这种形式就像Spark doris connector那篇文章中说的,有长处,然而毛病更显著。起初咱们浏览及测试了Spark的代码,决定站在伟人的肩上来实现(备注:间接拷贝代码批改)。

以下内容来自Spark Doris Connector博客的,间接拷贝了

于是咱们开发了针对Doris的新的Datasource,Spark-Doris-Connector。这种计划下,Doris能够裸露Doris数据分布给Spark。Spark的Driver拜访Doris的FE获取Doris表的Schema和底层数据分布。之后,根据此数据分布,正当调配数据查问工作给Executors。最初,Spark的Executors别离拜访不同的BE进行查问。大大晋升了查问的效率

应用办法

在Doris的代码库的 extension/flink-doris-connector/ 目录下编译生成doris-flink-1.0.0-SNAPSHOT.jar,将这个jar包退出flink的ClassPath中,即可应用Flink-on-Doris性能了

SQL形式

CREATE TABLE flink_doris_source (    name STRING,    age INT,    price DECIMAL(5,2),    sale DOUBLE    )     WITH (      'connector' = 'doris',      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',      'username' = '$YOUR_DORIS_USERNAME',      'password' = '$YOUR_DORIS_PASSWORD');CREATE TABLE flink_doris_sink (    name STRING,    age INT,    price DECIMAL(5,2),    sale DOUBLE    )     WITH (      'connector' = 'doris',      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',      'username' = '$YOUR_DORIS_USERNAME',      'password' = '$YOUR_DORIS_PASSWORD');INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_sourceDataStream形式DorisOptions.Builder options = DorisOptions.builder()                .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")                .setUsername("$YOUR_DORIS_USERNAME")                .setPassword("$YOUR_DORIS_PASSWORD")                .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME");env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();

实用场景

1.应用Flink对Doris中的数据和其余数据源进行联结剖析

很多业务部门会将本人的数据放在不同的存储系统上,比方一些在线剖析、报表的数据放在Doris中,一些结构化检索数据放在Elasticsearch中、一些须要事物的数据放在MySQL中,等等。业务往往须要跨多个存储源进行剖析,通过Flink Doris Connector买通Flink和Doris后,业务能够间接应用Flink,将Doris中的数据与多个内部数据源做联结查问计算。

2.实时数据接入

Flink Doris Connector之前:针对业务不规则数据,常常须要针对音讯做标准解决,空值过滤等写入新的topic,而后再启动Routine load写入Doris。

Flink Doris Connector之后:flink读取kafka,间接写入doris。

技术实现

架构图

Doris对外提供更多能力

Doris FE
对外开放了获取外部表的元数据信息、单表查问布局和局部统计信息的接口。

所有的Rest API接口都须要进行HttpBasic认证,用户名和明码是登录数据库的用户名和明码,须要留神权限的正确调配。

// 获取表schema元信息GET api/{database}/{table}/_schema// 获取对单表的查问布局模版POST api/{database}/{table}/_query_plan{"sql": "select k1, k2 from {database}.{table}"}// 获取表大小GET api/{database}/{table}/_countDoris BE通过Thrift协定,间接对外提供数据的过滤、扫描和裁剪能力。service TDorisExternalService {    // 初始化查问执行器TScanOpenResult open_scanner(1: TScanOpenParams params);// 流式batch获取数据,Apache Arrow数据格式    TScanBatchResult get_next(1: TScanNextBatchParams params);// 完结扫描    TScanCloseResult close_scanner(1: TScanCloseParams params);}Thrift相干构造体定义可参考:https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift

实现DataStream
继承 org.apache.flink.streaming.api.functions.source.RichSourceFunction ,自定义DorisSourceFunction,初始化时,获取相干表的执行打算,获取对应的分区。

重写run办法,循环从分区中读取数据。

public void run(SourceContext sourceContext){       //循环读取各分区        for(PartitionDefinition partitions : dorisPartitions){            scalaValueReader = new ScalaValueReader(partitions, settings);            while (scalaValueReader.hasNext()){                Object next = scalaValueReader.next();                sourceContext.collect(next);            }        }}

实现Flink SQL on Doris
参考了Flink自定义Source&Sink 和 Flink-jdbc-connector,实现了上面的成果,能够实现用Flink SQL间接操作Doris的表,包含读和写。

实现细节

1.实现DynamicTableSourceFactory , DynamicTableSinkFactory 注册 doris connector

2.自定义DynamicTableSource和DynamicTableSink 生成逻辑打算

3.DorisRowDataInputFormat和DorisDynamicOutputFormat获取到逻辑打算后开始执行。

实现中最次要的是基于RichInputFormat和RichOutputFormat 定制的DorisRowDataInputFormat和DorisDynamicOutputFormat。

在DorisRowDataInputFormat中,将获取到的dorisPartitions 在createInputSplits中 切分成多个分片,用于并行计算。

public DorisTableInputSplit[] createInputSplits(int minNumSplits) {        List<DorisTableInputSplit> dorisSplits = new ArrayList<>();        int splitNum = 0;        for (PartitionDefinition partition : dorisPartitions) {            dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));        }        return dorisSplits.toArray(new DorisTableInputSplit[0]);} public RowData nextRecord(RowData reuse)  {        if (!hasNext) {            //曾经读完数据,返回null            return null;        }        List next = (List)scalaValueReader.next();        GenericRowData genericRowData = new GenericRowData(next.size());        for(int i =0;i<next.size();i++){            genericRowData.setField(i, next.get(i));        }        //判断是否还有数据        hasNext = scalaValueReader.hasNext();        return genericRowData;}在DorisRowDataOutputFormat中,通过streamload的形式向doris中写数据。streamload程序参考org.apache.doris.plugin.audit.DorisStreamLoaderpublic  void writeRecord(RowData row) throws IOException {       //streamload 默认分隔符 \t        StringJoiner value = new StringJoiner("\t");        GenericRowData rowData = (GenericRowData) row;        for(int i = 0; i < row.getArity(); ++i) {            value.add(rowData.getField(i).toString());        }        //streamload 写数据        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value.toString());        System.out.println(loadResponse);}