共计 2945 个字符,预计需要花费 8 分钟才能阅读完成。
前言
本文针对解决 Kafka 不同 Topic 之间存在肯定的数据关联时的程序生产问题。
如存在 Topic-insert 和 Topic-update 别离是对数据的插入和更新,当 insert 和 update 操作为同一数据时,应保障先 insert 再 update。
1、问题引入
kafka 的程序生产始终是一个难以解决的问题,kafka 的生产策略是对于同 Topic 同 Partition 的音讯可保障程序生产,其余无奈保障。如果一个 Topic 只有一个 Partition,那么这个 Topic 对应 consumer 的生产必然是有序的。不同的 Topic 的任何状况下都无奈保障 consumer 的生产程序和 producer 的发送程序统一。
如果不同 Topic 之间存在数据关联且对生产程序有要求,该如何解决?本文次要解决此问题。
2、解决思路
现有 Topic-insert 和 Topic-update,数据惟一标识为 id,对于 id= 1 的数据而言,要保障 Topic-insert 生产在前,Topic-update 生产在后。
两个 Topic 的生产为不同线程解决,所以为了保障在同一时间内的同一数据标识的音讯仅有一个业务逻辑在解决,须要对业务增加锁操作。
应用 synchronized 进行加锁的话,会影响无关联的 insert 和 update 的数据生产能力,如 id= 1 的 insert 和 id= 2 的 update,在 synchronized 的状况下,无奈并发解决,这是没有必要的,咱们须要的是对于 id= 1 的 insert 和 id= 1 的 update 在同一时间只有一个在解决,所以应用细粒度锁来实现加锁的操作。
细粒度锁实现:https://blog.csdn.net/qq_3824…
PS:如果为分布式系统,细粒度锁须要应用分布式锁的对应实现。
在对 insert 和 update 加锁之后,其实还是没有解决生产程序的问题,只是确保了同一时间只有一个业务在解决。 对于生产程序异样的问题,也就是先生产了 update 再生产 insert 的状况。
解决形式:生产到 update 数据,校验库中是否存在以后数据(也就是是否执行 insert),如果没有,就将以后 update 数据存入缓存,key 为数据标识 id,在 insert 生产时查看是否存在 id 对应的 update 缓存,如果有,就证实以后数据的生产程序异样,需执行 update 操作,再将缓存数据移除。
3、实现计划
音讯发送:
kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");
监听代码示例:
KafkaListenerDemo.java
@Component
@Slf4j
public class KafkaListenerDemo {
// 生产到的数据缓存
private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
// 数据存储
private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
private WeakRefHashLock weakRefHashLock;
public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {this.weakRefHashLock = weakRefHashLock;}
@KafkaListener(topics = "TOPIC_INSERT")
public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
// 模仿程序异样,也就是 insert 后生产,这里线程 sleep
Thread.sleep(1000);
String id = record.value();
log.info("接管到 insert::{}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {log.info("开始解决 {} 的 insert", id);
// 模仿 insert 业务解决
Thread.sleep(1000);
// 从缓存中获取 是否存在有 update 数据
if (UPDATE_DATA_MAP.containsKey(id)){
// 缓存数据存在,执行 update
doUpdate(id);
}
log.info("解决 {} 的 insert 完结", id);
}finally {lock.unlock();
}
acknowledgment.acknowledge();}
@KafkaListener(topics = "TOPIC_UPDATE")
public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{String id = record.value();
log.info("接管到 update::{}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {
// 测试应用,不做数据库的校验
if (!DATA_MAP.containsKey(id)){
// 未找到对应数据,证实生产程序异样,将以后数据退出缓存
log.info("生产程序异样,将 update 数据 {} 退出缓存", id);
UPDATE_DATA_MAP.put(id, id);
}else {doUpdate(id);
}
}finally {lock.unlock();
}
acknowledgment.acknowledge();}
void doUpdate(String id) throws InterruptedException{
// 模仿 update
log.info("开始解决 update::{}", id);
Thread.sleep(1000);
log.info("解决 update::{} 完结", id);
}
}
日志(代码中已模仿必现生产程序异样的场景):
接管到 update::1
生产程序异样,将 update 数据 1 退出缓存
接管到 insert::1
开始解决 1 的 insert
开始解决 update::1
解决 update::1 完结
解决 1 的 insert 完结
察看日志,此计划可失常解决不同 Topic 再存在数据关联的生产程序问题。
版权申明:本文为 CSDN 博主「方片龙」的原创文章,遵循 CC 4.0 BY-SA 版权协定,转载请附上原文出处链接及本申明。
原文链接:https://blog.csdn.net/qq_3824…
近期热文举荐:
1.1,000+ 道 Java 面试题及答案整顿 (2022 最新版)
2. 劲爆!Java 协程要来了。。。
3.Spring Boot 2.x 教程,太全了!
4. 别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!
5.《Java 开发手册(嵩山版)》最新公布,速速下载!
感觉不错,别忘了顺手点赞 + 转发哦!