关于java:Java如何实现消费数据隔离

42次阅读

共计 2535 个字符,预计需要花费 7 分钟才能阅读完成。

我是 3y,一年 CRUD 教训用十年的 markdown 程序员👨🏻‍💻长年被誉为优质八股文选手

明天持续更新 austin 我的项目,如果还没看过该系列的同学能够点开我的历史文章回顾下,在看的过程中不要遗记了 点赞 哟!倡议不要漏了或者跳着看,不然这篇就看不懂了,之前写过的知识点和业务我就不再赘述啦。

明天要实现的是 handler 模块的 生产数据隔离。在聊这个之前,先看下之前的实现是怎么样的。

austin-api接管到了申请之后,将申请发往 Kafka,topicName 为austin。而在austin-handler 起了一个 groupName 名为 austinGroup 监听 austin 这个 topic 的数据,进而实现音讯发送。

从零碎架构来说,austin 我的项目是能够发送多种类型音讯的:短信、微信小程序、邮件等等等

那如果是单个 topic 单个 group 的话,有没有想过一个问题:如果某个发送渠道接口存在异样,超时了,此时会怎么样

没错,音讯都会堵住,因为它们生产同一个 topic,用的是同一个消费者。

01、数据隔离

要破局?很简略。多 topic 多 group 就行啦

下面这种能解决所有问题吗?并不 。即使是同一个渠道,但不同类型的音讯发送个性是不一样的。比方我要发 push 营销 音讯,有可能在某个时刻就要推送 4000W 的人群。

那这 4000W 人在短时间内齐全发送进来,不太事实。这很可能意味着会影响到 告诉类 的 push 音讯

还要破局?很简略。毕竟咱们在设计 音讯模板 的时候就曾经思考到这点了。音讯模板有 msgType 字段来标识以后的模板属于哪种类型,那咱们能够依据不同的音讯类型再划分对应的 group。

从实践上来说,咱们能够为 每种渠道的每种音讯类型独自辨别一个 topic 和 group。因为 topic 间的数据是隔离的,不同的 group 间生产也是隔离的,那咱们生产时必定是数据隔离的。

不过,我目前的做法是:单 topic 多 group。生产是隔离的,但生产的 topic 是共享的。我认为这样代码会更加清晰和易懂些,前期如果存在瓶颈了咱们能够持续改。

02、生产端设计

从下面曾经定了通过单 topic 多 group 来实现数据隔离。比方,我目前定义了 6 个渠道(im/push/ 邮件 / 短信 / 小程序 / 微信服务号) 和 3 种音讯类型 (告诉 / 营销 / 验证码),那相当于起了18 个消费者。

从 kafka 获取失去音讯当前,我 暂定 布局是走几个步骤:音讯抛弃 -> 去重 -> 真正发送

从实质上看 去重 发送音讯 都是 网络 IO 密集型 。于是,为了 进步吞吐量 ,我这边决定生产 Kafka 后存入缓存, 做一层缓冲区

做一层缓冲区可进步吞吐量,但同样会带来别的问题。如:当利用重启时,缓冲区的数据还没生产完,那是不是就会失落?

这个咱们能够前面再看看怎么把带来的问题给搞掂(继续关注,我的项目优化前面多着呢)。当初还是认为缓冲区的利大于弊,所以回到缓冲区上。

缓冲区给我的第一反馈是实现 生产者消费者模式

要实现这种模式,我初想了下挺简略的:生产 Kafka 的音讯作为生产者,而后把数据扔进阻塞队列上,开多个线程去生产阻塞队列的数据就完事了。

起初又想了下,间接线程池不就完事了吗?线程池不就是生产者和消费者的实现吗。

于是乎,架构就变成了下图:

03、代码设计

在生产端首先看 Receiver 的代码,该类看起来看简略,就只有一个 @KafkaListener 注解润饰办法,从 Kafka 生产进去随后交给 pending 做解决

我用的是 @KafkaListener 注解从 Kafka 拉取音讯,而没有用低级的Kafka api,原因无他:在项目前期无需做到完满,等有瓶颈的时候再想方法就好了。虽说如此,但我写的时候还是给我带来了不少的麻烦。

第一个问题 @KafkaListener 是一个注解,从源码正文看它的传值只可能用 Spring EL 表达式和读取某个配置。但要晓得的是,我的目标是想有 多个 group 生产同一个 topic。而我不可能说给每个 group 都定义一个生产的办法吧?(写这种破代码,我都睡不着觉

翻了一个早晨技术博客我都没找到计划,甚至还发了个朋友圈吐槽下有没有人遇到过。第二天我认真翻了下 Spring 的官网文档,终于给我找到了计划。

还是官网文档切实

有了解决办法了当前,那事件就好办了。既然我是每种音讯渠道的每种音讯类型都要隔离,那我把这给枚举进去就完事啦!

我的 Receiver 是多例的,那么只有我遍历这个 List 就好了(初始化消费者在 ReceiverStart 类上)。

解决了用 @KafkaListener 注解动静传入 groupId 进而创立多个消费者了之后。

我又遇到了第二个问题 :Spring 有@Aysnc 注解来 优雅 实现线程池的办法调用。我之前是没用过 @Aysnc 注解的,但我看了下原理和应用姿态。我感觉这样挺优雅的(优雅永不过期 )。然而用@Aysnc 是必定要 本人创立线程池 ,并且我要给每个消费者都创立本人 独有 的线程池。而我不可能说给每个 group 都定义一个创立线程池的办法吧?(写这种破代码,我都睡不着觉

这次翻了官网和各种技术博客,都没能解决掉我的问题:在 Spring 环境下 @Async注解上动静传入线程池实例,以及创立线程池实例时可反对依据条件传参。

最初只能放弃掉 @Aysnc 注解了,以编程的形式去实现:

上面是 TaskPendingHolder 的实现(无非就是给每个消费者创立对应的线程池):

而 Task 实现目前就比较简单啦,间接调用对应的 Handler 进而下发音讯就好:

04、总结

代码看似简略,业务看似容易了解,然而要晓得的是即使是 很多小公司 的生产我的项目都没有这种设计。一把梭可真的是太常见了(性能又不是不能实现,代码又不是不能跑,最次要的:人也不是不能跑)

这篇文章次要讲述了一个思路:在生产 MQ 的时候,多 group 是能够实现数据隔离的,想要进步生产的吞吐量,能够再做一层缓冲区(前提是生产是 IO 密集型 的)

关注我的微信公众号【Java3y】除了技术我还会聊点日常,有些话只能轻轻说~ 【对线面试官 + 从零编写 Java 我的项目】继续高强度更新中!求 star!!原创不易!!求三连!!

源码 Gitee 链接:gitee.com/austin

源码 GitHub 链接:github.com/austin

正文完
 0