共计 4875 个字符,预计需要花费 13 分钟才能阅读完成。
本文所有代码均在 Github 上:
https://github.com/kevinwang0…
https://github.com/kevinwang0…
kafka-clients
增加依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
消费者 Consumer
代码上总体能够分为三局部:
-
消费者的配置
- 消费者的配置在
org.apache.kafka.clients.consumer.ConsumerConfig
类中都有列举包含每个配置项的文档阐明
- 消费者的配置在
- 创立消费者实例并订阅 topic
- 生产音讯
代码如下:
// 1. 配置
Properties properties = new Properties();
//bootstrap.servers kafka 集群地址 host1:port1,host2:port2 ....
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// key.deserializer 音讯 key 序列化形式
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value.deserializer 音讯体序列化形式
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// group.id 生产组 id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
// enable.auto.commit 设置主动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// auto.offset.reset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 2. 创立消费者实例并订阅 topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String[] topics = new String[]{"demo-topic"};
consumer.subscribe(Arrays.asList(topics));
// 3. 生产音讯
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {System.out.println(record);
}
}
生产者 Producer
生产者这块的代码基本上和消费者的构造一样,不同的是,producer 的发消息的局部:
-
生产者的配置
- 在
org.apache.kafka.clients.producer.ProducerConfig
类中也都有列举
- 在
- 创立生产者实例
-
发送音讯到 topic
- 异步发送音讯
producer.send(new ProducerRecord<>("demo-topic", data))
- 同步发送音讯,应用
Future.get()
阻塞接管 - 异步发送音讯,回调的形式
- 异步发送音讯
整体代码如下
// 1. 配置
Properties properties = new Properties();
// bootstrap.servers kafka 集群地址 host1:port1,host2:port2 ....
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// key.deserializer 音讯 key 序列化形式
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value.deserializer 音讯体序列化形式
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 2. 创立生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 3. 发送音讯
// 0 异步发送音讯
for (int i = 0; i < 10; i++) {
String data = "async :" + i;
// 发送音讯
producer.send(new ProducerRecord<>("demo-topic", data));
}
// 1 同步发送音讯 调用 get() 阻塞返回后果
for (int i = 0; i < 10; i++) {
String data = "sync :" + i;
try {
// 发送音讯
Future<RecordMetadata> send = producer.send(new ProducerRecord<>("demo-topic", data));
RecordMetadata recordMetadata = send.get();
System.out.println(recordMetadata);
} catch (Exception e) {e.printStackTrace();
}
}
// 2 异步发送音讯 回调 callback()
for (int i = 0; i < 10; i++) {
String data = "callback :" + i;
// 发送音讯
producer.send(new ProducerRecord<>("demo-topic", data), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// 发送音讯的回调
if (exception != null) {exception.printStackTrace();
} else {System.out.println(metadata);
}
}
});
}
producer.close();
整合 SpringBoot
增加依赖
<parent>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
....
....
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--kafka starter-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 不便测试用 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
代码
# application.yml
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-group
// 启动类
@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);
}
}
// 消费者
@Component
public class Consumer {@KafkaListener(topics = { "test-topic"})
public void receiveMessage(String message) {System.out.println(message);
}
}
// 生产者
@Component
public class Producer {
@Resource
KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);
}
}
// 测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {
@Autowired
private Producer producer;
@Test
public void send() {producer.sendMessage("test-topic", "test-message");
try {Thread.sleep(10000);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
总结
整合 SpringBoot 之后的代码还是十分简洁的, 不过还是要相熟原生 API,这样能力在理论我的项目中遇到问题时熟能生巧。
正文完