前言
不晓得大家有没有遇到这样的场景,就是一个我的项目中要生产多个 kafka 音讯,不同的消费者生产指定 kafka 音讯。遇到这种场景,咱们能够通过 kafka 的提供的 api 进行配置即可。但很多时候咱们会应用 spring-kafka 来简化开发,可是 spring-kafka 原生的配置项并没提供多个 kafka 配置,因而本文就来聊聊如何将 spring-kafka 进行革新,使之能反对多个 kafka 配置
注释
1、通过 @ConfigurationProperties 指定 KafkaProperties 前缀
@Primary
@ConfigurationProperties(prefix = "lybgeek.kafka.one")
@Bean
public KafkaProperties oneKafkaProperties(){return new KafkaProperties();
}
如果有多个就配置多个,形如
@ConfigurationProperties(prefix = "lybgeek.kafka.two")
@Bean
public KafkaProperties twoKafkaProperties(){return new KafkaProperties();
}
@ConfigurationProperties(prefix = "lybgeek.kafka.three")
@Bean
public KafkaProperties threeKafkaProperties(){return new KafkaProperties();
}
2、配置消费者工厂,消费者工厂绑定对应的 KafkaProperties
@Bean
public ConsumerFactory twoConsumerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties){return new DefaultKafkaConsumerFactory(twoKafkaProperties.buildConsumerProperties());
}
3、配置消费者监听器工厂,并绑定指定消费者工厂以及消费者配置
@Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_TWO)
public KafkaListenerContainerFactory twoKafkaListenerContainerFactory(@Autowired @Qualifier("twoKafkaProperties") KafkaProperties twoKafkaProperties, @Autowired @Qualifier("twoConsumerFactory") ConsumerFactory twoConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(twoConsumerFactory);
factory.setConcurrency(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : twoKafkaProperties.getListener().getConcurrency());
factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(twoKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:twoKafkaProperties.getListener().getAckMode());
return factory;
}
残缺的配置示例如下
@Configuration
@EnableConfigurationProperties(MultiKafkaComsumeProperties.class)
public class OneKafkaComsumeAutoConfiguration {@Bean(MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE)
public KafkaListenerContainerFactory oneKafkaListenerContainerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties, @Autowired @Qualifier("oneConsumerFactory") ConsumerFactory oneConsumerFactory) {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(oneConsumerFactory);
factory.setConcurrency(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getConcurrency()) ? Runtime.getRuntime().availableProcessors() : oneKafkaProperties.getListener().getConcurrency());
factory.getContainerProperties().setAckMode(ObjectUtil.isEmpty(oneKafkaProperties.getListener().getAckMode()) ? ContainerProperties.AckMode.MANUAL:oneKafkaProperties.getListener().getAckMode());
return factory;
}
@Primary
@Bean
public ConsumerFactory oneConsumerFactory(@Autowired @Qualifier("oneKafkaProperties") KafkaProperties oneKafkaProperties){return new DefaultKafkaConsumerFactory(oneKafkaProperties.buildConsumerProperties());
}
@Primary
@ConfigurationProperties(prefix = "lybgeek.kafka.one")
@Bean
public KafkaProperties oneKafkaProperties(){return new KafkaProperties();
}
}
那个 @Primary 要指定一下,不然启动会因为存在多个 KafkaProperties, 而导致 kafka 的主动拆卸不懂要选哪个而报错
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class})
public class KafkaAutoConfiguration {
private final KafkaProperties properties;
private final RecordMessageConverter messageConverter;
public KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter) {
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();}
@Bean
@ConditionalOnMissingBean(KafkaTemplate.class)
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
ProducerListener<Object, Object> kafkaProducerListener) {KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
if (this.messageConverter != null) {kafkaTemplate.setMessageConverter(this.messageConverter);
}
kafkaTemplate.setProducerListener(kafkaProducerListener);
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
@Bean
@ConditionalOnMissingBean(ProducerListener.class)
public ProducerListener<Object, Object> kafkaProducerListener() {return new LoggingProducerListener<>();
}
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
@Bean
@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
@ConditionalOnMissingBean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {return new KafkaTransactionManager<>(producerFactory);
}
@Bean
@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
@ConditionalOnMissingBean
public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
Jaas jaasProperties = this.properties.getJaas();
if (jaasProperties.getControlFlag() != null) {jaas.setControlFlag(jaasProperties.getControlFlag());
}
if (jaasProperties.getLoginModule() != null) {jaas.setLoginModule(jaasProperties.getLoginModule());
}
jaas.setOptions(jaasProperties.getOptions());
return jaas;
}
@Bean
@ConditionalOnMissingBean
public KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
return kafkaAdmin;
}
}
同我的项目应用多个 kafka 消费者示例
1、在我的项目的 pom 引入 spring-kafka GAV
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、在我的项目的 yml 中配置如下内容
lybgeek:
kafka:
multi:
comsume-enabled: false
one:
producer:
# kafka 生产者服务端地址
bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:10.1.4.71:32643}
# 生产者重试的次数
retries: ${KAFKA_PRODUCER_RETRIES:0}
# 每次批量发送的数据量
batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
# 每次批量发送音讯的缓冲区大小
buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
# 指定音讯 key 和音讯体的编码方式
key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
value-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
# acks=1 只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。acks: ${KAFKA_PRODUCER_ACK:1}
consumer:
bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:10.1.4.71:32643}
# 在偏移量有效的状况下,消费者将从起始地位读取分区的记录
auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
# 是否主动提交偏移量,默认值是 true, 为了避免出现反复数据和数据失落,能够把它设置为 false, 而后手动提交偏移量
enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
# 指定音讯 key 和音讯体的解码形式
key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
value-deserializer: ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
listener:
# 在侦听器容器中运行的线程数。concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
missing-topics-fatal: false
ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
two:
producer:
# kafka 生产者服务端地址
bootstrap-servers: ${KAFKA_PRODUCER_BOOTSTRAP_SERVER:192.168.1.3:9202}
# 生产者重试的次数
retries: ${KAFKA_PRODUCER_RETRIES:0}
# 每次批量发送的数据量
batch-size: ${KAFKA_PRODUCER_BATCH_SIZE:16384}
# 每次批量发送音讯的缓冲区大小
buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMOEY:335554432}
# 指定音讯 key 和音讯体的编码方式
key-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
value-serializer: ${KAFKA_PRODUCER_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
# acks=1 只有集群的领袖节点收到音讯,生产者就会收到一个来自服务器胜利响应。acks: ${KAFKA_PRODUCER_ACK:1}
consumer:
bootstrap-servers: ${KAFKA_ONE_CONSUMER_BOOTSTRAP_SERVER:192.168.1.3:9202}
# 在偏移量有效的状况下,消费者将从起始地位读取分区的记录
auto-offset-reset: ${KAFKA_ONE_CONSUMER_AUTO_OFFSET_RESET:earliest}
# 是否主动提交偏移量,默认值是 true, 为了避免出现反复数据和数据失落,能够把它设置为 false, 而后手动提交偏移量
enable-auto-commit: ${KAFKA_ONE_CONSUMER_ENABLE_AUTO_COMMIT:false}
# 指定音讯 key 和音讯体的解码形式
key-deserializer: ${KAFKA_ONE_CONSUMER_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
value-deserializer: ${KAFKA_ONE_CONSUMER_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
listener:
# 在侦听器容器中运行的线程数。concurrency: ${KAFKA_ONE_CONSUMER_CONCURRENCY:1}
missing-topics-fatal: false
ack-mode: ${KAFKA_ONE_CONSUMER_ACK_MODE:manual}
3、配置生产者
private KafkaTemplate kafkaTemplate;
@Override
public MqResp sendSync(MqReq mqReq) {ListenableFuture<SendResult<String, String>> result = this.send(mqReq);
MqResp mqResp = this.buildMqResp(result);
return mqResp;
}
这个 KafkaTemplate 绑定的就是 @Primary 配置的 kafkaProperties
4、配置消费者监听, 并绑定 containerFactory
@LybGeekKafkaListener(id = "createUser",containerFactory = MultiKafkaConstant.KAFKA_LISTENER_CONTAINER_FACTORY_ONE,topics = Constant.USER_TOPIC)
public class UserComsumer extends BaseComusmeListener {
@Autowired
private UserService userService;
@Override
public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) {User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);
System.out.println("-----------------------");
return userService.isExistUserByUsername(user.getUsername());
}
@Override
public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) {User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);
System.out.println(user);
return userService.save(user);
}
}
通过指定 containerFactory,来生产指定的 kafka 音讯
5、测试
User user = User.builder().username("test")
.email("test@qq.com")
.fullname("test")
.mobile("1350000001")
.password("1234561")
.build();
userService.saveAndPush(user);
发送音讯,察看控制台输入
: messageKey:【null】,topic:【user-sync】存在反复音讯数据 -->【{"email":"test@qq.com","fullname":"test","mobile":"1350000000","password":"123456","username":"test"}】
会呈现这样,是因为数据库曾经有这条记录了,刚好验证一下反复生产
总结
本文实现的外围其实就是通过注入多个 kafkaProperties 来实现多配置,不晓得大家有没有发现,就是革新后的配置,配置消费者后,生产者依然也要配置。因为如果不配置,走的就是 kafkaProperties 默认的配置信息,即为 localhost。还有仔细的敌人兴许会发现我示例中的消费者监听应用的注解是 @LybGeekKafkaListener,这个和 @KafkaListener 实现的性能基本一致。因为本示例和之前的文章聊聊如何实现一个带幂等模板的 kafka 消费者监听是同份代码,就间接复用了
demo 链接
https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template