背景
目前,公司方面 RPC 调用如 Dubbo、Feign 曾经能反对基于灰度的调用,然而 MQ 还没有反对灰度的能力,因而导致在测试和生产环境业务验证、音讯隔离方面体验比拟差,因而咱们基于 RabbitMQ 和 Kafka 实现了音讯灰度的能力。
灰度场景
大部分场景下 MQ 的灰度并不会像 RPC 那样那么严格,然而咱们须要确认生产场景,即当灰度消费者不存在的状况下,音讯是否应该由失常消费者去生产。
1. 灰度音讯只由灰度节点生产
事实的状况是可能大家都想要这种严格意义上的音讯灰度隔离策略,由此才证实是真正的音讯灰度计划,然而这个计划须要思考一些具体场景问题。
比方,有时候作为灰度节点的发送方,它的性能改变点并不是在 MQ 这块,然而它发送的音讯却是灰度音讯,而音讯的生产方可能也未产生过性能变动,也不会有与之对应的灰度生产标识,这种状况下如果咱们将灰度的音讯进行抛弃的话,那么会造成最终的数据不残缺。
2. 灰度音讯能够由失常节点生产
因而,咱们再思考第二种计划,如果当灰度生产节点不存在时,音讯会由失常节点生产,当存在灰度节点时,则由灰度节点生产,失常节点生产灰度音讯只为了当灰度节点不存在时的兜底。
那么,这种场景依然可能存在问题,比方当生产节点的生产逻辑产生扭转时,由失常节点生产就可能造成业务上的谬误。对于此问题咱们能够默认认为如果生产方产生逻辑扭转,则灰度节点大概率肯定是存在的,如果一些异常情况导致的异样或者宕机的场景,依然能通过人工或者告警判断进去,总之,这个问题认为不算是问题。
灰度计划
咱们别离从 MQ 的本身个性和一些通用的解决形式登程,别离探讨 RabbitMQ 和 Kafka 的灰度实现形式。
惯例计划:影子 Queue/Topic
这个是当初实现 MQ 灰度最为常见的计划,为每一个 Queue/Topic 都建设一个与之对应的灰度 Queue/Topic。
生产者层将要发送的音讯进行 Queue/Topic/RoutingKey 的动静批改,让他发送到灰度或失常的 Queue/Topic 中。
而消费者层面只须要在利用启动时依据本身的灰度标记动静的切换到灰度 Queue/Topic 进行监听即可。
然而对于咱们目前的零碎现状而言,这个计划存在三个问题:
首先,因为咱们目前零碎测试环境的灰度标签是能够定制的,可能每一个性能上线都会有一个对应的灰度标识,这样带来的问题就是 Queue/Topic 的数量会随着灰度标识的减少而倍数性的减少。
而不论哪种 MQ,过多的 Queue/Topic 都会对 MQ 自身造成肯定解决能力降落。
另外,咱们的灰度标签是能够依据启动的实例随便批改的,也就意味着对应的整套 Queue/Topic 也得跟着灰度的标识随便的创立。这样一来,人工手动跟着创立显然就不太事实,而生产环境中咱们的 Queue/Topic 创立是须要走流程申请的,这又和咱们的现状违反。
再者,即使咱们可能依据生产者的灰度标识动静的创立 Queue/Topic 的话,那么至多也须要思考在灰度生产者实例失常下线时将它创立的 Queue/Topic 进行销毁,如果异样的下线还须要人工的接入定期的进行 Queue/Topic 的清理工作。
最初,如果是针对 Kafka 或 RocketMQ,这种计划履行起来还比较简单,如果是对于 RabbitMQ,这里又多了一层 Exchange 和 Queue 的绑定关系,不同的生产模式也须要去做各自的适配。
所以,为了在 RabbitMQ 和 Kafka 之间的一致性,咱们决定不采纳该计划来实现。
RabbitMQ
对于 RabbitMq,咱们应用从新入队这个个性来实现灰度队列。
通过 从新入队 的这个个性,咱们能够在生产者发送音讯时将灰度的标识标记到音讯头,发送时一并收回。
当消费者生产音讯时,依据消费者本身标记决定要不要对音讯进行生产,如果消费者自身不满足灰度生产规定,则把这条灰度音讯进行 Requeue 解决。
这条音讯通过轮询,最终会流转到灰度标识的消费者进行生产。
实现思路
- 生产者在发送音讯之前获取到以后实例的灰度标记,对音讯 Header 增加灰度标记
- 对消费者增加监听器,灰度节点生产依据灰度标记判断对灰度音讯的生产,失常节点依据开关决定是否生产或者进行 Requeue
生产流程
生产者在启动时,咱们通过主动拆卸,注册 RabbitTemplate 时 setBeforePublishPostProcessors
增加前置处理器,在发送音讯前对音讯的 Header 增加灰度标记。
生产流程
首先,在生产时通过监听 SimpleMessageListenerContainer
重写 executeListener
办法进行音讯解决。
- 当灰度开关未关上,执行失常生产逻辑。
- 当灰度机器间接匹配到灰度音讯时,那么间接生产即可。
- 通过监听 Eureka 本地缓存刷新的事件不停地刷新灰度实例的缓存,当失常节点生产灰度音讯时,如果灰度实例不存在就能够间接生产。
- 如果存在灰度实例且失常节点生产到灰度音讯,思考两种可能,第一是失常的轮询到失常节点,第二是灰度节点
prefetch_count
达到阈值,阻塞队列已满,灰度音讯在失常节点之间不停地轮询。为了解决第二个场景,增加了一层布隆过滤器,当再次匹配到同样的音讯时,以后节点将休眠一段短暂的工夫。 - 上述场景都未匹配到,那么执行 Requeue 操作。
Kafka
在 Kafka 的生产理念中有一层消费者组的概念,每个消费者都有一个对应的生产组。
当音讯公布到主题后,只会被投递给订阅它的每个生产组中的一个消费者,两个生产组之间互不影响。
借助这个生产个性,能够将同一个生产组中的灰度消费者独自拎进去,做成一个非凡的生产组,这样每个生产组都会接管到同样的音讯。
在失常的生产组中,遇到带有灰度标识的音讯,咱们只做空生产,不理论执行业务逻辑,在灰度生产组中的消费者,只解决匹配到灰度规定的音讯,其它的音讯做空生产。
实现思路
- 生产者生产灰度音讯的时候在音讯 Header 外面增加灰度标记
- 灰度消费者和失常消费者设置不同的 GroupId
- 灰度消费者和失常消费者在拿到音讯后判断有没有灰度标记,判断配置核心是否开启了音讯灰度,如果开启了则进行灰度节点的生产,如果没开启则不生产
生产流程
生产者在启动的时候会去动静拆卸所有的拦截器,拆卸的形式为在 BeanPostProcessor 的后置处理器中获取到 KafkaTemplate 对象,把咱们的拦截器的类的全限定名 set 进去 config 即可,这里能够反对不论用户本人创立的 Factory 对象还是 KafkaTemplate 对象都能进行拦截器的拆卸。
生产流程
生产的时候也是一样,如果以后节点是灰度节点,那么就批改以后 group.id
为灰度,最初通过拦截器执行生产逻辑。