明天介绍应用SpringBoot实现RabbitMQ音讯队列的高级用法。
- MQ装置
- 主动创立
- 音讯重试
- 音讯超时
- 死信队列
- 延时队列
一、RabbitMQ的装置
家喻户晓,RabbitMQ
的装置绝对简单,须要先装置Erlang,再按着对应版本的RabbitMQ的服务端,最初为了方便管理还须要装置rabbitmq_management治理端插件,偶然还会呈现一些装置配置问题,故十分复杂。
在开发测试环境下应用docker
来装置就不便多了,省去了环境和配置的麻烦。
1. 拉取官网image
docker pull rabbitmq:management
2. 启动RabbitMQ
docker run -dit --name MyRabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
rabbitmq:management: image:tag
--name:指定容器名;
-d:后盾运行容器;
-t:在新容器内指定一个伪终端或终端;
-i:容许你对容器内的规范输出 (STDIN) 进行交互;
-p:指定服务运行的端口(5672:利用拜访端口;15672:控制台Web端口号);
-e:指定环境变量;(RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的明码);
至此RabbitMQ就装置启动实现了,能够通过http://localhost:15672 登陆治理后盾,用户名明码就是下面配置的admin/admin
二、应用SpringBoot主动创立队列
1. 引入amqp包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
2. MQ配置
bootstrap.yml 配置
spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: admin password: admin listener: simple: concurrency: 5 direct: prefetch: 10
concurrency
:每个listener在初始化的时候设置的并发消费者的个数prefetch
:每次从一次性从broker外面取的待生产的音讯的个数
rabbitmq-spring.xml配置
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--接管音讯的队列名--> <rabbit:queue name="login-user-logined" /> <!--申明exchange的名称与类型--> <rabbit:topic-exchange name="login_barryhome_fun"> <rabbit:bindings> <!--queue与exchange的绑定和匹配路由--> <rabbit:binding queue="login-user-logined" pattern="login.user.logined"/> </rabbit:bindings> </rabbit:topic-exchange></beans>
rabbit:topic-exchange
:申明为topic音讯类型pattern="login.user.logined"
:此处是一个表达式,可应用“*”示意一个词,“#”示意一个或多个词
3. 音讯生产端
@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/send")public LoginUser SendLoginSucceedMessage(){ LoginUser loginUser = getLoginUser("succeed"); // 发送音讯 rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE, MessageConstant.LOGIN_ROUTING_KEY, loginUser); return loginUser;}@NoArgsConstructor@AllArgsConstructorpublic class LoginUser implements Serializable { String userName; String realName; String userToken; Date loginTime; String status;}
这里须要留神的是默认状况下音讯的转换器为SimpleMessageConverter
只能解析string和byte,故传递的音讯对象必须是可序列化的,实现Serializable
接口
SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: fun.barryhome.cloud.dto.LoginUser
4. 音讯生产端
@Componentpublic class ReceiverMessage { @RabbitListener(queues = "login-user-logined") public void receiveLoginMessage(LoginUser loginUser) { System.err.println(loginUser); }}
@RabbitListener(queues = "login-user-logined")
:用于监听名为login-user-logined 队列中的音讯
5. 主动创立Queue
@SpringBootApplication@ImportResource(value = "classpath:rabbitmq-spring.xml")public class MQApplication { public static void main(String[] args) { SpringApplication.run(MQApplication.class, args); }}
在没有导入xml且MQ服务器上没有列队的状况下,会导致找不到相干queue的谬误
channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'login-user-logined' in vhost '/', class-id=50, method-id=10)
而导入之后将主动创立
exchange和queue
三、音讯重试
默认状况下如果有音讯生产出错后会始终重试,造成音讯梗塞
如图可察看unacked和total始终是1,但deliver/get飙升
音讯梗塞之后也影响到后续音讯的生产,工夫越长越来越多的音讯将无奈及时生产解决。
如果是单条或极少量的音讯有问题可通过多开节点concurrency
将失常的音讯音讯掉,但如果较多则全副节点都将梗塞。
如果想遇到音讯生产报错重试几次就舍弃,从而不影响后续音讯的生产,如何实现呢?
spring: rabbitmq: host: localhost port: 5672 virtual-host: / username: admin password: admin listener: simple: concurrency: 5 prefetch: 10 retry: enabled: true # 容许音讯生产失败的重试 max-attempts: 3 # 音讯最多生产次数3次 initial-interval: 2000 # 音讯屡次生产的距离2秒
以上配置容许音讯生产失败后重试3次,每次距离2秒,如果还是失败则间接舍弃掉本条音讯。
重试可解决因非音讯体自身解决问题产生的临时性的故障,而将解决失败的音讯间接舍弃掉只是为其它音讯失常解决的权利之计而以,将业务操作降到绝对低的影响。
四、音讯超时
音讯重试
可解决因音讯解决报错引起的问题。如果是音讯解决过慢导致错过时效,除了可在解决逻辑中进行解决外,也能够通过音讯的超时机制来解决,设定超时工夫后将音讯间接舍弃。
批改rabbitmq-spring.xml
<rabbit:queue name="login-user-logined"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Long" /> </rabbit:queue-arguments></rabbit:queue>
x-message-ttl
:在音讯服务器停留的工夫(ms)
如果配置前已存在queue将不能被批改,须要删除原有queue后主动创立
创立胜利后会在Features中有TTL标识
五、死信队列
死信队列就是当业务队列解决失败后,将音讯依据routingKey转投到另一队列,这样的状况有:
- 音讯被回绝 (basic.reject or basic.nack) 且带 requeue=false不从新入队参数或达到的retry从新入队的下限次数
- 音讯的TTL(Time To Live)-存活工夫曾经过期
- 队列长度限度被超过(队列满,queue的"x-max-length"参数)
1. 批改rabbitmq-spring.xml
<!--接管音讯的队列名--><rabbit:queue name="login-user-logined"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/> <!--死信的交换机--> <entry key="x-dead-letter-exchange" value="login_barryhome_fun"/> <!--死信发送的路由--> <entry key="x-dead-letter-routing-key" value="login.user.login.dlq"/> </rabbit:queue-arguments></rabbit:queue><rabbit:queue name="login-user-logined-dlq"/><!--申明exchange的名称与类型--><rabbit:topic-exchange name="login_barryhome_fun"> <rabbit:bindings> <!--queue与exchange的绑定和匹配路由--> <rabbit:binding queue="login-user-logined" pattern="login.user.logined"/> <rabbit:binding queue="login-user-logined-dlq" pattern="login.user.login.dlq"/> </rabbit:bindings></rabbit:topic-exchange>
通过对死信发送的交换机和路由的的设置,可将音讯转向具体的queue中。这里交换机能够和原业务队列不是一个。
当login-user-logined
中的音讯解决失败后将间接转投向login-user-logined-dlq
队列中。
当程序逻辑修复后可再将音讯再移回业务队列中move messages
2. 装置插件
如图提醒须要先装置插件
3. 挪动音讯
装置胜利后就能够输出业务队列名再转投
六、延时队列
延时队列除了能够做个别的延时解决外,还能够当作单个job的定时工作解决,比起个别通过定时器去轮询的形式更优雅。
1. 批改rabbitmq-spring.xml
<rabbit:topic-exchange name="login_barryhome_fun" delayed="true">
首次配置时,如果报以下谬误,则是服务器不反对此命令,须要装置插件
Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2. 装置插件
1) 下载插件:https://github.com/rabbitmq/r...
2) 上传插件到docker容器中/pluginsdocker ps
查问rabbitmq的 CONTAINER ID
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 2c248563a2b0:/plugins
3) 进入docker容器外部
docker exec -it 2c248563a2b0 /bin/bash
4) 装置插件
cd /pluginsrabbitmq-plugins enable rabbitmq_delayed_message_exchange
具体装置教程可参考:https://blog.csdn.net/magic_1...
装置胜利后重启程序,察看mq治理端的exchange可发现
3. 发送延时音讯
@GetMapping("/sendDelay")public LoginUser SendDelayLoginSucceedMessage() { LoginUser loginUser = getLoginUser("succeed"); MessagePostProcessor messagePostProcessor = message -> { // 延时10s message.getMessageProperties().setHeader("x-delay", 10000); return message; }; // 发送音讯 rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE, MessageConstant.LOGIN_ROUTING_KEY, loginUser, messagePostProcessor); return loginUser;}
须要留神的是音讯的发送是实时
的,音讯服务器接管到音讯待延时工夫后再投到对应的queue中
七、残缺代码
https://gitee.com/hypier/barr...