1. 引言

Apache Kafka 是一个分布式的、容错的流解决零碎。在本文中,咱们将介绍Spring对Apache Kafka的反对,以及原生Kafka Java客户端Api 所提供的形象级别。

Spring Kafka 通过 @KafkaListener 注解,带来了一个简略而典型的 Spring 模板编程模型,它还带有一个 KafkaTemplate 和音讯驱动的 POJO 。

2. 装置和设置

要下载和装置Kafka,请参考官网指南。而后还须要在 pom.xml 文件中增加 spring-kafka:

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId>    <version>2.3.7.RELEASE</version></dependency>

新建一个 Spring Boot 示例应用程序,以默认配置启动。

3. 配置 Topics

以前咱们应用命令行工具在 Kafka 中创立 topic,例如:

$ bin/kafka-topics.sh --create \  --zookeeper localhost:2181 \  --replication-factor 1 --partitions 1 \  --topic mytopic

然而随着 AdminClient 在Kafka中的引入,咱们当初能够通过编程来创立 Topic

如下代码,增加 KafkAdmin bean 到 Spring中,它将主动为 NewTopic 类的所有 bean 增加 topic

@Configurationpublic class KafkaTopicConfig {        @Value(value = "${kafka.bootstrapAddress}")    private String bootstrapAddress;     @Bean    public KafkaAdmin kafkaAdmin() {        Map<String, Object> configs = new HashMap<>();        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);        return new KafkaAdmin(configs);    }        @Bean    public NewTopic topic1() {         return new NewTopic("developlee", 1, (short) 1);    }}

4. 音讯生成

要创立音讯,首先须要配置 ProducerFactory ,并设置创立 Kafka Producer 实例的策略,而后应用 KafkaTemplateKafkaTemplate 包装了 Producer 实例,并提供向 Kafka Topic 发送音讯的简便办法。

在整个应用程序上下文中应用单个实例将提供更高的性能。因而举荐应用一个 Producer 实例。该实例是线程平安的,所以 KakfaTemplate 实例也是线程平安的,

4.1. Producer 配置

@Configurationpublic class KafkaProducerConfig {     @Bean    public ProducerFactory<String, String> producerFactory() {        Map<String, Object> configProps = new HashMap<>();        configProps.put(          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,           bootstrapAddress);        configProps.put(          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,           StringSerializer.class);        configProps.put(          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,           StringSerializer.class);        return new DefaultKafkaProducerFactory<>(configProps);    }     @Bean    public KafkaTemplate<String, String> kafkaTemplate() {        return new KafkaTemplate<>(producerFactory());    }}

4.2. 音讯公布

咱们应用 KafkaTemplate 来公布音讯:

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String msg) {    kafkaTemplate.send(topicName, msg);}

send API 返回 ListenableFuture 对象。如果咱们想阻塞发送线程并取得对于发送音讯的后果,咱们能够调用ListenableFuture 对象的 get API。线程将会期待后果,但它会升高生产者的速度。

Kafka是一个疾速流解决平台。因而,最好异步处理结果,这样后续音讯就无需期待前一条音讯的后果。咱们能够通过回调来实现:

public void sendMessage(String message) {                ListenableFuture<SendResult<String, String>> future =       kafkaTemplate.send(topicName, message);        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {         @Override        public void onSuccess(SendResult<String, String> result) {            System.out.println("Sent message=[" + message +               "] with offset=[" + result.getRecordMetadata().offset() + "]");        }        @Override        public void onFailure(Throwable ex) {            System.out.println("Unable to send message=["               + message + "] due to : " + ex.getMessage());        }    });}

5. 音讯生产

5.1. 消费者配置

对于生产音讯,咱们须要配置一个 ConsumerFactory 和一个 KafkaListenerContainerFactory

一旦这些bean在Spring Bean工厂中可用,就能够应用 @KafkaListener 注解配置基于POJO的消费者。

配置类上须要增加 @EnableKafka 注解,以便可能检测Spring治理的bean上的 @KafkaListener 注解:

@EnableKafka@Configurationpublic class KafkaConsumerConfig {     @Bean    public ConsumerFactory<String, String> consumerFactory() {        Map<String, Object> props = new HashMap<>();        props.put(          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,           bootstrapAddress);        props.put(          ConsumerConfig.GROUP_ID_CONFIG,           groupId);        props.put(          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,           StringDeserializer.class);        props.put(          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,           StringDeserializer.class);        return new DefaultKafkaConsumerFactory<>(props);    }     @Bean    public ConcurrentKafkaListenerContainerFactory<String, String>       kafkaListenerContainerFactory() {           ConcurrentKafkaListenerContainerFactory<String, String> factory =          new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        return factory;    }}

5.2. 音讯生产

@KafkaListener(topics = "topicName", groupId = "foo")public void listenGroupFoo(String message) {    System.out.println("Received Message in group foo: " + message);}

能够为一个 topic 实现多个 listener,每个topic 都有不同的组Id。此外,一个消费者能够监听来自不同 topic 的音讯:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring 还反对应用 listener 中的 @Header 注解检索一个或多个音讯题目:

@KafkaListener(topics = "topicName")public void listenWithHeaders(  @Payload String message,   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {      System.out.println(        "Received Message: " + message"        + "from partition: " + partition);}

5.3. 生产来自特定分区的音讯

留神到,咱们只应用一个分区创立了 topic “developlee”。然而,对于具备多个分区的主题,@KafkaListener 能够显式订阅具备初始偏移量 topic 的特定分区:

@KafkaListener(  topicPartitions = @TopicPartition(topic = "topicName",  partitionOffsets = {    @PartitionOffset(partition = "0", initialOffset = "0"),     @PartitionOffset(partition = "3", initialOffset = "0")}),  containerFactory = "partitionsKafkaListenerContainerFactory")public void listenToPartition(  @Payload String message,   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {      System.out.println(        "Received Message: " + message"        + "from partition: " + partition);}

因为 initialOffset 已被发送到该 listener 中的分区0,因而每次初始化该 listener 时,将从新应用以前从分区0和分区3耗费的所有音讯。如果不须要设置偏移量,咱们能够应用 @TopicPartition 注解的 partitions 属性只设置没有偏移量的分区:

@KafkaListener(topicPartitions   = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. 为Listener增加音讯过滤器

通过增加自定义过滤器,能够将 listener 配置为应用特定类型的音讯。这能够通过将 RecordFilterStrategy 设置为 KafkaListenerContainerFactory 来实现:

@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String>  filterKafkaListenerContainerFactory() {     ConcurrentKafkaListenerContainerFactory<String, String> factory =      new ConcurrentKafkaListenerContainerFactory<>();    factory.setConsumerFactory(consumerFactory());    factory.setRecordFilterStrategy(      record -> record.value().contains("World"));    return factory;}

而后能够将 listener 配置为应用此容器工厂:

@KafkaListener(  topics = "topicName",   containerFactory = "filterKafkaListenerContainerFactory")public void listenWithFilter(String message) {    System.out.println("Received Message in filtered listener: " + message);}

在这个 listener 中,所有与过滤器匹配的音讯都将被抛弃

6. 自定义音讯转换器

到目前为止,咱们只探讨了字符串作为音讯发送和接管的对象。然而,咱们也能够发送和接管定制的Java对象。这须要在 ProducerFactory 中配置适当的序列化器,并在 ConsumerFactory 中配置反序列化器。

让咱们看一个简略的bean,并将以音讯的模式发送它:

public class Greeting {     private String msg;    private String name;     // standard getters, setters and constructor}

6.1. 生产自定义音讯

在本例中,咱们将应用 JsonSerializer。咱们看看 ProducerFactoryKafkaTemplate 的代码:

@Beanpublic ProducerFactory<String, Greeting> greetingProducerFactory() {    // ...    configProps.put(      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,       JsonSerializer.class);    return new DefaultKafkaProducerFactory<>(configProps);} @Beanpublic KafkaTemplate<String, Greeting> greetingKafkaTemplate() {    return new KafkaTemplate<>(greetingProducerFactory());}

新的 KafkaTemplate 可用于发送 Greeting 音讯:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. 生产自定义音讯

同样,咱们批改 ConsumerFactoryKafkaListenerContainerFactory 来正确反序列化 Greeting 音讯:

@Beanpublic ConsumerFactory<String, Greeting> greetingConsumerFactory() {    // ...    return new DefaultKafkaConsumerFactory<>(      props,      new StringDeserializer(),       new JsonDeserializer<>(Greeting.class));} @Beanpublic ConcurrentKafkaListenerContainerFactory<String, Greeting>   greetingKafkaListenerContainerFactory() {     ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =      new ConcurrentKafkaListenerContainerFactory<>();    factory.setConsumerFactory(greetingConsumerFactory());    return factory;}

spring-kafka JSON序列化器和反序列化器应用 Jackson 库,该库是 spring-kafka 我的项目的可选maven依赖项。咱们也把它加到 pom.xml 文件:

<dependency>    <groupId>com.fasterxml.jackson.core</groupId>    <artifactId>jackson-databind</artifactId>    <version>2.9.7</version></dependency>

倡议不要应用 Jackson 的最新版本,而是应用 pom.xml 文件 中 spring-kafka 的版本。
最初,咱们须要编写一个 listener 来 生产 Greeting 音讯:

@KafkaListener(  topics = "topicName",   containerFactory = "greetingKafkaListenerContainerFactory")public void greetingListener(Greeting greeting) {    // process greeting message}

7. 结语

在本文中,咱们介绍了Apache Kafka 和 Spring 集成的基础知识,且简要介绍了用于发送和接管音讯的类。
本文的残缺源代码能够在GitHub上找到. 在执行代码之前,请确保服务器正在运行 Kafka。
如果你感觉文章还不错,记得关注公众号: 锅外的大佬
刘一手的博客