通过剖析SpringCloud Stream 消费者端的工作流程,波及到的次要依赖有:
spring-cloud-stream
spring-rabbit
spring-amqp
spring-messaging
amqp-client

1、音讯驱动

1.1 剖析过程

1.1.1 筹备工作

案例中通过rabbitMQ作为消息中间件,实现SpringCloud Stream音讯驱动的剖析

1.1.2 音讯生产者

1.1.2-1 创立工程引入依赖

<dependencies>   <dependency>       <groupId>org.springframework.cloud</groupId>       <artifactId>spring-cloud-stream</artifactId>   </dependency>   <dependency>       <groupId>org.springframework.cloud</groupId>       <artifactId>spring-cloud-starter-stream-rabbit</artifactId>   </dependency>   <dependency>       <groupId>org.springframework.cloud</groupId>       <artifactId>spring-cloud-stream-binder-rabbit</artifactId>   </dependency></dependencies>

1.1.2-2 定义BINGDING

发送音讯时须要定义一个接口,不同的是接口办法的返回对象是 MessageChannel,上面是 Spring Cloud Stream 内置的接口:

public interface Source {String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output();}

这就接口申明了一个 binding 命名为 “output”。这个binding 申明了一个音讯输入流,也就是音讯的生产者。

1.1.2-3 配置APPLICATION.YML

server: port: 7001 #服务端口spring: application:   name: stream_producer #指定服务名 rabbitmq:   addresses: 192.168.142.128   username: root   password: 123456   virtual-host: /test cloud:   stream:     bindings:       output:         destination: root-default #指定音讯发送的目的地,在rabbitmq中,发送到一个root-default的exchange中     binders:  #配置绑定器       defaultRabbit:         type: rabbit

1.1.2-4 测试发送音讯

* 启动类* 入门案例:*      1.引入依赖*      2.配置application.yml文件*      3.发送音讯的话,定义一个通道接口,通过接口中内置的messagechannel*                      SpringCloudStream中内置接口  Source*      4.@EnableBinding : 绑定对应通道*      5.发送音讯的话,通过MessageChannel发送音讯*                      如果须要MessageChannel --> 通过绑定的内置接口获取** @author*/@SpringBootApplication@EnableBinding(Source.class)public class ProducerApplicationDemo implements CommandLineRunner {   @Autowired   private MessageChannel output;   public static void main(String[] args) {       SpringApplication.run(ProducerApplicationDemo.class);   }   @Override   public void run(String... args) throws Exception {       //发送MQ音讯       //messagesBuilder:工具类:创立音讯       output.send(MessageBuilder.withPayload("hello world").build());   }}

1.1.3 音讯消费者

1.1.3-1 创立工程引入依赖

<dependencies>    <dependency>        <groupId>org.springframework.cloud</groupId>        <artifactId>spring-cloud-stream</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.cloud</groupId>        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.cloud</groupId>        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>    </dependency></dependencies>

1.1.3-2 定义BINGDING

同发送音讯统一,在Spring Cloud Stream中承受音讯,须要定义一个接口,如下是内置的一个接口。

public interface Sink {String INPUT = "input";@Input(Sink.INPUT)SubscribableChannel input();}

正文 @Input 对应的办法,须要返回 SubscribableChannel ,并且参入一个参数值。
这就接口申明了一个 binding 命名为 “input” 。

1.1.3-3 配置APPLICATION.YML

server:  port: 7002 #服务端口spring:  application:    name: rabbitmq-consumer #指定服务名  rabbitmq:    addresses: 192.168.142.128    username: wgy    password: 123456    virtual-host: /test  cloud:    stream:      bindings:        input:  #内置的获取音讯的通道 , 从wgy-default中获取音讯          destination: wgy-default      binders:        defaultRabbit:          type: rabbit

1.1.3-4 测试

/** * 启动类 * 入门案例: *      1.引入依赖 *      2.配置application.yml *      3.须要配置一个通道的接口 *              内置获取音讯的通道接口 sink *      4.绑定通道 *      5.配置一个监听办法 : 当程序从中间件获取数据之后,执行的业务逻辑办法 *              须要在监听办法上配置@StreamListener * * @author  */@SpringBootApplication@EnableBinding(Sink.class)public class ConsumerApplicationDemo {    public static void main(String[] args) {        SpringApplication.run(ConsumerApplicationDemo.class);    }    /**     * 监听binding中的音讯     * @param message     */    @StreamListener(Sink.INPUT)    public void input(String message) {        System.out.println("获取到音讯:" + message);    }}

1.1.4 定义工具类

1.1.4-1 音讯生产者

/** * 负责向中间件发送数据 * * @author */@Component@EnableBinding(Source.class)public class MessageSender {    @Autowired    private MessageChannel output;    /**     * 发送音讯     *     * @param obj     */    public void send(Object obj) {        output.send(MessageBuilder.withPayload(obj).build());    }}

1.1.4-2 音讯消费者

/** * 负责向中间件获取数据 * * @author */@Component@EnableBinding(Sink.class)public class MessageListener {    /**     * 监听binding中的音讯     *     * @param message     */    @StreamListener(Sink.INPUT)    public void input(String message) {        System.out.println("获取到音讯:" + message);    }}

1.1.4-3 测试

/** * 测试类 * * @author */@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic class ProducerTest {    @Autowired    private MessageSender messageSender;    @Test    public void testSend() {        messageSender.send("hello 工具类");    }}

具体如下图所示:

1.2 自定义音讯通道

Spring Cloud Stream 内置了两种接口,别离定义了 binding 为 “input” 的输出流,和 “output” 的输入流,而在咱们理论应用中,往往是须要定义各种输入输出流。应用办法也很简略。

/** * 自定义的音讯通道 * * @author */public interface MyProcessor {    /**     * 音讯生产者的配置     */    String MYOUTPUT = "myoutput";    /**     * 音讯消费者的配置     */    String MYINPUT = "myinput";    @Output("myoutput")    MessageChannel myoutput();    @Input("myinput")    SubscribableChannel myinput();}
  • 一个接口中,能够定义无数个输入输出流,能够依据理论业务状况划分。上述的接口,定义了一个订单输出,和订单输入两个 binding。
  • 应用时,须要在@EnableBinding注解中,增加自定义的接口。
  • 应用@StreamListener做监听的时候,须要指定MyProcessor.MYINPUT

1.2.1 音讯生产者

/** * 负责向中间件发送数据 * * @author */@Component@EnableBinding(MyProcessor.class)public class MessageSender {    @Autowired    @Qualifier(value = "myoutput")    private MessageChannel myoutput;    /**     * 发送音讯     *     * @param obj     */    public void send(Object obj) {        myoutput.send(MessageBuilder.withPayload(obj).build());    }}
server:  port: 7001 #服务端口spring:  application:    name: stream_producer #指定服务名  rabbitmq:    addresses: 192.168.142.128    username: root    password: 123456    virtual-host: /test  cloud:    stream:      bindings:        output:          destination: root-default #指定音讯发送的目的地,在rabbitmq中,发送到一个root-default的exchange中        myoutput:          destination: root-custom-output      binders:  #配置绑定器        defaultRabbit:          type: rabbit

1.2.2 音讯消费者

/** * 负责向中间件获取数据 * * @author */@Component@EnableBinding(MyProcessor.class)public class MessageListener {    /**     * 监听binding中的音讯     *     * @param message     */    @StreamListener(MyProcessor.MYINPUT)    public void input(String message) {        System.out.println("获取到音讯:" + message);    }}
server:  port: 7002 #服务端口spring:  application:    name: rabbitmq-consumer #指定服务名  rabbitmq:    addresses: 192.168.142.128    username: root    password: 123456    virtual-host: /test  cloud:    stream:      bindings:        input:  #内置的获取音讯的通道 , 从root-default中获取音讯          destination: root-default        myinput:          destination: root-custom-output      binders:        defaultRabbit:          type: rabbit

1.3 音讯分组

通常在生产环境,咱们的每个服务都不会以单节点的形式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个音讯通道的指标主题(Topic)上。默认状况下,当生产者收回一条音讯到绑定通道上,这条音讯会产生多个正本被每个消费者实例接管和解决,然而有些业务场景之下,咱们心愿生产者产生的音讯只被其中一个实例生产,这个时候咱们须要为这些消费者设置生产组来实现这样的性能。

实现的形式非常简单,咱们只须要在服务消费者端设置属性即可,如以下实现:

server:  port: 7002 #服务端口spring:  application:    name: rabbitmq-consumer #指定服务名  rabbitmq:    addresses: 192.168.142.128    username: root    password: 123456    virtual-host: /test  cloud:    stream:      bindings:        input:  #内置的获取音讯的通道 , 从root-default中获取音讯          destination: root-default        myinput:          destination: root-custom-output          group: group1 #设置音讯的组名称(同名组中的多个消费者,只会有一个去生产音讯)      binders:        defaultRabbit:          type: rabbit

在同一个group中的多个消费者只有一个能够获取到音讯并生产

1.4 音讯分区

有一些场景须要满足, 同一个特色的数据被同一个实例生产, 比方同一个id的传感器监测数据必须被同一个实例统计计算剖析, 否则可能无奈获取全副的数据。又比方局部异步工作,首次申请启动task,二次申请勾销task,此场景就必须保障两次申请至同一实例.

1.4.1 音讯消费者0

server:  port: 7002 #服务端口spring:  application:    name: rabbitmq-consumer #指定服务名  rabbitmq:    addresses: 192.168.142.128    username: root    password: 123456    virtual-host: /test  cloud:    stream:      instanceCount: 2  #消费者总数      instanceIndex: 0  #以后消费者的索引      bindings:        input:  #内置的获取音讯的通道 , 从root-default中获取音讯          destination: root-default        myinput:          destination: root-custom-output          group: group1 #设置音讯的组名称(同名组中的多个消费者,只会有一个去生产音讯)          consumer:            partitioned: true  #开启分区反对      binders:        defaultRabbit:          type: rabbit

从下面的配置中,咱们能够看到减少了这三个参数:

  • spring.cloud.stream.bindings.input.consumer.partitioned :通过该参数开启消费者分区性能;
  • spring.cloud.stream.instanceCount:该参数指定了以后消费者的总实例数量;
  • spring.cloud.stream.instanceIndex :该参数设置以后实例的索引号,从0开始,最大值为spring.cloud.stream.instanceCount 参数 - 1。咱们试验的时候须要启动多个实例,能够通过运行参数来为不同实例设置不同的索引值。

1.4.2 音讯消费者1

server:  port: 7003 #服务端口spring:  application:    name: rabbitmq-consumer #指定服务名  rabbitmq:    addresses: 192.168.142.128    username: root    password: 123456    virtual-host: /test  cloud:    stream:      instanceCount: 2  #消费者总数      instanceIndex: 1  #以后消费者的索引      bindings:        input:  #内置的获取音讯的通道 , 从root-default中获取音讯          destination: root-default        myinput:          destination: root-custom-output          group: group1 #设置音讯的组名称(同名组中的多个消费者,只会有一个去生产音讯)          consumer:            partitioned: true  #开启分区反对      binders:        defaultRabbit:          type: rabbit

1.4.3 音讯生产者

server:  port: 7001 #服务端口spring:  application:    name: stream_producer #指定服务名  rabbitmq:    addresses: 192.168.142.128    username: root    password: 123456    virtual-host: /test  cloud:    stream:      bindings:        output:          destination: rootdefault #指定音讯发送的目的地,在rabbitmq中,发送到一个root-default的exchange中        myoutput:          destination: root-custom-output          producer:            partition-key-expression: payload  #分区关键字   对象中的id,对象            partition-count: 2  #分区大小      binders:  #配置绑定器        defaultRabbit:          type: rabbit

从下面的配置中,咱们能够看到减少了这两个参数:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression :通过该参数指定了分区键的表达式规定,咱们能够依据理论的输入音讯规定来配置SpEL来生成适合的分区键;
  • spring.cloud.stream.bindings.output.producer.partitionCount :该参数指定了音讯分区的数量。

到这里音讯分区配置就实现了,咱们能够再次启动这两个利用,同时消费者启动多个,但须要留神的是要为消费者指定不同的实例索引号,这样当同一个音讯被发给生产组时,咱们能够发现只有一个生产实例在接管和解决这些雷同的音讯。