乐趣区

关于springcloud:深入学习springCloudStream消息驱动

1.Stream 音讯驱动是什么

2.Stream 音讯驱动案例阐明

3.Stream 音讯驱动之生产者

4.Stream 音讯驱动之消费者

5.Stream 音讯驱动分组生产和长久化

1.Stream 音讯驱动是什么

1.1)队列和服务耦合带来的问题
在咱们的日常开发当中,会 应用到很多的队列,比方有 rabbitmq,kafka,rocketMQ,kafka 等等。

所以咱们在编写我的项目的时候,我的项目会和队列的 api 进行耦合 当咱们切换队列 ,或者 队列与队列之间传输信息的时候 ,这种 中间件的差异性会给咱们造成极大的困扰 ,如果咱们用了一种队列,前面又有新的业务需要,咱们想往另外一种音讯队列进行迁徙,这个时候无疑就是一种灾难性的, 很多货色都要推倒重做,因为这些队列和咱们的零碎耦合了。

1.2)springCloud stream 是什么?
springCloud stream 是一个让开发者调用下层 api,就能 屏蔽底层队列的区别,构建音讯驱动服务的框架。

所以咱们只须要搞清楚如何与 Spring Cloud Stream 交互就能够方便使用音讯驱动的形式。

目前仅反对 RabbitMQ、Kafka。

简略地来说,spring cloud stream 就是一种屏 蔽底层消息中间件的差别, 升高切换老本,对立音讯的编程模型。

1.3)为什么要应用 spring cloud stream

当咱们提出为什么要应用一个工具的时候,咱们要想不应用这个工具会有什么结果?
咱们从下面的形容能够看出,stream 音讯驱动是一个解耦工具 ,不应用这个工具,应用程序就会和队列进行耦合, 为了解除这种耦合,咱们采纳了 spring cloud stream。

1.4)stream 凭什么能够对立底层差别?

咱们能够先来看一下,传统的 mq 是如何工作的?

传统的 mq 模型分为 生产者,消费者,音讯通道。
生产者把音讯通过音讯通道发送给消费者。

咱们再来看一下 spring Cloud Stream 给咱们提供了一种解耦的形式。

这里引出了一个很重要的概念 Binder,在 没有 Binder这个概念的状况下,咱们的 springBoot 利用要间接与消息中间件进行信息交互,因为 消息中间件构建的理念不同 ,它们的实现细节上会有较大的差别,应用办法也会有很大的差别, 通过定义 Binder 为中间层,完满地实现了应用程序与消息中间件之间的隔离

1.5)Spring Cloud Stream 的几个重要概念

Binder:
INPUT 对应于消费者。
OUTPUT 对应于生产者。

这里咱们可能会感觉到很奇怪,咱们平时不是认为 input 是生产者,output 是消费者吗?
其实咱们只有转一个方向就明确了:
out 是发送信息的那一方,是生产者。
input 是信息的输出方,是消费者。
而咱们平时可能是站在音讯通道的角度来看的,in 是输出,是生产,out 是输入,是生产。
咱们站音讯通道的在里面,思考一下这个问题,就明确了。

Channel:
通道,是队列 Queue 的一种形象,在音讯通信零碎中就是实现存储和转发的媒介,通过 Channel 对队列进行配置。

Source 和 Sink:
Source:音讯发送者
Sink:音讯接受者

1.6)罕用注解

组成 阐明 B
@Input 音讯输出通道,音讯的消费者
@OutPut 音讯输入通道,音讯的生产者
@StreamListener 监听队列,用于消费者的队列的音讯接管
@EnableBinding 指信道 channel 和 exchange 绑定在一起

2.Stream 音讯驱动案例阐明

咱们创立三个我的项目,两个为消费者,一个为生产者。

3.Stream 音讯驱动之生产者

pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>service-stream-provider-8601</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-stream-provider-8601</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR1</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

yml:

server:
  port: 8601

eureka:
  instance:
    hostname: service-stream-provider #eureka 服务端的实例名称
    prefer-ip-address: true #拜访门路能够显示 IP 地址
    instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局
  client:
    register-with-eureka: true     #false 示意不向注册核心注册本人。fetch-registry: true     #false 示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务
    service-url:
      #集群指向其它 eureka
      defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/
spring:
  rabbitmq: #队列的根本配置
    host: localhost
    port: 5672
    username: guest
    password: guest
  application:
    name: service-stream-provider
  cloud:
    stream:
      bindings:
        output: #生产者 因为 out 是出,是发送音讯的一方,所以是生产者
          destination: studyExchange # 示意要应用的 Exchange 名称定义
          content-type: application/json # 设置音讯类型,本次为 json,文本则设置“text/plain”binder: rabbit # 设置要绑定的音讯服务的具体设置
feign:
  hystrix:
    enabled: true

测试类:

public interface IMessageProvider {String send() ;
}
import com.example.demo.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * @author sulingfeng
 * @title: MessageProviderImpl
 * @projectName CloudFamily
 * @description: TODO
 * @date 2022/4/13 17:01
 */
@EnableBinding(Source.class) // 能够了解为是一个音讯的发送管道的定义
public class MessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output; // 音讯的发送管道

    @Override
    public String send() {String serial = UUID.randomUUID().toString();
        this.output.send(MessageBuilder.withPayload(serial).build()); // 创立并发送音讯
        System.out.println("***serial:" + serial);
        return serial;
    }
}
@RestController
public class ProviderController {

    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {return messageProvider.send();
    }

}

4.Stream 音讯驱动之消费者

消费者咱们创立两个,不过代码大部分都是一样的。

pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>service-stream-consumer-8702</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-stream-consumer-8702</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR1</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

yml:

server:
  port: 8702

eureka:
  instance:
    hostname: service-stream-provider #eureka 服务端的实例名称
    prefer-ip-address: true #拜访门路能够显示 IP 地址
    instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局
  client:
    register-with-eureka: true     #false 示意不向注册核心注册本人。fetch-registry: true     #false 示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务
    service-url:
      #集群指向其它 eureka
      defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  application:
    name: service-stream-provider
  cloud:
    stream:
      bindings:
        input: #消费者  音讯的输出方 
          destination: studyExchange # 示意要应用的 Exchange 名称定义
          content-type: application/json # 设置音讯类型,本次为 json,文本则设置“text/plain”binder: rabbit # 设置要绑定的音讯服务的具体设置
feign:
  hystrix:
    enabled: true

测试类

@Component
@EnableBinding(Sink.class)// 音讯的消费者
public class ReceiveMessageListener {@Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)// 和队列绑定
    public void input(Message message)
    {System.out.println("消费者,-------> 接管到的音讯:" + message.getPayload()+"\t port:"+serverPort);
    }
}

5.Stream 音讯驱动分组生产和长久化

接下来咱们启动消费者和生产者,而后 生产者发送一条消 息,会发现 消费者的确收到了音讯 不过两个消费者都收到了音讯 ,这可能就会造成音讯的 反复生产

这个时候,咱们就能够用 stream 中的音讯 分组来解决
在 stream 中,处于 同一个 group 的多个消费者是竞争关系 就可能保障一条音讯只能被其中一个服务生产
不同组是能够反复生产的
同一组内有竞争关系,只能被其中一个组生产

批改 yml:

server:
  port: 8702

eureka:
  instance:
    hostname: service-stream-provider #eureka 服务端的实例名称
    prefer-ip-address: true #拜访门路能够显示 IP 地址
    instance-id: ${spring.cloud.client.ip-address}:${server.port}-service-stream-provider #拜访门路的名称格局
  client:
    register-with-eureka: true     #false 示意不向注册核心注册本人。fetch-registry: true     #false 示意本人端就是注册核心,我的职责就是保护服务实例,并不需要去检索服务
    service-url:
      #集群指向其它 eureka
      defaultZone: http://127.0.0.1:8001/eureka/,http://127.0.0.1:8002/eureka/
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  application:
    name: service-stream-provider
  cloud:
    stream:
      bindings:
        input: #消费者  音讯的输出方
          destination: studyExchange # 示意要应用的 Exchange 名称定义
          content-type: application/json # 设置音讯类型,本次为 json,文本则设置“text/plain”binder: rabbit # 设置要绑定的音讯服务的具体设置
          group: atguiguA
feign:
  hystrix:
    enabled: true

退出分组之后,咱们就能够防止反复生产了。
生产者:

消费者 1:


消费者 2:

解决了反复的音讯,此时咱们来看看 长久化,其实加上group 属性就加上了长久化,咱们把两个消费者服务都关掉,批改 yml,一个有 group,一个没有 group,而后间断发几条信息,再启动消费者,发现有 group 的服务的消费者仍然能够生产服务离线时候的音讯,然而没有 group 的消费者,就没法生产那些在它这个流动离线的时候生产的音讯了。

退出移动版