1.Stream音讯驱动是什么

2.Stream音讯驱动案例阐明

3.Stream音讯驱动之生产者

4.Stream音讯驱动之消费者

5.Stream音讯驱动分组生产和长久化

1.Stream音讯驱动是什么

1.1)队列和服务耦合带来的问题
在咱们的日常开发当中,会应用到很多的队列,比方有rabbitmq,kafka,rocketMQ,kafka等等。

所以咱们在编写我的项目的时候,我的项目会和队列的api进行耦合当咱们切换队列,或者队列与队列之间传输信息的时候,这种中间件的差异性会给咱们造成极大的困扰,如果咱们用了一种队列,前面又有新的业务需要,咱们想往另外一种音讯队列进行迁徙,这个时候无疑就是一种灾难性的,很多货色都要推倒重做,因为这些队列和咱们的零碎耦合了。

1.2)springCloud stream是什么?
springCloud stream是一个让开发者调用下层api,就能屏蔽底层队列的区别,构建音讯驱动服务的框架。

所以咱们只须要搞清楚如何与Spring Cloud Stream 交互就能够方便使用音讯驱动的形式。

目前仅反对RabbitMQ、Kafka。

简略地来说,spring cloud stream就是一种屏蔽底层消息中间件的差别,升高切换老本,对立音讯的编程模型。

1.3)为什么要应用spring cloud stream

当咱们提出为什么要应用一个工具的时候,咱们要想不应用这个工具会有什么结果?
咱们从下面的形容能够看出,stream音讯驱动是一个解耦工具,不应用这个工具,应用程序就会和队列进行耦合,为了解除这种耦合,咱们采纳了spring cloud stream。

1.4)stream凭什么能够对立底层差别?

咱们能够先来看一下,传统的mq是如何工作的?

传统的mq模型分为生产者,消费者,音讯通道。
生产者把音讯通过音讯通道发送给消费者。

咱们再来看一下spring Cloud Stream给咱们提供了一种解耦的形式。

这里引出了一个很重要的概念Binder,在没有Binder这个概念的状况下,咱们的springBoot利用要间接与消息中间件进行信息交互,因为消息中间件构建的理念不同,它们的实现细节上会有较大的差别,应用办法也会有很大的差别,通过定义Binder为中间层,完满地实现了应用程序与消息中间件之间的隔离

1.5)Spring Cloud Stream的几个重要概念

Binder:
INPUT对应于消费者。
OUTPUT对应于生产者。

这里咱们可能会感觉到很奇怪,咱们平时不是认为input是生产者,output是消费者吗?
其实咱们只有转一个方向就明确了:
out是发送信息的那一方,是生产者。
input是信息的输出方,是消费者。
而咱们平时可能是站在音讯通道的角度来看的,in是输出,是生产,out是输入,是生产。
咱们站音讯通道的在里面,思考一下这个问题,就明确了。

Channel:
通道,是队列Queue的一种形象,在音讯通信零碎中就是实现存储和转发的媒介,通过Channel对队列进行配置。

Source和Sink:
Source:音讯发送者
Sink:音讯接受者

1.6)罕用注解

组成阐明B
@Input音讯输出通道,音讯的消费者
@OutPut音讯输入通道,音讯的生产者
@StreamListener监听队列,用于消费者的队列的音讯接管
@EnableBinding指信道channel和exchange绑定在一起

2.Stream音讯驱动案例阐明

咱们创立三个我的项目,两个为消费者,一个为生产者。

3.Stream音讯驱动之生产者

pom:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.2.2.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.example</groupId>    <artifactId>service-stream-provider-8601</artifactId>    <version>0.0.1-SNAPSHOT</version>    <name>service-stream-provider-8601</name>    <description>Demo project for Spring Boot</description>    <properties>        <java.version>1.8</java.version>        <spring-cloud.version>Hoxton.SR1</spring-cloud.version>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-actuator</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>    </dependencies>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.cloud</groupId>                <artifactId>spring-cloud-dependencies</artifactId>                <version>${spring-cloud.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement>    <build>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project>

yml:

server:  port: 8601eureka:  instance:    hostname: service-stream-provider #eureka服务端的实例名称    prefer-ip-address: true #拜访门路能够显示IP地址    instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局  client:    register-with-eureka: true     #false示意不向注册核心注册本人。    fetch-registry: true     #false示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务    service-url:      #集群指向其它eureka      defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/spring:  rabbitmq: #队列的根本配置    host: localhost    port: 5672    username: guest    password: guest  application:    name: service-stream-provider  cloud:    stream:      bindings:        output: #生产者 因为out是出,是发送音讯的一方,所以是生产者          destination: studyExchange # 示意要应用的Exchange名称定义          content-type: application/json # 设置音讯类型,本次为json,文本则设置“text/plain”          binder: rabbit # 设置要绑定的音讯服务的具体设置feign:  hystrix:    enabled: true

测试类:

public interface IMessageProvider {    String send() ;}
import com.example.demo.service.IMessageProvider;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.integration.support.MessageBuilder;import org.springframework.messaging.MessageChannel;import javax.annotation.Resource;import java.util.UUID;/** * @author sulingfeng * @title: MessageProviderImpl * @projectName CloudFamily * @description: TODO * @date 2022/4/13 17:01 */@EnableBinding(Source.class) // 能够了解为是一个音讯的发送管道的定义public class MessageProviderImpl implements IMessageProvider {    @Resource    private MessageChannel output; // 音讯的发送管道    @Override    public String send() {        String serial = UUID.randomUUID().toString();        this.output.send(MessageBuilder.withPayload(serial).build()); // 创立并发送音讯        System.out.println("***serial: " + serial);        return serial;    }}
@RestControllerpublic class ProviderController {    @Resource    private IMessageProvider messageProvider;    @GetMapping(value = "/sendMessage")    public String sendMessage()    {        return messageProvider.send();    }}

4.Stream音讯驱动之消费者

消费者咱们创立两个,不过代码大部分都是一样的。

pom:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.2.2.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.example</groupId>    <artifactId>service-stream-consumer-8702</artifactId>    <version>0.0.1-SNAPSHOT</version>    <name>service-stream-consumer-8702</name>    <description>Demo project for Spring Boot</description>    <properties>        <java.version>1.8</java.version>        <spring-cloud.version>Hoxton.SR1</spring-cloud.version>    </properties>    <dependencies>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-actuator</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>    </dependencies>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.cloud</groupId>                <artifactId>spring-cloud-dependencies</artifactId>                <version>${spring-cloud.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement>    <build>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project>

yml:

server:  port: 8702eureka:  instance:    hostname: service-stream-provider #eureka服务端的实例名称    prefer-ip-address: true #拜访门路能够显示IP地址    instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局  client:    register-with-eureka: true     #false示意不向注册核心注册本人。    fetch-registry: true     #false示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务    service-url:      #集群指向其它eureka      defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/spring:  rabbitmq:    host: localhost    port: 5672    username: guest    password: guest  application:    name: service-stream-provider  cloud:    stream:      bindings:        input: #消费者  音讯的输出方           destination: studyExchange # 示意要应用的Exchange名称定义          content-type: application/json # 设置音讯类型,本次为json,文本则设置“text/plain”          binder: rabbit # 设置要绑定的音讯服务的具体设置feign:  hystrix:    enabled: true

测试类

@Component@EnableBinding(Sink.class)//音讯的消费者public class ReceiveMessageListener {    @Value("${server.port}")    private String serverPort;    @StreamListener(Sink.INPUT)//和队列绑定    public void input(Message message)    {        System.out.println("消费者,------->接管到的音讯:" + message.getPayload()+"\t port: "+serverPort);    }}

5.Stream音讯驱动分组生产和长久化

接下来咱们启动消费者和生产者,而后生产者发送一条消息,会发现消费者的确收到了音讯不过两个消费者都收到了音讯,这可能就会造成音讯的反复生产

这个时候,咱们就能够用stream中的音讯分组来解决
在stream中,处于同一个group的多个消费者是竞争关系就可能保障一条音讯只能被其中一个服务生产
不同组是能够反复生产的
同一组内有竞争关系,只能被其中一个组生产

批改yml:

server:  port: 8702eureka:  instance:    hostname: service-stream-provider #eureka服务端的实例名称    prefer-ip-address: true #拜访门路能够显示IP地址    instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局  client:    register-with-eureka: true     #false示意不向注册核心注册本人。    fetch-registry: true     #false示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务    service-url:      #集群指向其它eureka      defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/spring:  rabbitmq:    host: localhost    port: 5672    username: guest    password: guest  application:    name: service-stream-provider  cloud:    stream:      bindings:        input: #消费者  音讯的输出方          destination: studyExchange # 示意要应用的Exchange名称定义          content-type: application/json # 设置音讯类型,本次为json,文本则设置“text/plain”          binder: rabbit # 设置要绑定的音讯服务的具体设置          group: atguiguAfeign:  hystrix:    enabled: true

退出分组之后,咱们就能够防止反复生产了。
生产者:

消费者1:


消费者2:

解决了反复的音讯,此时咱们来看看长久化,其实加上group属性就加上了长久化,咱们把两个消费者服务都关掉,批改yml,一个有group,一个没有group,而后间断发几条信息,再启动消费者,发现有group的服务的消费者仍然能够生产服务离线时候的音讯,然而没有group的消费者,就没法生产那些在它这个流动离线的时候生产的音讯了。