共计 3293 个字符,预计需要花费 9 分钟才能阅读完成。
钻研一下音讯队列,当初来简略搭建一下。
1. Docker 搭建 RabbitMQ
1.1 查问并下载 RabbitMQ 镜像
docker search rabbitmq
// 抉择能够拜访 web 治理界面的 tag
docker pull rabbitmq:management
1.2 运行 RabbitMQ 镜像
// 设置账号密码都为 admin
docker run -dit --name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
1.3 浏览器上拜访 服务器 IP:15672
呈现以下页面示意启动胜利
2. 搭建 SpringBoot 我的项目整合 RabbitMQ
2.1 pom.xml
增加 web 和 rabbitmq 的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2.2 aplication.yml
将 rabbitmq 的地址用户名明码等配置上
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
2.3 新建交换机 SenderConfig.java
有以下三种罕用的交换机,咱们这里应用第三种
DirectExchange
直连型交换机,依据音讯携带的路由键,将音讯转发给对应的队列
FanoutExchange
扇形交换机,接管到音讯后会将音讯转发到所有队列
TopicExchange
主题交换机,依据音讯携带的路由键和交换机与队列绑定键的规定,将音讯转发给对应的队列
规定:
*(星号):示意一个字符必须呈现
#(井号):示意任意数量的字符
/**
* 交换机
* @author zhouzhaodong
*/
@Configuration
public class SenderConfig {
/**
* ----- 交换机 -----
* 参数意义:
* name: 名称
* durable: 长久化
* autoDelete: 主动删除
*/
@Bean
public TopicExchange topicExchange() {return new TopicExchange("topicExchange", true, false);
}
/**
* ----- 队列 -----
*/
@Bean
public Queue queueOne() {return new Queue("queueOne", true);
}
@Bean
public Queue queueTwo() {return new Queue("queueTwo", true);
}
@Bean
public Queue queueThree() {return new Queue("queueThree", true);
}
/**
* ----- 绑定 -----
* routingKey 就是路由规定,音讯对应的队列,用来辨别不同的音讯队列
*/
@Bean
public Binding bindingFanoutOne() {return BindingBuilder.bind(queueOne()).to(topicExchange()).with("topic_one");
}
@Bean
public Binding bindingFanoutTwo() {return BindingBuilder.bind(queueTwo()).to(topicExchange()).with("topic_two");
}
@Bean
public Binding bindingFanoutThree() {return BindingBuilder.bind(queueThree()).to(topicExchange()).with("topic_one");
}
}
2.4 发送者 SenderController.java
/**
* 音讯发送者
*
* @author zhouzhaodong
*/
@RestController
public class SenderController {
@Resource
AmqpTemplate amqpTemplate;
Logger logger = LoggerFactory.getLogger(SenderController.class);
@RequestMapping(value = "/send")
public String sendMessage(String message) {logger.info("音讯发送开始工夫:" + new Date());
// 这里 convertAndSend 第一个参数是交换机的名称
// 第二个参数能够是 routingKey
// 最初一个参数就是要发送的音讯
amqpTemplate.convertAndSend("topicExchange", "topic_one", message);
return "发送胜利";
}
}
2.5 消费者 ReceiverController.java
/**
* 消费者
* @author zhouzhaodong
*/
@Component
public class ReceiverController {Logger logger = LoggerFactory.getLogger(ReceiverController.class);
@RabbitHandler
@RabbitListener(queues = "queueOne")
public void processA(String message){logger.info("queueOne 接管音讯工夫为:" + new Date());
logger.info("queueOne 接管音讯为:" + message);
}
@RabbitHandler
@RabbitListener(queues = "queueTwo")
public void processB(String message){logger.info("queueTwo 接管音讯工夫为:" + new Date());
logger.info("queueTwo 接管音讯为:" + message);
}
@RabbitHandler
@RabbitListener(queues = "queueThree")
public void processC(String message){logger.info("queueThree 接管音讯工夫为:" + new Date());
logger.info("queueThree 接管音讯为:" + message);
}
}
3. 启动我的项目进行测试
3.1 调用生产者接口
发现有两个队列收到了音讯,因为这两个队列都配置的 routingKey 雷同,都是 topic_one
3.2 不带 routingKey 进行拜访
发现并没有队列收到音讯
测试完结
集体博客地址:
http://www.zhouzhaodong.xyz/
源代码地址:
https://gitee.com/zhouzhaodon…
正文完