共计 9705 个字符,预计需要花费 25 分钟才能阅读完成。
1. 概述
Apache Doris(原百度 Palo)是一款基于大规模并行处理技术的分布式 SQL 数据仓库,由百度在 2017 年开源,2018 年 8 月进入 Apache 孵化器。
Apache Doris 是一个现代化的 MPP 剖析型数据库产品。仅需亚秒级响应工夫即可取得查问后果,无效地反对实时数据分析。Apache Doris 的分布式架构十分简洁,易于运维,并且能够反对 10PB 以上的超大数据集。
Apache Doris 能够满足多种数据分析需要,例如固定历史报表,实时数据分析,交互式数据分析和摸索式数据分析等。令您的数据分析工作更加简略高效!
2. 场景介绍
这里咱们介绍的是通过 Doris 提供的 Stream load 联合 Flink 计算引擎怎么实现数据实时疾速入库操作。
应用环境如下:
mysql 5.x/8.x (次要是业务数据库)
kafka 2.11 (音讯队列)
flink 1.10.1(流式计算引擎)doris 0.14.7(外围数仓)Canal(Mysql binlog 数据采集工具)
3. 实现计划
这里咱们采纳的历史数据离线解决 + 增量数据实时处理的架构
3.1 历史数据离线解决
历史数据离线解决形式,这里咱们应用是 Doris ODBC 表面形式,将 mysql 的表映射到 doris 里,而后应用
insert into <doris_table_name> select * from <mysql_odbc_external_table>
3.1.1 表面创立办法
首先 Apache Doris 0.13.x 以上版本
要在所有的 BE 节点装置对应数据的 ODBC 驱动
创立表面
具体能够参考我的另外一篇文章,这里不多做介绍
[Apache doris ODBC 表面应用形式]
https://mp.weixin.qq.com/s/J0suRGPNkxD6oHSRFK6KTA
3.2 增量数据实时处理
增量数据的实时处理,这里咱们是通过 Canal 监控 Mysql binlog 解析并推送到指定的 Kafka 队列,而后通过 Flink 去实时生产 Kafka 队列的数据,而后你能够依据本人的须要对数据进行解决,算法等,最初将明细数据或者实时计算的两头后果保留到对应的 doris 数据表中,这里应用的是 stream load,你能够应用 Flink doris connector。
3.2.1 doris sink 实现
这里咱们首先实现一个 Flink doris sink
import com.alibaba.fastjson.JSON;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* 自定义 flink doris sink
*/
public class DorisSink extends RichSinkFunction<String> {
private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
private DorisStreamLoad dorisStreamLoad;
private String columns;
private String jsonFormat;
public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
this.dorisStreamLoad = dorisStreamLoad;
this.columns = columns;
this.jsonFormat = jsonFormat;
}
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
}
/**
* 判断 StreamLoad 是否胜利
*
* @param respContent streamload 返回的响应信息(JSON 格局)* @return
*/
public static Boolean checkStreamLoadStatus(RespContent respContent) {if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
&& respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {return true;} else {return false;}
}
@Override
public void invoke(String value, Context context) throws Exception {DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
if (loadResponse != null && loadResponse.status == 200) {RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
if (!checkStreamLoadStatus(respContent)) {log.error("Stream Load fail{}:", loadResponse);
}
} else {log.error("Stream Load Request failed:{}", loadResponse);
}
}
}
3.2.2 Stream Load 工具类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.io.IOException;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Calendar;
import java.util.UUID;
/**
* doris streamLoad
*/
public class DorisStreamLoad implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
// 连贯地址,这里应用的是连贯 FE
private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
//fe ip 地址
private String hostPort;
// 数据库
private String db;
// 要导入的数据表名
private String tbl;
// 用户名
private String user;
// 明码
private String passwd;
private String loadUrlStr;
private String authEncoding;
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
this.hostPort = hostPort;
this.db = db;
this.tbl = tbl;
this.user = user;
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
}
// 获取 http 连贯信息
private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", "Basic" + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
conn.addRequestProperty("max_filter_ratio", "0");
conn.addRequestProperty("strict_mode", "true");
conn.addRequestProperty("columns", columns);
conn.addRequestProperty("format", "json");
conn.addRequestProperty("jsonpaths", jsonformat);
conn.addRequestProperty("strip_outer_array", "true");
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
}
public static class LoadResponse {
public int status;
public String respMsg;
public String respContent;
public LoadResponse(int status, String respMsg, String respContent) {
this.status = status;
this.respMsg = respMsg;
this.respContent = respContent;
}
@Override
public String toString() {StringBuilder sb = new StringBuilder();
sb.append("status:").append(status);
sb.append(", resp msg:").append(respMsg);
sb.append(", resp content:").append(respContent);
return sb.toString();}
}
// 执行数据导入
public LoadResponse loadBatch(String data, String columns, String jsonformat) {Calendar calendar = Calendar.getInstance();
// 导入的 lable,全局惟一
String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
UUID.randomUUID().toString().replaceAll("-", ""));
HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
try {
// build request and send to fe
feConn = getConnection(loadUrlStr, label, columns, jsonformat);
int status = feConn.getResponseCode();
// fe send back http response code TEMPORARY_REDIRECT 307 and new be location
if (status != 307) {throw new Exception("status is not TEMPORARY_REDIRECT 307, status:" + status);
}
String location = feConn.getHeaderField("Location");
if (location == null) {throw new Exception("redirect location is null");
}
// build request and send to new be location
beConn = getConnection(location, label, columns, jsonformat);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(data.getBytes());
bos.close();
// get respond
status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
InputStream stream = (InputStream) beConn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {response.append(line);
}
return new LoadResponse(status, respMsg, response.toString());
} catch (Exception e) {e.printStackTrace();
String err = "failed to load audit via AuditLoader plugin with label:" + label;
log.warn(err, e);
return new LoadResponse(-1, e.getMessage(), err);
} finally {if (feConn != null) {feConn.disconnect();
}
if (beConn != null) {beConn.disconnect();
}
}
}
}
3.2.3 Flink Job
这个中央演示的是单表,如果是你通过 Canal 监听的多个表的数据,这里你须要依据表名进行辨别,并和你 mysql 表和 doris 里的表建好对应关系,解析相应的数据即可
import org.apache.doris.demo.flink.DorisSink;
import org.apache.doris.demo.flink.DorisStreamLoad;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
*
* This example mainly demonstrates how to use flink to stream Kafka data.
* And use the doris streamLoad method to write the data into the table specified by doris
* <p>
* Kafka data format is an array, For example: ["id":1,"name":"root"]
*/
public class FlinkKafka2Doris {
//kafka address
private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";
//kafka groupName
private static final String groupName = "test_flink_doris_group";
//kafka topicName
private static final String topicName = "test_flink_doris";
//doris ip port
private static final String hostPort = "xxx:8030";
//doris dbName
private static final String dbName = "db1";
//doris tbName
private static final String tbName = "tb1";
//doris userName
private static final String userName = "root";
//doris password
private static final String password = "";
//doris columns
private static final String columns = "name,age,price,sale";
//json format
private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServer);
props.put("group.id", groupName);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", "10000");
StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
blinkStreamEnv.enableCheckpointing(10000);
blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
new SimpleStringSchema(),
props);
DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));
blinkStreamEnv.execute("flink kafka to doris");
}
}
而后将 Flink Job 提交到集群上就能够运行了,数据就能够试试入库
这里其实是一个微批处理,你能够本人欠缺以下几局部:
每个批次最大入库记录数,或者每个多少秒进行一次入库,如果你的实时数据量比拟小,或者你的数据比拟大,这两条件哪个先到执行哪个
这里连贯是 FE,你能够通过 FE 的 rest api 接口拿到所有的 BE 节点,间接连贯 BE 进行入库,URL 地址只是将 FE 的 ip 和端口换成 BE 的 IP 及 http 端口即可
为了防止你连贯这个 BE 或者 FE 的时候,正好这个节点挂了,你能够进行重试其余 FE 或者 BE
为了防止单个节点压力,你能够进行轮训 BE 节点,不要每次都连贯同一个 BE 节点
设置最大重试次数,如果超过这个次数,能够将导入失败的数据推送到 Kafka 队列,以不便后续人工手动解决
4. 总结
本文只是抛砖引玉的形式给大家一个应用 Stream load 进行数据接入的应用形式及示例,Doris 还有很多数据接入的形式期待大家去摸索