共计 7410 个字符,预计需要花费 19 分钟才能阅读完成。
1. 引言
Apache Kafka 是一个分布式的、容错的流解决零碎。在本文中,咱们将介绍 Spring 对 Apache Kafka 的反对,以及原生 Kafka Java 客户端 Api 所提供的形象级别。
Spring Kafka 通过 @KafkaListener 注解,带来了一个简略而典型的 Spring 模板编程模型,它还带有一个 KafkaTemplate 和音讯驱动的 POJO。
2. 装置和设置
要下载和装置 Kafka,请参考官网指南。而后还须要在 pom.xml
文件中增加 spring-kafka
:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.7.RELEASE</version>
</dependency>
新建一个 Spring Boot 示例应用程序,以默认配置启动。
3. 配置 Topics
以前咱们应用命令行工具在 Kafka
中创立 topic
,例如:
$ bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic mytopic
然而随着 AdminClient 在 Kafka 中的引入,咱们当初能够通过编程来创立 Topic
。
如下代码,增加 KafkAdmin
bean 到 Spring 中,它将主动为 NewTopic
类的所有 bean
增加 topic
:
@Configuration
public class KafkaTopicConfig {@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {return new NewTopic("developlee", 1, (short) 1);
}
}
4. 音讯生成
要创立音讯,首先须要配置 ProducerFactory,并设置创立 Kafka Producer 实例的策略,而后应用 KafkaTemplate
。KafkaTemplate
包装了 Producer
实例,并提供向 Kafka Topic
发送音讯的简便办法。
在整个应用程序上下文中应用单个实例将提供更高的性能。因而举荐应用一个 Producer
实例。该实例是线程平安的,所以 KakfaTemplate
实例也是线程平安的,
4.1. Producer 配置
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());
}
}
4.2. 音讯公布
咱们应用 KafkaTemplate
来公布音讯:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String msg) {kafkaTemplate.send(topicName, msg);
}
send
API 返回 ListenableFuture
对象。如果咱们想阻塞发送线程并取得对于发送音讯的后果,咱们能够调用ListenableFuture
对象的 get
API。线程将会期待后果,但它会升高生产者的速度。
Kafka 是一个疾速流解决平台。因而,最好异步处理结果,这样后续音讯就无需期待前一条音讯的后果。咱们能够通过回调来实现:
public void sendMessage(String message) {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Sent message=[" + message +
"] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Unable to send message=["
+ message + "] due to :" + ex.getMessage());
}
});
}
5. 音讯生产
5.1. 消费者配置
对于生产音讯,咱们须要配置一个 ConsumerFactory
和一个 KafkaListenerContainerFactory
。
一旦这些 bean 在 Spring Bean 工厂中可用,就能够应用 @KafkaListener
注解配置基于 POJO 的消费者。
配置类上须要增加 @EnableKafka
注解,以便可能检测 Spring
治理的 bean 上的 @KafkaListener
注解:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
5.2. 音讯生产
@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {System.out.println("Received Message in group foo:" + message);
}
能够为一个 topic 实现多个 listener,每个 topic 都有不同的组 Id。此外,一个消费者能够监听来自不同 topic 的音讯:
@KafkaListener(topics = "topic1, topic2", groupId = "foo")
Spring 还反对应用 listener 中的 @Header 注解检索一个或多个音讯题目:
@KafkaListener(topics = "topicName")
public void listenWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Message:" + message"+"from partition: " + partition);
}
5.3. 生产来自特定分区的音讯
留神到,咱们只应用一个分区创立了 topic“developlee”。然而,对于具备多个分区的主题,@KafkaListener 能够显式订阅具备初始偏移量 topic 的特定分区:
@KafkaListener(
topicPartitions = @TopicPartition(topic = "topicName",
partitionOffsets = {@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "3", initialOffset = "0")}),
containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
System.out.println("Received Message:" + message"+"from partition: " + partition);
}
因为 initialOffset 已被发送到该 listener 中的分区 0,因而每次初始化该 listener
时,将从新应用以前从分区 0 和分区 3 耗费的所有音讯。如果不须要设置偏移量,咱们能够应用 @TopicPartition 注解的 partitions 属性只设置没有偏移量的分区:
@KafkaListener(topicPartitions
= @TopicPartition(topic = "topicName", partitions = { "0", "1"}))
5.4. 为 Listener 增加音讯过滤器
通过增加自定义过滤器,能够将 listener
配置为应用特定类型的音讯。这能够通过将 RecordFilterStrategy
设置为 KafkaListenerContainerFactory
来实现:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filterKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(record -> record.value().contains("World"));
return factory;
}
而后能够将 listener
配置为应用此容器工厂:
@KafkaListener(
topics = "topicName",
containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {System.out.println("Received Message in filtered listener:" + message);
}
在这个 listener
中,所有与过滤器匹配的 音讯都将被抛弃。
6. 自定义音讯转换器
到目前为止,咱们只探讨了字符串作为音讯发送和接管的对象。然而,咱们也能够发送和接管定制的 Java 对象。这须要在 ProducerFactory
中配置适当的序列化器,并在 ConsumerFactory
中配置反序列化器。
让咱们看一个简略的 bean,并将以音讯的模式发送它:
public class Greeting {
private String msg;
private String name;
// standard getters, setters and constructor
}
6.1. 生产自定义音讯
在本例中,咱们将应用 JsonSerializer
。咱们看看 ProducerFactory
和 KafkaTemplate
的代码:
@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
// ...
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {return new KafkaTemplate<>(greetingProducerFactory());
}
新的 KafkaTemplate
可用于发送 Greeting 音讯:
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
6.2. 生产自定义音讯
同样,咱们批改 ConsumerFactory
和 KafkaListenerContainerFactory
来正确反序列化 Greeting 音讯:
@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
// ...
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Greeting.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting>
greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(greetingConsumerFactory());
return factory;
}
spring-kafka
JSON 序列化器和反序列化器应用 Jackson 库,该库是 spring-kafka
我的项目的可选 maven 依赖项。咱们也把它加到 pom.xml 文件:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.7</version>
</dependency>
倡议不要应用 Jackson 的最新版本,而是应用 pom.xml 文件 中 spring-kafka
的版本。
最初,咱们须要编写一个 listener 来 生产 Greeting 音讯:
@KafkaListener(
topics = "topicName",
containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {// process greeting message}
7. 结语
在本文中,咱们介绍了 Apache Kafka 和 Spring 集成的基础知识,且简要介绍了用于发送和接管音讯的类。
本文的残缺源代码能够在 GitHub 上找到. 在执行代码之前,请确保服务器正在运行 Kafka。
如果你感觉文章还不错,记得关注公众号:锅外的大佬
刘一手的博客