一、背景

此处简略记录一下 SpringBootKafka 的整合。

二、实现步骤

1、引入jar包

<dependency>   <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>

2、编写生产者和消费者的配置

3、生产者配置

spring.application.name=kafka-springboot# 配置 kafka 服务器的地址,多个以逗号隔开spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094# 生产者配置spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.acks=1spring.kafka.producer.retries=0spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432

4、消费者配置

# 消费者配置# 敞开主动提交 ackspring.kafka.consumer.enable-auto-commit=falsespring.kafka.consumer.auto-commit-interval=100spring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.max-poll-records=500spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 配置监听手动提交 ack ,生产一条数据完后,立刻提交spring.kafka.listener.ack-mode=manual_immediate# 经测试也是批量提交的ack , 当生产完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交#spring.kafka.listener.ack-mode=manualspring.kafka.listener.poll-timeout=500S

5、消费者手动提交 ack

1、spring.kafka.consumer.enable-auto-commit 批改成 false
2、spring.kafka.listener.ack-mode 批改成
            |- manual: 示意手动提交,然而测试下来发现是批量提交
            |- manual_immediate: 示意手动提交,当调用 Acknowledgment#acknowledge之后立马提交。

3、编写生产者代码

@Componentpublic class KafkaProducer implements CommandLineRunner {    @Autowired    private KafkaTemplate<String, String> kafkaTemplate;    @Override    public void run(String... args) {        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() ->                {                    kafkaTemplate.send(KafkaConstant.TOPIC, String.valueOf(System.currentTimeMillis()))                            .addCallback(new SuccessCallback<SendResult<String, String>>() {                                @Override                                public void onSuccess(SendResult<String, String> result) {                                    if (null != result.getRecordMetadata()) {                                        System.out.println("生产发送胜利 offset:" + result.getRecordMetadata().offset());                                        return;                                    }                                    System.out.println("音讯发送胜利");                                }                            }, new FailureCallback() {                                @Override                                public void onFailure(Throwable throwable) {                                    System.out.println("生产发送失败:" + throwable.getMessage());                                }                            });                },                0, 1, TimeUnit.SECONDS);    }}

1、生产的发送应用KafkaTemplate
2、依据发送的后果晓得,音讯发送胜利还是失败。

4、编写消费者代码

@Componentpublic class KafkaConsumer {    @KafkaListener(topics = KafkaConstant.TOPIC, groupId = "kafka-springboot-001")    public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {        System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接管到kafka音讯,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());        TimeUnit.SECONDS.sleep(1);        ack.acknowledge();    }}

KafkaListener:
      topic: 示意须要监听的队列名称
      groupId: 示意消费者组的id

三、运行后果

四、参考文档

1、https://docs.spring.io/spring-boot/docs/2.4.2/reference/htmlsingle/#boot-features-kafka

五、代码门路

https://gitee.com/huan1993/rabbitmq/tree/master/kafka-springboot/src/main/java/com/huan/study/kafka