共计 6717 个字符,预计需要花费 17 分钟才能阅读完成。
1、死信队列
1.1、DeadLetter: 是 RabbitMQ 中的一种音讯机制
呈现死信音讯的可能状况如下
- 音讯被否定确认,channel.basicNack 或 channel.basicReject, requeue=false
- 音讯在队列的存活工夫超过设置的 TTL 工夫
- 音讯队列的音讯总数曾经超过最大的队列长度
1.2、Dead Letter Pattern 死信模式
当消费者不能解决接管到的音讯时候,将这个音讯从新公布到另一个队列中,这个过程中的 Exchange 称为死信交换机 (DLX)、Queue 就是死信队列
1.3、死信队列解决流程
- 死信音讯实际上就是来设置队列的属性( 配置死信队列 )
- 队列中若有死信音讯,RabbitMQ 会主动将这个音讯从新公布到设置的 Exchange 上,进而路由到另外一个队列。
- 监听这个队列中的音讯做解决( 解决死信队列 )
1.4、配置死信队列
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由 key
- 为死信交换机配置死信队列
1.5、死信交换机的申明周期
- 业务音讯被投入业务队列
- 消费者生产业务队列的音讯,因为处理过程中产生异样,于是进行了 nck 或者 reject 操作
- 被 nck 或 reject 的音讯由 RabbitMQ 投递到死信交换机中
- 死信交换机将音讯投入相应的死信队列
- 死信队列的消费者生产死信音讯
2、实例代码(boot-rabbit)
2.1、pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.2、application.properties
spring.application.name=springboot-rabbitmq
server.port=8080
#默认地址就是 127.0.0.1:5672,如果是服务器的 rabbitmq 就改下
spring.rabbitmq.host=192.168.174.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.type=simple
#设置为 false,会抛弃音讯或者从新发步到死信队列
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#虚拟主机目录
spring.rabbitmq.virtual-host=/
2.3、RabbitMQConfig
@Configuration
public class RabbitMQconfig {
/**
* // 业务 Exchange
*/ public static final String BUSINESS_EXCHANGE_NAME="dead.letter.demo.simple.business.exchange";
/**
* // 两个业务队列
*/
public static final String BUSINESS_QUEUEA_NAME="dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME="dead.letter.demo.simple.business.queueb";
/**
* 死信 Exchange
*/ public static final String DEAD_LETTER_EXCHANGE="dead.letter.demo.simple.deadletter.exchange";
/**
* 路由 key
*/ public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY="dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY="dead.letter.demo.simple.deadletter.queueb";
/**
* 两个死信队列
*/
public static final String DEAD_LETTER_QUEUEA_NAME="dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME="dead.letter.demo.simple.deadletter.queueb";
/**
* 申明业务 Exchange
*/ @Bean("businessExchange")
public FanoutExchange businessExchange(){
// 播送模式
return new FanoutExchange((BUSINESS_EXCHANGE_NAME));
}
/**
* 申明死信 Exchange
*/ @Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
// 点对点模式模式
return new DirectExchange((DEAD_LETTER_EXCHANGE));
}
/**
* 申明业务队列 A
* @return
*/
@Bean("businessQueueA")
public Queue businessQueueA(){HashMap<String, Object> map = new HashMap<>(2);
map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(map).build();}
/**
* 申明业务队列 B
* @return
*/
@Bean("businessQueueB")
public Queue businessQueueB(){HashMap<String, Object> map = new HashMap<>(2);
map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(map).build();}
/**
* 申明死信队列 A
*/ @Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
/**
* 申明死信队列 B
*/ @Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
/**
* 申明业务队列 A 绑定关系
*/
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange")FanoutExchange exchange){return BindingBuilder.bind(queue).to(exchange);
}
/**
* 申明业务队列 B 绑定关系
*/
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
@Qualifier("businessExchange")FanoutExchange exchange){return BindingBuilder.bind(queue).to(exchange);
}
/**
* 申明死信队列 A 绑定关系
*/
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,
@Qualifier("deadLetterExchange")DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
/**
* 申明死信队列 B 绑定关系
*/
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,
@Qualifier("deadLetterExchange")DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
2.4、controller(MQ 生产者)
@RestController
public class SendMessageController {
@Autowired
private RabbitMQconfig rabbitMQconfig;
@Autowired
private RabbitTemplate rabbitTemplate;
public static final String BUSINESS_EXCHANGE_NAME="dead.letter.demo.simple.business.exchange";
@GetMapping("/sendMsg")
public String sendMsg(String msg){System.out.println("msg{}"+msg);
rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME,"",msg);
return "success";
}
}
2.5、业务音讯消费者
@Component
public class BusinessMessageReceiver {
/**
* // 两个业务队列
*/
public static final String BUSINESS_QUEUEA_NAME="dead.letter.demo.simple.business.queuea";
public static final String BUSINESS_QUEUEB_NAME="dead.letter.demo.simple.business.queueb";
/**
* 生产音讯
*/
@RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {String msg=new String(message.getBody());
System.out.println("BusinessMessageA{}"+msg);
boolean ack=true;
Exception exception=null;
try {if (msg.contains("deadletter")){throw new RuntimeException("dead letter exception");
}
} catch (RuntimeException e) {
ack=false;
exception=e;
}
if (!ack){System.out.println("error msg{}"+exception.getMessage());
// 设置死信音讯
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}else {channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
/**
* 音讯信息 B
*/ @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
public void receiveB(Message message,Channel channel) throws IOException {String msg = new String(message.getBody());
System.out.println("BusinessMessageB{}"+msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
2.6、死信音讯消费者
@Component
public class DeadMessageReceiver {
/**
* 两个死信队列
*/
public static final String DEAD_LETTER_QUEUEA_NAME="dead.letter.demo.simple.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME="dead.letter.demo.simple.deadletter.queueb";
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {System.out.println("DeadMessageA{}"+new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {System.out.println("DeadMessageB{}"+new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
正文完