关于kafka:在需要-ClientId-鉴权的-Kafka-集群中高效使用-Producer-和-Consumer-的方法

197次阅读

共计 8808 个字符,预计需要花费 23 分钟才能阅读完成。

背景

在应用 Kafka 音讯队列时,申请 Kafka 资源波及 Topic 名称、分区数、ClientId 等参数。ClientIdKafka 中次要用于标识和治理客户端应用程序,以及为监控、日志记录和资源管理提供反对。通过为每个客户端调配惟一的 ClientId,你能够更好地跟踪和治理 Kafka 集群中的各个客户端连贯。然而在某些公司,ClientId 是用于鉴权和限流的,因而在应用 Kafka 时须要确保 ClientId 与申请 Topic 时的 ClientId 保持一致。这与个别的 Kafka 应用形式有所不同。在应用 spring-kafka 包和原生 kafka-clients 包的多线程环境下,固定的 ClientId 并不能满足需要,须要重写 spring-kafka 包的 ProducerFactoryConsumerFactory,或者申请多个 ClientId。本文将探讨在应用 Kafka 时遇到的对于 ClientId 的问题以及倡议的解决办法。

过程

大多数 Java 应用程序都集成了 Spring 框架,因而咱们能够基于 Spring 定义生产者和消费者。

生产者

@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, Object> producerFactory() {Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configProps.put(ProducerConfig.CLIENT_ID_CONFIG, "your-client-id");
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());
    }
}

在须要发送音讯的中央,咱们注入 KafkaTemplate

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

void send(String message) {ProducerRecord<String, Object> record = new ProducerRecord<>("your-topic-name", message);
  kafkaTemplate.send(record);
}

然而,当服务启动并尝试发送音讯时,发送不胜利,呈现以下谬误:

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [your-topic-name]

错误信息提醒鉴权失败。通过查看上下文:

INFO [kafka-producer-network-thread | your-client-id-1]

咱们发现 ClientIdyour-client-id 变成了 your-client-id-1,很显著这是因为第三方包做了手脚。通过查看源码:

发现在创立 Producer 时,会调用 getProducerConfigs 办法,该办法外部会对 ClientId 增加自增的后缀。为了解决这个问题,咱们须要重写 DefaultKafkaProducergetProducerConfigs 办法:

public class FixedClientIdKafkaProducerFactory extends DefaultKafkaProducerFactory {
    private final Map<String, Object> configs;
    
    public FixedClientIdKafkaProducerFactory(Map<String, Object> configs) {super(configs);
        this.configs = configs;
    }

    @Override
    protected Map<String, Object> getProducerConfigs() {final Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
        checkBootstrap(newProducerConfigs);
        return newProducerConfigs;
    }
}

而后在 KafkaProducerConfig 中应用 FixedClientIdKafkaProducerFactory 替换 DefaultKafkaProducerFactory

消费者

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group-id");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        return new DefaultKafkaConsumerFactory<>(props);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}
                  
@Component
public class MyKafkaMessageListener {@KafkaListener(topics = "your-topic-name", concurrency = "4", clientIdPrefix = "your-client-id")
    public void listen(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment) {
        // your business logic 
        //……
      
        acknowledgment.acknowledge();}
}

同样,咱们遇到了鉴权失败的问题。错误信息如下:

ERROR[org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1] o.s.k.l.KafkaMessageListenerContainer.error(149): Authentication/Authorization Exception and no authExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [your-topic-name]

通过跟踪源码:

咱们发现在容器 ConcurrentMessageListenerContainerdoStart 办法中,会依据并行度 concurrency 创立多个 KafkaMessageListenerContainer 子容器,而后调用 configureChildContainer 办法配置子容器,并依据 concurrencyalwaysClientIdSuffix 参数对 ClientId 增加后缀。如果并行度设置为 1,那么只须要在 KafkaConsumerConfig 中增加如下代码来设置 alwaysClientIdSuffix

factory.setContainerCustomizer(container -> container.setAlwaysClientIdSuffix(false));

这样就能确保 ClientId 不会被扭转,从而实现鉴权操作。然而为了进步生产能力,咱们总归须要设置并行度。因而,依据重写生产者的教训,咱们重写 DefaultKafkaConsumerFactorycreateKafkaConsumer 办法:

public class FixedClientIdKafkaConsumerFactory<K, V> extends DefaultKafkaConsumerFactory<K, V> {public FixedClientIdKafkaConsumerFactory(Map configs) {super(configs);
    }
    @Override
    protected Consumer<K, V> createKafkaConsumer(String groupId, String clientIdPrefixArg, String clientIdSuffixArg, Properties properties) {return super.createKafkaConsumer(groupId, clientIdPrefixArg, null, properties);
    }
}

而后在 KafkaConsumerConfig 中应用 FixedClientIdKafkaConsumerFactory 替换 DefaultKafkaConsumerFactory。然而,这次却呈现了如下谬误:

WARN [main] o.a.k.c.u.AppInfoParser.registerAppInfo(68): Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=your-client-id

在自定义 ConsumerFactory 以确保在多线程环境下共用雷同的 ClientId 时,咱们必须思考到启用 JMX 监控时,MBean 的唯一性问题。JMX MBean 是线程级别的,因而如果呈现抵触,可能会影响监控性能。

只管上述日志只是一个 WARN 级别的记录,不会间接影响生产性能,但如果咱们心愿通过 JMX 实现精准监控,那么必须要解决这个问题。另外,长期看到一系列的 WARN 日志也会令人不安,因而咱们决定持续采纳自增后缀的 ClientId 策略。

然而,在申请多个 ClientId 时,须要衡量数量。咱们须要思考 Kafka 集群的鉴权能力,同时也要防止后续扩容时频繁申请 ClientId 的问题。

Kafka 的高级 API 确保在同一个消费者组(consumer group)下,每个分区(partition)只能由一个 Consumer 线程生产(1Consumer 线程能够生产多个 partition)。

如上,当一个消费者组蕴含 5 个消费者并且有 4 个分区时,无论这 5 个消费者是以单实例形式部署还是分布式形式部署,都可能呈现某个消费者未被调配到分区的状况,这样会造成线程空跑,占据着资源。为了解决这个问题,咱们只需确保总消费者数量小于或等于分区数量。

通常状况下,咱们的利用中 Kafka Consumer 仅占据一部分流量。因而,咱们是否能够约定单个实例的 Kafka Consumer 的最大并发数为某个固定值呢?比方 6 这样一来,在申请 Topic 时,咱们能够一并申请 6ClientId,其命名格局为 -0-5。例如,如果咱们申请了一个名为 my-topic-testTopic,那么除了默认生成一个 ClientIdmy-topic-test-GeMp 之外,咱们只需再申请 my-topic-test-GeMp-0my-topic-test-GeMp-1my-topic-test-GeMp-2my-topic-test-GeMp-3my-topic-test-GeMp-4my-topic-test-GeMp-5 即可。

在后续须要晋升生产能力时,咱们能够扩大分区数量。依据分区数量进行横向程度扩容,以保障一个消费者组内的总消费者数量等于分区数量。这样,就不须要再放心 ClientId 的鉴权和 JMX 注册问题了。此外,这也确保了在将来须要晋升生产能力并进行分区扩容时,无需再次申请 ClientId

这种做法能够无效地优化 Kafka Consumer 的治理和扩大,以满足咱们的需要。

论断

综上所述,咱们总结了在应用 ClientId 进行 Kafka 集群环境下的身份验证时,Kafka 生产者和消费者的一种高效应用形式。

申请 ClientId

Kafka 平台上申请 Topic 时,请依据主动生成的 ClientId 作为前缀,而后应用 “-0” 至 “-5” 作为后缀,额定申请 6ClientId(数量只是倡议,可自行依据利用状况设置)。

Maven 配置

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.8</version> <!-- 倡议不配置,默认应用父 pom 中的版本 -->
</dependency>

生产者

@Configuration
public class KafkaProducerConfig {
    @Bean
    public ProducerFactory<String, Object> producerFactory() {Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configProps.put(ProducerConfig.CLIENT_ID_CONFIG, "your-client-id");
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());
    }
}

@Service
public class BusinessServiceImpl implements BusinessService {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    void send(String msg) {ProducerRecord<String, Object> record = new ProducerRecord<>("your-topic-name", message);
        kafkaTemplate.send(record);
    }
}

因为 KafkaProducer 是线程平安的,如果要应用多线程的生产者,倡议应用单例生产者,而后应用线程池来包装。

消费者

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group-id");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

@Component
public class MyKafkaMessageListener {@KafkaListener(topics = "your-topic-name", concurrency = "6", clientIdPrefix = "your-client-id")
    public void listen(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment) {
        // 在这儿增加业务逻辑
        //……
      
        acknowledgment.acknowledge();}
}

因为 KafkaConsumer 是非线程平安的,如果要应用多线程的消费者,倡议应用 Spring@KafkaListener 注解,并通过配置 concurrency 来实现多线程生产。在单机瓶颈呈现时,能够通过横向扩容来进行性能晋升。

这里,咱们额定提一下,concurrency 要依据理论状况进行设置,单实例的状况下设置的值不要超过分区数量。多实例的状况下,如果实例数量大于等于分区数,就没有必要再去设置 concurrency 了(concurrency 默认是 1)。

请留神上述约定中,ClientId 的数量应依据集体需要进行设定,但强烈建议一次性申请足够数量的 ClientId,以防止后续须要更改。这样,如果须要扩大,只须要横向扩大机器并减少主题分区即可。

正文完
 0