前情提要:rabbitmq 治理界面查看姿态
一、疾速搭建 / 根本信息发送和生产
1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、application.yml
spring:
rabbitmq:
host: ipXXX
port: 5672
username: 账户 XXX
password: 明码 XXX
virtual-host: /wen # 交换器名称
以 direct 模式为例
1、配置文件
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author : JCccc
* @CreateTime : 2019/9/3
* @Description :
**/
@Configuration
public class RabbitConfig {
// 队列 起名:TestDirectQueue
@Bean
public Queue emailQueue() {
// durable: 是否长久化, 默认是 false, 长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效
// exclusive: 默认也是 false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于 durable
// autoDelete: 是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。// return new Queue("TestDirectQueue",true,true,false);
// 个别设置一下队列的长久化就好, 其余两个就是默认 false
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue smsQueue() {
// durable: 是否长久化, 默认是 false, 长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效
// exclusive: 默认也是 false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于 durable
// autoDelete: 是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。// return new Queue("TestDirectQueue",true,true,false);
// 个别设置一下队列的长久化就好, 其余两个就是默认 false
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue weixinQueue() {
// durable: 是否长久化, 默认是 false, 长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效
// exclusive: 默认也是 false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于 durable
// autoDelete: 是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。// return new Queue("TestDirectQueue",true,true,false);
// 个别设置一下队列的长久化就好, 其余两个就是默认 false
return new Queue("weixin.fanout.queue", true);
}
@Bean
public Queue TTLQueue() {Map<String, Object> map = new HashMap<>(16);
map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则 30 秒后过期
return new Queue("TTL_QUEUE", true, false, false, map);
}
@Bean
public DirectExchange TTLExchange() {return new DirectExchange("TTL_EXCHANGE", true, false);
}
//Direct 交换机 起名:TestDirectExchange
@Bean
public DirectExchange fanoutOrderExchange() {// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("fanout_exchange", true, false);
}
// 绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@Bean
public Binding bindingDirect() {return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
}
@Bean
public Binding bindingDirect1() {return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
}
@Bean
public Binding bindingDirect2() {return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
}
@Bean
public Binding bindingDirect3() {return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
}
}
2、生产者
package com.pit.barberShop.common.MQ.Rabbit.fanout;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author:wenye
* @date:Created in 2021/6/15 21:41
* @description:播送模式
* @version: $
*/
@RestController
@RequestMapping("/rabbitmq")
public class ProducerFanout {
@Autowired
private RabbitTemplate rabbitTemplate;
// 1: 定义交换机
private String exchangeName = "fanout_exchange";
// 2: 路由 key
private String routeKey = "";
@RequestMapping("/fanout")
public void markerFanout() {
String message ="shua";
// 发送音讯
rabbitTemplate.convertAndSend(exchangeName, routeKey, message);
}
@RequestMapping("/ttl")
public String testTTL() {MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("20000"); // 设置过期工夫,单位:毫秒
byte[] msgBytes = "测试音讯主动过期".getBytes();
Message message = new Message(msgBytes, messageProperties);
rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message);
return "ok";
}
}
3、消费者
package com.pit.barberShop.common.MQ.Rabbit.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* @author:wenye
* @date:Created in 2021/6/15 22:07
* @description:fanout 消费者
* @version: $
*/
@Component
public class ConsumerFanout {
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_exchange",
// 这里是确定的 rabbitmq 模式是:fanout 是以播送模式、公布订阅模式
type = ExchangeTypes.DIRECT)
))
public void messagerevice(String message){
// 此处省略发邮件的逻辑
System.out.println("sms-two111------------->" + message);
}
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "fanout_exchange",
// 这里是确定的 rabbitmq 模式是:fanout 是以播送模式、公布订阅模式
type = ExchangeTypes.DIRECT)
))
public void messageWXrevice(String message){
// 此处省略发邮件的逻辑
System.out.println("weixin----two---------->" + message);
}
}
二、过期工夫
1、生产者发送音讯时设置过期工夫
public String testTTL() {MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("20000"); // 设置过期工夫,单位:毫秒
byte[] msgBytes = "测试音讯主动过期".getBytes();
Message message = new Message(msgBytes, messageProperties);
rabbitTemplate.convertAndSend("TTL_EXCHANGE", "", message);
return "ok";
}
2、队列中的所有音讯设置过期工夫
配置中增加
@Bean
public Queue TTLQueue() {Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则 30 秒后过期
return new Queue("TTL_QUEUE", true, false, false, map);
}
@Bean
public Queue TTLQueue() {Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则 30 秒后过期
return new Queue("TTL_QUEUE", true, false, false, map);
}
@Bean
public DirectExchange TTLExchange() {return new DirectExchange("TTL_EXCHANGE", true, false);
}
@Bean
public Binding bindingDirect() {return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
}
三、音讯确认机制配置
参考:https://blog.csdn.net/qq33098…
默认是自动应答
spring:
rabbitmq:
# 开启发送确认
publisher-confirms: true
# 开启发送失败退回
publisher-returns: true
目前回调存在 ConfirmCallback 和 ReturnCallback 两者。他们的区别在于
如果音讯没有到 exchange, 则 ConfirmCallback 回调,ack=false,
如果音讯达到 exchange, 则 ConfirmCallback 回调,ack=true
exchange 到 queue 胜利, 则不回调 ReturnCallback
rabbitMQ 音讯生产者发送音讯的流程
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
* correlationData:对象外部只有一个 id 属性,用来示意以后音讯的唯一性。* ack:音讯投递到 broker 的状态,true 示意胜利。* cause:示意投递失败的起因。**/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause){if (!ack) {log.error("音讯发送异样!");
} else {log.info("发送者爸爸曾经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {// 重写 returnedMessage() 办法,办法有五个参数 message(音讯体)、replyCode(响应 code)、replyText(响应内容)、exchange(交换机)、routingKey 路由(队列)@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
配置文件
1、避免反复签发 ack 须要在配置类中重写
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 此处也设置为手动 ack
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
2、从新创立设置交换器和队列属性
@Bean
public Queue chongfuQueue() {
// durable: 是否长久化, 默认是 false, 长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效
// exclusive: 默认也是 false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于 durable
// autoDelete: 是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。// return new Queue("TestDirectQueue",true,true,false);
// 个别设置一下队列的长久化就好, 其余两个就是默认 false
return new Queue("chongfu.fanout.queue", true);
}
//Direct 交换机 起名:TestDirectExchange
@Bean
public DirectExchange chongfuExchange() {// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("chongfu_exchange", true, false);
}
@Bean
public Binding bindingDirect4() {return BindingBuilder.bind(chongfuQueue()).to(chongfuExchange()).with("");
}
生产者
public void markerchongfu() {
/**
* 确保音讯发送失败后能够从新返回到队列中
* 留神:yml 须要配置 publisher-returns: true
*/
rabbitTemplate.setMandatory(true);
/**
* 消费者确认收到音讯后,手动 ack 回执回调解决
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService);
/**
* 音讯投递到队列失败回调解决
*/
rabbitTemplate.setReturnCallback(returnCallbackService);
/**
* 发送音讯
*/
String s = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend("chongfu_exchange", routeKey, "帅哥",
message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
new CorrelationData(s));
}
消费者
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。value = @Queue(value = "chongfu.fanout.queue",autoDelete = "false"),
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "chongfu_exchange",
// 这里是确定的 rabbitmq 模式是:fanout 是以播送模式、公布订阅模式
type = ExchangeTypes.DIRECT)
))
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {log.info("小富收到音讯:{}", msg);
// log.info("序号:{}", message.getMessageProperties().getDeliveryTag());
// System.out.println(msg);
//TODO 具体业务
// 收到音讯 basicAck()
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("音讯已反复解决失败, 回绝再次接管...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 回绝音讯
} else {log.error("音讯行将再次返回队列解决...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
生产音讯有三种回执办法
1、basicAck
basicAck:示意胜利确认,应用此回执办法后,音讯会被 rabbitmq broker 删除。
void basicAck(long deliveryTag, boolean multiple)
- deliveryTag:示意音讯投递序号,每次生产音讯或者音讯从新投递后,deliveryTag 都会减少。手动音讯确认模式下,咱们能够对指定 deliveryTag 的音讯进行 ack、nack、reject 等操作。
- multiple:是否批量确认,值为 true 则会一次性 ack 所有小于以后音讯 deliveryTag 的音讯。
举个栗子:假如我先发送三条音讯 deliveryTag 别离是 5、6、7,可它们都没有被确认,当我发第四条音讯此时 deliveryTag 为 8,multiple 设置为 true,会将 5、6、7、8 的音讯全副进行确认。
2、basicNack
basicNack:示意失败确认,个别在生产音讯业务异样时用到此办法,能够将音讯从新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- deliveryTag:示意音讯投递序号。
- multiple:是否批量确认。
- requeue:值为 true 音讯将从新入队列。
3、basicReject
basicReject:回绝音讯,与 basicNack 区别在于不能进行批量操作,其余用法很类似。
void basicReject(long deliveryTag, boolean requeue)
- deliveryTag:示意音讯投递序号。
- requeue:值为 true 音讯将从新入队列。
四、死信队列
死信队列其实和一般的队列没啥大的区别,都须要创立本人的 Queue、Exchange, 而后通过 RoutingKey 绑定到 Exchange 下来,只不过死信队列的 RoutingKey 和 Exchange 要作为参数,绑定到失常的队列下来,一种利用场景是失常队列外面的音讯被 basicNack 或者 reject 时,音讯就会被路由到失常队列绑定的死信队列中,还有一种还有罕用的场景就是开启了主动签收,而后消费者生产音讯时出现异常,超过了重试次数,那么这条音讯也会进入死信队列,如果配置了话,
例子
// 模仿异样用的交换器,topic 交换器会通配符匹配,当然字符串截然不同也会匹配
@Bean
TopicExchange emailExchange() {return new TopicExchange("demoTopicExchange");
}
// 死信队列
@Bean
public Queue deadLetterQueue() {return new Queue("demo.dead.letter");
}
// 死信交换器
@Bean
TopicExchange deadLetterExchange() {return new TopicExchange("demoDeadLetterTopicExchange");
}
// 绑定死信队列
@Bean
Binding bindingDeadLetterQueue() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("demo.dead.letter");
}
生产者
@RequestMapping("/sixin")
public void sendEmailMessage() {CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("demoTopicExchange","demo.email","11",correlationData);
log.info("--- 发送 email 音讯 ---{}---messageId---{}","111",correlationData.getId());
}
消费者
/**
* 邮件消费者
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(bindings =@QueueBinding(
// email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。value = @Queue(value = "demo.email",autoDelete = "false",
arguments = {@Argument(name = "x-dead-letter-exchange", value = "demoDeadLetterTopicExchange"),
@Argument(name = "x-dead-letter-routing-key",value = "demo.dead.letter"),
@Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long")
}),
key = "demo.email",
// order.fanout 交换机的名字 必须和生产者保持一致
exchange = @Exchange(value = "demoTopicExchange",
// 这里是确定的 rabbitmq 模式是:fanout 是以播送模式、公布订阅模式
type = ExchangeTypes.TOPIC)
))
public void handleEmailMessage(Message message, Channel channel,String msg) throws IOException {
try {log.info("--- 承受到音讯 ---{}",msg);
// 被动异样
int m=1/0;
// 手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
// 异样,ture 从新入队, 或者 false, 进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
/**
* 死信消费者,主动签收开启状态下,超过重试次数,或者手动签收,reject 或者 Nack
* @param message
*/
@RabbitListener(queues = "demo.dead.letter")
public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {
// 能够思考数据库记录,每次进来查数量,达到肯定的数量,进行预警,人工染指解决
log.info("接管到死信音讯:---{}--- 音讯 ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
// 回复 ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
同样也可应用 java 类配置
@Bean
public Queue emailQueue() {Map<String, Object> arguments = new HashMap<>(2);
// 绑定死信交换机
arguments.put("x-dead-letter-exchange", "demoDeadLetterTopicExchange");
// 绑定死信的路由 key
arguments.put("x-dead-letter-routing-key", "demo.dead.letter");
arguments.put("x-message-ttl", 3000);
return new Queue(emailQueue,true,false,false,arguments);
}
@Bean
TopicExchange emailExchange() {return new TopicExchange(topicExchange);
}
@Bean
Binding bindingEmailQueue() {return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
}
五、长久化机制和内存磁盘监控
1、长久化
RabbitMQ 的长久化队列分为:
1:队列长久化
2:音讯长久化
3:交换机长久化
不论是长久化的音讯还是非长久化的音讯都能够写入到磁盘中,只不过非长久的是等内存不足的状况下才会被写入到磁盘中。
2、内存磁盘监控
六、分布式事务
七、配置详解
rabbitmq:
addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定 client 连贯到的 server 的地址,多个以逗号分隔(优先取 addresses,而后再取 host)
# port:
## 集群配置 addresses 之间用逗号隔开
# addresses: ip:port,ip:port
password: admin
username: 123456
virtual-host: / # 连贯到 rabbitMQ 的 vhost
requested-heartbeat: #指定心跳超时,单位秒,0 为不指定;默认 60s
publisher-confirms: #是否启用 公布确认
publisher-reurns: # 是否启用公布返回
connection-timeout: #连贯超时,单位毫秒,0 示意无穷大,不超时
cache:
channel.size: # 缓存中放弃的 channel 数量
channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个 channel 的超时工夫,单位毫秒;如果为 0,则总是创立一个新 channel
connection.size: # 缓存的连接数,只有是 CONNECTION 模式时失效
connection.mode: # 连贯工厂缓存模式:CHANNEL 和 CONNECTION
listener:
simple.auto-startup: # 是否启动时主动启动容器
simple.acknowledge-mode: # 示意音讯确认形式,其有三种配置形式,别离是 none、manual 和 auto;默认 auto
simple.concurrency: # 最小的消费者数量
simple.max-concurrency: # 最大的消费者数量
simple.prefetch: # 指定一个申请能解决多少个音讯,如果有事务的话,必须大于等于 transaction 数量.
simple.transaction-size: # 指定一个事务处理的音讯数量,最好是小于等于 prefetch 的数量.
simple.default-requeue-rejected: # 决定被回绝的音讯是否从新入队;默认是 true(与参数 acknowledge-mode 有关系)simple.idle-event-interval: # 多少长时间公布闲暇容器工夫,单位毫秒
simple.retry.enabled: # 监听重试是否可用
simple.retry.max-attempts: # 最大重试次数
simple.retry.initial-interval: # 第一次和第二次尝试公布或传递音讯之间的距离
simple.retry.multiplier: # 利用于上一重试距离的乘数
simple.retry.max-interval: # 最大重试工夫距离
simple.retry.stateless: # 重试是有状态 or 无状态
template:
mandatory: # 启用强制信息;默认 false
receive-timeout: # receive() 操作的超时工夫
reply-timeout: # sendAndReceive() 操作的超时工夫
retry.enabled: # 发送重试是否可用
retry.max-attempts: # 最大重试次数
retry.initial-interval: # 第一次和第二次尝试公布或传递音讯之间的距离
retry.multiplier: # 利用于上一重试距离的乘数
retry.max-interval: #最大重试工夫距离