1.死信队列的概念
2.死信产生的起因
3.死信队列的架构图
4.音讯过期的死信
5.队列达到最大长度的死信
6.音讯被回绝的死信
1.死信队列的概念
死信,就是无奈被生产的音讯。生产者把音讯发送个broker,然而因为某些起因,queue中的音讯无奈被生产,这样的音讯被称之为死信,而这种音讯的后续解决,有专门的队列解决这类音讯,称之为死信队列。
2.死信产生的起因
个别会有上面三种起因:
音讯TTL工夫过期。
队列达到最大长度。
音讯被回绝。
3.死信队列的架构图
4.音讯过期的死信
消费者1:
public class Consumer1 { private final static String EXCHANGE_NAME = "test_exchange"; private final static String NORMAL_QUEUE = "normal_queue"; private final static String DEAD_EXCHANGE_NAME = "dead_test"; private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //申明一般交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, Object> params = new HashMap<>(); //失常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); //失常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "dead"); channel.queueDeclare(NORMAL_QUEUE,true,false,false,params); channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test"); System.out.println("消费者1期待接管音讯........."); //音讯如何进行生产的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("消费者1控制台接管并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; //勾销生产的一个回调接口 如在生产的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("音讯生产被中断"); }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); }}
消费者2:
public class Consumer2 { private final static String DEAD_EXCHANGE_NAME = "dead_test"; private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //申明交换机 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE,true,false,false,null); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead"); System.out.println("死信队列期待接管音讯........."); //音讯如何进行生产的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("死信队列控制台接管并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; //勾销生产的一个回调接口 如在生产的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("死信音讯生产被中断"); }; channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback); }}
生产者:
public class Producer { private final static String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个 exchange * 1.exchange 的名称 * 2.exchange 的类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //设置音讯的三十秒过期工夫 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build(); Scanner sc = new Scanner(System.in); System.out.println("请输出信息"); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8")); System.out.println("生产者收回音讯" + message); } } }}
生产者先发送三条音讯,发现失常队列都收到了音讯:
而后把消费者1关掉,模仿收不到音讯,生产者发现音讯过期了,就会把音讯发到死信队列外面:
5.队列达到最大长度的死信
先把之前创立的队列和交换机都删掉:
消费者1:
public class Consumer1 { private final static String EXCHANGE_NAME = "test_exchange"; private final static String NORMAL_QUEUE = "normal_queue"; private final static String DEAD_EXCHANGE_NAME = "dead_test"; private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, Object> params = new HashMap<>(); //失常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); //失常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "dead"); //设置队列最大长度 params.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE,true,false,false,params); channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test"); System.out.println("消费者1期待接管音讯........."); //音讯如何进行生产的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("消费者1控制台接管并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; //勾销生产的一个回调接口 如在生产的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("音讯生产被中断"); }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); }}
消费者2:
public class Consumer2 { private final static String DEAD_EXCHANGE_NAME = "dead_test"; private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //申明交换机 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE,true,false,false,null); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead"); System.out.println("死信队列期待接管音讯........."); //音讯如何进行生产的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("死信队列控制台接管并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; //勾销生产的一个回调接口 如在生产的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("死信音讯生产被中断"); }; channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback); }}
生产者:
public class Producer { private final static String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个 exchange * 1.exchange 的名称 * 2.exchange 的类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //设置音讯的三十秒过期工夫 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build(); System.out.println("请输出信息"); //主动发信息,发送速度比拟快,可造成队列达到最大长度 for (int i = 0; i <= 1000; i++) { String message = i + ""; channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8")); System.out.println("生产者收回音讯" + message); } } }}
生产者发送音讯:
消费者1失常生产:
死信消费者2生产队列放不下的数据:
6.音讯被回绝的死信
先把之前创立的队列和交换机都删掉:
消费者1:
public class Consumer1 { private final static String EXCHANGE_NAME = "test_exchange"; private final static String NORMAL_QUEUE = "normal_queue"; private final static String DEAD_EXCHANGE_NAME = "dead_test"; private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //申明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, Object> params = new HashMap<>(); //失常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); //失常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "dead"); channel.queueDeclare(NORMAL_QUEUE,true,false,false,params); channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test"); System.out.println("消费者1期待接管音讯........."); //音讯如何进行生产的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); //假如当初三位数的音讯,咱们都设置为生产失败,退回去 requeue为true ,如果为false,间接进入死信队列 if(message.length() >= 3){ channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true); }else{ System.out.println("消费者1控制台接管并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }; //勾销生产的一个回调接口 如在生产的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("音讯生产被中断"); }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback); }}
消费者2:
public class Consumer2 { private final static String DEAD_EXCHANGE_NAME = "dead_test"; private final static String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); //申明交换机 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE,true,false,false,null); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead"); System.out.println("死信队列期待接管音讯........."); //音讯如何进行生产的业务逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println("死信队列控制台接管并打印消息:"+message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; //勾销生产的一个回调接口 如在生产的时候队列被删除掉了 CancelCallback cancelCallback = (consumerTag) -> { System.out.println("死信音讯生产被中断"); }; channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback); }}
生产者:
public class Producer { private final static String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { /** * 申明一个 exchange * 1.exchange 的名称 * 2.exchange 的类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //设置音讯的三十秒过期工夫 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build(); System.out.println("请输出信息"); for (int i = 0; i <= 1000; i++) { String message = i + ""; channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8")); System.out.println("生产者收回音讯" + message); } } }}
生产者发送音讯:
消费者1失常生产音讯,并把长度不符合要求的音讯退回到队列:
等到音讯工夫到了当前,死信队列收到音讯,生产那些被回绝生产的音讯: