乐趣区

关于rabbitmq:系统学习消息队列RabbitMQ的死信队列

1. 死信队列的概念

2. 死信产生的起因

3. 死信队列的架构图

4. 音讯过期的死信

5. 队列达到最大长度的死信

6. 音讯被回绝的死信

1. 死信队列的概念
死信 ,就是 无奈被生产的音讯 。生产者把音讯发送个 broker,然而因为某些起因,queue 中的音讯无奈被生产,这样的音讯被称之为死信,而这种音讯的后续解决,有 专门的队列解决这类音讯 ,称之为 死信队列

2. 死信产生的起因

个别会有上面三种起因:

音讯 TTL 工夫过期。

队列达到最大长度。

音讯被回绝。

3. 死信队列的架构图

4. 音讯过期的死信

消费者 1:

public class Consumer1 {
    private final static String EXCHANGE_NAME = "test_exchange";
    private final static String NORMAL_QUEUE = "normal_queue";

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明一般交换机
        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        Map<String, Object> params = new HashMap<>();
        // 失常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 失常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "dead");

        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);

        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");

        System.out.println("消费者 1 期待接管音讯.........");
        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("消费者 1 控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
        };

        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
    }
}

消费者 2:

public class Consumer2 {

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);

        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");

        System.out.println("死信队列期待接管音讯.........");

        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("死信队列控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("死信音讯生产被中断");
        };

        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);

    }
}

生产者:

public class Producer {
    private final static String EXCHANGE_NAME = "test_exchange";



    public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个 exchange
             * 1.exchange 的名称
             * 2.exchange 的类型
             */
            channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);
            // 设置音讯的三十秒过期工夫
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();
            Scanner sc = new Scanner(System.in);
            System.out.println("请输出信息");
            while (sc.hasNext()) {String message = sc.nextLine();
                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));
                System.out.println("生产者收回音讯" + message);
            }
        }
    }
}

生产者先发送三条音讯,发现失常队列都收到了音讯:

而后把消费者 1 关掉,模仿收不到音讯,生产者发现音讯过期了,就会把音讯发到死信队列外面:

5. 队列达到最大长度的死信

先把之前创立的队列和交换机都删掉:

消费者 1:

public class Consumer1 {
    private final static String EXCHANGE_NAME = "test_exchange";
    private final static String NORMAL_QUEUE = "normal_queue";

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        Map<String, Object> params = new HashMap<>();
        // 失常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 失常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "dead");
        // 设置队列最大长度
        params.put("x-max-length", 6);

        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);

        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");

        System.out.println("消费者 1 期待接管音讯.........");
        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("消费者 1 控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
        };

        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
    }
}

消费者 2:

public class Consumer2 {

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);

        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");

        System.out.println("死信队列期待接管音讯.........");

        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("死信队列控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("死信音讯生产被中断");
        };

        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);

    }
}

生产者:

public class Producer {
    private final static String EXCHANGE_NAME = "test_exchange";


    public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个 exchange
             * 1.exchange 的名称
             * 2.exchange 的类型
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 设置音讯的三十秒过期工夫
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();
            System.out.println("请输出信息");
            // 主动发信息,发送速度比拟快,可造成队列达到最大长度
            for (int i = 0; i <= 1000; i++) {
                String message = i + "";
                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));
                System.out.println("生产者收回音讯" + message);
            }
        }
    }
}

生产者发送音讯:

消费者 1 失常生产:

死信消费者 2 生产队列放不下的数据:

6. 音讯被回绝的死信

先把之前创立的队列和交换机都删掉:

消费者 1:

public class Consumer1 {
    private final static String EXCHANGE_NAME = "test_exchange";
    private final static String NORMAL_QUEUE = "normal_queue";

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        Map<String, Object> params = new HashMap<>();
        // 失常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 失常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "dead");

        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);

        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");

        System.out.println("消费者 1 期待接管音讯.........");
        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            // 假如当初三位数的音讯,咱们都设置为生产失败,退回去 requeue 为 true,如果为 false,间接进入死信队列
            if(message.length() >= 3){channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);
            }else{System.out.println("消费者 1 控制台接管并打印消息:"+message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
        };

        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
    }
}

消费者 2:

public class Consumer2 {

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);

        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");

        System.out.println("死信队列期待接管音讯.........");

        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("死信队列控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("死信音讯生产被中断");
        };

        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);

    }
}

生产者:

public class Producer {
    private final static String EXCHANGE_NAME = "test_exchange";


    public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个 exchange
             * 1.exchange 的名称
             * 2.exchange 的类型
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 设置音讯的三十秒过期工夫
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();
            System.out.println("请输出信息");
            for (int i = 0; i <= 1000; i++) {
                String message = i + "";
                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));
                System.out.println("生产者收回音讯" + message);
            }
        }
    }
}

生产者发送音讯:

消费者 1 失常生产音讯,并把长度不符合要求的音讯退回到队列:

等到音讯工夫到了当前,死信队列收到音讯,生产那些被回绝生产的音讯:

退出移动版