共计 4414 个字符,预计需要花费 12 分钟才能阅读完成。
一 什么是死信队列
当一条音讯在队列中呈现以下三种状况的时候,该音讯就会变成一条死信。
- 音讯被回绝(basic.reject / basic.nack),并且 requeue = false
- 音讯 TTL 过期
- 队列达到最大长度
当音讯在一个队列中变成一个死信之后,如果配置了死信队列,它将被从新 publish 到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。
二 实现死信队列
2.1 原理图
2.2 创立消费者
创立一个消费者,绑定生产队列及死信交换机,交换机默认为 direct
模型,死信交换机也是,arguments 绑定死信交换机和 key。(注解反对的具体参数文末会附上)
public class DirectConsumer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatrip",arguments =
{@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
@Argument(name="x-dead-letter-routing-key",value = "deadKey")
}),
exchange = @Exchange(value="javatripDirect"),
key = {"info","error","warning"}
)
})
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{System.out.println("消费者 1"+message);
}
2.3 创立生产者
public void publishMessage(String message){rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatripDirect","info",message);
}
三 造成死信的三种状况
3.1 回绝音讯,并且禁止从新入队
- 设置 yml 为手动签收模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
- 设置回绝音讯并禁止从新入队
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicNack(deliverTag,false,false);
- 绑定死信队列
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatripDead"),
exchange = @Exchange(value = "deadExchange"),
key = "deadKey"
)
})
public void receive2(String message){System.out.println("我是一条死信:"+message);
}
3.2 音讯 TTL 过期
绑定业务队列的时候,减少音讯的过期时长,当音讯过期后,音讯将被转发到死信队列中。
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatrip",arguments =
{@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
@Argument(name="x-dead-letter-routing-key",value = "deadKey"),
@Argument(name = "x-message-ttl",value = "3000")
}),
exchange = @Exchange(value="javatripDirect"),
key = {"info","error","warning"}
)
})
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{System.out.println("消费者 1"+message);
}
3.3 队列达到最大长度
设置音讯队列长度,当队列中的音讯达到最大长度后,持续发送音讯,音讯将被转发到死信队列中。
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "javatrip",arguments =
{@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
@Argument(name="x-dead-letter-routing-key",value = "deadKey"),
@Argument(name = "x-max-length",value = "3")
}),
exchange = @Exchange(value="javatripDirect"),
key = {"info","error","warning"}
)
})
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{System.out.println("消费者 1"+message);
}
四 Spring Boot 整合 RabbitMQ 用到的几个注解
- @QueueBinding 作用就是将队列和交换机进行绑定,次要有以下三个参数:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueBinding {
/**
* @return the queue.
*/
Queue value();
/**
* @return the exchange.
*/
Exchange exchange();
/**
* @return the routing key or pattern for the binding.
* Multiple elements will result in multiple bindings.
*/
String[] key() default {};}
- @Queue 是申明队列及队列的一些属性,主要参数如下:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {
/**
* @return the queue name or "" for a generated queue name (default).
*/
@AliasFor("name")
String value() default "";
/**
* @return the queue name or "" for a generated queue name (default).
* @since 2.0
*/
@AliasFor("value")
String name() default "";
/**
* 是否长久化
*/
String durable() default "";
/**
* 是否独享、排外的.
*/
String exclusive() default "";
/**
* 是否主动删除;*/
String autoDelete() default "";
/**
* 队列的其余属性参数
*(1)x-message-ttl:音讯的过期工夫,单位:毫秒;*(2)x-expires:队列过期工夫,队列在多长时间未被拜访将被删除,单位:毫秒;*(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除音讯;*(4)x-max-length-bytes:队列音讯内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除音讯;*(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时音讯会产生什么。有效值是 drop-head、reject-publish 或 reject-publish-dlx。仲裁队列类型仅反对 drop-head;*(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的音讯可指定发送到该交换器中;*(7)x-dead-letter-routing-key:死信音讯路由键,在音讯发送到死信交换器时会应用该路由键,如果不设置,则应用音讯的原来的路由键值
*(8)x-single-active-consumer:示意队列是否是繁多流动消费者,true 时,注册的生产组内只有一个消费者生产音讯,其余被疏忽,false 时音讯循环分发给所有消费者(默认 false)
*(9)x-max-priority:队列要反对的最大优先级数; 如果未设置,队列将不反对音讯优先级;*(10)x-queue-mode(Lazy mode):将队列设置为提早模式,在磁盘上保留尽可能多的音讯,以缩小 RAM 的应用; 如果未设置,队列将保留内存缓存以尽可能快地传递音讯;*(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。*/
Argument[] arguments() default {};}
- @Exchange 是申明替换及交换机的一些属性,
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {
String TRUE = "true";
String FALSE = "false";
/**
* @return the exchange name.
*/
@AliasFor("name")
String value() default "";
/**
* @return the exchange name.
* @since 2.0
*/
@AliasFor("value")
String name() default "";
/**
* 交换机类型,默认 DIRECT
*/
String type() default ExchangeTypes.DIRECT;
/**
* 是否长久化
*/
String durable() default TRUE;
/**
* 是否主动删除
*/
String autoDelete() default FALSE;
/**
* @return the arguments to apply when declaring this exchange.
* @since 1.6
*/
Argument[] arguments() default {};}
> 如果感觉文章不错,欢送点赞、留言
> 关注公众号《Java 旅途》, 每日推送精品文章
正文完