基于Springboot的Kafka消费者动静操作

1. 问题

在基于Springboot开发Kafka相干业务时,遇到如下业务场景:

  • 执行局部操作时,如进行服务替换镜像、执行非凡业务解决等,须要先进行Consumer接管Kafka音讯,待处理实现后再开启Consumer接续接管Kafka音讯
  • 为并发生产Kafka音讯,可通过配置spring.kakfa.listener.concurency来设置Consumer的并发数;但spring.kakfa.listener.concurency是一个全局配置,当一个服务须要同时监听多个Topic,并且不同的Topic的Consumer须要设置不同的并发数时,这种办法就不实用

2. 解决思路

2.1 源码剖析

在Springboot我的项目中,个别通过办法上的@KafkaListener注解来注册Consumer,在Springboot服务启动过程中,通过实现了Springboot的扩大点的KafkaListenerAnnotationBeanPostProcessor类,在postProcessAfterInitialization办法中辨认含有@KafkaListener注解的办法,并注册至KafkaListenerEndpointRegistry中(具体的源码在此不开展形容,有趣味的能够自行翻阅源码或查问材料)。因而,后续的操作也将围绕着Listener容器MessageListenerContainer和注册表KafkaListenerEndpointRegistry开展。

2.2 动静启停Consumer

Listener容器MessageListenerContainer接口扩大了SmartLifecycle接口,在Lifecycle接口的start()办法根底上,扩大了pause()办法和resume()办法。通过正文能够晓得,这三个办法别离对应了Listener的启动、暂停和复原。

KafkaListenerEndpointRegistry类中,提供了依据ID获取MessageListenerContainer的办法。

因而,只有通过ID在KafkaListenerEndpointRegistry中获取了Listener容器MessageListenerContainer后,即可进行对应的开始、暂停和复原Consumer的操作。

2.3 动静批改参数

要想为不同的Listener配置不同的concurrency参数,首先得晓得concurrency参数是在哪里被设置至Listener中的。通过Debug剖析源码可知,在实现了MessageListenerContainer接口的ConcurrentMessageListenerContainer类中有一个setConcurrency(int)办法,能够设置容器的并发数。同时,Listener的注册表KafkaListenerEndpointRegistry类同样实现了SmartLifecycle接口,并在start()办法中理论启动Listener容器,因而想要动静批改参数,必须在容器启动前,即KafkaListenerEndpointRegistry执行start()办法前进行解决。

3. 动静启停Consumer

首先,定义一个公共的抽象类AbstractScheduledConsumer

public abstract class AbstractScheduledConsumer<T> {    @Resource    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;    public abstract void onMessage(T data);    protected abstract String getId();    protected MessageListenerContainer getListenerContainer() {        String containerId = this.getId();        MessageListenerContainer container = this.kafkaListenerEndpointRegistry.getListenerContainer(containerId);        Assert.notNull(container, String.format("MessageListenerContainer [%s] 获取失败", containerId));        return container;    }    /**     * 启动     */    public void start() {        MessageListenerContainer container = getListenerContainer();        if (!container.isRunning()) {            container.start();        } else {            container.resume();        }    }    /**     * 暂停     */    public void pause() {        getListenerContainer().pause();    }    /**     * 复原     */    public void resume() {        getListenerContainer().resume();    }}

业务解决的Consumer类只须要继承AbstractScheduledConsumer类即可实现Consumer的动静启停。变量ID即为Listener的ID,须要为每个Consumer定义不同的ID。

@Componentpublic class BusinessConsumer extends AbstractScheduledConsumer<ConsumerRecord<String, byte[]>> {    /**     * 自定义ID     */    private static final String ID = "business-consumer-id";    @Override    @KafkaListener(id = ID, topics = "")    public void onMessage(ConsumerRecord<String, byte[]> data) {        // 业务解决    }    @Override    protected String getId() {        return ID;    }}

至此,BusinessConsumer类已具备动静启停的Kafka Consumer的性能,只须要在Service和Controller减少代码即可通过接口实时启动、暂停和复原Consumer。

4. 动静批改Consumer参数

首先,定义一个配置项,用于配置须要批改的Consumer参数,此处的ID和上文的Listener的ID统一。

public class CustomizedKafkaConfig {    /**     * 是否启用Consumer拦截器     */    private boolean consumerInterceptorAutoStart = true;    /**     * 全局Consumer配置     */    private ConsumerInfo globalConsumerInfo;    /**     * 独立Consumer配置     */    private Map<String, ConsumerInfo> customizedConsumerInfos = new HashMap<>();    // 省略 get/set 办法    /**     * 依据ID获取配置     */    public ConsumerInfo getConsumerInfo(String id) {        return customizedConsumerInfos.get(id);    }    public static class ConsumerInfo {        private Boolean autoStart;        private Integer concurrency;        public Boolean getAutoStart() {            return autoStart;        }        public void setAutoStart(Boolean autoStart) {            this.autoStart = autoStart;        }        public Integer getConcurrency() {            return concurrency;        }        public void setConcurrency(Integer concurrency) {            this.concurrency = concurrency;        }    }}

接着定义Consumer拦截器,同样实现SmartLifecycle接口,通过getPhase()返回值保障优先于KafkaListenerEndpointRegistry执行。

@Slf4jpublic class KafkaListenerContainerInterceptor implements SmartLifecycle {    private final CustomizedKafkaConfig config;    private final KafkaListenerEndpointRegistry registry;    private volatile boolean running = false;    public KafkaListenerContainerInterceptor(CustomizedKafkaConfig customizedKafkaConfig, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {        this.config = customizedKafkaConfig;        this.registry = kafkaListenerEndpointRegistry;    }    @Override    public void start() {        Collection<MessageListenerContainer> listenerContainers = registry.getAllListenerContainers();        ConsumerInfo globalConsumerInfo = config.getGlobalConsumerInfo();        // 存在全局设置        if (globalConsumerInfo != null) {            log.info("已设置全局ConsumerInfo [autoStartup = {}, concurrency = {}]", globalConsumerInfo.getAutoStart(), globalConsumerInfo.getConcurrency());            listenerContainers.forEach(c -> resetMessageListenerContainer(c, globalConsumerInfo));        }        // 自定义消费者设置        for (MessageListenerContainer container : listenerContainers) {            String id = container.getListenerId();            ConsumerInfo consumerInfo;            // 未自定义消费者设置,跳过拦挡            if ((consumerInfo = config.getConsumerInfo(id)) == null) {                continue;            }            // 拦挡设置            resetMessageListenerContainer(container, consumerInfo);        }    }    @Override    public void stop() {        this.running = false;    }    @Override    public boolean isRunning() {        return running;    }    @Override    public boolean isAutoStartup() {        return config.isConsumerInterceptorAutoStart();    }    @Override    public int getPhase() {        return 0;    }    private void resetMessageListenerContainer(MessageListenerContainer container, ConsumerInfo consumerInfo) {        String id = container.getListenerId();        // 设置AutoStartup属性        Optional.ofNullable(consumerInfo.getAutoStart()).ifPresent(v -> {            container.setAutoStartup(v);            log.info("MessageListenerContainer [{}] [autoStartup] 属性设置为 [{}]", id, v);        });        // 设置concurrency属性        if (container instanceof ConcurrentMessageListenerContainer<?,?>) {            Optional.ofNullable(consumerInfo.getConcurrency()).ifPresent(v -> {                ((ConcurrentMessageListenerContainer<?,?>) container).setConcurrency(v);                log.info("MessageListenerContainer [{}] [concurrency] 属性设置为 [{}]", id, v);            });        } else {            log.warn("MessageListenerContainer [{}] 不是 [ConcurrentMessageListenerContainer],无奈批改 [concurrency] 属性", id);        }    }}

最初定义Configuration类,用户注册KafkaListenerContainerInterceptor类。

@Configurationpublic class CustomizedKafkaConfiguration {    @Resource    private CustomizedKafkaConfig customizedKafkaConfig;    @Resource    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;    @Bean    @ConditionalOnMissingBean    public KafkaListenerContainerInterceptor kafkaListenerContainerInterceptor() {        return new KafkaListenerContainerInterceptor(customizedKafkaConfig, kafkaListenerEndpointRegistry);    }}