乐趣区

关于kafka:javaspring-cloud-stream集成kafka和rabbitmq收发消息示例

前言

最近应用 helm3 装置好了 kafka 和 rabbitmq,并且想集成到 spring 中,发现集成不是那么简略的,尽管有官网实例,然而实例下面短少必要的代码所以通过本人摸索一步步实现,分享给大家。

操作

首先,装置好 kafka 装置和 rabbitmq 装置环境,装置好之后,咱们就能够配置 spring 了。

1、首先引入相干依赖包:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

2、配置 application.yml 文件

上面是我的配置,如下所示:

spring:
  cloud:
    stream:
      function:
        definition: testKafkaOut;testKafkaIn;testRabbitOut;testRabbitIn
      bindings:
        testKafkaOut-out-0: 
          binder: kafka-binder
          destination: test
          #设置音讯类型,本次为 json,文本则设置 "text/plain"
          content-type: application/json 
        testKafkaIn-in-0:
          binder: kafka-binder
          destination: test
          content-type: application/json 
          group: log_group
        testRabbitOut-out-0:
          binder: rabbit-binder
          destination: dev
          content-type: application/json
        testRabbitIn-in-0:
          binder: rabbit-binder
          destination: dev
          content-type: application/json
          group: dev-group
      binders:
        kafka-binder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: xxx.xxx.xxx.xxx:xxxx
                      auto-create-topics: true
        rabbit-binder:
          type: rabbit # 音讯组件类型
          environment: # 设置 rabbitmq 的相干的环境配置
            spring:
              rabbitmq:
                host: xxx.xxx.xxx.xxx
                port: xxxx
                username: user
                password: password
                virtual-host: dev

这样就实现了 spring cloud streamkafka以及 rabbitmq 的配置。

3、收发音讯

新建一个 java 类进行收发音讯操作,如下所示:

import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
import java.util.function.Supplier;

@Component
public class MessageProcessor {
    @Bean
    public Supplier<Message<String>> testKafkaOut() {return () -> MessageBuilder.withPayload("Hello from Kafka!").build();}

    @Bean
    public Consumer<Message<String>> testKafkaIn() {return message -> System.out.println("Received from Kafka:" + message.getPayload());
    }

    @Bean
    public Supplier<Message<String>> testRabbitOut() {return () -> MessageBuilder.withPayload("Hello from RabbitMQ!").build();}

    @Bean
    public Consumer<Message<String>> testRabbitIn() {return message -> System.out.println("Received from RabbitMQ:" + message.getPayload());
    }
}

或者应用上面简略的写法:

发消息:

@Autowired
private StreamBridge streamBridge;
...
streamBridge.send("testRabbitOut-out-0", "hello rabbitmq");
streamBridge.send("testKafkaOut-out-0", "hello kafka");
...

收音讯:

@Component
public class ConsumersHandler {
    @Bean
    public Consumer<String> testKafkaIn(){

        return str -> {System.out.println("Success Rescive message from kafka:" + str);
        };
    }

    @Bean
    public Consumer<String> testRabbitIn() {
        return str -> {System.out.println("Success Rescive message from rabbitmq:" + str);
        };
    }
}

两种写法都能够,就看你本人的抉择了,上面是收到音讯的打印后果:

Success Rescive message from rabbitmq: hello rabbitmq
Success Rescive message from kafka: hello kafka

总结

1、配置 application.yml 文件的时候要留神 function.definition 的写法,如下所示:

function:
        definition: testKafkaOut;testKafkaIn;testRabbitOut;testRabbitIn

而我一开始写成了:

testKafkaOut,testKafkaIn,testRabbitOut,testRabbitIn

导致报错:

kafka-binder,rabbit-binder, and no default binder has been set.

别小看这个问题,因为我的大意花了一周才解决,唉!
2、同样是 application.yml 中的配置 default-binder 能够不必设置,因为咱们在每个 bindings 中曾经指定了 binder 了
3、上面的写法,应用程序启动之后报:rabbitmq binder 是找不到,所以应用了localhost:5672,如下所示:

spring:
  cloud:
    stream:
      kafka:
        binder:
          auto-create-topics: true
          brokers: xxx.xxx.xxx.xxx:xxxx
      rabbit:
        binder:
          host: xxx.xxx.xxx.xxx
          port: xxxx
          username: user
          password: password
          virtual-host: dev

这个写法有问题,所以举荐我下面的 binders 写法。
4、我开始借助了 chatgpt3.5,它有时候给的代码都是 spring cloud strem 3.1 之前的,于是我应用了 4.0 之后给出的代码是最新的,大家能够试试 chatgpt4,这里有个举荐的地址

援用

Spring Cloud Stream 函数式编程整合 kafka/rabbit
spring-cloud-stream-samples
Spring Cloud Stream 整合 Kafka
Spring Cloud Stream Rabbit 3.1.3 入门实际

退出移动版