本文所有代码均在Github上:https://github.com/kevinwang0...
https://github.com/kevinwang0...
kafka-clients
增加依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version></dependency>
消费者 Consumer
代码上总体能够分为三局部:
消费者的配置
- 消费者的配置在
org.apache.kafka.clients.consumer.ConsumerConfig
类中都有列举包含每个配置项的文档阐明
- 消费者的配置在
- 创立消费者实例并订阅topic
- 生产音讯
代码如下:
// 1. 配置Properties properties = new Properties();//bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");// key.deserializer 音讯key序列化形式properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// value.deserializer 音讯体序列化形式properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// group.id 生产组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");// enable.auto.commit 设置主动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// auto.offset.resetproperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 2. 创立消费者实例并订阅topicKafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);String[] topics = new String[]{"demo-topic"};consumer.subscribe(Arrays.asList(topics));// 3. 生产音讯while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); }}
生产者 Producer
生产者这块的代码基本上和消费者的构造一样,不同的是,producer 的发消息的局部:
生产者的配置
- 在
org.apache.kafka.clients.producer.ProducerConfig
类中也都有列举
- 在
- 创立生产者实例
发送音讯到 topic
- 异步发送音讯
producer.send(new ProducerRecord<>("demo-topic", data))
- 同步发送音讯 ,应用
Future.get()
阻塞接管 - 异步发送音讯,回调的形式
- 异步发送音讯
整体代码如下
// 1. 配置Properties properties = new Properties();// bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");// key.deserializer 音讯key序列化形式properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// value.deserializer 音讯体序列化形式properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2. 创立生产者实例KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3. 发送音讯// 0 异步发送音讯for (int i = 0; i < 10; i++) { String data = "async :" + i; // 发送音讯 producer.send(new ProducerRecord<>("demo-topic", data));}// 1 同步发送音讯 调用get()阻塞返回后果for (int i = 0; i < 10; i++) { String data = "sync : " + i; try { // 发送音讯 Future<RecordMetadata> send = producer.send(new ProducerRecord<>("demo-topic", data)); RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata); } catch (Exception e) { e.printStackTrace(); }}// 2 异步发送音讯 回调callback()for (int i = 0; i < 10; i++) { String data = "callback : " + i; // 发送音讯 producer.send(new ProducerRecord<>("demo-topic", data), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // 发送音讯的回调 if (exception != null) { exception.printStackTrace(); } else { System.out.println(metadata); } } });}producer.close();
整合SpringBoot
增加依赖
<parent> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --></parent>........<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId></dependency><!--kafka starter--><dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency><!--不便测试用--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope></dependency><dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope></dependency>
代码
# application.ymlspring: kafka: bootstrap-servers: 127.0.0.1:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer group-id: test-group
// 启动类@SpringBootApplicationpublic class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); }}// 消费者@Componentpublic class Consumer { @KafkaListener(topics = { "test-topic" }) public void receiveMessage(String message) { System.out.println(message); }}// 生产者@Componentpublic class Producer { @Resource KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); }}// 测试@RunWith(SpringRunner.class)@SpringBootTestpublic class DemoApplicationTests { @Autowired private Producer producer; @Test public void send() { producer.sendMessage("test-topic", "test-message"); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }}
总结
整合SpringBoot之后的代码还是十分简洁的,不过还是要相熟原生API,这样能力在理论我的项目中遇到问题时熟能生巧。