一、MySql的Binlog

1、什么是Binlog

1)binlog是二进制日志,并且是事务安全性

2)binlog记录了所有的DDL和DML(除了数据查问语句)语句,并以事件的模式记录,还蕴含语句所执行的耗费的工夫

3)一般来说开启二进制日志大略会有1%的性能损耗。

2、Binlog应用场景

1)应用binlog复原数据

2)在我的项目中动静监听mysql中变动的数据

3、Binlog开启

1)在MySQL的配置文件(Linux: /etc/my.cnf , Windows:\my.ini)下,批改配置在[mysqld] 区块设置/增加

server-id=1log-bin=mysql-binbinlog_format=rowbinlog-do-db=gmall2019binlog-do-db=gmall2020binlog-do-db=gmall2021

2)重启mysql

sudo systemctl restart mysqld

4、配置文件参数解析

配置机器id

多台机器不能反复

server-id=1

开启binlog

log-bin=mysql-bin

Binlog分类设置

MySQL Binlog的格局,那就是有三种,别离是STATEMENT,MIXED,ROW。

在配置文件中抉择配置,个别会配置为row

binlog_format=row

三种分类的区别:

1)statement

语句级,binlog会记录每次一执行写操作的语句。

绝对row模式节俭空间,然而可能产生不一致性,比方

update tt set create_date=now()

如果用binlog日志进行复原,因为执行工夫不同可能产生的数据就不同。

长处:节俭空间

毛病:有可能造成数据不统一。

2)row(罕用)

行级,binlog会记录每次操作后每行记录的变动。

长处:保持数据的相对一致性。因为不论sql是什么,援用了什么函数,他只记录执行后的成果。

毛病:占用较大空间。

3)mixed

statement的升级版,肯定水平上解决了,因为一些状况而造成的statement模式不统一问题

    在某些状况下譬如:

当函数中蕴含 UUID() 时;

    蕴含 AUTO_INCREMENT 字段的表被更新时;    执行 INSERT DELAYED 语句时;    用 UDF 时;    会依照 ROW的形式进行解决

长处:节俭空间,同时兼顾了肯定的一致性。

毛病:还有些极个别状况依旧会造成不统一,

另外statement和mixed对于须要对binlog的监控的状况都不不便。

设置数据库

设置要监听的数据库,能够同时写入多个库

binlog-do-db=gmall2021binlog-do-db=gmall2022binlog-do-db=gmall2023

二、FlinkCDC

1、什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕捉数据库的变动(包含数据或数据表的插入、更新以及删除等),将这些变更按产生的程序残缺记录下来,写入到消息中间件中以供其余服务进行订阅及生产。

2、CDC的品种

CDC次要分为基于查问和基于Binlog两种形式,咱们次要理解一下这两种之间的区别:

基于查问的CDC基于Binlog的CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否能够捕捉所有数据变动
提早性高提早低提早
是否减少数据库压力

3、FlinkCDC

Flink内置了Debezium

FlinkCDC1.11版本正式公布

Canal不反对读取全量binlog数据,而FlinkCDC完满避开了这个问题

Flink社区开发了 flink-cdc-connectors 组件,这是一个能够间接从 MySQL、PostgreSQL 等数据库间接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/...

3.CDC案例实操

1)导入依赖

<dependencies>    <dependency>        <groupId>org.apache.flink</groupId>        <artifactId>flink-java</artifactId>        <version>1.12.0</version>    </dependency>    <dependency>        <groupId>org.apache.flink</groupId>        <artifactId>flink-streaming-java_2.12</artifactId>        <version>1.12.0</version>    </dependency>    <dependency>        <groupId>org.apache.flink</groupId>        <artifactId>flink-clients_2.12</artifactId>        <version>1.12.0</version>    </dependency>    <dependency>        <groupId>org.apache.hadoop</groupId>        <artifactId>hadoop-client</artifactId>        <version>3.1.3</version>    </dependency>    <dependency>        <groupId>mysql</groupId>        <artifactId>mysql-connector-java</artifactId>        <version>5.1.49</version>    </dependency>    <dependency>        <groupId>com.alibaba.ververica</groupId>        <artifactId>flink-connector-mysql-cdc</artifactId>        <version>1.2.0</version>    </dependency><dependency>        <groupId>com.alibaba</groupId>        <artifactId>fastjson</artifactId>        <version>1.2.75</version>    </dependency></dependencies><build>    <plugins>        <plugin>            <groupId>org.apache.maven.plugins</groupId>            <artifactId>maven-assembly-plugin</artifactId>            <version>3.0.0</version>            <configuration>                <descriptorRefs>                    <descriptorRef>jar-with-dependencies</descriptorRef>                </descriptorRefs>            </configuration>            <executions>                <execution>                    <id>make-assembly</id>                    <phase>package</phase>                    <goals>                        <goal>single</goal>                    </goals>                </execution>            </executions>        </plugin>    </plugins></build>

2)编写代码

package com.haoziqi;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Properties;/************************************************************** * @Author: haoziqi * @Date: Created in 9:27 2021/3/15 * @Description: TODO 应用DataStream连贯mysql,并监控表中新增的数据  测试通道是否失常:flink读取mysql  binlog数据 * 执行的时候须要查看对应的库是否存在 *   linux中:sudo vim /etc/my.cnf *   2、执行的时候须要运行hdfs *   3、启动mysql, * **************************************************************/public class FlinkCDC1 {    private static Properties properties;    public static void main(String[] args) throws Exception {        //TODO 1.获取流解决执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        //1.1Checkpoint相干/*读取的是binlog中的数据,如果集群挂掉,尽量能实现断点续传性能。如果从最新的读取(丢数据)。如果从最开始读(反复数据)。现实状态:读取binlog中的数据读一行,保留一次读取到的(读取到的行)地位信息。而flink中读取行地位信息保留在Checkpoint中。应用Checkpoint能够把flink中读取(按行)的地位信息保留在Checkpoint中*/        env.enableCheckpointing(5000L);//5s执行一次Checkpoint        //设置Checkpoint的模式:精准一次        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);        //工作挂掉的时候是否清理checkpoint。使工作失常退出时不删除CK内容,有助于工作复原。默认的是勾销的时候清空checkpoint中的数据。RETAIN_ON_CANCELLATION示意勾销工作的时候,保留最初一次的checkpoint。便于工作的重启和复原,失常状况下都应用RETAIN        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);        //设置一个重启策略:默认的固定延时重启次数,重启的次数是Integer的最大值,重启的距离是1s        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));        //设置一个状态后端 jobManager。如果应用的yarn集群模式,jobManager随着工作的生成而生成,工作挂了jobManager就没了。因而须要启动一个状态后端。只有设置checkpoint,尽量就设置一个状态后端。保留在各个节点都能读取的地位:hdfs中        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/ck/"));        //指定用户        System.setProperty("HADOOP_USER_NAME", "atguigu");        //TODO 2.读取mysql变动数据 监控MySQL中变动的数据        Properties properties = new Properties(); //创立一个变量能够增加之后想增加的配置信息        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder() //应用builder创立MySQLsource对象,须要指定对象的泛型。                .hostname("hadoop102") //指定监控的哪台服务器(MySQL装置的地位)                .port(3306) //MySQL连贯的端口号                .username("root") //用户                .password("123456")//明码                .databaseList("gmall_flink_0923") //list:能够监控多个库                .tableList("gmall_flink_0923.z_user_info") //如果不写则监控库下的所有表,须要应用【库名.表名】                //debezium中有很多配置信息。能够创立一个对象来接管                //.debeziumProperties(properties)                .deserializer(new StringDebeziumDeserializationSchema()) //读的数据是binlog文件,反序列化器,解析数据                .startupOptions(StartupOptions.initial()) //初始化数据:空值读不读数据库中的历史数据。initial(历史+连贯之后的)、latest-offset(连贯之后的)。timestamp(依据指定工夫戳作为开始读取的地位)                .build();        DataStreamSource<String> streamSource = env.addSource(sourceFunction);        //TODO 3.打印数据        streamSource.print();        //把下面代码正文掉,报错代码        SingleOutputStreamOperator<String> map = streamSource.map(data -> data);        SingleOutputStreamOperator<String> slotgroup = map.slotSharingGroup("123");        slotgroup.print();        //TODO 4.启动工作        env.execute();    }}

3) 案例测试:

1)打包成带依赖的jar包

2)开启MySQLbinlog并重启Mysql

4) 启动HDFS集群+yarn

start-yarn.shstart-dfs.sh

5)启动程序(基于yarn的pre-job模式)

bin/flink run -t yarn-per-job -c com.haoziqi.FlinkCDC1 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

6)在MySQL的gmall-flink.z_user_info表中增加、批改或者删除数据

7)在控制台查看输入

4)CDC数据格式转换(必看)

通过下面的数据采集,咱们失去一份SourceRecord格局的数据

SourceRecord{    sourcePartition={server=mysql_binlog_source},    sourceOffset={ts_sec=1616030398, file=mysql-bin.000009, pos=519, row=1, server_id=1, event=2}    }ConnectRecord{    topic='mysql_binlog_source.gmall_flink_0923.z_user_info', kafkaPartition=null, key=Struct{id=8},     keySchema=Schema{    mysql_binlog_source.gmall_flink_0923.z_user_info.Key:STRUCT    }, value=Struct{        before=Struct{id=8,name=haoziqi},        after=Struct{id=8,name=haoziqi,phone_num=123456},        source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1616030398000,db=gmall_flink_0923,table=z_user_info,server_id=1,file=mysql-bin.000009,pos=675,row=0,thread=2        },op=u,ts_ms=1616030399282        },    valueSchema=Schema{        mysql_binlog_source.gmall_flink_0923.z_user_info.Envelope:STRUCT                },     timestamp=null, headers=ConnectHeaders(headers=)}

在下面获取到的数据中,咱们只须要获取到更新后的数据,能够应用如下代码对数据进行筛选

更新后的数据在value的Struct中被标记为after

package com.haoziqi.app.func;import com.alibaba.fastjson.JSONObject;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;import io.debezium.data.Envelope;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;/** * description * created by A on 2021/3/15 *///实现DebeziumDeserializationSchema接口并定义输入数据的类型public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema<String> {    @Override    public void deserialize(SourceRecord so urceRecord, Collector<String> collector) throws Exception {        //定义JSON对象用于寄存反序列化后的数据        JSONObject result = new JSONObject();        //获取库名和表名        String topic = sourceRecord.topic();        String[] split = topic.split("\\.");        String database = split[1];        String table = split[2];        //获取操作类型        Envelope.Operation operation = Envelope.operationFor(sourceRecord);        //获取数据自身        Struct struct = (Struct) sourceRecord.value();        Struct after = struct.getStruct("after");        JSONObject value = new JSONObject();        if (after != null) {            Schema schema = after.schema();            for (Field field : schema.fields()) {                value.put(field.name(), after.get(field.name()));            }        }        //将数据放入JSON对象        result.put("database", database);        result.put("table", table);        result.put("operation", operation.toString().toLowerCase());        result.put("value", value);        //将数据传输进来        collector.collect(result.toJSONString());    }    @Override    public TypeInformation<String> getProducedType() {        return TypeInformation.of(String.class);    }}

写好格局转换类后,在构建Flink对象时设置.deserializer参数即可

  DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()                .hostname("hadoop102")                .port(3306)                .username("root")                .password("123456")                .databaseList("gmall_flink_0923")                .deserializer(new MyDeserializationSchemaFunction())                .startupOptions(StartupOptions.latest())                .build();        DataStreamSource<String> mySqlDS = env.addSource(sourceFunction);

至此,咱们已胜利的将CDC采集到的SourceRecord格局转换为了JSON字符串