共计 4333 个字符,预计需要花费 11 分钟才能阅读完成。
Springboot 系列之 kafka 操作
kafka 简介
ApacheKafka®是一个分布式流媒体平台。有三个关键功能:
发布和订阅记录流,类似于消息队列或企业消息传递系统。以容错的持久方式存储记录流。记录发生时处理流。 |
Kafka 通常用于两大类应用:
构建可在系统或应用程序之间可靠获取数据的实时流数据管道 | |
构建转换或响应数据流的实时流应用程序 |
kafka 概念
(1)什么是流处理?
所谓流处理,我的理解是流水线处理。例如,电子厂每个人负责一个功能,来了就处 | |
理,不来就等着。 |
(2)partition 和 replication 和 broker 有关吗?
partition 和 replication 是分区和备份的概念。即使是单机一个 broker 也一样 | |
支持。 |
(3)consumer 如何设置和存储 partition 的 offset 偏移量,有哪几种消费模式,怎么确定消息是否被消费,将偏移量移到前面会立即消费到最后吗?
使用 KafkaConsumer 设置 partition 和 offset。有自动提交和手动 ack 模式提交 | |
偏移量两种消费方式。将偏移量移到前面需要设置成为消费状态会立即被消费(设置 | |
新消费组)。 |
(4)AckMode 模式有哪几种?
RECORD:处理记录后,侦听器返回时提交偏移量 | |
BATCH:在处理 poll()返回的所有记录时提交偏移量 | |
TIME:只要已超过自上次提交以来的 ackTime,就会在处理 poll()返回的所有记录时提交偏移量 | |
COUNT:只要自上次提交以来已收到 ackCount 记录,就会在处理 poll()返回的所有记录时提交偏移量 | |
COUNT_TIME:与 TIME 和 COUNT 类似,但如果任一条件为真,则执行提交 | |
MANUAL:消息监听器负责确认()确认。之后,应用与 BATCH 相同的语义 | |
MANUAL_IMMEDIATE:当侦听器调用 Acknowledgment.acknowledge()方法时,立即提交偏移量 |
Springboot 使用 kafka
(1)注入 NewTopic 自动在 broker 中添加 topic
@Bean | |
public NewTopic topic() {return new NewTopic("topic1", 2, (short) 1); | |
} |
(2)使用 KafkaTemplate 发送消息时,topic 自动创建,自动创建的 partition 是 0,长度为 1
(3)使用 KafkaTemplate 发送消息
@RequestMapping("sendMsgWithTopic") | |
public String sendMsgWithTopic(@RequestParam String topic, @RequestParam int partition, @RequestParam String key, | |
@RequestParam String value) {ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, value); | |
return "success"; | |
} |
(4)异步发送消息
public void sendToKafka(final MyOutputData data) {final ProducerRecord<String, String> record = createRecord(data); | |
ListenableFuture<SendResult<Integer, String>> future = template.send(record); | |
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { | |
@Override | |
public void onSuccess(SendResult<Integer, String> result) {handleSuccess(data); | |
} | |
@Override | |
public void onFailure(Throwable ex) {handleFailure(data, record, ex); | |
} | |
}); | |
} |
(5)同步发送消息
public void sendToKafka(final MyOutputData data) {final ProducerRecord<String, String> record = createRecord(data); | |
try {template.send(record).get(10, TimeUnit.SECONDS); | |
handleSuccess(data); | |
}catch (ExecutionException e) {handleFailure(data, record, e.getCause()); | |
}catch (TimeoutException | InterruptedException e) {handleFailure(data, record, e); | |
} | |
} |
(6)事务
(1)Spring 事务支持一起使用(@Transactional,TransactionTemplate 等)(2)使用 template 执行事务 | |
boolean result = template.executeInTransaction(t -> {t.sendDefault("thing1", "thing2"); | |
t.sendDefault("cat", "hat"); | |
return true; | |
}); |
(7)消费者
(1)简单使用 | |
@KafkaListener(id = "myListener", topics = "myTopic", | |
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}") | |
public void listen(String data) {...}(2)配置多个 topic 和 partition,TopicPartition 中 partitions 和 PartitionOffset 不能同时使用 | |
@KafkaListener(id = "thing2", topicPartitions = | |
{@TopicPartition(topic = "topic1", partitions = { "0", "1"}), | |
@TopicPartition(topic = "topic2", partitions = "0", | |
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) | |
}) | |
public void listen(ConsumerRecord<?, ?> record) {...}(3)使用 ack 手动确认模式 | |
@KafkaListener(id = "cat", topics = "myTopic", | |
containerFactory = "kafkaManualAckListenerContainerFactory") | |
public void listen(String data, Acknowledgment ack) { | |
... | |
ack.acknowledge();} | |
(4) 获取消息的 header 信息 | |
@KafkaListener(id = "qux", topicPattern = "myTopic1") | |
public void listen(@Payload String foo, | |
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, | |
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, | |
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, | |
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts | |
) {...}(5)批处理 | |
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") | |
public void listen(List<String> list, | |
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys, | |
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, | |
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, | |
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {...}(6)使用 @Valid 校验数据 | |
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", | |
containerFactory = "kafkaJsonListenerContainerFactory") | |
public void validatedListener(@Payload @Valid ValidatedClass val) {...} | |
@Bean | |
public KafkaListenerErrorHandler validationErrorHandler() {return (m, e) -> {...}; | |
}(7)topic 根据参数类型映射不同方法 | |
@KafkaListener(id = "multi", topics = "myTopic") | |
static class MultiListenerBean { | |
@KafkaHandler | |
public void listen(String cat) {...} | |
@KafkaHandler | |
public void listen(Integer hat) {...} | |
@KafkaHandler | |
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {...} | |
} |
Springboot 使用 kafka 踩坑
(1)需要修改 server.properties 的 listener 主机地址不然 Java 获取不到消息。
(2)不同服务配置相同 groupId 只有一个监听者可以收到消息
kafka 图形化工具 kafka tool
下载地址 http://www.kafkatool.com/down…
有问题请留言!原文地址:
正文完