数栈是云原生—站式数据中台 PaaS,咱们在 github 和 gitee 上有一个乏味的开源我的项目:FlinkX,FlinkX 是一个基于 Flink 的批流对立的数据同步工具,既能够采集动态的数据,也能够采集实时变动的数据,是全域、异构、批流一体的数据同步引擎。大家喜爱的话请给咱们点个 star!star!star!
github 开源我的项目:https://github.com/DTStack/fl…
gitee 开源我的项目:https://gitee.com/dtstack_dev…
袋鼠云云原生一站式数据中台 PaaS——数栈,笼罩了建设数据中心过程中所须要的各种工具(包含数据开发平台、数据资产平台、数据迷信平台、数据服务引擎等),残缺笼罩离线计算、实时计算利用,帮忙企业极大地缩短数据价值的萃取过程,进步提炼数据价值的能力。
目前,数栈 - 离线开发平台(BatchWorks)中的数据离线同步工作、数栈 - 实时开发平台(StreamWorks)中的数据实时采集工作曾经对立基于 FlinkX 来实现。数据的离线采集和实时采集根本的原理的是一样的,次要的不同之处是源头的流是否有界,所以对立用 Flink 的 Stream API 来实现这两种数据同步场景,实现数据同步的批流对立。
一、性能介绍
1、断点续传
断点续传是指数据同步工作在运行过程中因各种起因导致工作失败,不须要重头同步数据,只须要从上次失败的地位持续同步即可,相似于下载文件时因网络起因失败,不须要从新下载文件,只须要持续下载就行,能够大大节省时间和计算资源。断点续传是数栈 - 离线开发平台(BatchWorks)里数据同步工作的一个性能,须要联合工作的出错重试机制能力实现。当工作运行失败,会在 Engine 里进行重试,重试的时候会接着上次失败时读取的地位持续读取数据,直到工作运行胜利为止。
2、实时采集
实时采集是数栈 - 实时开发平台(StreamWorks)里数据采集工作的一个性能,当数据源里的数据产生了增删改操作,同步工作监听到这些变动,将变动的数据实时同步到指标数据源。除了数据实时变动外,实时采集和离线数据同步的另一个区别是:实时采集工作是不会进行的,工作会始终监听数据源是否有变动。这一点和 Flink 工作是统一的,所以实时采集工作是数栈流计算利用里的一个工作类型,配置过程和离线计算里的同步工作根本一样。
二、Flink 中的 Checkpoint 机制
断点续传和实时采集都依赖于 Flink 的 Checkpoint 机制,所以咱们先来简略理解一下。
Checkpoint 是 Flink 实现容错机制最外围的性能,它可能依据配置周期性地基于 Stream 中各个 Operator 的状态来生成 Snapshot,从而将这些状态数据定期长久化存储下来,当 Flink 程序一旦意外解体时,从新运行程序时能够有选择地从这些 Snapshot 进行复原,从而修改因为故障带来的程序数据状态中断。
Checkpoint 触发时,会向多个分布式的 Stream Source 中插入一个 Barrier 标记,这些 Barrier 会随着 Stream 中的数据记录一起流向上游的各个 Operator。当一个 Operator 接管到一个 Barrier 时,它会暂停解决 Steam 中新接管到的数据记录。因为一个 Operator 可能存在多个输出的 Stream,而每个 Stream 中都会存在对应的 Barrier,该 Operator 要等到所有的输出 Stream 中的 Barrier 都达到。
当所有 Stream 中的 Barrier 都曾经达到该 Operator,这时所有的 Barrier 在工夫上看来是同一个时刻点(示意曾经对齐),在期待所有 Barrier 达到的过程中,Operator 的 Buffer 中可能曾经缓存了一些比 Barrier 早达到 Operator 的数据记录(Outgoing Records),这时该 Operator 会将数据记录(Outgoing Records)发射(Emit)进来,作为上游 Operator 的输出,最初将 Barrier 对应 Snapshot 发射(Emit)进来作为此次 Checkpoint 的后果数据。
三、断点续传
1、前提条件
同步工作要反对断点续传,对数据源有一些强制性的要求:
1)数据源(这里特指关系数据库)中必须蕴含一个升序的字段,比方主键或者日期类型的字段,同步过程中会应用 checkpoint 机制记录这个字段的值,工作复原运行时应用这个字段结构查问条件过滤曾经同步过的数据,如果这个字段的值不是升序的,那么工作复原时过滤的数据就是谬误的,最终导致数据的缺失或反复;
2)数据源必须反对数据过滤,如果不反对的话,工作就无奈从断点处复原运行,会导致数据反复;
3)指标数据源必须反对事务,比方关系数据库,文件类型的数据源也能够通过临时文件的形式反对。
2、工作运行的具体过程
咱们用一个具体的工作具体介绍一下整个过程,工作详情如下:
1)读取数据
读取数据时首先要结构数据分片,结构数据分片就是依据通道索引和 checkpoint 记录的地位结构查问 sql,sql 模板如下:
select * from data_test
where id mod ${channel_num}=${channel_index}
and id > ${offset}
如果是第一次运行,或者上一次工作失败时还没有触发 checkpoint,那么 offset 就不存在,依据 offset 和通道能够确定具体的查问 sql:
offset 存在时
第一个通道:
select * from data_test
where id mod 2=0
and id > ${offset_0};
第二个通道:
select * from data_test
where id mod 2=1
and id > ${offset_1};
offset 不存在时
第一个通道:
select * from data_test
where id mod 2=0;
第二个通道:
select * from data_test
where id mod 2=1;
数据分片结构好之后,每个通道就依据本人的数据分片去读数据了。
2)写数据
写数据前会先做几个操作:
a、检测 /data_test 目录是否存在,如果目录不存在,则创立这个目录,如果目录存在,进行 2 操作;
b、判断是不是以笼罩模式写数据,如果是,则删除 /data_test 目录,而后再创立目录,如果不是,则进行 3 操作;
c、检测 /data_test/.data 目录是否存在,如果存在就先删除,再创立,确保没有其它工作因异样失败遗留的脏数据文件;
数据写入 hdfs 是单条写入的,不反对批量写入。数据会先写入 /data_test/.data/ 目录下,数据文件的命名格局为:
channelIndex.jobId.fileIndex
蕴含通道索引,jobId,文件索引三个局部。
3)checkpoint 触发时
在 FlinkX 中“状态”示意的是标识字段 id 的值,咱们假如 checkpoint 触发时两个通道的读取和写入状况如图中所示:
checkpoint 触发后,两个 reader 学生成 Snapshot 记录读取状态,通道 0 的状态为 id=12,通道 1 的状态为 id=11。Snapshot 生成之后向数据流外面插入 barrier,barrier 随数据流向 Writer。以 Writer_0 为例,Writer_0 接管 Reader_0 和 Reader_1 发来的数据,假如先收到了 Reader_0 的 barrier,这个时候 Writer_0 进行写出数据到 HDFS,将接管到的数据先放到 InputBuffer 外面,始终期待 Reader_1 的 barrier 达到之后再将 Buffer 里的数据全副写出,而后生成 Writer 的 Snapshot,整个 checkpoint 完结后,记录的工作状态为:
Reader_0:id=12
Reader_1:id=11
Writer_0:id= 无奈确定
Writer_1:id= 无奈确定
工作状态会记录到配置的 HDFS 目录 /flinkx/checkpoint/abc123 下。因为每个 Writer 会接管两个 Reader 的数据,以及各个通道的数据读写速率可能不一样,所以导致 writer 接管到的数据程序是不确定的,然而这不影响数据的准确性,因为读取数据时只须要 Reader 记录的状态就能够结构查问 sql,咱们只有确保这些数据真的写到 HDFS 就行了。在 Writer 生成 Snapshot 之前,会做一系列操作保障接管到的数据全副写入 HDFS:
a、close 写入 HDFS 文件的数据流,这时候会在 /data_test/.data 目录下生成两个两个文件:
/data_test/.data/0.abc123.0
/data_test/.data/1.abc123.0
b、将生成的两个数据文件挪动到 /data_test 目录下;
c、更新文件名称模板更新为:channelIndex.abc123.1;
快照生成后工作持续读写数据,如果生成快照的过程中有任何异样,工作会间接失败,这样这次快照就不会生成,工作复原时会从上一个胜利的快照复原。
4)工作失常完结
工作失常完结时也会做和生成快照时同样的操作,close 文件流,挪动长期数据文件等。
5)工作异样终止
工作如果异样完结,假如工作完结时最初一个 checkpoint 记录的状态为:
Reader_0:id=12Reader_1:id=11
那么工作复原的时候就会把各个通道记录的状态赋值给 offset,再次读取数据时结构的 sql 为:
第一个通道:
select * from data_test
where id mod 2=0
and id > 12;
[点击并拖拽以挪动]
第二个通道:
select * from data_test
where id mod 2=1
and id > 11;
[点击并拖拽以挪动]
这样就能够从上一次失败的地位持续读取数据了。
3、反对断点续传的插件
实践上只有反对过滤数据的数据源,和反对事务的数据源都能够反对断点续传的性能,目前 FlinkX 反对的插件如下:
四、实时采集
目前 FlinkX 反对实时采集的插件有 KafKa、binlog 插件,binlog 插件是专门针对 mysql 数据库做实时采集的,如果要反对其它的数据源,只须要把数据打到 Kafka,而后再用 FlinkX 的 Kafka 插件生产数据即可,比方 oracle,只须要应用 oracle 的 ogg 将数据打到 Kafka。这里咱们专门解说一下 mysql 的实时采集插件 binlog。
1、binlog
binlog 是 Mysql sever 层保护的一种二进制日志,与 innodb 引擎中的 redo/undo log 是齐全不同的日志;其次要是用来记录对 mysql 数据更新或潜在产生更新的 SQL 语句,并以 ” 事务 ” 的模式保留在磁盘中。
binlog 的作用次要有:
1)复制:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给 slaves 并回放来达到 master-slave 数据统一的目标;
2)数据恢复:通过 mysqlbinlog 工具复原数据;
3)增量备份。
2、MySQL 主备复制
有了记录数据变动的 binlog 日志还不够,咱们还须要借助 MySQL 的主备复制性能:主备复制是指 一台服务器充当主数据库服务器,另一台或多台服务器充当从数据库服务器,主服务器中的数据主动复制到从服务器之中。
主备复制的过程:
1)MySQL master 将数据变更写入二进制日志 (binary log, 其中记录叫做二进制日志事件 binary log events,能够通过 show binlog events 进行查看);
2)MySQL slave 将 master 的 binary log events 拷贝到它的中继日志 (relay log);
3)MySQL slave 重放 relay log 中事件,将数据变更反映它本人的数据。
3、写入 Hive
binlog 插件能够监听多张表的数据变更状况,解析出的数据中蕴含表名称信息,读取到的数据能够全副写入指标数据库的一张表,也能够依据数据中蕴含的表名信息写入不同的表,目前只有 Hive 插件反对这个性能。Hive 插件目前只有写入插件,性能基于 HDFS 的写入插件实现,也就是说从 binlog 读取,写入 hive 也反对失败复原的性能。
写入 Hive 的过程:
1)从数据中解析出 MySQL 的表名,而后依据表名映射规定转换成对应的 Hive 表名;
2)查看 Hive 表是否存在,如果不存在就创立 Hive 表;
3)查问 Hive 表的相干信息,结构 HdfsOutputFormat;
4)调用 HdfsOutputFormat 将数据写入 HDFS。