关于java:RabbitMQ的死信队列

1、死信队列

1.1、DeadLetter: 是RabbitMQ中的一种音讯机制

呈现死信音讯的可能状况如下

  1. 音讯被否定确认,channel.basicNack 或 channel.basicReject, requeue=false
  2. 音讯在队列的存活工夫超过设置的TTL工夫
  3. 音讯队列的音讯总数曾经超过最大的队列长度
1.2、 Dead Letter Pattern 死信模式

当消费者不能解决接管到的音讯时候, 将这个音讯从新公布到另一个队列中,这个过程中的Exchange称为死信交换机(DLX)、Queue就是死信队列

1.3、死信队列解决流程
  1. 死信音讯实际上就是来设置队列的属性(配置死信队列
  2. 队列中若有死信音讯,RabbitMQ会主动将这个音讯从新公布到设置的Exchange上,进而路由到另外一个队列。
  3. 监听这个队列中的音讯做解决(解决死信队列
1.4、配置死信队列
  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列
1.5、死信交换机的申明周期
  1. 业务音讯被投入业务队列
  2. 消费者生产业务队列的音讯,因为处理过程中产生异样,于是进行了nck或者reject操作
  3. 被nck或reject的音讯由RabbitMQ投递到死信交换机中
  4. 死信交换机将音讯投入相应的死信队列
  5. 死信队列的消费者生产死信音讯

2、实例代码(boot-rabbit)

2.1、pom.xml

<dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies>

2.2、application.properties

spring.application.name=springboot-rabbitmq
server.port=8080
#默认地址就是127.0.0.1:5672,如果是服务器的rabbitmq就改下
spring.rabbitmq.host=192.168.174.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.type=simple
#设置为false,会抛弃音讯或者从新发步到死信队列
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#虚拟主机目录
spring.rabbitmq.virtual-host=/

2.3、RabbitMQConfig

@Configuration
public class RabbitMQconfig {
 /**
 * //业务Exchange
 */ public static final String BUSINESS_EXCHANGE_NAME="dead.letter.demo.simple.business.exchange";
 /**
 *    //两个业务队列
 */
 public static final String BUSINESS_QUEUEA_NAME="dead.letter.demo.simple.business.queuea";
 public static final String BUSINESS_QUEUEB_NAME="dead.letter.demo.simple.business.queueb";
 /**
 * 死信Exchange
 */ public static final String DEAD_LETTER_EXCHANGE="dead.letter.demo.simple.deadletter.exchange";
 /**
 * 路由key
 */ public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY="dead.letter.demo.simple.deadletter.queuea";
 public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY="dead.letter.demo.simple.deadletter.queueb";
 /**
 * 两个死信队列
 */
 public static final String DEAD_LETTER_QUEUEA_NAME="dead.letter.demo.simple.deadletter.queuea";
 public static final String DEAD_LETTER_QUEUEB_NAME="dead.letter.demo.simple.deadletter.queueb";
 /**
 * 申明业务Exchange
 */ @Bean("businessExchange")
 public FanoutExchange businessExchange(){
 //播送模式
 return  new FanoutExchange((BUSINESS_EXCHANGE_NAME));
 }
 /**
 * 申明死信Exchange
 */ @Bean("deadLetterExchange")
 public DirectExchange deadLetterExchange(){
 //点对点模式模式
 return  new DirectExchange((DEAD_LETTER_EXCHANGE));
 }
 /**
 * 申明业务队列A
 * @return
 */
 @Bean("businessQueueA")
 public Queue businessQueueA(){
 HashMap<String, Object> map = new HashMap<>(2);
 map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
 map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUTING_KEY);
 return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();
 }
 /**
 * 申明业务队列B
 * @return
 */
 @Bean("businessQueueB")
 public Queue businessQueueB(){
 HashMap<String, Object> map = new HashMap<>(2);
 map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
 map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUTING_KEY);
 return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();
 }
 /**
 * 申明死信队列A
 */ @Bean("deadLetterQueueA")
 public Queue deadLetterQueueA(){
 return  new Queue(DEAD_LETTER_QUEUEA_NAME);
 }
 /**
 * 申明死信队列B
 */ @Bean("deadLetterQueueB")
 public Queue deadLetterQueueB(){
 return  new Queue(DEAD_LETTER_QUEUEB_NAME);
 }
 /**
 * 申明业务队列A绑定关系
 */
 @Bean
 public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
 @Qualifier("businessExchange")FanoutExchange exchange){
 return BindingBuilder.bind(queue).to(exchange);
 }
 /**
 * 申明业务队列B绑定关系
 */
 @Bean
 public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
 @Qualifier("businessExchange")FanoutExchange exchange){
 return BindingBuilder.bind(queue).to(exchange);
 }
 /**
 * 申明死信队列A绑定关系
 */
 @Bean
 public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,
 @Qualifier("deadLetterExchange")DirectExchange exchange)   {
 return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
 }
 /**
 * 申明死信队列B绑定关系
 */
 @Bean
 public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,
 @Qualifier("deadLetterExchange")DirectExchange exchange)   {
 return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
 }
}

2.4、controller(MQ生产者)

@RestController
public class SendMessageController {
 @Autowired
 private RabbitMQconfig rabbitMQconfig;
 @Autowired
 private RabbitTemplate rabbitTemplate;
 public static final String BUSINESS_EXCHANGE_NAME="dead.letter.demo.simple.business.exchange";
 @GetMapping("/sendMsg")
 public String sendMsg(String msg){
 System.out.println("msg{}"+msg);
 rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);
 return "success";
 }
}

2.5、业务音讯消费者

@Component
public class BusinessMessageReceiver {
 /**
 *    //两个业务队列
 */
 public static final String BUSINESS_QUEUEA_NAME="dead.letter.demo.simple.business.queuea";
 public static final String BUSINESS_QUEUEB_NAME="dead.letter.demo.simple.business.queueb";
 /**
 * 生产音讯
 */
 @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
 public void receiveA(Message message, Channel channel) throws IOException {
 String msg=new String(message.getBody());
 System.out.println("BusinessMessageA{}"+msg);
 boolean ack=true;
 Exception exception=null;
 try {
 if (msg.contains("deadletter")){
 throw new RuntimeException("dead letter exception");
 }
 } catch (RuntimeException e) {
 ack=false;
 exception=e;
 }
 if (!ack){
 System.out.println("error msg{ }"+exception.getMessage());
 //设置死信音讯
 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
 }else {
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }
 }
 /**
 * 音讯信息B
 */ @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
 public void receiveB(Message message,Channel channel) throws IOException {
 String msg = new String(message.getBody());
 System.out.println("BusinessMessageB{ }"+msg);
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }
}

2.6、死信音讯消费者

@Component
public class DeadMessageReceiver {
 /**
 * 两个死信队列
 */
 public static final String DEAD_LETTER_QUEUEA_NAME="dead.letter.demo.simple.deadletter.queuea";
 public static final String DEAD_LETTER_QUEUEB_NAME="dead.letter.demo.simple.deadletter.queueb";
 @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
 public void receiveA(Message message, Channel channel) throws IOException {
 System.out.println("DeadMessageA{}"+new String(message.getBody()));
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }
 @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
 public void receiveB(Message message, Channel channel) throws IOException {
 System.out.println("DeadMessageB{}"+new String(message.getBody()));
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }
}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理