一、背景
此处简略记录一下 SpringBoot
和 Kafka
的整合。
二、实现步骤
1、引入 jar 包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2、编写生产者和消费者的配置
3、生产者配置
spring.application.name=kafka-springboot
# 配置 kafka 服务器的地址,多个以逗号隔开
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
# 生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
4、消费者配置
# 消费者配置
# 敞开主动提交 ack
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 配置监听手动提交 ack , 生产一条数据完后,立刻提交
spring.kafka.listener.ack-mode=manual_immediate
# 经测试也是批量提交的 ack , 当生产完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交
#spring.kafka.listener.ack-mode=manual
spring.kafka.listener.poll-timeout=500S
5、消费者手动提交 ack
1、spring.kafka.consumer.enable-auto-commit
批改成 false
2、spring.kafka.listener.ack-mode
批改成
|- manual
: 示意手动提交,然而测试下来发现是批量提交
|- manual_immediate
: 示意手动提交,当调用 Acknowledgment#acknowledge
之后立马提交。
3、编写生产者代码
@Component
public class KafkaProducer implements CommandLineRunner {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Override
public void run(String... args) {Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() ->
{kafkaTemplate.send(KafkaConstant.TOPIC, String.valueOf(System.currentTimeMillis()))
.addCallback(new SuccessCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {if (null != result.getRecordMetadata()) {System.out.println("生产发送胜利 offset:" + result.getRecordMetadata().offset());
return;
}
System.out.println("音讯发送胜利");
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable throwable) {System.out.println("生产发送失败:" + throwable.getMessage());
}
});
},
0, 1, TimeUnit.SECONDS);
}
}
1、生产的发送应用 KafkaTemplate
。
2、依据发送的后果晓得,音讯发送胜利还是失败。
4、编写消费者代码
@Component
public class KafkaConsumer {@KafkaListener(topics = KafkaConstant.TOPIC, groupId = "kafka-springboot-001")
public void consumer(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) + "接管到 kafka 音讯,partition:" + record.partition() + ",offset:" + record.offset() + "value:" + record.value());
TimeUnit.SECONDS.sleep(1);
ack.acknowledge();}
}
KafkaListener
:
topic
: 示意须要监听的队列名称
groupId
: 示意消费者组的 id
三、运行后果
四、参考文档
1、https://docs.spring.io/spring-boot/docs/2.4.2/reference/htmlsingle/#boot-features-kafka
五、代码门路
https://gitee.com/huan1993/rabbitmq/tree/master/kafka-springboot/src/main/java/com/huan/study/kafka