前言
最近应用helm3装置好了kafka和rabbitmq,并且想集成到spring中,发现集成不是那么简略的,尽管有官网实例,然而实例下面短少必要的代码所以通过本人摸索一步步实现,分享给大家。
操作
首先,装置好kafka装置和rabbitmq装置环境,装置好之后,咱们就能够配置spring了。
1、首先引入相干依赖包:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>
2、配置application.yml文件
上面是我的配置,如下所示:
spring: cloud: stream: function: definition: testKafkaOut;testKafkaIn;testRabbitOut;testRabbitIn bindings: testKafkaOut-out-0: binder: kafka-binder destination: test #设置音讯类型,本次为json,文本则设置"text/plain" content-type: application/json testKafkaIn-in-0: binder: kafka-binder destination: test content-type: application/json group: log_group testRabbitOut-out-0: binder: rabbit-binder destination: dev content-type: application/json testRabbitIn-in-0: binder: rabbit-binder destination: dev content-type: application/json group: dev-group binders: kafka-binder: type: kafka environment: spring: cloud: stream: kafka: binder: brokers: xxx.xxx.xxx.xxx:xxxx auto-create-topics: true rabbit-binder: type: rabbit # 音讯组件类型 environment: # 设置rabbitmq的相干的环境配置 spring: rabbitmq: host: xxx.xxx.xxx.xxx port: xxxx username: user password: password virtual-host: dev
这样就实现了spring cloud stream
和kafka
以及rabbitmq
的配置。
3、收发音讯
新建一个java类进行收发音讯操作,如下所示:
import org.springframework.context.annotation.Bean;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;import java.util.function.Consumer;import java.util.function.Supplier;@Componentpublic class MessageProcessor { @Bean public Supplier<Message<String>> testKafkaOut() { return () -> MessageBuilder.withPayload("Hello from Kafka!").build(); } @Bean public Consumer<Message<String>> testKafkaIn() { return message -> System.out.println("Received from Kafka: " + message.getPayload()); } @Bean public Supplier<Message<String>> testRabbitOut() { return () -> MessageBuilder.withPayload("Hello from RabbitMQ!").build(); } @Bean public Consumer<Message<String>> testRabbitIn() { return message -> System.out.println("Received from RabbitMQ: " + message.getPayload()); }}
或者应用上面简略的写法:
发消息:
@Autowiredprivate StreamBridge streamBridge;...streamBridge.send("testRabbitOut-out-0", "hello rabbitmq");streamBridge.send("testKafkaOut-out-0", "hello kafka");...
收音讯:
@Componentpublic class ConsumersHandler { @Bean public Consumer<String> testKafkaIn(){ return str -> { System.out.println("Success Rescive message from kafka: " + str); }; } @Bean public Consumer<String> testRabbitIn() { return str -> { System.out.println("Success Rescive message from rabbitmq: " + str); }; }}
两种写法都能够,就看你本人的抉择了,上面是收到音讯的打印后果:
Success Rescive message from rabbitmq: hello rabbitmqSuccess Rescive message from kafka: hello kafka
总结
1、配置application.yml文件的时候要留神function.definition
的写法,如下所示:
function: definition: testKafkaOut;testKafkaIn;testRabbitOut;testRabbitIn
而我一开始写成了:
testKafkaOut,testKafkaIn,testRabbitOut,testRabbitIn
导致报错:
kafka-binder,rabbit-binder, and no default binder has been set.
别小看这个问题,因为我的大意花了一周才解决,唉!
2、同样是application.yml中的配置default-binder
能够不必设置,因为咱们在每个bindings中曾经指定了binder了
3、上面的写法,应用程序启动之后报:rabbitmq binder是找不到
,所以应用了localhost:5672
,如下所示:
spring: cloud: stream: kafka: binder: auto-create-topics: true brokers: xxx.xxx.xxx.xxx:xxxx rabbit: binder: host: xxx.xxx.xxx.xxx port: xxxx username: user password: password virtual-host: dev
这个写法有问题,所以举荐我下面的binders
写法。
4、我开始借助了chatgpt3.5,它有时候给的代码都是spring cloud strem 3.1
之前的,于是我应用了4.0之后给出的代码是最新的,大家能够试试chatgpt4,这里有个举荐的地址
援用
Spring Cloud Stream 函数式编程整合 kafka/rabbit
spring-cloud-stream-samples
Spring Cloud Stream 整合Kafka
Spring Cloud Stream Rabbit 3.1.3 入门实际