基于Springboot的Kafka消费者动静操作
1. 问题
在基于Springboot开发Kafka相干业务时,遇到如下业务场景:
- 执行局部操作时,如进行服务替换镜像、执行非凡业务解决等,须要先进行Consumer接管Kafka音讯,待处理实现后再开启Consumer接续接管Kafka音讯
- 为并发生产Kafka音讯,可通过配置
spring.kakfa.listener.concurency
来设置Consumer的并发数;但spring.kakfa.listener.concurency
是一个全局配置,当一个服务须要同时监听多个Topic,并且不同的Topic的Consumer须要设置不同的并发数时,这种办法就不实用
2. 解决思路
2.1 源码剖析
在Springboot我的项目中,个别通过办法上的@KafkaListener
注解来注册Consumer,在Springboot服务启动过程中,通过实现了Springboot的扩大点的KafkaListenerAnnotationBeanPostProcessor
类,在postProcessAfterInitialization
办法中辨认含有@KafkaListener
注解的办法,并注册至KafkaListenerEndpointRegistry
中(具体的源码在此不开展形容,有趣味的能够自行翻阅源码或查问材料)。因而,后续的操作也将围绕着Listener容器MessageListenerContainer
和注册表KafkaListenerEndpointRegistry
开展。
2.2 动静启停Consumer
Listener容器MessageListenerContainer
接口扩大了SmartLifecycle
接口,在Lifecycle
接口的start()
办法根底上,扩大了pause()
办法和resume()
办法。通过正文能够晓得,这三个办法别离对应了Listener的启动、暂停和复原。
在KafkaListenerEndpointRegistry
类中,提供了依据ID获取MessageListenerContainer
的办法。
因而,只有通过ID在KafkaListenerEndpointRegistry
中获取了Listener容器MessageListenerContainer
后,即可进行对应的开始、暂停和复原Consumer的操作。
2.3 动静批改参数
要想为不同的Listener配置不同的concurrency
参数,首先得晓得concurrency
参数是在哪里被设置至Listener中的。通过Debug剖析源码可知,在实现了MessageListenerContainer
接口的ConcurrentMessageListenerContainer
类中有一个setConcurrency(int)
办法,能够设置容器的并发数。同时,Listener的注册表KafkaListenerEndpointRegistry
类同样实现了SmartLifecycle
接口,并在start()
办法中理论启动Listener容器,因而想要动静批改参数,必须在容器启动前,即KafkaListenerEndpointRegistry
执行start()
办法前进行解决。
3. 动静启停Consumer
首先,定义一个公共的抽象类AbstractScheduledConsumer
。
public abstract class AbstractScheduledConsumer<T> { @Resource private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; public abstract void onMessage(T data); protected abstract String getId(); protected MessageListenerContainer getListenerContainer() { String containerId = this.getId(); MessageListenerContainer container = this.kafkaListenerEndpointRegistry.getListenerContainer(containerId); Assert.notNull(container, String.format("MessageListenerContainer [%s] 获取失败", containerId)); return container; } /** * 启动 */ public void start() { MessageListenerContainer container = getListenerContainer(); if (!container.isRunning()) { container.start(); } else { container.resume(); } } /** * 暂停 */ public void pause() { getListenerContainer().pause(); } /** * 复原 */ public void resume() { getListenerContainer().resume(); }}
业务解决的Consumer类只须要继承AbstractScheduledConsumer
类即可实现Consumer的动静启停。变量ID即为Listener的ID,须要为每个Consumer定义不同的ID。
@Componentpublic class BusinessConsumer extends AbstractScheduledConsumer<ConsumerRecord<String, byte[]>> { /** * 自定义ID */ private static final String ID = "business-consumer-id"; @Override @KafkaListener(id = ID, topics = "") public void onMessage(ConsumerRecord<String, byte[]> data) { // 业务解决 } @Override protected String getId() { return ID; }}
至此,BusinessConsumer
类已具备动静启停的Kafka Consumer的性能,只须要在Service和Controller减少代码即可通过接口实时启动、暂停和复原Consumer。
4. 动静批改Consumer参数
首先,定义一个配置项,用于配置须要批改的Consumer参数,此处的ID和上文的Listener的ID统一。
public class CustomizedKafkaConfig { /** * 是否启用Consumer拦截器 */ private boolean consumerInterceptorAutoStart = true; /** * 全局Consumer配置 */ private ConsumerInfo globalConsumerInfo; /** * 独立Consumer配置 */ private Map<String, ConsumerInfo> customizedConsumerInfos = new HashMap<>(); // 省略 get/set 办法 /** * 依据ID获取配置 */ public ConsumerInfo getConsumerInfo(String id) { return customizedConsumerInfos.get(id); } public static class ConsumerInfo { private Boolean autoStart; private Integer concurrency; public Boolean getAutoStart() { return autoStart; } public void setAutoStart(Boolean autoStart) { this.autoStart = autoStart; } public Integer getConcurrency() { return concurrency; } public void setConcurrency(Integer concurrency) { this.concurrency = concurrency; } }}
接着定义Consumer拦截器,同样实现SmartLifecycle
接口,通过getPhase()
返回值保障优先于KafkaListenerEndpointRegistry
执行。
@Slf4jpublic class KafkaListenerContainerInterceptor implements SmartLifecycle { private final CustomizedKafkaConfig config; private final KafkaListenerEndpointRegistry registry; private volatile boolean running = false; public KafkaListenerContainerInterceptor(CustomizedKafkaConfig customizedKafkaConfig, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) { this.config = customizedKafkaConfig; this.registry = kafkaListenerEndpointRegistry; } @Override public void start() { Collection<MessageListenerContainer> listenerContainers = registry.getAllListenerContainers(); ConsumerInfo globalConsumerInfo = config.getGlobalConsumerInfo(); // 存在全局设置 if (globalConsumerInfo != null) { log.info("已设置全局ConsumerInfo [autoStartup = {}, concurrency = {}]", globalConsumerInfo.getAutoStart(), globalConsumerInfo.getConcurrency()); listenerContainers.forEach(c -> resetMessageListenerContainer(c, globalConsumerInfo)); } // 自定义消费者设置 for (MessageListenerContainer container : listenerContainers) { String id = container.getListenerId(); ConsumerInfo consumerInfo; // 未自定义消费者设置,跳过拦挡 if ((consumerInfo = config.getConsumerInfo(id)) == null) { continue; } // 拦挡设置 resetMessageListenerContainer(container, consumerInfo); } } @Override public void stop() { this.running = false; } @Override public boolean isRunning() { return running; } @Override public boolean isAutoStartup() { return config.isConsumerInterceptorAutoStart(); } @Override public int getPhase() { return 0; } private void resetMessageListenerContainer(MessageListenerContainer container, ConsumerInfo consumerInfo) { String id = container.getListenerId(); // 设置AutoStartup属性 Optional.ofNullable(consumerInfo.getAutoStart()).ifPresent(v -> { container.setAutoStartup(v); log.info("MessageListenerContainer [{}] [autoStartup] 属性设置为 [{}]", id, v); }); // 设置concurrency属性 if (container instanceof ConcurrentMessageListenerContainer<?,?>) { Optional.ofNullable(consumerInfo.getConcurrency()).ifPresent(v -> { ((ConcurrentMessageListenerContainer<?,?>) container).setConcurrency(v); log.info("MessageListenerContainer [{}] [concurrency] 属性设置为 [{}]", id, v); }); } else { log.warn("MessageListenerContainer [{}] 不是 [ConcurrentMessageListenerContainer],无奈批改 [concurrency] 属性", id); } }}
最初定义Configuration类,用户注册KafkaListenerContainerInterceptor
类。
@Configurationpublic class CustomizedKafkaConfiguration { @Resource private CustomizedKafkaConfig customizedKafkaConfig; @Resource private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Bean @ConditionalOnMissingBean public KafkaListenerContainerInterceptor kafkaListenerContainerInterceptor() { return new KafkaListenerContainerInterceptor(customizedKafkaConfig, kafkaListenerEndpointRegistry); }}