前言
不晓得大家有没有遇到这样的场景,就是一个我的项目中要生产多个kafka音讯,不同的消费者生产指定kafka音讯。遇到这种场景,咱们能够通过kafka的提供的api进行配置即可。但很多时候咱们会应用spring-kafka来简化开发,可是spring-kafka原生的配置项并没提供多个kafka配置,因而本文就来聊聊如何将spring-kafka进行革新,使之能反对多个kafka配置
注释
1、通过 @ConfigurationProperties指定KafkaProperties前缀
@Primary @ConfigurationProperties(prefix = "lybgeek.kafka.one") @Bean public KafkaProperties oneKafkaProperties(){ return new KafkaProperties(); }
如果有多个就配置多个,形如
@ConfigurationProperties(prefix = "lybgeek.kafka.two") @Bean public KafkaProperties twoKafkaProperties(){ return new KafkaProperties(); } @ConfigurationProperties(prefix = "lybgeek.kafka.three") @Bean public KafkaProperties threeKafkaProperties(){ return new KafkaProperties(); }
2、配置消费者工厂,消费者工厂绑定对应的KafkaProperties
@Bean public ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){ return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties()); }
3、配置消费者监听器工厂,并绑定指定消费者工厂以及消费者配置
@Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_TWO) public KafkaListenerContainerFactory twoKafkaListenerContainerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties, @Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(twoConsumerFactory); factory.setConcurrency(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : twoKafkaProperties.getListener().getConcurrency()); factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:twoKafkaProperties.getListener().getAckMode()); return factory; }
残缺的配置示例如下
@Configuration@EnableConfigurationProperties(MultiKafkaComsumeProperties.class)public class OneKafkaComsumeAutoConfiguration { @Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE) public KafkaListenerContainerFactory oneKafkaListenerContainerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties, @Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(oneConsumerFactory); factory.setConcurrency(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : oneKafkaProperties.getListener().getConcurrency()); factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:oneKafkaProperties.getListener().getAckMode()); return factory; } @Primary @Bean public ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){ return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties()); } @Primary @ConfigurationProperties(prefix = "lybgeek.kafka.one") @Bean public KafkaProperties oneKafkaProperties(){ return new KafkaProperties(); }}
那个 @Primary要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的主动拆卸不懂要选哪个而报错
@Configuration@ConditionalOnClass(KafkaTemplate.class)@EnableConfigurationProperties(KafkaProperties.class)@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })public class KafkaAutoConfiguration { private final KafkaProperties properties; private final RecordMessageConverter messageConverter; public KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter) { this.properties = properties; this.messageConverter = messageConverter.getIfUnique(); } @Bean @ConditionalOnMissingBean(KafkaTemplate.class) public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory); if (this.messageConverter != null) { kafkaTemplate.setMessageConverter(this.messageConverter); } kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; } @Bean @ConditionalOnMissingBean(ProducerListener.class) public ProducerListener<Object, Object> kafkaProducerListener() { return new LoggingProducerListener<>(); } @Bean @ConditionalOnMissingBean(ConsumerFactory.class) public ConsumerFactory<?, ?> kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()); } @Bean @ConditionalOnMissingBean(ProducerFactory.class) public ProducerFactory<?, ?> kafkaProducerFactory() { DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>( this.properties.buildProducerProperties()); String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); } return factory; } @Bean @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix") @ConditionalOnMissingBean public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) { return new KafkaTransactionManager<>(producerFactory); } @Bean @ConditionalOnProperty(name = "spring.kafka.jaas.enabled") @ConditionalOnMissingBean public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); Jaas jaasProperties = this.properties.getJaas(); if (jaasProperties.getControlFlag() != null) { jaas.setControlFlag(jaasProperties.getControlFlag()); } if (jaasProperties.getLoginModule() != null) { jaas.setLoginModule(jaasProperties.getLoginModule()); } jaas.setOptions(jaasProperties.getOptions()); return jaas; } @Bean @ConditionalOnMissingBean public KafkaAdmin kafkaAdmin() { KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties()); kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast()); return kafkaAdmin; }}
同我的项目应用多个kafka消费者示例
1、在我的项目的pom引入spring-kafka GAV
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2、在我的项目的yml中配置如下内容
lybgeek: kafka: multi: comsume-enabled: false one: producer: # kafka生产者服务端地址 bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:10.1.4.71:32643} # 生产者重试的次数 retries: ${KAFKA_PRODUCER_RETRIES:0} # 每次批量发送的数据量 batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384} # 每次批量发送音讯的缓冲区大小 buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432} # 指定音讯key和音讯体的编码方式 key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer} value-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer} # acks=1 只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。 acks: ${KAFKA_PRODUCER_ACK:1} consumer: bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:10.1.4.71:32643} # 在偏移量有效的状况下,消费者将从起始地位读取分区的记录 auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest} # 是否主动提交偏移量,默认值是true,为了避免出现反复数据和数据失落,能够把它设置为false,而后手动提交偏移量 enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false} # 指定音讯key和音讯体的解码形式 key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer} value-deserializer: ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer} listener: # 在侦听器容器中运行的线程数。 concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1} missing-topics-fatal: false ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual} two: producer: # kafka生产者服务端地址 bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:192.168.1.3:9202} # 生产者重试的次数 retries: ${KAFKA_PRODUCER_RETRIES:0} # 每次批量发送的数据量 batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384} # 每次批量发送音讯的缓冲区大小 buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432} # 指定音讯key和音讯体的编码方式 key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer} value-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer} # acks=1 只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。 acks: ${KAFKA_PRODUCER_ACK:1} consumer: bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:192.168.1.3:9202} # 在偏移量有效的状况下,消费者将从起始地位读取分区的记录 auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest} # 是否主动提交偏移量,默认值是true,为了避免出现反复数据和数据失落,能够把它设置为false,而后手动提交偏移量 enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false} # 指定音讯key和音讯体的解码形式 key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer} value-deserializer: ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer} listener: # 在侦听器容器中运行的线程数。 concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1} missing-topics-fatal: false ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
3、配置生产者
private KafkaTemplate kafkaTemplate; @Override public MqResp sendSync(MqReq mqReq) { ListenableFuture<SendResult<String, String>> result = this.send(mqReq); MqResp mqResp = this.buildMqResp(result); return mqResp; }
这个KafkaTemplate绑定的就是@Primary配置的kafkaProperties
4、配置消费者监听,并绑定containerFactory
@LybGeekKafkaListener(id = "createUser",containerFactory = MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE,topics = Constant.USER_TOPIC)public class UserComsumer extends BaseComusmeListener { @Autowired private UserService userService; @Override public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) { User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class); System.out.println("-----------------------"); return userService.isExistUserByUsername(user.getUsername()); } @Override public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) { User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class); System.out.println(user); return userService.save(user); }}
通过指定containerFactory ,来生产指定的kafka音讯
5、测试
User user = User.builder().username("test") .email("test@qq.com") .fullname("test") .mobile("1350000001") .password("1234561") .build(); userService.saveAndPush(user);
发送音讯,察看控制台输入
: messageKey:【null】,topic:【user-sync】存在反复音讯数据-->【{"email":"test@qq.com","fullname":"test","mobile":"1350000000","password":"123456","username":"test"}】
会呈现这样,是因为数据库曾经有这条记录了,刚好验证一下反复生产
总结
本文实现的外围其实就是通过注入多个kafkaProperties来实现多配置 ,不晓得大家有没有发现,就是革新后的配置,配置消费者后,生产者依然也要配置。因为如果不配置,走的就是kafkaProperties默认的配置信息,即为localhost。还有仔细的敌人兴许会发现我示例中的消费者监听应用的注解是@LybGeekKafkaListener,这个和 @KafkaListener实现的性能基本一致。因为本示例和之前的文章聊聊如何实现一个带幂等模板的kafka消费者监听是同份代码,就间接复用了
demo链接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template