关于java:RabbitMQ的死信队列

28次阅读

共计 6717 个字符,预计需要花费 17 分钟才能阅读完成。

1、死信队列

1.1、DeadLetter: 是 RabbitMQ 中的一种音讯机制

呈现死信音讯的可能状况如下

  1. 音讯被否定确认,channel.basicNack 或 channel.basicReject, requeue=false
  2. 音讯在队列的存活工夫超过设置的 TTL 工夫
  3. 音讯队列的音讯总数曾经超过最大的队列长度
1.2、Dead Letter Pattern 死信模式

当消费者不能解决接管到的音讯时候,将这个音讯从新公布到另一个队列中,这个过程中的 Exchange 称为死信交换机 (DLX)、Queue 就是死信队列

1.3、死信队列解决流程
  1. 死信音讯实际上就是来设置队列的属性( 配置死信队列
  2. 队列中若有死信音讯,RabbitMQ 会主动将这个音讯从新公布到设置的 Exchange 上,进而路由到另外一个队列。
  3. 监听这个队列中的音讯做解决( 解决死信队列
1.4、配置死信队列
  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由 key
  3. 为死信交换机配置死信队列
1.5、死信交换机的申明周期
  1. 业务音讯被投入业务队列
  2. 消费者生产业务队列的音讯,因为处理过程中产生异样,于是进行了 nck 或者 reject 操作
  3. 被 nck 或 reject 的音讯由 RabbitMQ 投递到死信交换机中
  4. 死信交换机将音讯投入相应的死信队列
  5. 死信队列的消费者生产死信音讯

2、实例代码(boot-rabbit)

2.1、pom.xml

<dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies>

2.2、application.properties

spring.application.name=springboot-rabbitmq
server.port=8080
#默认地址就是 127.0.0.1:5672,如果是服务器的 rabbitmq 就改下
spring.rabbitmq.host=192.168.174.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.type=simple
#设置为 false,会抛弃音讯或者从新发步到死信队列
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#虚拟主机目录
spring.rabbitmq.virtual-host=/

2.3、RabbitMQConfig

@Configuration
public class RabbitMQconfig {
 /**
 * // 业务 Exchange
 */ public static final String BUSINESS_EXCHANGE_NAME="dead.letter.demo.simple.business.exchange";
 /**
 *    // 两个业务队列
 */
 public static final String BUSINESS_QUEUEA_NAME="dead.letter.demo.simple.business.queuea";
 public static final String BUSINESS_QUEUEB_NAME="dead.letter.demo.simple.business.queueb";
 /**
 * 死信 Exchange
 */ public static final String DEAD_LETTER_EXCHANGE="dead.letter.demo.simple.deadletter.exchange";
 /**
 * 路由 key
 */ public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY="dead.letter.demo.simple.deadletter.queuea";
 public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY="dead.letter.demo.simple.deadletter.queueb";
 /**
 * 两个死信队列
 */
 public static final String DEAD_LETTER_QUEUEA_NAME="dead.letter.demo.simple.deadletter.queuea";
 public static final String DEAD_LETTER_QUEUEB_NAME="dead.letter.demo.simple.deadletter.queueb";
 /**
 * 申明业务 Exchange
 */ @Bean("businessExchange")
 public FanoutExchange businessExchange(){
 // 播送模式
 return  new FanoutExchange((BUSINESS_EXCHANGE_NAME));
 }
 /**
 * 申明死信 Exchange
 */ @Bean("deadLetterExchange")
 public DirectExchange deadLetterExchange(){
 // 点对点模式模式
 return  new DirectExchange((DEAD_LETTER_EXCHANGE));
 }
 /**
 * 申明业务队列 A
 * @return
 */
 @Bean("businessQueueA")
 public Queue businessQueueA(){HashMap<String, Object> map = new HashMap<>(2);
 map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
 map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUTING_KEY);
 return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();}
 /**
 * 申明业务队列 B
 * @return
 */
 @Bean("businessQueueB")
 public Queue businessQueueB(){HashMap<String, Object> map = new HashMap<>(2);
 map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
 map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUTING_KEY);
 return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();}
 /**
 * 申明死信队列 A
 */ @Bean("deadLetterQueueA")
 public Queue deadLetterQueueA(){return  new Queue(DEAD_LETTER_QUEUEA_NAME);
 }
 /**
 * 申明死信队列 B
 */ @Bean("deadLetterQueueB")
 public Queue deadLetterQueueB(){return  new Queue(DEAD_LETTER_QUEUEB_NAME);
 }
 /**
 * 申明业务队列 A 绑定关系
 */
 @Bean
 public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
 @Qualifier("businessExchange")FanoutExchange exchange){return BindingBuilder.bind(queue).to(exchange);
 }
 /**
 * 申明业务队列 B 绑定关系
 */
 @Bean
 public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
 @Qualifier("businessExchange")FanoutExchange exchange){return BindingBuilder.bind(queue).to(exchange);
 }
 /**
 * 申明死信队列 A 绑定关系
 */
 @Bean
 public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,
 @Qualifier("deadLetterExchange")DirectExchange exchange)   {return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
 }
 /**
 * 申明死信队列 B 绑定关系
 */
 @Bean
 public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,
 @Qualifier("deadLetterExchange")DirectExchange exchange)   {return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
 }
}

2.4、controller(MQ 生产者)

@RestController
public class SendMessageController {
 @Autowired
 private RabbitMQconfig rabbitMQconfig;
 @Autowired
 private RabbitTemplate rabbitTemplate;
 public static final String BUSINESS_EXCHANGE_NAME="dead.letter.demo.simple.business.exchange";
 @GetMapping("/sendMsg")
 public String sendMsg(String msg){System.out.println("msg{}"+msg);
 rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);
 return "success";
 }
}

2.5、业务音讯消费者

@Component
public class BusinessMessageReceiver {
 /**
 *    // 两个业务队列
 */
 public static final String BUSINESS_QUEUEA_NAME="dead.letter.demo.simple.business.queuea";
 public static final String BUSINESS_QUEUEB_NAME="dead.letter.demo.simple.business.queueb";
 /**
 * 生产音讯
 */
 @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
 public void receiveA(Message message, Channel channel) throws IOException {String msg=new String(message.getBody());
 System.out.println("BusinessMessageA{}"+msg);
 boolean ack=true;
 Exception exception=null;
 try {if (msg.contains("deadletter")){throw new RuntimeException("dead letter exception");
 }
 } catch (RuntimeException e) {
 ack=false;
 exception=e;
 }
 if (!ack){System.out.println("error msg{}"+exception.getMessage());
 // 设置死信音讯
 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
 }else {channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }
 }
 /**
 * 音讯信息 B
 */ @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
 public void receiveB(Message message,Channel channel) throws IOException {String msg = new String(message.getBody());
 System.out.println("BusinessMessageB{}"+msg);
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }
}

2.6、死信音讯消费者

@Component
public class DeadMessageReceiver {
 /**
 * 两个死信队列
 */
 public static final String DEAD_LETTER_QUEUEA_NAME="dead.letter.demo.simple.deadletter.queuea";
 public static final String DEAD_LETTER_QUEUEB_NAME="dead.letter.demo.simple.deadletter.queueb";
 @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
 public void receiveA(Message message, Channel channel) throws IOException {System.out.println("DeadMessageA{}"+new String(message.getBody()));
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }
 @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
 public void receiveB(Message message, Channel channel) throws IOException {System.out.println("DeadMessageB{}"+new String(message.getBody()));
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }
}

正文完
 0