通过剖析 SpringCloud Stream 消费者端的工作流程,波及到的次要依赖有:spring-cloud-stream
spring-rabbit
spring-amqp
spring-messaging
amqp-client
1、音讯驱动
1.1 剖析过程
1.1.1 筹备工作
案例中通过 rabbitMQ 作为消息中间件,实现 SpringCloud Stream 音讯驱动的剖析
1.1.2 音讯生产者
1.1.2-1 创立工程引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
1.1.2-2 定义 BINGDING
发送音讯时须要定义一个接口,不同的是接口办法的返回对象是 MessageChannel,上面是 Spring Cloud Stream 内置的接口:
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();}
这就接口申明了一个 binding 命名为“output”。这个 binding 申明了一个音讯输入流,也就是音讯的生产者。
1.1.2-3 配置 APPLICATION.YML
server:
port: 7001 #服务端口
spring:
application:
name: stream_producer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
output:
destination: root-default #指定音讯发送的目的地, 在 rabbitmq 中, 发送到一个 root-default 的 exchange 中
binders: #配置绑定器
defaultRabbit:
type: rabbit
1.1.2-4 测试发送音讯
* 启动类
* 入门案例:
* 1. 引入依赖
* 2. 配置 application.yml 文件
* 3. 发送音讯的话, 定义一个通道接口, 通过接口中内置的 messagechannel
* SpringCloudStream 中内置接口 Source
* 4.@EnableBinding : 绑定对应通道
* 5. 发送音讯的话, 通过 MessageChannel 发送音讯
* 如果须要 MessageChannel --> 通过绑定的内置接口获取
*
* @author
*/
@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplicationDemo implements CommandLineRunner {
@Autowired
private MessageChannel output;
public static void main(String[] args) {SpringApplication.run(ProducerApplicationDemo.class);
}
@Override
public void run(String... args) throws Exception {
// 发送 MQ 音讯
//messagesBuilder: 工具类:创立音讯
output.send(MessageBuilder.withPayload("hello world").build());
}
}
1.1.3 音讯消费者
1.1.3-1 创立工程引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>
1.1.3-2 定义 BINGDING
同发送音讯统一,在 Spring Cloud Stream 中承受音讯,须要定义一个接口,如下是内置的一个接口。
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();}
正文 @Input 对应的办法,须要返回 SubscribableChannel,并且参入一个参数值。
这就接口申明了一个 binding 命名为“input”。
1.1.3-3 配置 APPLICATION.YML
server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: wgy
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
input: #内置的获取音讯的通道 , 从 wgy-default 中获取音讯
destination: wgy-default
binders:
defaultRabbit:
type: rabbit
1.1.3-4 测试
/**
* 启动类
* 入门案例:
* 1. 引入依赖
* 2. 配置 application.yml
* 3. 须要配置一个通道的接口
* 内置获取音讯的通道接口 sink
* 4. 绑定通道
* 5. 配置一个监听办法 : 当程序从中间件获取数据之后, 执行的业务逻辑办法
* 须要在监听办法上配置 @StreamListener
*
* @author
*/
@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplicationDemo {public static void main(String[] args) {SpringApplication.run(ConsumerApplicationDemo.class);
}
/**
* 监听 binding 中的音讯
* @param message
*/
@StreamListener(Sink.INPUT)
public void input(String message) {System.out.println("获取到音讯:" + message);
}
}
1.1.4 定义工具类
1.1.4-1 音讯生产者
/**
* 负责向中间件发送数据
*
* @author
*/
@Component
@EnableBinding(Source.class)
public class MessageSender {
@Autowired
private MessageChannel output;
/**
* 发送音讯
*
* @param obj
*/
public void send(Object obj) {output.send(MessageBuilder.withPayload(obj).build());
}
}
1.1.4-2 音讯消费者
/**
* 负责向中间件获取数据
*
* @author
*/
@Component
@EnableBinding(Sink.class)
public class MessageListener {
/**
* 监听 binding 中的音讯
*
* @param message
*/
@StreamListener(Sink.INPUT)
public void input(String message) {System.out.println("获取到音讯:" + message);
}
}
1.1.4-3 测试
/**
* 测试类
*
* @author
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest {
@Autowired
private MessageSender messageSender;
@Test
public void testSend() {messageSender.send("hello 工具类");
}
}
具体如下图所示:
1.2 自定义音讯通道
Spring Cloud Stream 内置了两种接口,别离定义了 binding 为“input”的输出流,和“output”的输入流,而在咱们理论应用中,往往是须要定义各种输入输出流。应用办法也很简略。
/**
* 自定义的音讯通道
*
* @author
*/
public interface MyProcessor {
/**
* 音讯生产者的配置
*/
String MYOUTPUT = "myoutput";
/**
* 音讯消费者的配置
*/
String MYINPUT = "myinput";
@Output("myoutput")
MessageChannel myoutput();
@Input("myinput")
SubscribableChannel myinput();}
- 一个接口中,能够定义无数个输入输出流,能够依据理论业务状况划分。上述的接口,定义了一个订单输出,和订单输入两个 binding。
- 应用时,须要在 @EnableBinding 注解中,增加自定义的接口。
- 应用 @StreamListener 做监听的时候,须要指定 MyProcessor.MYINPUT
1.2.1 音讯生产者
/**
* 负责向中间件发送数据
*
* @author
*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender {
@Autowired
@Qualifier(value = "myoutput")
private MessageChannel myoutput;
/**
* 发送音讯
*
* @param obj
*/
public void send(Object obj) {myoutput.send(MessageBuilder.withPayload(obj).build());
}
}
server:
port: 7001 #服务端口
spring:
application:
name: stream_producer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
output:
destination: root-default #指定音讯发送的目的地, 在 rabbitmq 中, 发送到一个 root-default 的 exchange 中
myoutput:
destination: root-custom-output
binders: #配置绑定器
defaultRabbit:
type: rabbit
1.2.2 音讯消费者
/**
* 负责向中间件获取数据
*
* @author
*/
@Component
@EnableBinding(MyProcessor.class)
public class MessageListener {
/**
* 监听 binding 中的音讯
*
* @param message
*/
@StreamListener(MyProcessor.MYINPUT)
public void input(String message) {System.out.println("获取到音讯:" + message);
}
}
server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
input: #内置的获取音讯的通道 , 从 root-default 中获取音讯
destination: root-default
myinput:
destination: root-custom-output
binders:
defaultRabbit:
type: rabbit
1.3 音讯分组
通常在生产环境,咱们的每个服务都不会以单节点的形式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个音讯通道的指标主题(Topic)上。默认状况下,当生产者收回一条音讯到绑定通道上,这条音讯会产生多个正本被每个消费者实例接管和解决,然而有些业务场景之下,咱们心愿生产者产生的音讯只被其中一个实例生产,这个时候咱们须要为这些消费者设置生产组来实现这样的性能。
实现的形式非常简单,咱们只须要在服务消费者端设置属性即可,如以下实现:
server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
input: #内置的获取音讯的通道 , 从 root-default 中获取音讯
destination: root-default
myinput:
destination: root-custom-output
group: group1 #设置音讯的组名称(同名组中的多个消费者,只会有一个去生产音讯)binders:
defaultRabbit:
type: rabbit
在同一个 group 中的多个消费者只有一个能够获取到音讯并生产
1.4 音讯分区
有一些场景须要满足, 同一个特色的数据被同一个实例生产, 比方同一个 id 的传感器监测数据必须被同一个实例统计计算剖析, 否则可能无奈获取全副的数据。又比方局部异步工作,首次申请启动 task,二次申请勾销 task,此场景就必须保障两次申请至同一实例.
1.4.1 音讯消费者 0
server:
port: 7002 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
instanceCount: 2 #消费者总数
instanceIndex: 0 #以后消费者的索引
bindings:
input: #内置的获取音讯的通道 , 从 root-default 中获取音讯
destination: root-default
myinput:
destination: root-custom-output
group: group1 #设置音讯的组名称(同名组中的多个消费者,只会有一个去生产音讯)consumer:
partitioned: true #开启分区反对
binders:
defaultRabbit:
type: rabbit
从下面的配置中,咱们能够看到减少了这三个参数:
- spring.cloud.stream.bindings.input.consumer.partitioned:通过该参数开启消费者分区性能;
- spring.cloud.stream.instanceCount:该参数指定了以后消费者的总实例数量;
- spring.cloud.stream.instanceIndex:该参数设置以后实例的索引号,从 0 开始,最大值为 spring.cloud.stream.instanceCount 参数 – 1。咱们试验的时候须要启动多个实例,能够通过运行参数来为不同实例设置不同的索引值。
1.4.2 音讯消费者 1
server:
port: 7003 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
instanceCount: 2 #消费者总数
instanceIndex: 1 #以后消费者的索引
bindings:
input: #内置的获取音讯的通道 , 从 root-default 中获取音讯
destination: root-default
myinput:
destination: root-custom-output
group: group1 #设置音讯的组名称(同名组中的多个消费者,只会有一个去生产音讯)consumer:
partitioned: true #开启分区反对
binders:
defaultRabbit:
type: rabbit
1.4.3 音讯生产者
server:
port: 7001 #服务端口
spring:
application:
name: stream_producer #指定服务名
rabbitmq:
addresses: 192.168.142.128
username: root
password: 123456
virtual-host: /test
cloud:
stream:
bindings:
output:
destination: rootdefault #指定音讯发送的目的地, 在 rabbitmq 中, 发送到一个 root-default 的 exchange 中
myoutput:
destination: root-custom-output
producer:
partition-key-expression: payload #分区关键字 对象中的 id, 对象
partition-count: 2 #分区大小
binders: #配置绑定器
defaultRabbit:
type: rabbit
从下面的配置中,咱们能够看到减少了这两个参数:
- spring.cloud.stream.bindings.output.producer.partitionKeyExpression:通过该参数指定了分区键的表达式规定,咱们能够依据理论的输入音讯规定来配置 SpEL 来生成适合的分区键;
- spring.cloud.stream.bindings.output.producer.partitionCount:该参数指定了音讯分区的数量。
到这里音讯分区配置就实现了,咱们能够再次启动这两个利用,同时消费者启动多个,但须要留神的是要为消费者指定不同的实例索引号,这样当同一个音讯被发给生产组时,咱们能够发现只有一个生产实例在接管和解决这些雷同的音讯。