介绍

死信队列:没有被及时消费的消息存放的队列,消息没有被及时消费有以下几点原因:
1.有消息被拒绝(basic.reject/ basic.nack)并且requeue=false
2.队列达到最大长度
3.消息TTL过期

场景

1.小时进入初始队列,等待30分钟后进入5分钟队列
2.消息等待5分钟后进入执行队列
3.执行失败后重新回到5分钟队列
4.失败5次后,消息进入2小时队列
5.消息等待2小时进入执行队列
6.失败5次后,将消息丢弃或做其他处理

使用

  • 安装MQ

使用docker方式安装,选择带mangement的版本

docker pull rabbitmq:managementdocker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

访问 localhost: 15672,默认账号密码guest/guest

  • 项目配置

(1)创建springboot项目
(2)在application.properties配置文件中配置mq连接信息

spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest

(3)队列配置

package com.df.ps.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.beans.factory.annotation.Autowire;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MqConfig {    //time    @Value("${spring.df.buffered.min:120}")    private int springdfBufferedTime;    @Value("${spring.df.high-buffered.min:5}")    private int springdfHighBufferedTime;    @Value("${spring.df.low-buffered.min:120}")    private int springdfLowBufferedTime;    // 30min Buffered Queue    @Value("${spring.df.queue:spring-df-buffered-queue}")    private String springdfBufferedQueue;    @Value("${spring.df.topic:spring-df-buffered-topic}")    private String springdfBufferedTopic;    @Value("${spring.df.route:spring-df-buffered-route}")    private String springdfBufferedRouteKey;    // 5M Buffered Queue    @Value("${spring.df.high-buffered.queue:spring-df-high-buffered-queue}")    private String springdfHighBufferedQueue;    @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")    private String springdfHighBufferedTopic;    @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")    private String springdfHighBufferedRouteKey;    // High Queue    @Value("${spring.df.high.queue:spring-df-high-queue}")    private String springdfHighQueue;    @Value("${spring.df.high.topic:spring-df-high-topic}")    private String springdfHighTopic;    @Value("${spring.df.high.route:spring-df-high-route}")    private String springdfHighRouteKey;    // 2H Low Buffered Queue    @Value("${spring.df.low-buffered.queue:spring-df-low-buffered-queue}")    private String springdfLowBufferedQueue;    @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")    private String springdfLowBufferedTopic;    @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")    private String springdfLowBufferedRouteKey;    // Low Queue    @Value("${spring.df.low.queue:spring-df-low-queue}")    private String springdfLowQueue;    @Value("${spring.df.low.topic:spring-df-low-topic}")    private String springdfLowTopic;    @Value("${spring.df.low.route:spring-df-low-route}")    private String springdfLowRouteKey;    @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedQueue")    Queue springdfBufferedQueue() {        int bufferedTime = 1000 * 60 * springdfBufferedTime;        return createBufferedQueue(springdfBufferedQueue, springdfHighBufferedTopic, springdfHighBufferedRouteKey, bufferedTime);    }    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedQueue")    Queue springdfHighBufferedQueue() {        int highBufferedTime = 1000 * 60 * springdfHighBufferedTime;        return createBufferedQueue(springdfHighBufferedQueue, springdfHighTopic, springdfHighRouteKey, highBufferedTime);    }    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighQueue")    Queue springdfHighQueue() {        return new Queue(springdfHighQueue, true);    }    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedQueue")    Queue springdfLowBufferedQueue() {        int lowBufferedTime = 1000 * 60 * springdfLowBufferedTime;        return createBufferedQueue(springdfLowBufferedQueue, springdfLowTopic, springdfLowRouteKey, lowBufferedTime);    }    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowQueue")    Queue springdfLowQueue() {        return new Queue(springdfLowQueue, true);    }    @Bean(autowire = Autowire.BY_NAME, value = "springdfBufferedTopic")    TopicExchange springdfBufferedTopic() {        return new TopicExchange(springdfBufferedTopic);    }    @Bean    Binding springBuffereddf(Queue springdfBufferedQueue, TopicExchange springdfBufferedTopic) {        return BindingBuilder.bind(springdfBufferedQueue).to(springdfBufferedTopic).with(springdfBufferedRouteKey);    }    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighBufferedTopic")    TopicExchange springdfHighBufferedTopic() {        return new TopicExchange(springdfHighBufferedTopic);    }    @Bean    Binding springHighBuffereddf(Queue springdfHighBufferedQueue, TopicExchange springdfHighBufferedTopic) {        return BindingBuilder.bind(springdfHighBufferedQueue).to(springdfHighBufferedTopic).with(springdfHighBufferedRouteKey);    }    @Bean(autowire = Autowire.BY_NAME, value = "springdfHighTopic")    TopicExchange springdfHighTopic() {        return new TopicExchange(springdfHighTopic);    }    @Bean    Binding springHighdf(Queue springdfHighQueue, TopicExchange springdfHighTopic) {        return BindingBuilder.bind(springdfHighQueue).to(springdfHighTopic).with(springdfHighRouteKey);    }    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowBufferedTopic")    TopicExchange springdfLowBufferedTopic() {        return new TopicExchange(springdfLowBufferedTopic);    }    @Bean    Binding springLowBuffereddf(Queue springdfLowBufferedQueue, TopicExchange springdfLowBufferedTopic) {        return BindingBuilder.bind(springdfLowBufferedQueue).to(springdfLowBufferedTopic).with(springdfLowBufferedRouteKey);    }    @Bean(autowire = Autowire.BY_NAME, value = "springdfLowTopic")    TopicExchange springdfLowTopic() {        return new TopicExchange(springdfLowTopic);    }    @Bean    Binding springLowdf(Queue springdfLowQueue, TopicExchange springdfLowTopic) {        return BindingBuilder.bind(springdfLowQueue).to(springdfLowTopic).with(springdfLowRouteKey);    }    @Bean    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,                                             MessageListenerAdapter listenerAdapter) {        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();        container.setConnectionFactory(connectionFactory);        container.setQueueNames(springdfHighQueue, springdfLowQueue);        container.setMessageListener(listenerAdapter);        return container;    }    @Bean    MessageListenerAdapter listenerAdapter(IntegrationReceiver receiver) {        MessageListenerAdapter adapter = new MessageListenerAdapter(receiver);        adapter.setDefaultListenerMethod("receive");        Map<String, String> queueOrTagToMethodName = new HashMap<>();        queueOrTagToMethodName.put(springdfHighQueue, "springdfHighReceive");        queueOrTagToMethodName.put(springdfLowQueue, "springdfLowReceive");        adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);        return adapter;    }    private Queue createBufferedQueue(String queueName, String topic, String routeKey, int bufferedTime) {        Map<String, Object> args = new HashMap<>();        args.put("x-dead-letter-exchange", topic);        args.put("x-dead-letter-routing-key", routeKey);        args.put("x-message-ttl", bufferedTime);        // 是否持久化        boolean durable = true;        // 仅创建者可以使用的私有队列,断开后自动删除        boolean exclusive = false;        // 当所有消费客户端连接断开后,是否自动删除队列        boolean autoDelete = false;        return new Queue(queueName, durable, exclusive, autoDelete, args);    }}
  • 消费者配置
package com.df.ps.mq;import com.fasterxml.jackson.databind.ObjectMapper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import java.util.Map;public class MqReceiver {    private static Logger logger = LoggerFactory.getLogger(MqReceiver.class);    @Value("${high-retry:5}")    private int highRetry;    @Value("${low-retry:5}")    private int lowRetry;    @Value("${spring.df.high-buffered.topic:spring-df-high-buffered-topic}")    private String springdfHighBufferedTopic;    @Value("${spring.df.high-buffered.route:spring-df-high-buffered-route}")    private String springdfHighBufferedRouteKey;    @Value("${spring.df.low-buffered.topic:spring-df-low-buffered-topic}")    private String springdfLowBufferedTopic;    @Value("${spring.df.low-buffered.route:spring-df-low-buffered-route}")    private String springdfLowBufferedRouteKey;    private final RabbitTemplate rabbitTemplate;    @Autowired    public MqReceiver(RabbitTemplate rabbitTemplate) {        this.rabbitTemplate = rabbitTemplate;    }    public void receive(Object message) {        if (logger.isInfoEnabled()) {            logger.info("default receiver: " + message);        }    }    /**     * 消息从初始队列进入5分钟的高速缓冲队列     * @param message     */    public void highReceiver(Object message){        ObjectMapper mapper = new ObjectMapper();        Map msg = mapper.convertValue(message, Map.class);        try{            logger.info("这里做消息处理...");        }catch (Exception e){            int times = msg.get("times") == null ? 0 : (int) msg.get("times");            if (times < highRetry) {                msg.put("times", times + 1);                rabbitTemplate.convertAndSend(springdfHighBufferedTopic,springdfHighBufferedRouteKey,message);            } else {                msg.put("times", 0);                rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);            }        }    }    /**     * 消息从5分钟缓冲队列进入2小时缓冲队列     * @param message     */    public void lowReceiver(Object message){        ObjectMapper mapper = new ObjectMapper();        Map msg = mapper.convertValue(message, Map.class);                try {            logger.info("这里做消息处理...");        }catch (Exception e){            int times = msg.get("times") == null ? 0 : (int) msg.get("times");            if (times < lowRetry) {                rabbitTemplate.convertAndSend(springdfLowBufferedTopic,springdfLowBufferedRouteKey,message);            }else{                logger.info("消息无法被消费...");            }        }     }}