一、前言
假如以后Flink利用已实现EOS(即 Exactly-Once Semantics)语义,当初须要减少Flink解决数据长久化到MySQL,前提条件不能突破Flink EOS的生态。官网提供的flink-connector-jdbc并没有提供事务和checkpoint的相干操作,自定义sink须要思考和CheckPoint简单的配合。参考Flink EOS如何避免内部零碎乱入,可自定义实现TwoPhaseCommitSinkFunction类,实现MySQL内部零碎组件的完满嵌入。
本次模仿Flink生产Kafka数据并写入MySQL,通过自定义MySQL Sink验证Flink 2PC以及EOS的准确性。相应的零碎及组件版本如下,Flink的部署形式为Standalone。
组件/零碎 | 版本 |
---|---|
centOS | 7.5 |
Flink | 1.12.2 |
MySQL | 5.7 |
Kafka | 2.3.1 |
Zookeeper | 3.4.5 |
Hadoop | 2.6.0 |
二、验证
思路及猜想后果:
kafka发送:一共发送20条数据,每条数据开端数字自增,不便察看成果。每条发送距离为10秒,一共耗时200s。
flink解决:checkpoint工夫价格为60s,重启提早为10s,解决第10条数据的时候模仿产生bug,解决完第19条数据的时候,手动勾销job,利用checkpoint复原job。
猜想1:产生bug时,解决过的数据但未做checkpoint不能长久化到MySQL,只有做了checkpoint的数据能力长久化到MySQL。即checkpioint的提交和MySQL事务的最终提交是保持一致的。
猜想2:job重启会进行事务回滚,从新执行写入事务操作。
猜想3:job失败,利用checkpoint复原job能保证数据恰好解决一次的准确语义。
验证猜想1:
通过截图日志以及两图比照可知,checkPoint实现后回调了MySQL的commit操作,且尾数134之前的数据全副写入MySQL(即最初一次checkpoint之前的数据全副长久化胜利),阐明MySQL的事务是和checkpoint保持一致的。
验证猜想2:
通过比照job重启前后的日志比照,发现135-137数据产生了事务回滚,并从新进行的写入操作。
验证猜想3:
job未手动重启之前,能够看到kafka producer理论发送的数据和长久化MySQL的数据是不统一的,接下来就是验证,利用Flink的checkpoint复原作业最终能达到准确一次生产的语义。
找到最初一次checkpoint的门路,执行以下命令进行job复原
bin/flink run -s hdfs://lsl001:8020/checkpoint/flink-checkpoint/ca7caa1b21052bb1dbb02d5533b93df4/chk-5 flink_study-1.0-SNAPSHOT.jar --detached
job重启胜利,从截图日志可看到,没有长久化胜利的数据,从新执行了写入操作,最终通过checkpoint胜利提交事务。MySQL查问后果如下图:
三、总结
通过上述的测试流程,咱们能够失去如下论断:
- Flink Job重启或者失败,事务都会回滚,并且都能最终保证数据的准确一次生产。
- Flink CheckPoin和两阶段提交时亲密绑定的。
- 自定义MySQL sink实现TwoPhaseCommitSinkFunction类可实现MySQL零碎敌对的融入Flink EOS生态。
发表回复