共计 8577 个字符,预计需要花费 22 分钟才能阅读完成。
写在最后面
间隔上一次发文章曾经很久了,其实这段时间始终也没有停笔,只不过在忙着找工作还有学校结课的事件,从新弄了一下博客,前面也会陆陆续续会把文章最近更新进去~
- 这篇文章有点长,就分了两篇
- PS:那个 Github 上 Java 常识问答的文章也没有停笔,最近也会陆续更新
6. 进阶补充
6.1 过期工夫设置(TTL)
过期工夫(TTL)就是对 音讯或者队列 设置一个时效,只有在工夫范畴内才能够被被消费者接管获取,超过过期工夫后音讯将主动被删除。
注:咱们次要讲音讯过期,在音讯过期的第一种形式中,顺便也就会提到队列过期的设置形式
- 通过队列属性设置,队列中所有音讯都有雷同的过期工夫
- 对音讯进行独自设置,每条音讯 TTL 能够不同
两种办法同时被应用时,以两者过期工夫 TTL 较小的那个数值为准。音讯在队列的生存工夫一旦超过设置的 TTL 值,就称为 Dead Message 被投递到死信队列,消费者将无奈再收到该音讯(死信队列是咱们下一点要讲的)
6.1.1 利用于全副音讯的过期工夫
- 配置类
@Configuration
public class RabbitMqConfiguration {
public static final String TOPIC_EXCHANGE = "topic_order_exchange";
public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
public static final String TOPIC_ROUTINGKEY_1 = "test.*";
@Bean
public TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue1() {
// 创立参数 Map 容器
Map<String, Object> args = new HashMap<>();
// 设置音讯过期工夫 留神此处是数值 5000 不是字符串
args.put("x-message-ttl", 5000);
// 设置队列过期工夫
args.put("x-expires", 8000);
// 在最初传入额定参数 即这些过期信息
return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
}
@Bean
public Binding bindingTopic1() {return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with(TOPIC_ROUTINGKEY_1);
}
}
- 创立参数 Map 容器:类型是在 Queue 参数中所要求的,要依照要求来。
- 设置音讯过期工夫:这里设置的音讯过期工夫,会利用到所有音讯中。
- 设置队列过期工夫
- 传入额定参数:将上述配置好的过期工夫设置,通过 Queue 传入即可。
- 生产者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired
@Test
public void testTopicSendMessage() {rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
}
}
不要配置消费者,而后就能够在 Web 管理器中看到成果了
6.1.2 利用于独自音讯的过期工夫
- 配置中放弃最后的样子就行了,就不须要配置过期工夫了
- 生产者中配置音讯独自的过期工夫
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired
@Test
public void testTopicSendMessage2() {MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){public Message postProcessMessage(Message message){
// 留神此处是 字符串“5000”message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order",
"This is a message 002 !",messagePostProcessor);
}
}
6.2 死信队列
死信官网原文为 Dead letter,它是 RabbitMQ 中的一种音讯机制,当你在生产音讯时,如果队列以及队列里的音讯呈现以下状况,阐明以后音讯就成为了“死信”,如果配置了死信队列,这些数据就会传送到其中,如果没有配置就会间接抛弃。
- 音讯被回绝
- 音讯过期
- 队列达到最大长度
不过死信队列并不是什么很非凡的存在,咱们只须要配置一个交换机,在生产的那个队列中配置,呈现死信就从新发送到方才配置的交换机中去,进而被路由到与交换机绑定的队列中去,这个队列也就是死信队列,所以从创立上看,它和一般的队列没什么区别。
6.2.1 利用场景
比方在一些比拟重要的业务队列中,未被正确生产的音讯,往往咱们并不想抛弃,因为抛弃后如果想复原这些数据,往往须要运维人员从日志获取到原音讯,而后从新投递音讯,而配置了死信队列,相当于给了未正确生产音讯一个暂存的地位,日后须要复原的时候,只须要编写对应的代码就能够了。
6.2.2 实现形式
- 定义一个解决死信的交换机和队列
@Configuration
public class DeadRabbitMqConfiguration{
@Bean
public DirectExchange deadDirect(){return new DirectExchange("dead_direct_exchange");}
@Bean
public Queue deadQueue(){return new Queue("dead_direct_queue");}
@Bean
public Binding deadBinds(){return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
}
}
- 在失常的生产队列中指定死信队列
@Configuration
public class RabbitMqConfiguration {
public static final String TOPIC_EXCHANGE = "topic_order_exchange";
public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
public static final String TOPIC_ROUTINGKEY_1 = "test.*";
@Bean
public TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue1() {
// 设置过期工夫
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);
// 设置死信队列交换器
args.put("x-dead-letter-exchange","dead_direct_exchange");
// 设置替换路由的路由 key fanout 模式不须要配置此条
args.put("x-dead-letter-routing-key","dead");
return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
}
@Bean
public Binding bindingTopic1() {return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with(TOPIC_ROUTINGKEY_1);
}
}
6.3 内存及磁盘监控
6.3.1 内存告警及管制
为了避免防止服务器因内存不够而解体,所以 RabbitMQ 设定了一个阈值,当内存使用量超过阈值的时候,RabbitMQ 会临时阻塞所有客户端的连贯,并且进行持续承受新音讯。
有两种形式能够批改这个阈值
-
通过命令(二选一即可)
- 命令的形式会在 Broker 重启后生效
# 通过百分比设置的命令 <fraction> 处代表百分比小数例如 0.6
rabbitmqctl set_vm_memory_high_watermark <fraction>
# 通过绝对值设置的命令 <value> 处代表设置的一个固定值例如 700MB
rabbitmqctl set_vm_memory_high_watermark absolute <value>
-
通过批改配置文件 rabbitmq.conf
- 配置文件每次启动都会加载,属于永恒无效
# 百分比设置 默认值为 0.4 举荐 0.4-0.7 之间
vm_memory_high_watermark.relative = 0.5
# 固定值设置
vm_memory_high_watermark.absolute = 2GB
6.3.2 内存换页
在客户端连贯和生产者被阻塞之前,它会尝试将队列中的音讯换页到磁盘中,这种思维在操作系统中其实十分常见,以最大水平的满足音讯的失常解决。
当内存换页产生后,无论长久化还是非长久化的音讯,都会被转移到磁盘,而因为长久化的音讯原本就在磁盘中有一个长久化的正本,所以会优先移除长久化的音讯。
默认状况下,当内存达到阈值的 50 % 的时候,就会进行换页解决。
能够通过设置 vm_memory_high_watermark_paging_ratio 批改
# 值小于 1, 如果大于 1 就没有意义了
vm_memory_high_watermark_paging_ratio = 0.6
6.3.3 磁盘预警
如果无止境的换页,也很有可能会导致耗尽磁盘空间导致服务器解体,所以 RabbitMQ 又提供了一个磁盘预警的阈值,当低于这个值的时候就会进行报警,默认是 50MB,能够通过命令的形式批改
# 固定值
rabbitmqctl set_disk_free_limit <disk_limit>
# 百分数
rabbitmqctl set_disk_free_limit memory_limit <fraction>
6.4 音讯的牢靠传递
生产者向 RabbitMQ 中发送音讯的时候,可能会因为网络等种种原因导致发送失败,所以 RabbitMQ 提供了一系列保障音讯牢靠传递的机制,能够大抵分为生产者和消费者两局部的解决
6.4.1 生产者中的机制
生产者作为音讯的发送者,须要保障本人的音讯发送胜利,RabbitMQ 提供了两种形式来保障这一点。-
- confirm 确认模式
- return 退回模式
6.4.1.1 confirm 确认模式
生产者发送音讯后,会异步期待接管一个 ack 应答,收到返回的 ack 确认音讯后,依据 ack 是 true 还是 false,调用 confirmCallback 接口进行解决
- 配置类
spring:
rabbitmq:
# 发送确认
publisher-confirm-type: correlated
- 实现 ConfirmCallback 接口的 confirm 办法
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
* @param correlationData 相干配置信息
* @param ack exchange 交换机 是否胜利收到了音讯。true 胜利,false 代表失败
* @param cause 失败起因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {
// 接管胜利
System.out.println("音讯胜利发送到交换机");
} else {
// 接管失败
System.out.println("音讯发送到交换机失败,失败起因:" + cause);
// TODO 能够解决失败的音讯,例如再次发送等等
}
}
}
- 申明队列和交换机
@Configuration
public class RabbitMqConfig {@Bean()
public Queue confirmTestQueue() {return new Queue("confirm_test_queue", true, false, false);
}
@Bean()
public FanoutExchange confirmTestExchange() {return new FanoutExchange("confirm_test_exchange");
}
@Bean
public Binding confirmTestFanoutExchangeAndQueue() {return BindingBuilder.bind(confirmTestQueue()).to(confirmTestExchange());
}
}
- 生产者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired
/**
* 注入 ConfirmCallbackService
*/
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Test
public void testConfirm() {
// 设置确认回调类
rabbitTemplate.setConfirmCallback(confirmCallbackService);
// 发送音讯
rabbitTemplate.convertAndSend("confirm_test_exchange", "","ConfirmCallback !");
}
}
6.4.1.2 return 退回模式
当 Exchange 发送到 Queue 失败时,会调用一个 returnsCallback,咱们能够通过实现这个接口,而后来解决这种失败的状况。
- 在配置文件中开启发送回调
spring:
rabbitmq:
# 发送回调
publisher-returns: true
- 实现 ReturnsCallback 的 returnedMessage 办法
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 曾经属于过期办法了
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {System.out.println(returned);
}
}
- 申明队列和交换机(Direct 模式)
@Configuration
public class RabbitMqConfig {@Bean()
public Queue returnsTestQueue() {return new Queue("return_test_queue", true, false, false);
}
@Bean()
public DirectExchange returnsTestExchange() {return new DirectExchange("returns_test_exchange");
}
@Bean
public Binding returnsTestDirectExchangeAndQueue() {return BindingBuilder.bind(returnsTestQueue()).to(returnsTestExchange()).with("info");
}
}
- 生产者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired
/**
* 注入 ConfirmCallbackService
*/
@Autowired
private ConfirmCallbackService confirmCallbackService;
/**
* 注入 ReturnCallbackService
*/
@Autowired
private ReturnCallbackService returnCallbackService;
@Test
public void testReturn() {
// 确保音讯发送失败后能够从新返回到队列中
rabbitTemplate.setMandatory(true);
// 音讯投递到队列失败回调解决
rabbitTemplate.setReturnsCallback(returnCallbackService);
// 音讯投递确认模式
rabbitTemplate.setConfirmCallback(confirmCallbackService);
// 发送音讯
rabbitTemplate.convertAndSend("returns_test_exchange", "info", "ReturnsCallback !");
}
}
- 批改不同的路由 key,即可测试出后果。
6.4.2 消费者中的机制
6.4.2.1 ack 确认机制
ack 示意收到音讯的确认,默认是主动确认,然而它有三种类型
acknowledge-mode 选项介绍
- auto:主动确认,为默认选项
- manual:手动确认(按能力调配就须要设置为手动确认)
- none:不确认,发送后主动抛弃
其中主动确认是指,当音讯一旦被消费者接管到,则主动确认收到,并把这个音讯从队列中删除。
然而在理论业务解决中,正确的接管到的音讯可能会因为业务上的问题,导致音讯没有正确的被解决,然而如果设置了 手动确认形式,则须要在业务解决胜利后,调用 channel.basicAck(),手动签收,如果出现异常,则调用 channel.basicNack()办法,让其主动从新发送音讯。
- 配置文件
spring:
rabbitmq:
listener:
simple:
# 手动确认
acknowledge-mode: manual
- 消费者
@Component
@RabbitListener(queues = "confirm_test_queue")
public class TestConsumer {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {System.out.println("音讯内容:" + new String(message.getBody()));
System.out.println("业务出错的地位:");
int i = 66 / 0;
// 手动签收 deliveryTag 标识代表队列能够删除了
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 回绝签收
channel.basicNack(deliveryTag, true, true);
}
}
}
6.5 集群 & 6.6 分布式事务(待更新)
因为这两个点篇幅也不短,切实不愿草草简略写上了事,放到前面独自的文章编写,公布哇。
对于集群的搭建临时可参考:https://blog.csdn.net/belongh…