1.Stream音讯驱动是什么
2.Stream音讯驱动案例阐明
3.Stream音讯驱动之生产者
4.Stream音讯驱动之消费者
5.Stream音讯驱动分组生产和长久化
1.Stream音讯驱动是什么
1.1)队列和服务耦合带来的问题
在咱们的日常开发当中,会应用到很多的队列,比方有rabbitmq,kafka,rocketMQ,kafka等等。
所以咱们在编写我的项目的时候,我的项目会和队列的api进行耦合,当咱们切换队列,或者队列与队列之间传输信息的时候,这种中间件的差异性会给咱们造成极大的困扰,如果咱们用了一种队列,前面又有新的业务需要,咱们想往另外一种音讯队列进行迁徙,这个时候无疑就是一种灾难性的,很多货色都要推倒重做,因为这些队列和咱们的零碎耦合了。
1.2)springCloud stream是什么?
springCloud stream是一个让开发者调用下层api,就能屏蔽底层队列的区别,构建音讯驱动服务的框架。
所以咱们只须要搞清楚如何与Spring Cloud Stream 交互就能够方便使用音讯驱动的形式。
目前仅反对RabbitMQ、Kafka。
简略地来说,spring cloud stream就是一种屏蔽底层消息中间件的差别,升高切换老本,对立音讯的编程模型。
1.3)为什么要应用spring cloud stream
当咱们提出为什么要应用一个工具的时候,咱们要想不应用这个工具会有什么结果?
咱们从下面的形容能够看出,stream音讯驱动是一个解耦工具,不应用这个工具,应用程序就会和队列进行耦合,为了解除这种耦合,咱们采纳了spring cloud stream。
1.4)stream凭什么能够对立底层差别?
咱们能够先来看一下,传统的mq是如何工作的?
传统的mq模型分为生产者,消费者,音讯通道。
生产者把音讯通过音讯通道发送给消费者。
咱们再来看一下spring Cloud Stream给咱们提供了一种解耦的形式。
这里引出了一个很重要的概念Binder,在没有Binder这个概念的状况下,咱们的springBoot利用要间接与消息中间件进行信息交互,因为消息中间件构建的理念不同,它们的实现细节上会有较大的差别,应用办法也会有很大的差别,通过定义Binder为中间层,完满地实现了应用程序与消息中间件之间的隔离。
1.5)Spring Cloud Stream的几个重要概念
Binder:
INPUT对应于消费者。
OUTPUT对应于生产者。
这里咱们可能会感觉到很奇怪,咱们平时不是认为input是生产者,output是消费者吗?
其实咱们只有转一个方向就明确了:
out是发送信息的那一方,是生产者。
input是信息的输出方,是消费者。
而咱们平时可能是站在音讯通道的角度来看的,in是输出,是生产,out是输入,是生产。
咱们站音讯通道的在里面,思考一下这个问题,就明确了。
Channel:
通道,是队列Queue的一种形象,在音讯通信零碎中就是实现存储和转发的媒介,通过Channel对队列进行配置。
Source和Sink:
Source:音讯发送者
Sink:音讯接受者
1.6)罕用注解
组成 | 阐明B |
---|---|
@Input | 音讯输出通道,音讯的消费者 |
@OutPut | 音讯输入通道,音讯的生产者 |
@StreamListener | 监听队列,用于消费者的队列的音讯接管 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
2.Stream音讯驱动案例阐明
咱们创立三个我的项目,两个为消费者,一个为生产者。
3.Stream音讯驱动之生产者
pom:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>service-stream-provider-8601</artifactId> <version>0.0.1-SNAPSHOT</version> <name>service-stream-provider-8601</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR1</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
yml:
server: port: 8601eureka: instance: hostname: service-stream-provider #eureka服务端的实例名称 prefer-ip-address: true #拜访门路能够显示IP地址 instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局 client: register-with-eureka: true #false示意不向注册核心注册本人。 fetch-registry: true #false示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务 service-url: #集群指向其它eureka defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/spring: rabbitmq: #队列的根本配置 host: localhost port: 5672 username: guest password: guest application: name: service-stream-provider cloud: stream: bindings: output: #生产者 因为out是出,是发送音讯的一方,所以是生产者 destination: studyExchange # 示意要应用的Exchange名称定义 content-type: application/json # 设置音讯类型,本次为json,文本则设置“text/plain” binder: rabbit # 设置要绑定的音讯服务的具体设置feign: hystrix: enabled: true
测试类:
public interface IMessageProvider { String send() ;}
import com.example.demo.service.IMessageProvider;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.integration.support.MessageBuilder;import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;import java.util.UUID;/** * @author sulingfeng * @title: MessageProviderImpl * @projectName CloudFamily * @description: TODO * @date 2022/4/13 17:01 */@EnableBinding(Source.class) // 能够了解为是一个音讯的发送管道的定义public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; // 音讯的发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); this.output.send(MessageBuilder.withPayload(serial).build()); // 创立并发送音讯 System.out.println("***serial: " + serial); return serial; }}
@RestControllerpublic class ProviderController { @Resource private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProvider.send(); }}
4.Stream音讯驱动之消费者
消费者咱们创立两个,不过代码大部分都是一样的。
pom:
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>service-stream-consumer-8702</artifactId> <version>0.0.1-SNAPSHOT</version> <name>service-stream-consumer-8702</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR1</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
yml:
server: port: 8702eureka: instance: hostname: service-stream-provider #eureka服务端的实例名称 prefer-ip-address: true #拜访门路能够显示IP地址 instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局 client: register-with-eureka: true #false示意不向注册核心注册本人。 fetch-registry: true #false示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务 service-url: #集群指向其它eureka defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/spring: rabbitmq: host: localhost port: 5672 username: guest password: guest application: name: service-stream-provider cloud: stream: bindings: input: #消费者 音讯的输出方 destination: studyExchange # 示意要应用的Exchange名称定义 content-type: application/json # 设置音讯类型,本次为json,文本则设置“text/plain” binder: rabbit # 设置要绑定的音讯服务的具体设置feign: hystrix: enabled: true
测试类
@Component@EnableBinding(Sink.class)//音讯的消费者public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT)//和队列绑定 public void input(Message message) { System.out.println("消费者,------->接管到的音讯:" + message.getPayload()+"\t port: "+serverPort); }}
5.Stream音讯驱动分组生产和长久化
接下来咱们启动消费者和生产者,而后生产者发送一条消息,会发现消费者的确收到了音讯,不过两个消费者都收到了音讯,这可能就会造成音讯的反复生产
这个时候,咱们就能够用stream中的音讯分组来解决:
在stream中,处于同一个group的多个消费者是竞争关系,就可能保障一条音讯只能被其中一个服务生产。
不同组是能够反复生产的。
同一组内有竞争关系,只能被其中一个组生产。
批改yml:
server: port: 8702eureka: instance: hostname: service-stream-provider #eureka服务端的实例名称 prefer-ip-address: true #拜访门路能够显示IP地址 instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局 client: register-with-eureka: true #false示意不向注册核心注册本人。 fetch-registry: true #false示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务 service-url: #集群指向其它eureka defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/spring: rabbitmq: host: localhost port: 5672 username: guest password: guest application: name: service-stream-provider cloud: stream: bindings: input: #消费者 音讯的输出方 destination: studyExchange # 示意要应用的Exchange名称定义 content-type: application/json # 设置音讯类型,本次为json,文本则设置“text/plain” binder: rabbit # 设置要绑定的音讯服务的具体设置 group: atguiguAfeign: hystrix: enabled: true
退出分组之后,咱们就能够防止反复生产了。
生产者:
消费者1:
消费者2:
解决了反复的音讯,此时咱们来看看长久化,其实加上group属性就加上了长久化,咱们把两个消费者服务都关掉,批改yml,一个有group,一个没有group,而后间断发几条信息,再启动消费者,发现有group的服务的消费者仍然能够生产服务离线时候的音讯,然而没有group的消费者,就没法生产那些在它这个流动离线的时候生产的音讯了。