共计 9410 个字符,预计需要花费 24 分钟才能阅读完成。
一、MySql 的 Binlog
1、什么是 Binlog
1)binlog 是二进制日志,并且是事务安全性
2)binlog 记录了所有的 DDL 和 DML(除了数据查问语句)语句,并以事件的模式记录, 还蕴含语句所执行的耗费的工夫
3)一般来说开启二进制日志大略会有 1% 的性能损耗。
2、Binlog 应用场景
1)应用 binlog 复原数据
2)在我的项目中动静监听 mysql 中变动的数据
3、Binlog 开启
1)在 MySQL 的配置文件 (Linux: /etc/my.cnf , Windows:\my.ini) 下, 批改配置在[mysqld] 区块设置 / 增加
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2019
binlog-do-db=gmall2020
binlog-do-db=gmall2021
2)重启 mysql
sudo systemctl restart mysqld
4、配置文件参数解析
配置机器 id
多台机器不能反复
server-id=1
开启 binlog
log-bin=mysql-bin
Binlog 分类设置
MySQL Binlog 的格局,那就是有三种,别离是 STATEMENT,MIXED,ROW。
在配置文件中抉择配置,个别会配置为 row
binlog_format=row
三种分类的区别:
1)statement
语句级,binlog 会记录每次一执行写操作的语句。
绝对 row 模式节俭空间,然而可能产生不一致性,比方
update tt set create_date=now()
如果用 binlog 日志进行复原,因为执行工夫不同可能产生的数据就不同。
长处:节俭空间
毛病:有可能造成数据不统一。
2)row(罕用)
行级,binlog 会记录每次操作后每行记录的变动。
长处:保持数据的相对一致性。因为不论 sql 是什么,援用了什么函数,他只记录执行后的成果。
毛病:占用较大空间。
3)mixed
statement 的升级版,肯定水平上解决了,因为一些状况而造成的 statement 模式不统一问题
在某些状况下譬如:
当函数中蕴含 UUID() 时;
蕴含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会依照 ROW 的形式进行解决
长处:节俭空间,同时兼顾了肯定的一致性。
毛病:还有些极个别状况依旧会造成不统一,
另外 statement 和 mixed 对于须要对 binlog 的监控的状况都不不便。
设置数据库
设置要监听的数据库,能够同时写入多个库
binlog-do-db=gmall2021
binlog-do-db=gmall2022
binlog-do-db=gmall2023
二、FlinkCDC
1、什么是 CDC
CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕捉数据库的变动(包含数据或数据表的插入、更新以及删除等),将这些变更按产生的程序残缺记录下来,写入到消息中间件中以供其余服务进行订阅及生产。
2、CDC 的品种
CDC 次要分为基于查问和基于 Binlog 两种形式,咱们次要理解一下这两种之间的区别:
基于查问的 CDC | 基于 Binlog 的 CDC | |
---|---|---|
开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
执行模式 | Batch | Streaming |
是否能够捕捉所有数据变动 | 否 | 是 |
提早性 | 高提早 | 低提早 |
是否减少数据库压力 | 是 | 否 |
3、FlinkCDC
Flink 内置了 Debezium
FlinkCDC1.11 版本正式公布
Canal 不反对读取全量 binlog 数据,而 FlinkCDC 完满避开了这个问题
Flink 社区开发了 flink-cdc-connectors 组件,这是一个能够间接从 MySQL、PostgreSQL 等数据库间接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/…
3.CDC 案例实操
1)导入依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2)编写代码
package com.haoziqi;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**************************************************************
* @Author: haoziqi
* @Date: Created in 9:27 2021/3/15
* @Description: TODO 应用 DataStream 连贯 mysql,并监控表中新增的数据 测试通道是否失常:flink 读取 mysql binlog 数据
* 执行的时候须要查看对应的库是否存在
* linux 中:sudo vim /etc/my.cnf
* 2、执行的时候须要运行 hdfs
* 3、启动 mysql,*
**************************************************************/
public class FlinkCDC1 {
private static Properties properties;
public static void main(String[] args) throws Exception {
//TODO 1. 获取流解决执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1Checkpoint 相干
/* 读取的是 binlog 中的数据,如果集群挂掉,尽量能实现断点续传性能。如果从最新的读取(丢数据)。如果从最开始读(反复数据)。现实状态:读取 binlog 中的数据读一行,保留一次读取到的(读取到的行)地位信息。而 flink 中读取行地位信息保留在 Checkpoint 中。应用 Checkpoint 能够把 flink 中读取(按行)的地位信息保留在 Checkpoint 中 */
env.enableCheckpointing(5000L);//5s 执行一次 Checkpoint
// 设置 Checkpoint 的模式:精准一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 工作挂掉的时候是否清理 checkpoint。使工作失常退出时不删除 CK 内容,有助于工作复原。默认的是勾销的时候清空 checkpoint 中的数据。RETAIN_ON_CANCELLATION 示意勾销工作的时候,保留最初一次的 checkpoint。便于工作的重启和复原,失常状况下都应用 RETAIN
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置一个重启策略:默认的固定延时重启次数,重启的次数是 Integer 的最大值,重启的距离是 1s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
// 设置一个状态后端 jobManager。如果应用的 yarn 集群模式,jobManager 随着工作的生成而生成,工作挂了 jobManager 就没了。因而须要启动一个状态后端。只有设置 checkpoint,尽量就设置一个状态后端。保留在各个节点都能读取的地位:hdfs 中
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/ck/"));
// 指定用户
System.setProperty("HADOOP_USER_NAME", "atguigu");
//TODO 2. 读取 mysql 变动数据 监控 MySQL 中变动的数据
Properties properties = new Properties(); // 创立一个变量能够增加之后想增加的配置信息
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder() // 应用 builder 创立 MySQLsource 对象,须要指定对象的泛型。.hostname("hadoop102") // 指定监控的哪台服务器(MySQL 装置的地位).port(3306) //MySQL 连贯的端口号
.username("root") // 用户
.password("123456")// 明码
.databaseList("gmall_flink_0923") //list:能够监控多个库
.tableList("gmall_flink_0923.z_user_info") // 如果不写则监控库下的所有表,须要应用【库名. 表名】//debezium 中有很多配置信息。能够创立一个对象来接管
//.debeziumProperties(properties)
.deserializer(new StringDebeziumDeserializationSchema()) // 读的数据是 binlog 文件,反序列化器,解析数据
.startupOptions(StartupOptions.initial()) // 初始化数据:空值读不读数据库中的历史数据。initial(历史 + 连贯之后的)、latest-offset(连贯之后的)。timestamp(依据指定工夫戳作为开始读取的地位).build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//TODO 3. 打印数据
streamSource.print();
// 把下面代码正文掉,报错代码
SingleOutputStreamOperator<String> map = streamSource.map(data -> data);
SingleOutputStreamOperator<String> slotgroup = map.slotSharingGroup("123");
slotgroup.print();
//TODO 4. 启动工作
env.execute();}
}
3) 案例测试:
1)打包成带依赖的 jar 包
2)开启 MySQLbinlog 并重启 Mysql
4) 启动 HDFS 集群 +yarn
start-yarn.sh
start-dfs.sh
5)启动程序(基于 yarn 的 pre-job 模式)
bin/flink run -t yarn-per-job -c com.haoziqi.FlinkCDC1 flink-1.0-SNAPSHOT-jar-with-dependencies.jar
6)在 MySQL 的 gmall-flink.z_user_info 表中增加、批改或者删除数据
7)在控制台查看输入
4)CDC 数据格式转换(必看)
通过下面的数据采集,咱们失去一份 SourceRecord 格局的数据
SourceRecord{sourcePartition={server=mysql_binlog_source},
sourceOffset={ts_sec=1616030398, file=mysql-bin.000009, pos=519, row=1, server_id=1, event=2}
}
ConnectRecord{topic='mysql_binlog_source.gmall_flink_0923.z_user_info', kafkaPartition=null, key=Struct{id=8},
keySchema=Schema{mysql_binlog_source.gmall_flink_0923.z_user_info.Key:STRUCT},
value=Struct{before=Struct{id=8,name=haoziqi},
after=Struct{id=8,name=haoziqi,phone_num=123456},
source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1616030398000,db=gmall_flink_0923,table=z_user_info,server_id=1,file=mysql-bin.000009,pos=675,row=0,thread=2},op=u,ts_ms=1616030399282
},
valueSchema=Schema{mysql_binlog_source.gmall_flink_0923.z_user_info.Envelope:STRUCT},
timestamp=null, headers=ConnectHeaders(headers=)
}
在下面获取到的数据中,咱们只须要获取到更新后的数据,能够应用如下代码对数据进行筛选
更新后的数据在 value 的 Struct 中被标记为 after
package com.haoziqi.app.func;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
* description
* created by A on 2021/3/15
*/
// 实现 DebeziumDeserializationSchema 接口并定义输入数据的类型
public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord so urceRecord, Collector<String> collector) throws Exception {
// 定义 JSON 对象用于寄存反序列化后的数据
JSONObject result = new JSONObject();
// 获取库名和表名
String topic = sourceRecord.topic();
String[] split = topic.split("\\.");
String database = split[1];
String table = split[2];
// 获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
// 获取数据自身
Struct struct = (Struct) sourceRecord.value();
Struct after = struct.getStruct("after");
JSONObject value = new JSONObject();
if (after != null) {Schema schema = after.schema();
for (Field field : schema.fields()) {value.put(field.name(), after.get(field.name()));
}
}
// 将数据放入 JSON 对象
result.put("database", database);
result.put("table", table);
result.put("operation", operation.toString().toLowerCase());
result.put("value", value);
// 将数据传输进来
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);
}
}
写好格局转换类后,在构建 Flink 对象时设置.deserializer 参数即可
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall_flink_0923")
.deserializer(new MyDeserializationSchemaFunction())
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> mySqlDS = env.addSource(sourceFunction);
至此,咱们已胜利的将 CDC 采集到的 SourceRecord 格局 转换为了JSON 字符串