目前常见的应用软件都有音讯的提早推送的影子,利用也极为宽泛,例如:
淘宝七天主动确认收货。在咱们签收商品后,物流零碎会在七天后延时发送一个音讯给领取零碎,告诉领取零碎将款打给商家,这个过程继续七天,就是应用了消息中间件的提早推送性能。
12306 购票领取确认页面。咱们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30 分钟内订单不确认的话将会主动勾销订单。其实在下订单那一刻开始购票业务零碎就会发送一个延时音讯给订单零碎,延时 30 分钟,通知订单零碎订单未实现,如果咱们在 30 分钟内实现了订单,则能够通过逻辑代码判断来疏忽掉收到的音讯。
在下面两种场景中,如果咱们应用上面两种传统解决方案无疑大大降低了零碎的整体性能和吞吐量:
- 应用 redis 给订单设置过期工夫,最初通过判断 redis 中是否还有该订单来决定订单是否曾经实现。这种解决方案相较于音讯的提早推送性能较低,因为咱们晓得 redis 都是存储于内存中,咱们遇到歹意下单或者刷单的将会给内存带来微小压力。
- 应用传统的数据库轮询来判断数据库表中订单的状态,这无疑减少了 IO 次数,性能极低。
- 应用 jvm 原生的 DelayQueue,也是大量占用内存,而且没有长久化策略,零碎宕机或者重启都会失落订单信息。
音讯提早推送的实现
在 RabbitMQ 3.6.x 之前咱们个别采纳死信队列 +TTL 过期工夫来实现提早队列,咱们这里不做过多介绍。
在 RabbitMQ 3.6.x 开始,RabbitMQ 官网提供了提早队列的插件,能够下载搁置到 RabbitMQ 根目录下的 plugins 下。
图片
首先咱们创立交换机和音讯队列,application.properties 中配置与上一篇文章雷同。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
public static final String LAZY_QUEUE = "MQ.LazyQueue";
public static final String LAZY_KEY = "lazy.#";
@Bean
public TopicExchange lazyExchange(){//Map<String, Object> pros = new HashMap<>();
// 设置交换机反对提早音讯推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue lazyQueue(){return new Queue(LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding(){return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
}
咱们在 Exchange 的申明中能够设置 exchange.setDelayed(true)
来开启提早队列,也能够设置为以下内容传入交换机申明的办法中,因为第一种形式的底层就是通过这种形式来实现的。
//Map<String, Object> pros = new HashMap<>();
// 设置交换机反对提早音讯推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
发送音讯时咱们须要指定提早推送的工夫,咱们这里在发送音讯的办法中传入参数 new MessagePostProcessor()
是为了取得 Message
对象,因为须要借助 Message
对象的 api 来设置延迟时间。
import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//confirmCallback returnCallback 代码省略,请参照上一篇
public void sendLazy(Object message){rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 工夫戳 全局惟一
CorrelationData correlationData = new CorrelationData("12345678909"+new Date());
// 发送音讯时指定 header 延迟时间
rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置音讯长久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//message.getMessageProperties().setHeader("x-delay", "6000");
message.getMessageProperties().setDelay(6000);
return message;
}
}, correlationData);
}
}
咱们能够察看 setDelay(Integer i)
底层代码,也是在 header 中设置 x-delay。等同于咱们手动设置 header
message.getMessageProperties().setHeader("x-delay", "6000");
/**
* Set the x-delay header.
* @param delay the delay.
* @since 1.6
*/
public void setDelay(Integer delay) {if (delay == null || delay < 0) {this.headers.remove(X_DELAY);
}
else {this.headers.put(X_DELAY, delay);
}
}
生产端进行生产
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class MQReceiver {@RabbitListener(queues = "MQ.LazyQueue")
@RabbitHandler
public void onLazyMessage(Message msg, Channel channel) throws IOException{long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("lazy receive" + new String(msg.getBody()));
}
```
## 测试后果[#](https://www.cnblogs.com/haixiang/p/10966985.html#3724420099)
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {
@Autowired
private MQSender mqSender;
@Test
public void sendLazy() throws Exception {
String msg = “hello spring boot”;
mqSender.sendLazy(msg + “:”);
}
}
果然在 6 秒后收到了音讯 `lazy receive hello spring boot:`