共计 6313 个字符,预计需要花费 16 分钟才能阅读完成。
前言
Spring AMQP 我的项目是利用了 spring 的外围概念到 AMQP 协定音讯解决方案中。咱们提供了一个“template”作为更高级别的形象去发送和接管音讯。我也提供了音讯驱动类的反对。应用依赖注入和申明式编程能够更好的治理 AMQP 源代码。此我的项目中你能够看到和 SpringFramework 中 JMS 一些类似的中央。
RabbitMQ 简介
1. 音讯队列是应用程序和应用程序之间的一种通信办法。
2.RabbitMQ:erlang 语言开发、基于 AMQP 协定。
3. 同类产品:ActiveMQ、ZeroMQ、RabbitMQ、RocketMQ、Kafka。
4. 物理模型
5.Broker 音讯队列服务过程、Exchange 音讯队列交换机,Queue 音讯队列、Producer 音讯生产者、Consumer 音讯消费者。
6. 六种模式:简略模式、工作模式、公布与订阅模式、路由模式、通配符模式、近程调用模式(根本不会用到)。
7. 关键词:{Broker:服务器实体、Exchange:音讯交换机、Queue:音讯队列载体、Binding:绑定、Routing Key:路由关键字、VHost:虚拟主机、Producer:音讯生产者、Consumer:音讯消费者、Channel:音讯通道}
8. 要害概念:由 Exchange、Queue、RoutingKey 三个能力决定一个从 Exchange 到 Queue 的惟一的线路。
RabbitMQ 五大模式实战
此次是基于 SpringBoot 开发的 RabbitMQ 应用程序,利用 SpringBoot 的主动配置和起步依赖会让你更快更不便的构建我的项目。
让咱们实战开始。
- 筹备阶段
- 启动一台 RabbitMQ 服务器
- 此次应用的是 SpringBoot 我的项目
- 利用的 pom 依赖
<parent> <groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>2.3.4.RELEASE</version> | |
<relativePath/> <!-- lookup parent from repository --> | |
</parent> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-amqp</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-web</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-configuration-processor</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<scope>test</scope> | |
<exclusions> | |
<exclusion> | |
<groupId>org.junit.vintage</groupId> | |
<artifactId>junit-vintage-engine</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.amqp</groupId> | |
<artifactId>spring-rabbit-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
</plugin> | |
</plugins> | |
</build> 123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- 配置 application.yml 文件
spring: | |
rabbitmq: | |
host: 127.0.0.1 | |
port: 5672 | |
username: guest | |
password: guest | |
server: | |
port: 8082 |
- 启动类和目录构造是 SpringBoot 惯例设置,这里不再赘述。
留神:启动类名设置为 RabbitmqProducerApplication
- 简略模式
- 简略模式配置文件
@Configuration | |
public class RabbitSimpleConfig { | |
@Bean | |
public Queue simpleQueue(){return new Queue("simpleQueue"); | |
} | |
} |
- 简略模式生产者局部
@SpringBootTest(classes = RabbitmqProducerApplication.class) | |
public class ProducerTest { | |
@Autowired | |
RabbitTemplate rabbitTemplate; | |
@Test | |
public void simpleProduct(){for (int num = 0; num < 20; num++) {rabbitTemplate.convertAndSend("simpleQueue", "简略模式"+num); | |
} | |
} | |
} |
- 简略模式消费者局部
@Component | |
public class MessageListener {@RabbitListener(queues = "simpleQueue") | |
public void simpleListener(String message){System.out.println("简略模式监听器:"+message); | |
} | |
} |
- 工作模式
- 工作模式配置文件
@Bean | |
public Queue workQueue(){return new Queue("workQueue"); | |
} |
- 工作模式生产者局部
@Test | |
public void workProduct(){for (int num = 0; num < 20; num++) {rabbitTemplate.convertAndSend("workQueue", "工作模式"+num); | |
} | |
} |
- 工作模式消费者局部
@RabbitListener(queues = "workQueue") | |
public void workListener1(String message) {System.out.println("工作模式监听器 1:" + message); | |
} | |
@RabbitListener(queues = "workQueue") | |
public void workListener2(String message) {System.out.println("工作模式监听器 2:" + message); | |
} |
- 公布订阅模式
- 公布订阅模式配置文件
// 配置交换器 | |
@Bean | |
public FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange"); | |
} | |
// 配置队列 | |
@Bean | |
public Queue fanoutQueue1() {return new Queue("fanoutQueue1", true, false, false, null); | |
} | |
@Bean | |
public Queue fanoutQueue2() {return new Queue("fanoutQueue2", true, false, false, null); | |
} | |
// 配置绑定 | |
@Bean | |
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); | |
} | |
@Bean | |
public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); | |
} |
- 公布订阅模式生产者局部
@Test | |
public void FanoutProduct(){for (int num = 0; num < 10; num++) {rabbitTemplate.convertAndSend("fanoutExchange",""," 公布订阅模式 "+num); | |
} | |
} |
- 公布订阅模式消费者局部
@RabbitListener(queues = "fanoutQueue1") | |
public void fanoutListener1(String message) {System.out.println("公布订阅监听器 1:" + message); | |
} | |
@RabbitListener(queues = "fanoutQueue2") | |
public void fanoutListener2(String message) {System.out.println("公布订阅监听器 2:" + message); | |
} |
- 路由模式
- 路由模式配置文件
// 配置交换机 | |
@Bean | |
public DirectExchange directExchange() {return new DirectExchange("directExchange"); | |
} | |
// 配置队列 | |
@Bean | |
public Queue directQueue1() {return new Queue("directQueue1", true, false, false, null); | |
} | |
@Bean | |
public Queue directQueue2() {return new Queue("directQueue2", true, false, false, null); | |
} | |
// 配置绑定 | |
@Bean | |
public Binding directBinding1(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("one"); | |
} | |
@Bean | |
public Binding directBinding2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("two"); | |
} |
- 路由模式生产者局部
@Test | |
public void directProduct1() {for (int num = 0; num < 5; num++) {rabbitTemplate.convertAndSend("directExchange","one", "发送到路由队列 1 音讯"+num); | |
} | |
} | |
@Test | |
public void directProduct2() {for (int num = 0; num < 5; num++) {rabbitTemplate.convertAndSend("directExchange","two", "发送到路由队列 2 音讯"+num); | |
} | |
} |
- 路由模式消费者局部
@RabbitListener(queues = "directQueue1") | |
public void fanoutListener1(String message) {System.out.println("路由模式监听器 1:" + message); | |
} | |
@RabbitListener(queues = "directQueue2") | |
public void fanoutListener2(String message) {System.out.println("路由模式监听器 2:" + message); | |
} |
- 通配符模式
- 通配符模式配置文件
// 配置队列 | |
@Bean | |
public Queue topicQueue1() {return new Queue("topicQueue1"); | |
} | |
@Bean | |
public Queue topicQueue2() {return new Queue("topicQueue2"); | |
} | |
// 配置交换器 | |
@Bean | |
public TopicExchange topicExchange() {return new TopicExchange("topicExchange"); | |
} | |
// 配置绑定 | |
@Bean | |
public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue1).to(topicExchange).with("topic.*"); | |
} | |
@Bean | |
public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#"); | |
} |
- 通配符模式生产者局部
/* | |
* 通配符模式测试 | |
* */ | |
@Test | |
public void topicProduct() {rabbitTemplate.convertAndSend("topicExchange","topic.one", "routkey 为 topic.one 的音讯"); | |
rabbitTemplate.convertAndSend("topicExchange","topic.one.two", "routkey 为 topic.one.two 的音讯"); | |
} |
- 通配符模式消费者局部
@RabbitListener(queues = "topicQueue1") | |
public void fanoutListener1(String message) {System.out.println("通配符监听器 1:" + message); | |
} | |
@RabbitListener(queues = "topicQueue2") | |
public void fanoutListener2(String message) {System.out.println("通配符监听器 2:" + message); | |
} |
总结
以上就是 SpringBoot+RabbitMQ 五大模式的简略应用实例,到目前为止 RabbitMQ 也是 Sping AMQP 的惟一实现。下一节将会解说 RabbitMQ 可视化治理界面,可视化治理界面帮忙咱们能够直观地看到 RabbitMQ 服务器的运行状况。如果你感觉文章对你有帮忙,欢送关注,谢谢。