乐趣区

关于flink:基于Canal与Flink实现数据实时增量同步二

本文次要从 Binlog 实时采集和离线解决 Binlog 还原业务数据两个方面,来介绍如何实现 DB 数据精确、高效地进入 Hive 数仓。

背景

在数据仓库建模中,未经任何加工解决的原始业务层数据,咱们称之为 ODS(Operational Data Store) 数据。在互联网企业中,常见的 ODS 数据有业务日志数据(Log)和业务 DB 数据(DB)两类。对于业务 DB 数据来说,从 MySQL 等关系型数据库的业务数据进行采集,而后导入到 Hive 中,是进行数据仓库生产的重要环节。如何精确、高效地把 MySQL 数据同步到 Hive 中?个别罕用的解决方案是批量取数并 Load:直连 MySQL 去 Select 表中的数据,而后存到本地文件作为两头存储,最初把文件 Load 到 Hive 表中。这种计划的长处是实现简略,然而随着业务的倒退,毛病也逐步裸露进去:

  • 性能瓶颈:随着业务规模的增长,Select From MySQL -> Save to Localfile -> Load to Hive 这种数据流破费的工夫越来越长,无奈满足上游数仓生产的工夫要求。
  • 间接从 MySQL 中 Select 大量数据,对 MySQL 的影响十分大,容易造成慢查问,影响业务线上的失常服务。
  • 因为 Hive 自身的语法不反对更新、删除等 SQL 原语 (高版本 Hive 反对,然而须要分桶 +ORC 存储格局),对于 MySQL 中产生 Update/Delete 的数据无奈很好地进行反对。

为了彻底解决这些问题,咱们逐渐转向 CDC (Change Data Capture) + Merge 的技术计划,即实时 Binlog 采集 + 离线解决 Binlog 还原业务数据这样一套解决方案。Binlog 是 MySQL 的二进制日志,记录了 MySQL 中产生的所有数据变更,MySQL 集群本身的主从同步就是基于 Binlog 做的。

实现思路

首先,采纳 Flink 负责把 Kafka 上的 Binlog 数据拉取到 HDFS 上。

而后,对每张 ODS 表,首先须要一次性制作快照(Snapshot),把 MySQL 里的存量数据读取到 Hive 上,这一过程底层采纳直连 MySQL 去 Select 数据的形式,能够应用 Sqoop 进行一次性全量导入。

最初,对每张 ODS 表,每天基于存量数据和当天增量产生的 Binlog 做 Merge,从而还原出业务数据。

Binlog 是流式产生的,通过对 Binlog 的实时采集,把局部数据处理需要由每天一次的批处理摊派到实时流上。无论从性能上还是对 MySQL 的拜访压力上,都会有显著地改善。Binlog 自身记录了数据变更的类型(Insert/Update/Delete),通过一些语义方面的解决,齐全可能做到精准的数据还原。

实现计划

Flink 解决 Kafka 的 binlog 日志

应用 kafka source,对读取的数据进行 JSON 解析,将解析的字段拼接成字符串,合乎 Hive 的 schema 格局,具体代码如下:

package com.etl.kafka2hdfs;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Map;
import java.util.Properties;

/**
 *  @Created with IntelliJ IDEA.
 *  @author : jmx
 *  @Date: 2020/3/27
 *  @Time: 12:52
 *  
 */
public class HdfsSink {public static void main(String[] args) throws Exception {
        String fieldDelimiter = ",";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // checkpoint
        env.enableCheckpointing(10_000);
        //env.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
        env.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
        CheckpointConfig config = env.getCheckpointConfig();
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

        // source
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
        // only required for Kafka 0.8
        props.setProperty("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181");
        props.setProperty("group.id", "test123");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("qfbap_ods.code_city", new SimpleStringSchema(), props);
        consumer.setStartFromEarliest();
        DataStream<String> stream = env.addSource(consumer);

        // transform
        SingleOutputStreamOperator<String> cityDS = stream
                .filter(new FilterFunction<String>() {
                    // 过滤掉 DDL 操作
                    @Override
                    public boolean filter(String jsonVal) throws Exception {JSONObject record = JSON.parseObject(jsonVal, Feature.OrderedField);
                        return record.getString("isDdl").equals("false");
                    }
                })
                .map(new MapFunction<String, String>() {

                    @Override
                    public String map(String value) throws Exception {StringBuilder fieldsBuilder = new StringBuilder();
                        // 解析 JSON 数据
                        JSONObject record = JSON.parseObject(value, Feature.OrderedField);
                        // 获取最新的字段值
                        JSONArray data = record.getJSONArray("data");
                        // 遍历,字段值的 JSON 数组,只有一个元素
                        for (int i = 0; i < data.size(); i++) {
                            // 获取到 JSON 数组的第 i 个元素
                            JSONObject obj = data.getJSONObject(i);
                            if (obj != null) {fieldsBuilder.append(record.getLong("id")); // 序号 id
                                fieldsBuilder.append(fieldDelimiter); // 字段分隔符
                                fieldsBuilder.append(record.getLong("es")); // 业务工夫戳
                                fieldsBuilder.append(fieldDelimiter);
                                fieldsBuilder.append(record.getLong("ts")); // 日志工夫戳
                                fieldsBuilder.append(fieldDelimiter);
                                fieldsBuilder.append(record.getString("type")); // 操作类型
                                for (Map.Entry<String, Object> entry : obj.entrySet()) {fieldsBuilder.append(fieldDelimiter);
                                    fieldsBuilder.append(entry.getValue()); // 表字段数据
                                }

                            }
                        }
                        return fieldsBuilder.toString();}

                });

        //cityDS.print();
        //stream.print();

        // sink
        // 以下条件满足其中之一就会滚动生成新的文件
        RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create()
                .withRolloverInterval(60L * 1000L) // 滚动写入新文件的工夫,默认 60s。依据具体情况调节
                .withMaxPartSize(1024 * 1024 * 128L) // 设置每个文件的最大大小 , 默认是 128M,这里设置为 128M
                .withInactivityInterval(60L * 1000L) // 默认 60 秒, 未写入数据处于不沉闷状态超时会滚动新文件
                .build();
        
        StreamingFileSink<String> sink = StreamingFileSink
                //.forRowFormat(new Path("file:///E://binlog_db/city"), new SimpleStringEncoder<String>())
                .forRowFormat(new Path("hdfs://kms-1:8020/binlog_db/code_city_delta"), new SimpleStringEncoder<String>())
                .withBucketAssigner(new EventTimeBucketAssigner())
                .withRollingPolicy(rollingPolicy)
                .withBucketCheckInterval(1000)  // 桶查看距离,这里设置 1S
                .build();

        cityDS.addSink(sink);
        env.execute();}
}

对于 Flink Sink 到 HDFS,StreamingFileSink 代替了先前的 BucketingSink,用来将上游数据存储到 HDFS 的不同目录中。它的外围逻辑是分桶,默认的分桶形式是 DateTimeBucketAssigner,即依照解决工夫分桶。解决工夫指的是音讯达到 Flink 程序的工夫,这点并不合乎咱们的需要。因而,咱们须要本人编写代码将事件工夫从音讯体中解析进去,按规定生成分桶的名称,具体代码如下:

package com.etl.kafka2hdfs;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 *  @Created with IntelliJ IDEA.
 *  @author : jmx
 *  @Date: 2020/3/27
 *  @Time: 12:49
 *  
 */

public class EventTimeBucketAssigner implements BucketAssigner<String, String> {

    @Override
    public String getBucketId(String element, Context context) {
        String partitionValue;
        try {partitionValue = getPartitionValue(element);
        } catch (Exception e) {partitionValue = "00000000";}
        return "dt=" + partitionValue;// 分区目录名称
    }

    @Override
    public SimpleVersionedSerializer<String> getSerializer() {return SimpleVersionedStringSerializer.INSTANCE;}
    private String getPartitionValue(String element) throws Exception {

        // 取出最初拼接字符串的 es 字段值,该值为业务工夫
        long eventTime = Long.parseLong(element.split(",")[1]);
        Date eventDate = new Date(eventTime);
        return new SimpleDateFormat("yyyyMMdd").format(eventDate);
    }
}

离线还原 MySQL 数据

通过上述步骤,即可将 Binlog 日志记录写入到 HDFS 的对应的分区中,接下来就须要依据增量的数据和存量的数据还原最新的数据。Hive 表保留在 HDFS 上,该文件系统不反对批改,因而咱们须要一些额定工作来写入数据变更。罕用的形式包含:JOIN、Hive 事务、或改用 HBase、kudu。

如昨日的存量数据 code_city, 今日增量的数据为 code_city_delta,能够通过 FULL OUTER JOIN,将存量和增量数据合并成一张最新的数据表,并作为今天的存量数据:

INSERT OVERWRITE TABLE code_city
SELECT 
        COALESCE(t2.id, t1.id) AS id,
        COALESCE (t2.city, t1.city) AS city,
        COALESCE (t2.province, t1.province) AS province,
        COALESCE (t2.event_time, t1.event_time) AS event_time 
FROM
        code_city t1
        FULL OUTER JOIN (
SELECT
        id,
        city,
        province,
        event_time 
FROM
        (-- 取最初一条状态数据
SELECT
        id,
        city,
        province,
        dml_type,
        event_time,
        row_number () over ( PARTITION BY id ORDER BY event_time DESC) AS rank 
FROM
        code_city_delta 
WHERE
        dt = '20200324' -- 分区数据
        ) temp 
WHERE
        rank = 1 
        ) t2 ON t1.id = t2.id;

小结

本文次要从 Binlog 流式采集和基于 Binlog 的 ODS 数据还原两方面,介绍了通过 Flink 实现实时的 ETL,此外还能够将 binlog 日志写入 kudu、HBase 等反对事务操作的 NoSQL 中,这样就能够省去数据表还原的步骤。本文是《基于 Canal 与 Flink 实现数据实时增量同步》的第二篇,对于 canal 解析 Binlog 日志写入 kafka 的实现步骤,参见《基于 Canal 与 Flink 实现数据实时增量同步一》。

refrence:

[1]https://tech.meituan.com/2018…

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版