前言

最近业务开发部门给咱们部门提了一个需要,因为他们开发环境和测试环境共用一套kafka,他们心愿咱们部门能帮他们实现主动给kafka的topic加上环境前缀,比方开发环境,则topic为dev_topic,测试环境,则topic为test_topic,他们kafka客户端是应用spring-kafka。一开始接到这个需要的时候,我心里是回绝的,为啥开发环境和测试环境不别离部署一套kafka,还要那么麻烦。但老大都许可接这个需要了,作为小罗罗也只能接了

实现思路

1、生产者端

能够通过生产者拦截器,来给topic加前缀

2、实现步骤

a、编写一个生产者拦截器
@Slf4jpublic class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> {    /**     * 运行在用户主线程中,在音讯被序列化之前调用     * @param record     * @return     */    @Override    public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) {        log.info("原始topic:{}",record.topic());        return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(),                record.partition(),record.timestamp(),record.key(), record.value());    }    /**     * 在音讯被应答之前或者音讯发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中     * @param metadata     * @param exception     */    @Override    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {      log.info("理论topic:{}",metadata.topic());    }    /**     *  清理工作     */    @Override    public void close() {    }    /**     * 初始化工作     * @param configs     */    @Override    public void configure(Map<String, ?> configs) {    }
b、配置拦截器
kafka:    producer:      # 生产者拦截器配置      properties:        interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor
c、测试

2、消费者端

这个就略微有点难搞了,因为业务开发部门他们是间接用@KafkaListener的注解,形如下

 @KafkaListener(id = "msgId",topics = {Constant.TOPIC})

像这种也没啥好的方法,就只能通过源码了,通过源码能够发现在如下中央

KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

会把@KafkaListener的值赋值给消费者,如果对spring有理解的敌人,可能会晓得postProcessAfterInitialization是spring后置处理器的办法,次要用来bean初始化后的一些操作,既然咱们晓得@KafkaListener会在bean初始化后再进行赋值,那咱们就能够在bean初始化前,批改掉@KafkaListener的值。具体实现如下

@Componentpublic class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {    @SneakyThrows    @Override    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {        List<String> packageNames = AutoConfigurationPackages.get(beanFactory);        for (String packageName : packageNames) {            Reflections reflections = new Reflections(new ConfigurationBuilder()                    .forPackages(packageName) // 指定门路URL                    .addScanners(new SubTypesScanner()) // 增加子类扫描工具                    .addScanners(new FieldAnnotationsScanner()) // 增加 属性注解扫描工具                    .addScanners(new MethodAnnotationsScanner() ) // 增加 办法注解扫描工具                    .addScanners(new MethodParameterScanner() ) // 增加办法参数扫描工具            );            Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);            if(!CollectionUtils.isEmpty(methodSet)){                for (Method method : methodSet) {                    KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);                    changeTopics(kafkaListener);                }            }        }    }    private void changeTopics(KafkaListener kafkaListener) throws Exception{        InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);        Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");        memberValuesField.setAccessible(true);        Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler);        String[] topics = (String[])memberValues.get("topics");        System.out.println("批改前topics:" + Lists.newArrayList(topics));        for (int i = 0; i < topics.length; i++) {            topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];        }        memberValues.put("topics", topics);        System.out.println("批改后topics:" + Lists.newArrayList(kafkaListener.topics()));    }}
测试


总结

尽管实现了动静批改topic,但我还是感觉topic不要轻易扭转,有条件的话,kafka还是得基于物理环境隔离,其次真的客观条件不容许,要动静变更topic,则需做好topic动静变更宣导以及相干wiki的编写,不然很容易掉坑

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-mq-idempotent-consume