本文次要从 Binlog 实时采集和离线解决 Binlog 还原业务数据两个方面,来介绍如何实现 DB 数据精确、高效地进入 Hive 数仓。
背景
在数据仓库建模中,未经任何加工解决的原始业务层数据,咱们称之为 ODS(Operational Data Store) 数据。在互联网企业中,常见的 ODS 数据有业务日志数据(Log)和业务 DB 数据(DB)两类。对于业务 DB 数据来说,从 MySQL 等关系型数据库的业务数据进行采集,而后导入到 Hive 中,是进行数据仓库生产的重要环节。如何精确、高效地把 MySQL 数据同步到 Hive 中?个别罕用的解决方案是批量取数并 Load:直连 MySQL 去 Select 表中的数据,而后存到本地文件作为两头存储,最初把文件 Load 到 Hive 表中。这种计划的长处是实现简略,然而随着业务的倒退,毛病也逐步裸露进去:
- 性能瓶颈:随着业务规模的增长,Select From MySQL -> Save to Localfile -> Load to Hive 这种数据流破费的工夫越来越长,无奈满足上游数仓生产的工夫要求。
- 间接从 MySQL 中 Select 大量数据,对 MySQL 的影响十分大,容易造成慢查问,影响业务线上的失常服务。
- 因为 Hive 自身的语法不反对更新、删除等 SQL 原语 (高版本 Hive 反对,然而须要分桶 +ORC 存储格局),对于 MySQL 中产生 Update/Delete 的数据无奈很好地进行反对。
为了彻底解决这些问题,咱们逐渐转向 CDC (Change Data Capture) + Merge 的技术计划,即实时 Binlog 采集 + 离线解决 Binlog 还原业务数据这样一套解决方案。Binlog 是 MySQL 的二进制日志,记录了 MySQL 中产生的所有数据变更,MySQL 集群本身的主从同步就是基于 Binlog 做的。
实现思路
首先,采纳 Flink 负责把 Kafka 上的 Binlog 数据拉取到 HDFS 上。
而后,对每张 ODS 表,首先须要一次性制作快照(Snapshot),把 MySQL 里的存量数据读取到 Hive 上,这一过程底层采纳直连 MySQL 去 Select 数据的形式,能够应用 Sqoop 进行一次性全量导入。
最初,对每张 ODS 表,每天基于存量数据和当天增量产生的 Binlog 做 Merge,从而还原出业务数据。
Binlog 是流式产生的,通过对 Binlog 的实时采集,把局部数据处理需要由每天一次的批处理摊派到实时流上。无论从性能上还是对 MySQL 的拜访压力上,都会有显著地改善。Binlog 自身记录了数据变更的类型(Insert/Update/Delete),通过一些语义方面的解决,齐全可能做到精准的数据还原。
实现计划
Flink 解决 Kafka 的 binlog 日志
应用 kafka source,对读取的数据进行 JSON 解析,将解析的字段拼接成字符串,合乎 Hive 的 schema 格局,具体代码如下:
package com.etl.kafka2hdfs;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Map;
import java.util.Properties;
/**
* @Created with IntelliJ IDEA.
* @author : jmx
* @Date: 2020/3/27
* @Time: 12:52
*
*/
public class HdfsSink {public static void main(String[] args) throws Exception {
String fieldDelimiter = ",";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// checkpoint
env.enableCheckpointing(10_000);
//env.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
env.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// source
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
// only required for Kafka 0.8
props.setProperty("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181");
props.setProperty("group.id", "test123");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("qfbap_ods.code_city", new SimpleStringSchema(), props);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
// transform
SingleOutputStreamOperator<String> cityDS = stream
.filter(new FilterFunction<String>() {
// 过滤掉 DDL 操作
@Override
public boolean filter(String jsonVal) throws Exception {JSONObject record = JSON.parseObject(jsonVal, Feature.OrderedField);
return record.getString("isDdl").equals("false");
}
})
.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {StringBuilder fieldsBuilder = new StringBuilder();
// 解析 JSON 数据
JSONObject record = JSON.parseObject(value, Feature.OrderedField);
// 获取最新的字段值
JSONArray data = record.getJSONArray("data");
// 遍历,字段值的 JSON 数组,只有一个元素
for (int i = 0; i < data.size(); i++) {
// 获取到 JSON 数组的第 i 个元素
JSONObject obj = data.getJSONObject(i);
if (obj != null) {fieldsBuilder.append(record.getLong("id")); // 序号 id
fieldsBuilder.append(fieldDelimiter); // 字段分隔符
fieldsBuilder.append(record.getLong("es")); // 业务工夫戳
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(record.getLong("ts")); // 日志工夫戳
fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(record.getString("type")); // 操作类型
for (Map.Entry<String, Object> entry : obj.entrySet()) {fieldsBuilder.append(fieldDelimiter);
fieldsBuilder.append(entry.getValue()); // 表字段数据
}
}
}
return fieldsBuilder.toString();}
});
//cityDS.print();
//stream.print();
// sink
// 以下条件满足其中之一就会滚动生成新的文件
RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create()
.withRolloverInterval(60L * 1000L) // 滚动写入新文件的工夫,默认 60s。依据具体情况调节
.withMaxPartSize(1024 * 1024 * 128L) // 设置每个文件的最大大小 , 默认是 128M,这里设置为 128M
.withInactivityInterval(60L * 1000L) // 默认 60 秒, 未写入数据处于不沉闷状态超时会滚动新文件
.build();
StreamingFileSink<String> sink = StreamingFileSink
//.forRowFormat(new Path("file:///E://binlog_db/city"), new SimpleStringEncoder<String>())
.forRowFormat(new Path("hdfs://kms-1:8020/binlog_db/code_city_delta"), new SimpleStringEncoder<String>())
.withBucketAssigner(new EventTimeBucketAssigner())
.withRollingPolicy(rollingPolicy)
.withBucketCheckInterval(1000) // 桶查看距离,这里设置 1S
.build();
cityDS.addSink(sink);
env.execute();}
}
对于 Flink Sink 到 HDFS,StreamingFileSink
代替了先前的 BucketingSink
,用来将上游数据存储到 HDFS 的不同目录中。它的外围逻辑是分桶,默认的分桶形式是 DateTimeBucketAssigner
,即依照解决工夫分桶。解决工夫指的是音讯达到 Flink 程序的工夫,这点并不合乎咱们的需要。因而,咱们须要本人编写代码将事件工夫从音讯体中解析进去,按规定生成分桶的名称,具体代码如下:
package com.etl.kafka2hdfs;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Created with IntelliJ IDEA.
* @author : jmx
* @Date: 2020/3/27
* @Time: 12:49
*
*/
public class EventTimeBucketAssigner implements BucketAssigner<String, String> {
@Override
public String getBucketId(String element, Context context) {
String partitionValue;
try {partitionValue = getPartitionValue(element);
} catch (Exception e) {partitionValue = "00000000";}
return "dt=" + partitionValue;// 分区目录名称
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {return SimpleVersionedStringSerializer.INSTANCE;}
private String getPartitionValue(String element) throws Exception {
// 取出最初拼接字符串的 es 字段值,该值为业务工夫
long eventTime = Long.parseLong(element.split(",")[1]);
Date eventDate = new Date(eventTime);
return new SimpleDateFormat("yyyyMMdd").format(eventDate);
}
}
离线还原 MySQL 数据
通过上述步骤,即可将 Binlog 日志记录写入到 HDFS 的对应的分区中,接下来就须要依据增量的数据和存量的数据还原最新的数据。Hive 表保留在 HDFS 上,该文件系统不反对批改,因而咱们须要一些额定工作来写入数据变更。罕用的形式包含:JOIN、Hive 事务、或改用 HBase、kudu。
如昨日的存量数据 code_city, 今日增量的数据为 code_city_delta,能够通过 FULL OUTER JOIN
,将存量和增量数据合并成一张最新的数据表,并作为今天的存量数据:
INSERT OVERWRITE TABLE code_city
SELECT
COALESCE(t2.id, t1.id) AS id,
COALESCE (t2.city, t1.city) AS city,
COALESCE (t2.province, t1.province) AS province,
COALESCE (t2.event_time, t1.event_time) AS event_time
FROM
code_city t1
FULL OUTER JOIN (
SELECT
id,
city,
province,
event_time
FROM
(-- 取最初一条状态数据
SELECT
id,
city,
province,
dml_type,
event_time,
row_number () over ( PARTITION BY id ORDER BY event_time DESC) AS rank
FROM
code_city_delta
WHERE
dt = '20200324' -- 分区数据
) temp
WHERE
rank = 1
) t2 ON t1.id = t2.id;
小结
本文次要从 Binlog 流式采集和基于 Binlog 的 ODS 数据还原两方面,介绍了通过 Flink 实现实时的 ETL,此外还能够将 binlog 日志写入 kudu、HBase 等反对事务操作的 NoSQL 中,这样就能够省去数据表还原的步骤。本文是《基于 Canal 与 Flink 实现数据实时增量同步》的第二篇,对于 canal 解析 Binlog 日志写入 kafka 的实现步骤,参见《基于 Canal 与 Flink 实现数据实时增量同步一》。
refrence:
[1]https://tech.meituan.com/2018…
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包