前言

最近应用helm3装置好了kafka和rabbitmq,并且想集成到spring中,发现集成不是那么简略的,尽管有官网实例,然而实例下面短少必要的代码所以通过本人摸索一步步实现,分享给大家。

操作

首先,装置好kafka装置和rabbitmq装置环境,装置好之后,咱们就能够配置spring了。

1、首先引入相干依赖包:

<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency><dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency>

2、配置application.yml文件

上面是我的配置,如下所示:

spring:  cloud:    stream:      function:        definition: testKafkaOut;testKafkaIn;testRabbitOut;testRabbitIn      bindings:        testKafkaOut-out-0:           binder: kafka-binder          destination: test          #设置音讯类型,本次为json,文本则设置"text/plain"          content-type: application/json         testKafkaIn-in-0:          binder: kafka-binder          destination: test          content-type: application/json           group: log_group        testRabbitOut-out-0:          binder: rabbit-binder          destination: dev          content-type: application/json        testRabbitIn-in-0:          binder: rabbit-binder          destination: dev          content-type: application/json          group: dev-group      binders:        kafka-binder:          type: kafka          environment:            spring:              cloud:                stream:                  kafka:                    binder:                      brokers: xxx.xxx.xxx.xxx:xxxx                      auto-create-topics: true        rabbit-binder:          type: rabbit # 音讯组件类型          environment: # 设置rabbitmq的相干的环境配置            spring:              rabbitmq:                host: xxx.xxx.xxx.xxx                port: xxxx                username: user                password: password                virtual-host: dev

这样就实现了spring cloud streamkafka以及rabbitmq的配置。

3、收发音讯

新建一个java类进行收发音讯操作,如下所示:

import org.springframework.context.annotation.Bean;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;import java.util.function.Consumer;import java.util.function.Supplier;@Componentpublic class MessageProcessor {    @Bean    public Supplier<Message<String>> testKafkaOut() {        return () -> MessageBuilder.withPayload("Hello from Kafka!").build();    }    @Bean    public Consumer<Message<String>> testKafkaIn() {        return message -> System.out.println("Received from Kafka: " + message.getPayload());    }    @Bean    public Supplier<Message<String>> testRabbitOut() {        return () -> MessageBuilder.withPayload("Hello from RabbitMQ!").build();    }    @Bean    public Consumer<Message<String>> testRabbitIn() {        return message -> System.out.println("Received from RabbitMQ: " + message.getPayload());    }}

或者应用上面简略的写法:

发消息:

@Autowiredprivate StreamBridge streamBridge;...streamBridge.send("testRabbitOut-out-0", "hello rabbitmq");streamBridge.send("testKafkaOut-out-0", "hello kafka");...

收音讯:

@Componentpublic class ConsumersHandler {    @Bean    public Consumer<String> testKafkaIn(){        return str -> {            System.out.println("Success Rescive message from kafka: " + str);        };    }    @Bean    public Consumer<String> testRabbitIn() {        return str -> {            System.out.println("Success Rescive message from rabbitmq: " + str);        };    }}

两种写法都能够,就看你本人的抉择了,上面是收到音讯的打印后果:

Success Rescive message from rabbitmq: hello rabbitmqSuccess Rescive message from kafka: hello kafka

总结

1、配置application.yml文件的时候要留神function.definition的写法,如下所示:

function:        definition: testKafkaOut;testKafkaIn;testRabbitOut;testRabbitIn

而我一开始写成了:

testKafkaOut,testKafkaIn,testRabbitOut,testRabbitIn

导致报错:

kafka-binder,rabbit-binder, and no default binder has been set.

别小看这个问题,因为我的大意花了一周才解决,唉!
2、同样是application.yml中的配置default-binder能够不必设置,因为咱们在每个bindings中曾经指定了binder了
3、上面的写法,应用程序启动之后报:rabbitmq binder是找不到,所以应用了localhost:5672,如下所示:

spring:  cloud:    stream:      kafka:        binder:          auto-create-topics: true          brokers: xxx.xxx.xxx.xxx:xxxx      rabbit:        binder:          host: xxx.xxx.xxx.xxx          port: xxxx          username: user          password: password          virtual-host: dev

这个写法有问题,所以举荐我下面的binders写法。
4、我开始借助了chatgpt3.5,它有时候给的代码都是spring cloud strem 3.1之前的,于是我应用了4.0之后给出的代码是最新的,大家能够试试chatgpt4,这里有个举荐的地址

援用

Spring Cloud Stream 函数式编程整合 kafka/rabbit
spring-cloud-stream-samples
Spring Cloud Stream 整合Kafka
Spring Cloud Stream Rabbit 3.1.3 入门实际