共计 8518 个字符,预计需要花费 22 分钟才能阅读完成。
1. 死信队列的概念
2. 死信产生的起因
3. 死信队列的架构图
4. 音讯过期的死信
5. 队列达到最大长度的死信
6. 音讯被回绝的死信
1. 死信队列的概念
死信 ,就是 无奈被生产的音讯 。生产者把音讯发送个 broker,然而因为某些起因,queue 中的音讯无奈被生产,这样的音讯被称之为死信,而这种音讯的后续解决,有 专门的队列解决这类音讯 ,称之为 死信队列。
2. 死信产生的起因
个别会有上面三种起因:
音讯 TTL 工夫过期。
队列达到最大长度。
音讯被回绝。
3. 死信队列的架构图
4. 音讯过期的死信
消费者 1:
public class Consumer1 {
private final static String EXCHANGE_NAME = "test_exchange";
private final static String NORMAL_QUEUE = "normal_queue";
private final static String DEAD_EXCHANGE_NAME = "dead_test";
private final static String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明一般交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, Object> params = new HashMap<>();
// 失常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 失常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);
channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");
System.out.println("消费者 1 期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("消费者 1 控制台接管并打印消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
}
}
消费者 2:
public class Consumer2 {
private final static String DEAD_EXCHANGE_NAME = "dead_test";
private final static String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE,true,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");
System.out.println("死信队列期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("死信队列控制台接管并打印消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("死信音讯生产被中断");
};
channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
}
}
生产者:
public class Producer {
private final static String EXCHANGE_NAME = "test_exchange";
public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 设置音讯的三十秒过期工夫
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();
Scanner sc = new Scanner(System.in);
System.out.println("请输出信息");
while (sc.hasNext()) {String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));
System.out.println("生产者收回音讯" + message);
}
}
}
}
生产者先发送三条音讯,发现失常队列都收到了音讯:
而后把消费者 1 关掉,模仿收不到音讯,生产者发现音讯过期了,就会把音讯发到死信队列外面:
5. 队列达到最大长度的死信
先把之前创立的队列和交换机都删掉:
消费者 1:
public class Consumer1 {
private final static String EXCHANGE_NAME = "test_exchange";
private final static String NORMAL_QUEUE = "normal_queue";
private final static String DEAD_EXCHANGE_NAME = "dead_test";
private final static String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, Object> params = new HashMap<>();
// 失常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 失常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead");
// 设置队列最大长度
params.put("x-max-length", 6);
channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);
channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");
System.out.println("消费者 1 期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("消费者 1 控制台接管并打印消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
}
}
消费者 2:
public class Consumer2 {
private final static String DEAD_EXCHANGE_NAME = "dead_test";
private final static String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE,true,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");
System.out.println("死信队列期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("死信队列控制台接管并打印消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("死信音讯生产被中断");
};
channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
}
}
生产者:
public class Producer {
private final static String EXCHANGE_NAME = "test_exchange";
public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 设置音讯的三十秒过期工夫
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();
System.out.println("请输出信息");
// 主动发信息,发送速度比拟快,可造成队列达到最大长度
for (int i = 0; i <= 1000; i++) {
String message = i + "";
channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));
System.out.println("生产者收回音讯" + message);
}
}
}
}
生产者发送音讯:
消费者 1 失常生产:
死信消费者 2 生产队列放不下的数据:
6. 音讯被回绝的死信
先把之前创立的队列和交换机都删掉:
消费者 1:
public class Consumer1 {
private final static String EXCHANGE_NAME = "test_exchange";
private final static String NORMAL_QUEUE = "normal_queue";
private final static String DEAD_EXCHANGE_NAME = "dead_test";
private final static String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, Object> params = new HashMap<>();
// 失常队列设置死信交换机 参数 key 是固定值
params.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 失常队列设置死信 routing-key 参数 key 是固定值
params.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(NORMAL_QUEUE,true,false,false,params);
channel.queueBind(NORMAL_QUEUE,EXCHANGE_NAME,"test");
System.out.println("消费者 1 期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
// 假如当初三位数的音讯,咱们都设置为生产失败,退回去 requeue 为 true,如果为 false,间接进入死信队列
if(message.length() >= 3){channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);
}else{System.out.println("消费者 1 控制台接管并打印消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("音讯生产被中断");
};
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
}
}
消费者 2:
public class Consumer2 {
private final static String DEAD_EXCHANGE_NAME = "dead_test";
private final static String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();
// 申明交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE,true,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE_NAME,"dead");
System.out.println("死信队列期待接管音讯.........");
// 音讯如何进行生产的业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());
System.out.println("死信队列控制台接管并打印消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
// 勾销生产的一个回调接口 如在生产的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {System.out.println("死信音讯生产被中断");
};
channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
}
}
生产者:
public class Producer {
private final static String EXCHANGE_NAME = "test_exchange";
public static void main(String[] args) throws Exception {try (Channel channel = RabbitMqUtils.getChannel()) {
/**
* 申明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 设置音讯的三十秒过期工夫
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("30000").build();
System.out.println("请输出信息");
for (int i = 0; i <= 1000; i++) {
String message = i + "";
channel.basicPublish(EXCHANGE_NAME, "test", properties, message.getBytes("UTF-8"));
System.out.println("生产者收回音讯" + message);
}
}
}
}
生产者发送音讯:
消费者 1 失常生产音讯,并把长度不符合要求的音讯退回到队列:
等到音讯工夫到了当前,死信队列收到音讯,生产那些被回绝生产的音讯:
正文完