1.死信队列的概念

2.死信产生的起因

3.死信队列的架构图

4.音讯过期的死信

5.队列达到最大长度的死信

6.音讯被回绝的死信

1.死信队列的概念
死信,就是无奈被生产的音讯。生产者把音讯发送个broker,然而因为某些起因,queue中的音讯无奈被生产,这样的音讯被称之为死信,而这种音讯的后续解决,有专门的队列解决这类音讯,称之为死信队列

2.死信产生的起因

个别会有上面三种起因:

音讯TTL工夫过期。

队列达到最大长度。

音讯被回绝。

3.死信队列的架构图

4.音讯过期的死信

消费者1:

public class Consumer1 {    private final static String EXCHANGE_NAME = "test_exchange";    private final static String NORMAL_QUEUE = "normal_queue";    private final static String DEAD_EXCHANGE_NAME = "dead_test";    private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        //申明一般交换机        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        Map<String, Object> params = new HashMap<>();        //失常队列设置死信交换机 参数 key 是固定值        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);        //失常队列设置死信 routing-key 参数 key 是固定值        params.put("x-dead-letter-routing-key", "dead");        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");        System.out.println("消费者1期待接管音讯.........");        //音讯如何进行生产的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());            System.out.println("消费者1控制台接管并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("音讯生产被中断");        };        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);    }}

消费者2:

public class Consumer2 {    private final static String DEAD_EXCHANGE_NAME = "dead_test";    private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        //申明交换机        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");        System.out.println("死信队列期待接管音讯.........");        //音讯如何进行生产的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());            System.out.println("死信队列控制台接管并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("死信音讯生产被中断");        };        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);    }}

生产者:

public class Producer {    private final static String EXCHANGE_NAME = "test_exchange";    public static void main(String[] args) throws Exception {        try (Channel channel = RabbitMqUtils.getChannel()) {            /**             * 申明一个 exchange             * 1.exchange 的名称             * 2.exchange 的类型             */            channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);            //设置音讯的三十秒过期工夫            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();            Scanner sc = new Scanner(System.in);            System.out.println("请输出信息");            while (sc.hasNext()) {                String message = sc.nextLine();                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));                System.out.println("生产者收回音讯" + message);            }        }    }}

生产者先发送三条音讯,发现失常队列都收到了音讯:

而后把消费者1关掉,模仿收不到音讯,生产者发现音讯过期了,就会把音讯发到死信队列外面:

5.队列达到最大长度的死信

先把之前创立的队列和交换机都删掉:

消费者1:

public class Consumer1 {    private final static String EXCHANGE_NAME = "test_exchange";    private final static String NORMAL_QUEUE = "normal_queue";    private final static String DEAD_EXCHANGE_NAME = "dead_test";    private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        //申明交换机        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        Map<String, Object> params = new HashMap<>();        //失常队列设置死信交换机 参数 key 是固定值        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);        //失常队列设置死信 routing-key 参数 key 是固定值        params.put("x-dead-letter-routing-key", "dead");        //设置队列最大长度        params.put("x-max-length", 6);        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");        System.out.println("消费者1期待接管音讯.........");        //音讯如何进行生产的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());            System.out.println("消费者1控制台接管并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("音讯生产被中断");        };        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);    }}

消费者2:

public class Consumer2 {    private final static String DEAD_EXCHANGE_NAME = "dead_test";    private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        //申明交换机        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");        System.out.println("死信队列期待接管音讯.........");        //音讯如何进行生产的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());            System.out.println("死信队列控制台接管并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("死信音讯生产被中断");        };        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);    }}

生产者:

public class Producer {    private final static String EXCHANGE_NAME = "test_exchange";    public static void main(String[] args) throws Exception {        try (Channel channel = RabbitMqUtils.getChannel()) {            /**             * 申明一个 exchange             * 1.exchange 的名称             * 2.exchange 的类型             */            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);            //设置音讯的三十秒过期工夫            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();            System.out.println("请输出信息");            //主动发信息,发送速度比拟快,可造成队列达到最大长度            for (int i = 0; i <= 1000; i++) {                String message = i + "";                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));                System.out.println("生产者收回音讯" + message);            }        }    }}

生产者发送音讯:

消费者1失常生产:

死信消费者2生产队列放不下的数据:

6.音讯被回绝的死信

先把之前创立的队列和交换机都删掉:

消费者1:

public class Consumer1 {    private final static String EXCHANGE_NAME = "test_exchange";    private final static String NORMAL_QUEUE = "normal_queue";    private final static String DEAD_EXCHANGE_NAME = "dead_test";    private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        //申明交换机        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        Map<String, Object> params = new HashMap<>();        //失常队列设置死信交换机 参数 key 是固定值        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);        //失常队列设置死信 routing-key 参数 key 是固定值        params.put("x-dead-letter-routing-key", "dead");        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");        System.out.println("消费者1期待接管音讯.........");        //音讯如何进行生产的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());            //假如当初三位数的音讯,咱们都设置为生产失败,退回去 requeue为true ,如果为false,间接进入死信队列            if(message.length() >= 3){                channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);            }else{                System.out.println("消费者1控制台接管并打印消息:"+message);                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);            }        };        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("音讯生产被中断");        };        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);    }}

消费者2:

public class Consumer2 {    private final static String DEAD_EXCHANGE_NAME = "dead_test";    private final static String DEAD_QUEUE = "dead_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMqUtils.getChannel();        //申明交换机        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");        System.out.println("死信队列期待接管音讯.........");        //音讯如何进行生产的业务逻辑        DeliverCallback deliverCallback = (consumerTag, delivery) -> {            String message = new String(delivery.getBody());            System.out.println("死信队列控制台接管并打印消息:"+message);            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);        };        //勾销生产的一个回调接口 如在生产的时候队列被删除掉了        CancelCallback cancelCallback = (consumerTag) -> {            System.out.println("死信音讯生产被中断");        };        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);    }}

生产者:

public class Producer {    private final static String EXCHANGE_NAME = "test_exchange";    public static void main(String[] args) throws Exception {        try (Channel channel = RabbitMqUtils.getChannel()) {            /**             * 申明一个 exchange             * 1.exchange 的名称             * 2.exchange 的类型             */            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);            //设置音讯的三十秒过期工夫            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();            System.out.println("请输出信息");            for (int i = 0; i <= 1000; i++) {                String message = i + "";                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));                System.out.println("生产者收回音讯" + message);            }        }    }}

生产者发送音讯:

消费者1失常生产音讯,并把长度不符合要求的音讯退回到队列:

等到音讯工夫到了当前,死信队列收到音讯,生产那些被回绝生产的音讯: