本文所有代码均在Github上:

https://github.com/kevinwang0...

https://github.com/kevinwang0...


kafka-clients

增加依赖

 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --><dependency>  <groupId>org.apache.kafka</groupId>  <artifactId>kafka-clients</artifactId>  <version>2.5.0</version></dependency>

消费者 Consumer

代码上总体能够分为三局部:

  1. 消费者的配置

    1. 消费者的配置在 org.apache.kafka.clients.consumer.ConsumerConfig 类中都有列举包含每个配置项的文档阐明
  2. 创立消费者实例并订阅topic
  3. 生产音讯

代码如下:

// 1. 配置Properties properties = new Properties();//bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");// key.deserializer 音讯key序列化形式properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// value.deserializer 音讯体序列化形式properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// group.id 生产组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");// enable.auto.commit 设置主动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// auto.offset.resetproperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 2. 创立消费者实例并订阅topicKafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);String[] topics = new String[]{"demo-topic"};consumer.subscribe(Arrays.asList(topics));// 3. 生产音讯while (true) {  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  for (ConsumerRecord<String, String> record : records) {    System.out.println(record);  }}

生产者 Producer

生产者这块的代码基本上和消费者的构造一样,不同的是,producer 的发消息的局部:

  1. 生产者的配置

    1. org.apache.kafka.clients.producer.ProducerConfig 类中也都有列举
  2. 创立生产者实例
  3. 发送音讯到 topic

    1. 异步发送音讯 producer.send(new ProducerRecord<>("demo-topic", data))
    2. 同步发送音讯 ,应用 Future.get() 阻塞接管
    3. 异步发送音讯,回调的形式

整体代码如下

// 1. 配置Properties properties = new Properties();// bootstrap.servers kafka集群地址 host1:port1,host2:port2 ....properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");// key.deserializer 音讯key序列化形式properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// value.deserializer 音讯体序列化形式properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 2. 创立生产者实例KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// 3. 发送音讯// 0 异步发送音讯for (int i = 0; i < 10; i++) {  String data = "async :" + i;  // 发送音讯  producer.send(new ProducerRecord<>("demo-topic", data));}// 1 同步发送音讯 调用get()阻塞返回后果for (int i = 0; i < 10; i++) {  String data = "sync : " + i;  try {    // 发送音讯    Future<RecordMetadata> send = producer.send(new ProducerRecord<>("demo-topic", data));    RecordMetadata recordMetadata = send.get();    System.out.println(recordMetadata);  } catch (Exception e) {    e.printStackTrace();  }}// 2 异步发送音讯 回调callback()for (int i = 0; i < 10; i++) {  String data = "callback : " + i;  // 发送音讯  producer.send(new ProducerRecord<>("demo-topic", data), new Callback() {    @Override    public void onCompletion(RecordMetadata metadata, Exception exception) {      // 发送音讯的回调      if (exception != null) {        exception.printStackTrace();      } else {        System.out.println(metadata);      }    }  });}producer.close();

整合SpringBoot

增加依赖

<parent>  <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-parent</artifactId>  <version>2.3.2.RELEASE</version>  <relativePath/> <!-- lookup parent from repository --></parent>........<dependency>   <groupId>org.springframework.boot</groupId>   <artifactId>spring-boot-starter</artifactId></dependency><!--kafka starter--><dependency>  <groupId>org.springframework.kafka</groupId>  <artifactId>spring-kafka</artifactId></dependency><!--不便测试用--><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-test</artifactId>  <scope>test</scope></dependency><dependency>  <groupId>org.springframework.kafka</groupId>  <artifactId>spring-kafka-test</artifactId>  <scope>test</scope></dependency>

代码

# application.ymlspring:  kafka:    bootstrap-servers: 127.0.0.1:9092    producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      group-id: test-group
// 启动类@SpringBootApplicationpublic class DemoApplication {    public static void main(String[] args) {        SpringApplication.run(DemoApplication.class, args);    }}// 消费者@Componentpublic class Consumer {    @KafkaListener(topics = { "test-topic" })    public void receiveMessage(String message) {        System.out.println(message);    }}// 生产者@Componentpublic class Producer {    @Resource    KafkaTemplate<String, String> kafkaTemplate;    public void sendMessage(String topic, String message) {        kafkaTemplate.send(topic, message);    }}// 测试@RunWith(SpringRunner.class)@SpringBootTestpublic class DemoApplicationTests {    @Autowired    private Producer producer;    @Test    public void send() {        producer.sendMessage("test-topic", "test-message");        try {            Thread.sleep(10000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

总结

整合SpringBoot之后的代码还是十分简洁的,不过还是要相熟原生API,这样能力在理论我的项目中遇到问题时熟能生巧。