一、前言
假如以后 Flink 利用已实现 EOS(即 Exactly-Once Semantics)语义,当初须要减少 Flink 解决数据长久化到 MySQL,前提条件不能突破 Flink EOS 的生态。官网提供的 flink-connector-jdbc 并没有提供事务和 checkpoint 的相干操作,自定义 sink 须要思考和 CheckPoint 简单的配合。参考 Flink EOS 如何避免内部零碎乱入,可自定义实现 TwoPhaseCommitSinkFunction 类,实现 MySQL 内部零碎组件的完满嵌入。
本次模仿 Flink 生产 Kafka 数据并写入 MySQL,通过自定义 MySQL Sink 验证 Flink 2PC 以及 EOS 的准确性。相应的零碎及组件版本如下,Flink 的部署形式为 Standalone。
组件 / 零碎 | 版本 |
---|---|
centOS | 7.5 |
Flink | 1.12.2 |
MySQL | 5.7 |
Kafka | 2.3.1 |
Zookeeper | 3.4.5 |
Hadoop | 2.6.0 |
二、验证
思路及猜想后果:
kafka 发送:一共发送 20 条数据,每条数据开端数字自增,不便察看成果。每条发送距离为 10 秒,一共耗时 200s。
flink 解决:checkpoint 工夫价格为 60s,重启提早为 10s,解决第 10 条数据的时候模仿产生 bug,解决完第 19 条数据的时候,手动勾销 job,利用 checkpoint 复原 job。
猜想 1 :产生 bug 时,解决过的数据但未做 checkpoint 不能长久化到 MySQL,只有做了 checkpoint 的数据能力长久化到 MySQL。即 checkpioint 的提交和 MySQL 事务的最终提交是保持一致的。
猜想 2 :job 重启会进行事务回滚,从新执行写入事务操作。
猜想 3 :job 失败,利用 checkpoint 复原 job 能保证数据恰好解决一次的准确语义。
验证猜想 1 :
通过截图日志以及两图比照可知,checkPoint 实现后回调了 MySQL 的 commit 操作,且尾数 134 之前的数据全副写入 MySQL(即最初一次 checkpoint 之前的数据全副长久化胜利),阐明 MySQL 的事务是和 checkpoint 保持一致的。
验证猜想 2 :
通过比照 job 重启前后的日志比照,发现 135-137 数据产生了事务回滚,并从新进行的写入操作。
验证猜想 3 :
job 未手动重启之前,能够看到 kafka producer 理论发送的数据和长久化 MySQL 的数据是不统一的,接下来就是验证,利用 Flink 的 checkpoint 复原作业最终能达到准确一次生产的语义。
找到最初一次 checkpoint 的门路,执行以下命令进行 job 复原
bin/flink run -s hdfs://lsl001:8020/checkpoint/flink-checkpoint/ca7caa1b21052bb1dbb02d5533b93df4/chk-5 flink_study-1.0-SNAPSHOT.jar --detached
job 重启胜利,从截图日志可看到,没有长久化胜利的数据,从新执行了写入操作,最终通过 checkpoint 胜利提交事务。MySQL 查问后果如下图:
三、总结
通过上述的测试流程,咱们能够失去如下论断:
- Flink Job 重启或者失败,事务都会回滚,并且都能最终保证数据的准确一次生产。
- Flink CheckPoin 和两阶段提交时亲密绑定的。
- 自定义 MySQL sink 实现 TwoPhaseCommitSinkFunction 类可实现 MySQL 零碎敌对的融入 Flink EOS 生态。