共计 6406 个字符,预计需要花费 17 分钟才能阅读完成。
明天介绍应用 SpringBoot 实现 RabbitMQ 音讯队列的高级用法。
- MQ 装置
- 主动创立
- 音讯重试
- 音讯超时
- 死信队列
- 延时队列
一、RabbitMQ 的装置
家喻户晓,RabbitMQ
的装置绝对简单,须要先装置 Erlang,再按着对应版本的RabbitMQ 的服务端,最初为了方便管理还须要装置 rabbitmq_management 治理端插件,偶然还会呈现一些装置配置问题,故十分复杂。
在开发测试环境下应用 docker
来装置就不便多了,省去了环境和配置的麻烦。
1. 拉取官网 image
docker pull rabbitmq:management
2. 启动 RabbitMQ
docker run -dit --name MyRabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
rabbitmq:management: image:tag
–name: 指定容器名;
-d: 后盾运行容器;
-t: 在新容器内指定一个伪终端或终端;
-i: 容许你对容器内的规范输出 (STDIN) 进行交互;
-p: 指定服务运行的端口(5672:利用拜访端口;15672:控制台 Web 端口号);
-e: 指定环境变量;(RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的明码);
至此 RabbitMQ 就装置启动实现了,能够通过 http://localhost:15672 登陆治理后盾,用户名明码就是下面配置的admin/admin
二、应用 SpringBoot 主动创立队列
1. 引入 amqp 包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. MQ 配置
bootstrap.yml 配置
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
concurrency: 5
direct:
prefetch: 10
concurrency
:每个 listener 在初始化的时候设置的并发消费者的个数prefetch
:每次从一次性从 broker 外面取的待生产的音讯的个数
rabbitmq-spring.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 接管音讯的队列名 -->
<rabbit:queue name="login-user-logined" />
<!-- 申明 exchange 的名称与类型 -->
<rabbit:topic-exchange name="login_barryhome_fun">
<rabbit:bindings>
<!--queue 与 exchange 的绑定和匹配路由 -->
<rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
</rabbit:bindings>
</rabbit:topic-exchange>
</beans>
rabbit:topic-exchange
:申明为 topic 音讯类型pattern="login.user.logined"
:此处是一个表达式,可应用“*”示意一个词,“#”示意一个或多个词
3. 音讯生产端
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public LoginUser SendLoginSucceedMessage(){LoginUser loginUser = getLoginUser("succeed");
// 发送音讯
rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
MessageConstant.LOGIN_ROUTING_KEY, loginUser);
return loginUser;
}
@NoArgsConstructor
@AllArgsConstructor
public class LoginUser implements Serializable {
String userName;
String realName;
String userToken;
Date loginTime;
String status;
}
这里须要留神的是默认状况下音讯的转换器为 SimpleMessageConverter
只能解析 string 和byte,故传递的音讯对象必须是可序列化的,实现 Serializable
接口
SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: fun.barryhome.cloud.dto.LoginUser
4. 音讯生产端
@Component
public class ReceiverMessage {@RabbitListener(queues = "login-user-logined")
public void receiveLoginMessage(LoginUser loginUser) {System.err.println(loginUser);
}
}
@RabbitListener(queues = "login-user-logined")
:用于监听名为login-user-logined 队列中的音讯
5. 主动创立 Queue
@SpringBootApplication
@ImportResource(value = "classpath:rabbitmq-spring.xml")
public class MQApplication {public static void main(String[] args) {SpringApplication.run(MQApplication.class, args);
}
}
在没有导入 xml 且 MQ 服务器上没有列队的状况下,会导致找不到相干 queue 的谬误
channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'login-user-logined' in vhost '/', class-id=50, method-id=10)
而导入之后将 主动创立
exchange 和queue
三、音讯重试
默认状况下如果有音讯生产出错后会始终重试,造成音讯梗塞
如图可察看 unacked 和total始终是 1,但 deliver/get 飙升
音讯梗塞之后也影响到后续音讯的生产,工夫越长越来越多的音讯将无奈及时生产解决。
如果是单条或极少量的音讯有问题可通过多开节点 concurrency
将失常的音讯音讯掉,但如果较多则全副节点都将梗塞。
如果想遇到音讯生产报错重试几次就舍弃,从而不影响后续音讯的生产,如何实现呢?
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: admin
password: admin
listener:
simple:
concurrency: 5
prefetch: 10
retry:
enabled: true # 容许音讯生产失败的重试
max-attempts: 3 # 音讯最多生产次数 3 次
initial-interval: 2000 # 音讯屡次生产的距离 2 秒
以上配置容许音讯生产失败后重试 3 次,每次距离 2 秒,如果还是失败则间接舍弃掉本条音讯。
重试可解决因非音讯体自身解决问题产生的临时性的故障,而将解决失败的音讯间接舍弃掉只是为其它音讯失常解决的权利之计而以,将业务操作降到绝对低的影响。
四、音讯超时
音讯重试
可解决因音讯解决报错引起的问题。如果是音讯解决过慢导致错过时效,除了可在解决逻辑中进行解决外,也能够通过音讯的超时机制来解决,设定超时工夫后将音讯间接舍弃。
批改rabbitmq-spring.xml
<rabbit:queue name="login-user-logined">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
</rabbit:queue-arguments>
</rabbit:queue>
x-message-ttl
:在音讯服务器停留的工夫(ms)
如果配置前已存在 queue 将不能被批改,须要删除原有 queue 后主动创立
创立胜利后会在 Features 中有 TTL 标识
五、死信队列
死信队列就是当业务队列解决失败后,将音讯依据 routingKey 转投到另一队列,这样的状况有:
- 音讯被回绝 (basic.reject or basic.nack) 且带 requeue=false 不从新入队参数或达到的 retry 从新入队的下限次数
- 音讯的 TTL(Time To Live)- 存活工夫曾经过期
- 队列长度限度被超过(队列满,queue 的 ”x-max-length” 参数)
1. 批改rabbitmq-spring.xml
<!-- 接管音讯的队列名 -->
<rabbit:queue name="login-user-logined">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
<!-- 死信的交换机 -->
<entry key="x-dead-letter-exchange" value="login_barryhome_fun"/>
<!-- 死信发送的路由 -->
<entry key="x-dead-letter-routing-key" value="login.user.login.dlq"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="login-user-logined-dlq"/>
<!-- 申明 exchange 的名称与类型 -->
<rabbit:topic-exchange name="login_barryhome_fun">
<rabbit:bindings>
<!--queue 与 exchange 的绑定和匹配路由 -->
<rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
<rabbit:binding queue="login-user-logined-dlq" pattern="login.user.login.dlq"/>
</rabbit:bindings>
</rabbit:topic-exchange>
通过对死信发送的交换机和路由的的设置,可将音讯转向具体的 queue 中。这里交换机能够和原业务队列不是一个。
当 login-user-logined
中的音讯解决失败后将间接转投向 login-user-logined-dlq
队列中。
当程序逻辑修复后可再将音讯再移回业务队列中move messages
2. 装置插件
如图提醒须要先装置插件
3. 挪动音讯
装置胜利后就能够输出业务队列名再转投
六、延时队列
延时队列除了能够做个别的延时解决外,还能够当作单个 job 的定时工作解决,比起个别通过定时器去轮询的形式更优雅。
1. 批改 rabbitmq-spring.xml
<rabbit:topic-exchange name="login_barryhome_fun" delayed="true">
首次配置时,如果报以下谬误,则是服务器不反对此命令,须要装置插件
Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2. 装置插件
1) 下载插件:https://github.com/rabbitmq/r…
2) 上传插件到 docker 容器中 /pluginsdocker ps
查问 rabbitmq 的 CONTAINER ID
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 2c248563a2b0:/plugins
3) 进入 docker 容器外部
docker exec -it 2c248563a2b0 /bin/bash
4) 装置插件
cd /plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
具体装置教程可参考:https://blog.csdn.net/magic_1…
装置胜利后重启程序,察看 mq 治理端的 exchange 可发现
3. 发送延时音讯
@GetMapping("/sendDelay")
public LoginUser SendDelayLoginSucceedMessage() {LoginUser loginUser = getLoginUser("succeed");
MessagePostProcessor messagePostProcessor = message -> {
// 延时 10s
message.getMessageProperties().setHeader("x-delay", 10000);
return message;
};
// 发送音讯
rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
MessageConstant.LOGIN_ROUTING_KEY, loginUser, messagePostProcessor);
return loginUser;
}
须要留神的是音讯的发送是
实时
的,音讯服务器接管到音讯待延时工夫后再投到对应的 queue 中
七、残缺代码
https://gitee.com/hypier/barr…
八、请关注我的公众号