关于springboot:聊聊在springboot项目中如何配置多个kafka消费者

前言

不晓得大家有没有遇到这样的场景,就是一个我的项目中要生产多个kafka音讯,不同的消费者生产指定kafka音讯。遇到这种场景,咱们能够通过kafka的提供的api进行配置即可。但很多时候咱们会应用spring-kafka来简化开发,可是spring-kafka原生的配置项并没提供多个kafka配置,因而本文就来聊聊如何将spring-kafka进行革新,使之能反对多个kafka配置

注释

1、通过 @ConfigurationProperties指定KafkaProperties前缀

    @Primary
    @ConfigurationProperties(prefix = "lybgeek.kafka.one")
    @Bean
    public KafkaProperties oneKafkaProperties(){
        return new KafkaProperties();
    }

如果有多个就配置多个,形如

    @ConfigurationProperties(prefix = "lybgeek.kafka.two")
    @Bean
    public KafkaProperties twoKafkaProperties(){
        return new KafkaProperties();
    }

    @ConfigurationProperties(prefix = "lybgeek.kafka.three")
    @Bean
    public KafkaProperties threeKafkaProperties(){
        return new KafkaProperties();
    }

2、配置消费者工厂,消费者工厂绑定对应的KafkaProperties

  @Bean
    public ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){

        return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());
    }

3、配置消费者监听器工厂,并绑定指定消费者工厂以及消费者配置

  @Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_TWO)
    public KafkaListenerContainerFactory twoKafkaListenerContainerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties, @Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(twoConsumerFactory);
        factory.setConcurrency(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : twoKafkaProperties.getListener().getConcurrency());
        factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:twoKafkaProperties.getListener().getAckMode());

        return factory;
    }

残缺的配置示例如下

@Configuration
@EnableConfigurationProperties(MultiKafkaComsumeProperties.class)
public class OneKafkaComsumeAutoConfiguration {

    @Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE)
    public KafkaListenerContainerFactory oneKafkaListenerContainerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties, @Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(oneConsumerFactory);
        factory.setConcurrency(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : oneKafkaProperties.getListener().getConcurrency());
        factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:oneKafkaProperties.getListener().getAckMode());
        return factory;
    }

    @Primary
    @Bean
    public ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){
        return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());
    }


    @Primary
    @ConfigurationProperties(prefix = "lybgeek.kafka.one")
    @Bean
    public KafkaProperties oneKafkaProperties(){
        return new KafkaProperties();
    }

}

那个 @Primary要指定一下,不然启动会因为存在多个KafkaProperties,而导致kafka的主动拆卸不懂要选哪个而报错

@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    private final RecordMessageConverter messageConverter;

    public KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter) {
        this.properties = properties;
        this.messageConverter = messageConverter.getIfUnique();
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        if (this.messageConverter != null) {
            kafkaTemplate.setMessageConverter(this.messageConverter);
        }
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    @ConditionalOnMissingBean(ProducerListener.class)
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<>();
    }

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
    }

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public ProducerFactory<?, ?> kafkaProducerFactory() {
        DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
                this.properties.buildProducerProperties());
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        if (transactionIdPrefix != null) {
            factory.setTransactionIdPrefix(transactionIdPrefix);
        }
        return factory;
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
    @ConditionalOnMissingBean
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    @ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
    @ConditionalOnMissingBean
    public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
        KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
        Jaas jaasProperties = this.properties.getJaas();
        if (jaasProperties.getControlFlag() != null) {
            jaas.setControlFlag(jaasProperties.getControlFlag());
        }
        if (jaasProperties.getLoginModule() != null) {
            jaas.setLoginModule(jaasProperties.getLoginModule());
        }
        jaas.setOptions(jaasProperties.getOptions());
        return jaas;
    }

    @Bean
    @ConditionalOnMissingBean
    public KafkaAdmin kafkaAdmin() {
        KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
        kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
        return kafkaAdmin;
    }

}

同我的项目应用多个kafka消费者示例

1、在我的项目的pom引入spring-kafka GAV

 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、在我的项目的yml中配置如下内容

lybgeek:
    kafka:
        multi:
            comsume-enabled: false
        one:
            producer:
                # kafka生产者服务端地址
                bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:10.1.4.71:32643}
                # 生产者重试的次数
                retries: ${KAFKA_PRODUCER_RETRIES:0}
                # 每次批量发送的数据量
                batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
                # 每次批量发送音讯的缓冲区大小
                buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
                # 指定音讯key和音讯体的编码方式
                key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
                value-serializer:  ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
                # acks=1 只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。
                acks: ${KAFKA_PRODUCER_ACK:1}

            consumer:
                bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:10.1.4.71:32643}
                # 在偏移量有效的状况下,消费者将从起始地位读取分区的记录
                auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
                #  是否主动提交偏移量,默认值是true,为了避免出现反复数据和数据失落,能够把它设置为false,而后手动提交偏移量
                enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
                # 指定音讯key和音讯体的解码形式
                key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
                value-deserializer:  ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
            listener:
                # 在侦听器容器中运行的线程数。
                concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
                missing-topics-fatal: false
                ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
                
    two:
        producer:
            # kafka生产者服务端地址
            bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:192.168.1.3:9202}
            # 生产者重试的次数
            retries: ${KAFKA_PRODUCER_RETRIES:0}
            # 每次批量发送的数据量
            batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
            # 每次批量发送音讯的缓冲区大小
            buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
            # 指定音讯key和音讯体的编码方式
            key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
            value-serializer:  ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
            # acks=1 只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。
            acks: ${KAFKA_PRODUCER_ACK:1}

            consumer:
                bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:192.168.1.3:9202}
                # 在偏移量有效的状况下,消费者将从起始地位读取分区的记录
                auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
                #  是否主动提交偏移量,默认值是true,为了避免出现反复数据和数据失落,能够把它设置为false,而后手动提交偏移量
                enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
                # 指定音讯key和音讯体的解码形式
                key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
                value-deserializer:  ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
            listener:
                # 在侦听器容器中运行的线程数。
                concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
                missing-topics-fatal: false
                ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
        

3、配置生产者

 private KafkaTemplate kafkaTemplate;

    @Override
    public MqResp sendSync(MqReq mqReq) {
        ListenableFuture<SendResult<String, String>> result = this.send(mqReq);
        MqResp mqResp = this.buildMqResp(result);
        return mqResp;
    }

这个KafkaTemplate绑定的就是@Primary配置的kafkaProperties

4、配置消费者监听,并绑定containerFactory

@LybGeekKafkaListener(id = "createUser",containerFactory = MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE,topics = Constant.USER_TOPIC)
public class UserComsumer extends BaseComusmeListener {

    @Autowired
    private UserService userService;

    @Override
    public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) {
        User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);
        System.out.println("-----------------------");
        return userService.isExistUserByUsername(user.getUsername());
    }

    @Override
    public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) {
        User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);
        System.out.println(user);
        return userService.save(user);
    }
}

通过指定containerFactory ,来生产指定的kafka音讯

5、测试

  User user = User.builder().username("test")
                .email("test@qq.com")
                .fullname("test")
                .mobile("1350000001")
                .password("1234561")
                .build();
      userService.saveAndPush(user);

发送音讯,察看控制台输入

: messageKey:【null】,topic:【user-sync】存在反复音讯数据-->【{"email":"test@qq.com","fullname":"test","mobile":"1350000000","password":"123456","username":"test"}】

会呈现这样,是因为数据库曾经有这条记录了,刚好验证一下反复生产

总结

本文实现的外围其实就是通过注入多个kafkaProperties来实现多配置 ,不晓得大家有没有发现,就是革新后的配置,配置消费者后,生产者依然也要配置。因为如果不配置,走的就是kafkaProperties默认的配置信息,即为localhost。还有仔细的敌人兴许会发现我示例中的消费者监听应用的注解是@LybGeekKafkaListener,这个和 @KafkaListener实现的性能基本一致。因为本示例和之前的文章聊聊如何实现一个带幂等模板的kafka消费者监听是同份代码,就间接复用了

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理