1.概述

Apache Doris(原百度 Palo)是一款基于大规模并行处理技术的分布式 SQL 数据仓库,由百度在 2017 年开源,2018 年 8 月进入 Apache 孵化器。

Apache Doris是一个现代化的MPP剖析型数据库产品。仅需亚秒级响应工夫即可取得查问后果,无效地反对实时数据分析。Apache Doris的分布式架构十分简洁,易于运维,并且能够反对10PB以上的超大数据集。

Apache Doris能够满足多种数据分析需要,例如固定历史报表,实时数据分析,交互式数据分析和摸索式数据分析等。令您的数据分析工作更加简略高效!

2.场景介绍

这里咱们介绍的是通过Doris提供的Stream load 联合Flink计算引擎怎么实现数据实时疾速入库操作。

应用环境如下:

  mysql 5.x/8.x (次要是业务数据库)  kafka 2.11 (音讯队列)  flink 1.10.1 (流式计算引擎)  doris 0.14.7  (外围数仓)  Canal (Mysql binlog数据采集工具)

3.实现计划

这里咱们采纳的历史数据离线解决+增量数据实时处理的架构

3.1 历史数据离线解决

历史数据离线解决形式,这里咱们应用是Doris ODBC表面形式,将mysql的表映射到doris里,而后应用

 insert into <doris_table_name>  select * from <mysql_odbc_external_table>

3.1.1 表面创立办法

首先Apache Doris 0.13.x以上版本
要在所有的BE节点装置对应数据的ODBC驱动
创立表面
具体能够参考我的另外一篇文章,这里不多做介绍

[Apache doris ODBC表面应用形式]

https://mp.weixin.qq.com/s/J0suRGPNkxD6oHSRFK6KTA

3.2 增量数据实时处理

增量数据的实时处理,这里咱们是通过 Canal 监控 Mysql binlog 解析并推送到指定的 Kafka 队列,而后通过 Flink 去实时生产Kafka队列的数据,而后你能够依据本人的须要对数据进行解决,算法等,最初将明细数据或者实时计算的两头后果保留到对应的doris数据表中,这里应用的是stream load,你能够应用Flink doris connector。

3.2.1 doris sink实现

这里咱们首先实现一个Flink doris sink

 import com.alibaba.fastjson.JSON; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  import java.util.ArrayList; import java.util.Arrays; import java.util.List;  /**  * 自定义flink doris sink  */ public class DorisSink extends RichSinkFunction<String> {      private static final Logger log = LoggerFactory.getLogger(DorisSink.class);      private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));      private DorisStreamLoad dorisStreamLoad;      private String columns;      private String jsonFormat;      public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {         this.dorisStreamLoad = dorisStreamLoad;         this.columns = columns;         this.jsonFormat = jsonFormat;     }      @Override     public void open(Configuration parameters) throws Exception {         super.open(parameters);     }       /**      * 判断StreamLoad是否胜利      *      * @param respContent streamload返回的响应信息(JSON格局)      * @return      */     public static Boolean checkStreamLoadStatus(RespContent respContent) {         if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())                 && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {             return true;         } else {             return false;         }     }      @Override     public void invoke(String value, Context context) throws Exception {         DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);         if (loadResponse != null && loadResponse.status == 200) {             RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);             if (!checkStreamLoadStatus(respContent)) {                 log.error("Stream Load fail{}:", loadResponse);             }         } else {             log.error("Stream Load Request failed:{}", loadResponse);         }     } }3.2.2 Stream Load 工具类 import org.slf4j.Logger; import org.slf4j.LoggerFactory;   import java.io.Serializable; import java.io.IOException; import java.io.BufferedOutputStream; import java.io.InputStream; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Calendar; import java.util.UUID;   /**  * doris streamLoad  */  public class DorisStreamLoad implements Serializable {      private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);     //连贯地址,这里应用的是连贯FE     private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";     //fe ip地址     private String hostPort;     //数据库     private String db;     //要导入的数据表名     private String tbl;     //用户名     private String user;     //明码     private String passwd;     private String loadUrlStr;     private String authEncoding;       public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {         this.hostPort = hostPort;         this.db = db;         this.tbl = tbl;         this.user = user;         this.passwd = passwd;         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));     }     //获取http连贯信息     private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {         URL url = new URL(urlStr);         HttpURLConnection conn = (HttpURLConnection) url.openConnection();         conn.setInstanceFollowRedirects(false);         conn.setRequestMethod("PUT");         conn.setRequestProperty("Authorization", "Basic " + authEncoding);         conn.addRequestProperty("Expect", "100-continue");         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");         conn.addRequestProperty("label", label);         conn.addRequestProperty("max_filter_ratio", "0");         conn.addRequestProperty("strict_mode", "true");         conn.addRequestProperty("columns", columns);         conn.addRequestProperty("format", "json");         conn.addRequestProperty("jsonpaths", jsonformat);         conn.addRequestProperty("strip_outer_array", "true");         conn.setDoOutput(true);         conn.setDoInput(true);          return conn;     }      public static class LoadResponse {         public int status;         public String respMsg;         public String respContent;          public LoadResponse(int status, String respMsg, String respContent) {             this.status = status;             this.respMsg = respMsg;             this.respContent = respContent;         }          @Override         public String toString() {             StringBuilder sb = new StringBuilder();             sb.append("status: ").append(status);             sb.append(", resp msg: ").append(respMsg);             sb.append(", resp content: ").append(respContent);             return sb.toString();         }     }     //执行数据导入     public LoadResponse loadBatch(String data, String columns, String jsonformat) {         Calendar calendar = Calendar.getInstance();         //导入的lable,全局惟一         String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s",                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),                 calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),                 UUID.randomUUID().toString().replaceAll("-", ""));          HttpURLConnection feConn = null;         HttpURLConnection beConn = null;         try {             // build request and send to fe             feConn = getConnection(loadUrlStr, label, columns, jsonformat);             int status = feConn.getResponseCode();             // fe send back http response code TEMPORARY_REDIRECT 307 and new be location             if (status != 307) {                 throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);             }             String location = feConn.getHeaderField("Location");             if (location == null) {                 throw new Exception("redirect location is null");             }             // build request and send to new be location             beConn = getConnection(location, label, columns, jsonformat);             // send data to be             BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());             bos.write(data.getBytes());             bos.close();              // get respond             status = beConn.getResponseCode();             String respMsg = beConn.getResponseMessage();             InputStream stream = (InputStream) beConn.getContent();             BufferedReader br = new BufferedReader(new InputStreamReader(stream));             StringBuilder response = new StringBuilder();             String line;             while ((line = br.readLine()) != null) {                 response.append(line);             }             return new LoadResponse(status, respMsg, response.toString());          } catch (Exception e) {             e.printStackTrace();             String err = "failed to load audit via AuditLoader plugin with label: " + label;             log.warn(err, e);             return new LoadResponse(-1, e.getMessage(), err);         } finally {             if (feConn != null) {                 feConn.disconnect();             }             if (beConn != null) {                 beConn.disconnect();             }         }     }  }

3.2.3 Flink Job

这个中央演示的是单表,如果是你通过Canal监听的多个表的数据,这里你须要依据表名进行辨别,并和你mysql表和doris里的表建好对应关系,解析相应的数据即可

import org.apache.doris.demo.flink.DorisSink;import org.apache.doris.demo.flink.DorisStreamLoad;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;/** * * This example mainly demonstrates how to use flink to stream Kafka data. *  And use the doris streamLoad method to write the data into the table specified by doris * <p> * Kafka data format is an array, For example: ["id":1,"name":"root"] */public class FlinkKafka2Doris {    //kafka address    private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";    //kafka groupName    private static final String groupName = "test_flink_doris_group";    //kafka topicName    private static final String topicName = "test_flink_doris";    //doris ip port    private static final String hostPort = "xxx:8030";    //doris dbName    private static final String dbName = "db1";    //doris tbName    private static final String tbName = "tb1";    //doris userName    private static final String userName = "root";    //doris password    private static final String password = "";    //doris columns    private static final String columns = "name,age,price,sale";    //json format    private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";    public static void main(String[] args) throws Exception {        Properties props = new Properties();        props.put("bootstrap.servers", bootstrapServer);        props.put("group.id", groupName);        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("auto.offset.reset", "earliest");        props.put("max.poll.records", "10000");        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();        blinkStreamEnv.enableCheckpointing(10000);        blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,                new SimpleStringSchema(),                props);        DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);        dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));        blinkStreamEnv.execute("flink kafka to doris");    }}

而后将Flink Job提交到集群上就能够运行了,数据就能够试试入库

这里其实是一个微批处理,你能够本人欠缺以下几局部:

每个批次最大入库记录数,或者每个多少秒进行一次入库,如果你的实时数据量比拟小,或者你的数据比拟大,这两条件哪个先到执行哪个
这里连贯是FE,你能够通过FE的 rest api接口拿到所有的BE节点,间接连贯BE进行入库,URL地址只是将FE的ip和端口换成BE的IP及http 端口即可
为了防止你连贯这个BE或者FE的时候,正好这个节点挂了,你能够进行重试其余FE或者BE
为了防止单个节点压力,你能够进行轮训BE节点,不要每次都连贯同一个BE节点
设置最大重试次数,如果超过这个次数,能够将导入失败的数据推送到Kafka队列,以不便后续人工手动解决

4.总结

本文只是抛砖引玉的形式给大家一个应用Stream load进行数据接入的应用形式及示例,Doris还有很多数据接入的形式期待大家去摸索