乐趣区

关于springboot:知识整理基于Springboot的Kafka消费者动态操作

基于 Springboot 的 Kafka 消费者动静操作

1. 问题

​ 在基于 Springboot 开发 Kafka 相干业务时,遇到如下业务场景:

  • 执行局部操作时,如进行服务替换镜像、执行非凡业务解决等,须要先进行 Consumer 接管 Kafka 音讯,待处理实现后再开启 Consumer 接续接管 Kafka 音讯
  • 为并发生产 Kafka 音讯,可通过配置 spring.kakfa.listener.concurency 来设置 Consumer 的并发数;但 spring.kakfa.listener.concurency 是一个全局配置,当一个服务须要同时监听多个 Topic,并且不同的 Topic 的 Consumer 须要设置不同的并发数时,这种办法就不实用

2. 解决思路

2.1 源码剖析

​ 在 Springboot 我的项目中,个别通过办法上的 @KafkaListener 注解来注册 Consumer,在 Springboot 服务启动过程中,通过实现了 Springboot 的扩大点的 KafkaListenerAnnotationBeanPostProcessor 类,在 postProcessAfterInitialization 办法中辨认含有 @KafkaListener 注解的办法,并注册至 KafkaListenerEndpointRegistry 中(具体的源码在此不开展形容,有趣味的能够自行翻阅源码或查问材料)。因而,后续的操作也将围绕着 Listener 容器 MessageListenerContainer 和注册表 KafkaListenerEndpointRegistry 开展。

2.2 动静启停 Consumer

​ Listener 容器 MessageListenerContainer 接口扩大了 SmartLifecycle 接口,在 Lifecycle 接口的 start() 办法根底上,扩大了 pause() 办法和 resume() 办法。通过正文能够晓得,这三个办法别离对应了 Listener 的启动、暂停和复原。

​ 在 KafkaListenerEndpointRegistry 类中,提供了依据 ID 获取 MessageListenerContainer 的办法。

​ 因而,只有通过 ID 在 KafkaListenerEndpointRegistry 中获取了 Listener 容器 MessageListenerContainer 后,即可进行对应的开始、暂停和复原 Consumer 的操作。

2.3 动静批改参数

​ 要想为不同的 Listener 配置不同的 concurrency 参数,首先得晓得 concurrency 参数是在哪里被设置至 Listener 中的。通过 Debug 剖析源码可知,在实现了 MessageListenerContainer 接口的 ConcurrentMessageListenerContainer 类中有一个 setConcurrency(int) 办法,能够设置容器的并发数。同时,Listener 的注册表 KafkaListenerEndpointRegistry 类同样实现了 SmartLifecycle 接口,并在 start() 办法中理论启动 Listener 容器,因而想要动静批改参数,必须在容器启动前,即 KafkaListenerEndpointRegistry 执行 start() 办法前进行解决。

3. 动静启停 Consumer

​ 首先,定义一个公共的抽象类AbstractScheduledConsumer

public abstract class AbstractScheduledConsumer<T> {

    @Resource
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public abstract void onMessage(T data);

    protected abstract String getId();

    protected MessageListenerContainer getListenerContainer() {String containerId = this.getId();
        MessageListenerContainer container = this.kafkaListenerEndpointRegistry.getListenerContainer(containerId);
        Assert.notNull(container, String.format("MessageListenerContainer [%s] 获取失败", containerId));
        return container;
    }

    /**
     * 启动
     */
    public void start() {MessageListenerContainer container = getListenerContainer();
        if (!container.isRunning()) {container.start();
        } else {container.resume();
        }
    }

    /**
     * 暂停
     */
    public void pause() {getListenerContainer().pause();}

    /**
     * 复原
     */
    public void resume() {getListenerContainer().resume();}
}

​ 业务解决的 Consumer 类只须要继承 AbstractScheduledConsumer 类即可实现 Consumer 的动静启停。变量 ID 即为 Listener 的 ID,须要为每个 Consumer 定义不同的 ID。

@Component
public class BusinessConsumer extends AbstractScheduledConsumer<ConsumerRecord<String, byte[]>> {

    /**
     * 自定义 ID
     */
    private static final String ID = "business-consumer-id";

    @Override
    @KafkaListener(id = ID, topics = "")
    public void onMessage(ConsumerRecord<String, byte[]> data) {// 业务解决}

    @Override
    protected String getId() {return ID;}
}

​ 至此,BusinessConsumer类已具备动静启停的 Kafka Consumer 的性能,只须要在 Service 和 Controller 减少代码即可通过接口实时启动、暂停和复原 Consumer。

4. 动静批改 Consumer 参数

​ 首先,定义一个配置项,用于配置须要批改的 Consumer 参数,此处的 ID 和上文的 Listener 的 ID 统一。

public class CustomizedKafkaConfig {

    /**
     * 是否启用 Consumer 拦截器
     */
    private boolean consumerInterceptorAutoStart = true;

    /**
     * 全局 Consumer 配置
     */
    private ConsumerInfo globalConsumerInfo;

    /**
     * 独立 Consumer 配置
     */
    private Map<String, ConsumerInfo> customizedConsumerInfos = new HashMap<>();

    // 省略 get/set 办法

    /**
     * 依据 ID 获取配置
     */
    public ConsumerInfo getConsumerInfo(String id) {return customizedConsumerInfos.get(id);
    }

    public static class ConsumerInfo {

        private Boolean autoStart;

        private Integer concurrency;

        public Boolean getAutoStart() {return autoStart;}

        public void setAutoStart(Boolean autoStart) {this.autoStart = autoStart;}

        public Integer getConcurrency() {return concurrency;}

        public void setConcurrency(Integer concurrency) {this.concurrency = concurrency;}
    }
}

​ 接着定义 Consumer 拦截器,同样实现 SmartLifecycle 接口,通过 getPhase() 返回值保障优先于 KafkaListenerEndpointRegistry 执行。

@Slf4j
public class KafkaListenerContainerInterceptor implements SmartLifecycle {

    private final CustomizedKafkaConfig config;

    private final KafkaListenerEndpointRegistry registry;

    private volatile boolean running = false;

    public KafkaListenerContainerInterceptor(CustomizedKafkaConfig customizedKafkaConfig, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
        this.config = customizedKafkaConfig;
        this.registry = kafkaListenerEndpointRegistry;
    }

    @Override
    public void start() {Collection<MessageListenerContainer> listenerContainers = registry.getAllListenerContainers();

        ConsumerInfo globalConsumerInfo = config.getGlobalConsumerInfo();

        // 存在全局设置
        if (globalConsumerInfo != null) {log.info("已设置全局 ConsumerInfo [autoStartup = {}, concurrency = {}]", globalConsumerInfo.getAutoStart(), globalConsumerInfo.getConcurrency());

            listenerContainers.forEach(c -> resetMessageListenerContainer(c, globalConsumerInfo));
        }

        // 自定义消费者设置
        for (MessageListenerContainer container : listenerContainers) {String id = container.getListenerId();

            ConsumerInfo consumerInfo;
            // 未自定义消费者设置,跳过拦挡
            if ((consumerInfo = config.getConsumerInfo(id)) == null) {continue;}

            // 拦挡设置
            resetMessageListenerContainer(container, consumerInfo);
        }
    }

    @Override
    public void stop() {this.running = false;}

    @Override
    public boolean isRunning() {return running;}

    @Override
    public boolean isAutoStartup() {return config.isConsumerInterceptorAutoStart();
    }

    @Override
    public int getPhase() {return 0;}

    private void resetMessageListenerContainer(MessageListenerContainer container, ConsumerInfo consumerInfo) {String id = container.getListenerId();

        // 设置 AutoStartup 属性
        Optional.ofNullable(consumerInfo.getAutoStart()).ifPresent(v -> {container.setAutoStartup(v);
            log.info("MessageListenerContainer [{}] [autoStartup] 属性设置为 [{}]", id, v);
        });

        // 设置 concurrency 属性
        if (container instanceof ConcurrentMessageListenerContainer<?,?>) {Optional.ofNullable(consumerInfo.getConcurrency()).ifPresent(v -> {((ConcurrentMessageListenerContainer<?,?>) container).setConcurrency(v);
                log.info("MessageListenerContainer [{}] [concurrency] 属性设置为 [{}]", id, v);
            });
        } else {log.warn("MessageListenerContainer [{}] 不是 [ConcurrentMessageListenerContainer],无奈批改 [concurrency] 属性", id);
        }
    }
}

​ 最初定义 Configuration 类,用户注册 KafkaListenerContainerInterceptor 类。

@Configuration
public class CustomizedKafkaConfiguration {

    @Resource
    private CustomizedKafkaConfig customizedKafkaConfig;

    @Resource
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Bean
    @ConditionalOnMissingBean
    public KafkaListenerContainerInterceptor kafkaListenerContainerInterceptor() {return new KafkaListenerContainerInterceptor(customizedKafkaConfig, kafkaListenerEndpointRegistry);
    }
}
退出移动版