1.Stream音讯驱动是什么
2.Stream音讯驱动案例阐明
3.Stream音讯驱动之生产者
4.Stream音讯驱动之消费者
5.Stream音讯驱动分组生产和长久化
1.Stream音讯驱动是什么
1.1)队列和服务耦合带来的问题
在咱们的日常开发当中,会应用到很多的队列,比方有rabbitmq,kafka,rocketMQ,kafka等等。
所以咱们在编写我的项目的时候,我的项目会和队列的api进行耦合,当咱们切换队列,或者队列与队列之间传输信息的时候,这种中间件的差异性会给咱们造成极大的困扰,如果咱们用了一种队列,前面又有新的业务需要,咱们想往另外一种音讯队列进行迁徙,这个时候无疑就是一种灾难性的,很多货色都要推倒重做,因为这些队列和咱们的零碎耦合了。
1.2)springCloud stream是什么?
springCloud stream是一个让开发者调用下层api,就能屏蔽底层队列的区别,构建音讯驱动服务的框架。
所以咱们只须要搞清楚如何与Spring Cloud Stream 交互就能够方便使用音讯驱动的形式。
目前仅反对RabbitMQ、Kafka。
简略地来说,spring cloud stream就是一种屏蔽底层消息中间件的差别,升高切换老本,对立音讯的编程模型。
1.3)为什么要应用spring cloud stream
当咱们提出为什么要应用一个工具的时候,咱们要想不应用这个工具会有什么结果?
咱们从下面的形容能够看出,stream音讯驱动是一个解耦工具,不应用这个工具,应用程序就会和队列进行耦合,为了解除这种耦合,咱们采纳了spring cloud stream。
1.4)stream凭什么能够对立底层差别?
咱们能够先来看一下,传统的mq是如何工作的?
传统的mq模型分为生产者,消费者,音讯通道。
生产者把音讯通过音讯通道发送给消费者。
咱们再来看一下spring Cloud Stream给咱们提供了一种解耦的形式。
这里引出了一个很重要的概念Binder,在没有Binder这个概念的状况下,咱们的springBoot利用要间接与消息中间件进行信息交互,因为消息中间件构建的理念不同,它们的实现细节上会有较大的差别,应用办法也会有很大的差别,通过定义Binder为中间层,完满地实现了应用程序与消息中间件之间的隔离。
1.5)Spring Cloud Stream的几个重要概念
Binder:
INPUT对应于消费者。
OUTPUT对应于生产者。
这里咱们可能会感觉到很奇怪,咱们平时不是认为input是生产者,output是消费者吗?
其实咱们只有转一个方向就明确了:
out是发送信息的那一方,是生产者。
input是信息的输出方,是消费者。
而咱们平时可能是站在音讯通道的角度来看的,in是输出,是生产,out是输入,是生产。
咱们站音讯通道的在里面,思考一下这个问题,就明确了。
Channel:
通道,是队列Queue的一种形象,在音讯通信零碎中就是实现存储和转发的媒介,通过Channel对队列进行配置。
Source和Sink:
Source:音讯发送者
Sink:音讯接受者
1.6)罕用注解
组成 | 阐明B |
---|---|
@Input | 音讯输出通道,音讯的消费者 |
@OutPut | 音讯输入通道,音讯的生产者 |
@StreamListener | 监听队列,用于消费者的队列的音讯接管 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
2.Stream音讯驱动案例阐明
咱们创立三个我的项目,两个为消费者,一个为生产者。
3.Stream音讯驱动之生产者
pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>service-stream-provider-8601</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>service-stream-provider-8601</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
yml:
server:
port: 8601
eureka:
instance:
hostname: service-stream-provider #eureka服务端的实例名称
prefer-ip-address: true #拜访门路能够显示IP地址
instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局
client:
register-with-eureka: true #false示意不向注册核心注册本人。
fetch-registry: true #false示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务
service-url:
#集群指向其它eureka
defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/
spring:
rabbitmq: #队列的根本配置
host: localhost
port: 5672
username: guest
password: guest
application:
name: service-stream-provider
cloud:
stream:
bindings:
output: #生产者 因为out是出,是发送音讯的一方,所以是生产者
destination: studyExchange # 示意要应用的Exchange名称定义
content-type: application/json # 设置音讯类型,本次为json,文本则设置“text/plain”
binder: rabbit # 设置要绑定的音讯服务的具体设置
feign:
hystrix:
enabled: true
测试类:
public interface IMessageProvider {
String send() ;
}
import com.example.demo.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @author sulingfeng
* @title: MessageProviderImpl
* @projectName CloudFamily
* @description: TODO
* @date 2022/4/13 17:01
*/
@EnableBinding(Source.class) // 能够了解为是一个音讯的发送管道的定义
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; // 音讯的发送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
this.output.send(MessageBuilder.withPayload(serial).build()); // 创立并发送音讯
System.out.println("***serial: " + serial);
return serial;
}
}
@RestController
public class ProviderController {
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage()
{
return messageProvider.send();
}
}
4.Stream音讯驱动之消费者
消费者咱们创立两个,不过代码大部分都是一样的。
pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>service-stream-consumer-8702</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>service-stream-consumer-8702</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
yml:
server:
port: 8702
eureka:
instance:
hostname: service-stream-provider #eureka服务端的实例名称
prefer-ip-address: true #拜访门路能够显示IP地址
instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局
client:
register-with-eureka: true #false示意不向注册核心注册本人。
fetch-registry: true #false示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务
service-url:
#集群指向其它eureka
defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
application:
name: service-stream-provider
cloud:
stream:
bindings:
input: #消费者 音讯的输出方
destination: studyExchange # 示意要应用的Exchange名称定义
content-type: application/json # 设置音讯类型,本次为json,文本则设置“text/plain”
binder: rabbit # 设置要绑定的音讯服务的具体设置
feign:
hystrix:
enabled: true
测试类
@Component
@EnableBinding(Sink.class)//音讯的消费者
public class ReceiveMessageListener {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)//和队列绑定
public void input(Message message)
{
System.out.println("消费者,------->接管到的音讯:" + message.getPayload()+"\t port: "+serverPort);
}
}
5.Stream音讯驱动分组生产和长久化
接下来咱们启动消费者和生产者,而后生产者发送一条消息,会发现消费者的确收到了音讯,不过两个消费者都收到了音讯,这可能就会造成音讯的反复生产
这个时候,咱们就能够用stream中的音讯分组来解决:
在stream中,处于同一个group的多个消费者是竞争关系,就可能保障一条音讯只能被其中一个服务生产。
不同组是能够反复生产的。
同一组内有竞争关系,只能被其中一个组生产。
批改yml:
server:
port: 8702
eureka:
instance:
hostname: service-stream-provider #eureka服务端的实例名称
prefer-ip-address: true #拜访门路能够显示IP地址
instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局
client:
register-with-eureka: true #false示意不向注册核心注册本人。
fetch-registry: true #false示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务
service-url:
#集群指向其它eureka
defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
application:
name: service-stream-provider
cloud:
stream:
bindings:
input: #消费者 音讯的输出方
destination: studyExchange # 示意要应用的Exchange名称定义
content-type: application/json # 设置音讯类型,本次为json,文本则设置“text/plain”
binder: rabbit # 设置要绑定的音讯服务的具体设置
group: atguiguA
feign:
hystrix:
enabled: true
退出分组之后,咱们就能够防止反复生产了。
生产者:
消费者1:
消费者2:
解决了反复的音讯,此时咱们来看看长久化,其实加上group属性就加上了长久化,咱们把两个消费者服务都关掉,批改yml,一个有group,一个没有group,而后间断发几条信息,再启动消费者,发现有group的服务的消费者仍然能够生产服务离线时候的音讯,然而没有group的消费者,就没法生产那些在它这个流动离线的时候生产的音讯了。
发表回复