关于rabbitmq:系统学习消息队列RabbitMQ的死信队列

44次阅读

共计 8518 个字符,预计需要花费 22 分钟才能阅读完成。

1. 死信队列的概念

2. 死信产生的起因

3. 死信队列的架构图

4. 音讯过期的死信

5. 队列达到最大长度的死信

6. 音讯被回绝的死信

1. 死信队列的概念
死信 ,就是 无奈被生产的音讯 。生产者把音讯发送个 broker,然而因为某些起因,queue 中的音讯无奈被生产,这样的音讯被称之为死信,而这种音讯的后续解决,有 专门的队列解决这类音讯 ,称之为 死信队列

2. 死信产生的起因

个别会有上面三种起因:

音讯 TTL 工夫过期。

队列达到最大长度。

音讯被回绝。

3. 死信队列的架构图

4. 音讯过期的死信

消费者 1:

public class Consumer1 {
    private final static String EXCHANGE_NAME = "test_exchange";
    private final static String NORMAL_QUEUE = "normal_queue";

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明一般交换机
        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        Map<String, Object> params = new HashMap<>();
        // 失常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 失常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "dead");

        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);

        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");

        System.out.println("消费者 1 期待接管音讯.........");
        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("消费者 1 控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
        };

        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
    }
}

消费者 2:

public class Consumer2 {

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);

        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");

        System.out.println("死信队列期待接管音讯.........");

        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("死信队列控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("死信音讯生产被中断");
        };

        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);

    }
}

生产者:

public class Producer {
    private final static String EXCHANGE_NAME = "test_exchange";



    public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个 exchange
             * 1.exchange 的名称
             * 2.exchange 的类型
             */
            channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);
            // 设置音讯的三十秒过期工夫
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();
            Scanner sc = new Scanner(System.in);
            System.out.println("请输出信息");
            while (sc.hasNext()) {String message = sc.nextLine();
                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));
                System.out.println("生产者收回音讯" + message);
            }
        }
    }
}

生产者先发送三条音讯,发现失常队列都收到了音讯:

而后把消费者 1 关掉,模仿收不到音讯,生产者发现音讯过期了,就会把音讯发到死信队列外面:

5. 队列达到最大长度的死信

先把之前创立的队列和交换机都删掉:

消费者 1:

public class Consumer1 {
    private final static String EXCHANGE_NAME = "test_exchange";
    private final static String NORMAL_QUEUE = "normal_queue";

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        Map<String, Object> params = new HashMap<>();
        // 失常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 失常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "dead");
        // 设置队列最大长度
        params.put("x-max-length", 6);

        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);

        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");

        System.out.println("消费者 1 期待接管音讯.........");
        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("消费者 1 控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
        };

        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
    }
}

消费者 2:

public class Consumer2 {

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);

        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");

        System.out.println("死信队列期待接管音讯.........");

        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("死信队列控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("死信音讯生产被中断");
        };

        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);

    }
}

生产者:

public class Producer {
    private final static String EXCHANGE_NAME = "test_exchange";


    public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个 exchange
             * 1.exchange 的名称
             * 2.exchange 的类型
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 设置音讯的三十秒过期工夫
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();
            System.out.println("请输出信息");
            // 主动发信息,发送速度比拟快,可造成队列达到最大长度
            for (int i = 0; i <= 1000; i++) {
                String message = i + "";
                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));
                System.out.println("生产者收回音讯" + message);
            }
        }
    }
}

生产者发送音讯:

消费者 1 失常生产:

死信消费者 2 生产队列放不下的数据:

6. 音讯被回绝的死信

先把之前创立的队列和交换机都删掉:

消费者 1:

public class Consumer1 {
    private final static String EXCHANGE_NAME = "test_exchange";
    private final static String NORMAL_QUEUE = "normal_queue";

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        Map<String, Object> params = new HashMap<>();
        // 失常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        // 失常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "dead");

        channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);

        channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");

        System.out.println("消费者 1 期待接管音讯.........");
        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            // 假如当初三位数的音讯,咱们都设置为生产失败,退回去 requeue 为 true,如果为 false,间接进入死信队列
            if(message.length() >= 3){channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);
            }else{System.out.println("消费者 1 控制台接管并打印消息:"+message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
        };

        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
    }
}

消费者 2:

public class Consumer2 {

    private final static String DEAD_EXCHANGE_NAME = "dead_test";
    private final static String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
        // 申明交换机
        channel.exchangeDeclare(DEAD_EXCHANGE_NAME,  BuiltinExchangeType.DIRECT);

        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);

        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");

        System.out.println("死信队列期待接管音讯.........");

        // 音讯如何进行生产的业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
            System.out.println("死信队列控制台接管并打印消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {System.out.println("死信音讯生产被中断");
        };

        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);

    }
}

生产者:

public class Producer {
    private final static String EXCHANGE_NAME = "test_exchange";


    public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
            /**
             * 申明一个 exchange
             * 1.exchange 的名称
             * 2.exchange 的类型
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 设置音讯的三十秒过期工夫
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();
            System.out.println("请输出信息");
            for (int i = 0; i <= 1000; i++) {
                String message = i + "";
                channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));
                System.out.println("生产者收回音讯" + message);
            }
        }
    }
}

生产者发送音讯:

消费者 1 失常生产音讯,并把长度不符合要求的音讯退回到队列:

等到音讯工夫到了当前,死信队列收到音讯,生产那些被回绝生产的音讯:

正文完
 0