前言
本文针对解决Kafka不同Topic之间存在肯定的数据关联时的程序生产问题。
如存在Topic-insert和Topic-update别离是对数据的插入和更新,当insert和update操作为同一数据时,应保障先insert再update。
1、问题引入
kafka的程序生产始终是一个难以解决的问题,kafka的生产策略是对于同Topic同Partition的音讯可保障程序生产,其余无奈保障。如果一个Topic只有一个Partition,那么这个Topic对应consumer的生产必然是有序的。不同的Topic的任何状况下都无奈保障consumer的生产程序和producer的发送程序统一。
如果不同Topic之间存在数据关联且对生产程序有要求,该如何解决?本文次要解决此问题。
2、解决思路
现有Topic-insert和Topic-update,数据惟一标识为id,对于id=1的数据而言,要保障Topic-insert生产在前,Topic-update生产在后。
两个Topic的生产为不同线程解决,所以为了保障在同一时间内的同一数据标识的音讯仅有一个业务逻辑在解决,须要对业务增加锁操作。
应用synchronized进行加锁的话,会影响无关联的insert和update的数据生产能力,如id=1的insert和id=2的update,在synchronized的状况下,无奈并发解决,这是没有必要的,咱们须要的是对于id=1的insert和id=1的update在同一时间只有一个在解决,所以应用细粒度锁来实现加锁的操作。
细粒度锁实现:https://blog.csdn.net/qq_3824...
PS:如果为分布式系统,细粒度锁须要应用分布式锁的对应实现。
在对insert和update加锁之后,其实还是没有解决生产程序的问题,只是确保了同一时间只有一个业务在解决。 对于生产程序异样的问题,也就是先生产了update再生产insert的状况。
解决形式:生产到update数据,校验库中是否存在以后数据(也就是是否执行insert),如果没有,就将以后update数据存入缓存,key为数据标识id,在insert生产时查看是否存在id对应的update缓存,如果有,就证实以后数据的生产程序异样,需执行update操作,再将缓存数据移除。
3、实现计划
音讯发送:
kafkaTemplate.send("TOPIC_INSERT", "1");kafkaTemplate.send("TOPIC_UPDATE", "1");
监听代码示例:
KafkaListenerDemo.java
@Component@Slf4jpublic class KafkaListenerDemo { // 生产到的数据缓存 private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>(); // 数据存储 private Map<String, String> DATA_MAP = new ConcurrentHashMap<>(); private WeakRefHashLock weakRefHashLock; public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) { this.weakRefHashLock = weakRefHashLock; } @KafkaListener(topics = "TOPIC_INSERT") public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{ // 模仿程序异样,也就是insert后生产,这里线程sleep Thread.sleep(1000); String id = record.value(); log.info("接管到insert :: {}", id); Lock lock = weakRefHashLock.lock(id); lock.lock(); try { log.info("开始解决 {} 的insert", id); // 模仿 insert 业务解决 Thread.sleep(1000); // 从缓存中获取 是否存在有update数据 if (UPDATE_DATA_MAP.containsKey(id)){ // 缓存数据存在,执行update doUpdate(id); } log.info("解决 {} 的insert 完结", id); }finally { lock.unlock(); } acknowledgment.acknowledge(); } @KafkaListener(topics = "TOPIC_UPDATE") public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{ String id = record.value(); log.info("接管到update :: {}", id); Lock lock = weakRefHashLock.lock(id); lock.lock(); try { // 测试应用,不做数据库的校验 if (!DATA_MAP.containsKey(id)){ // 未找到对应数据,证实生产程序异样,将以后数据退出缓存 log.info("生产程序异样,将update数据 {} 退出缓存", id); UPDATE_DATA_MAP.put(id, id); }else { doUpdate(id); } }finally { lock.unlock(); } acknowledgment.acknowledge(); } void doUpdate(String id) throws InterruptedException{ // 模仿 update log.info("开始解决update::{}", id); Thread.sleep(1000); log.info("解决update::{} 完结", id); }}
日志(代码中已模仿必现生产程序异样的场景):
接管到update ::1生产程序异样,将update数据 1 退出缓存接管到insert ::1开始解决 1 的insert开始解决update::1解决update::1 完结解决 1 的insert 完结
察看日志,此计划可失常解决不同Topic再存在数据关联的生产程序问题。
版权申明:本文为CSDN博主「方片龙」的原创文章,遵循CC 4.0 BY-SA版权协定,转载请附上原文出处链接及本申明。
原文链接:https://blog.csdn.net/qq_3824...
近期热文举荐:
1.1,000+ 道 Java面试题及答案整顿(2022最新版)
2.劲爆!Java 协程要来了。。。
3.Spring Boot 2.x 教程,太全了!
4.别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!
5.《Java开发手册(嵩山版)》最新公布,速速下载!
感觉不错,别忘了顺手点赞+转发哦!