本篇文章将讲述
springboot
整合rabbitmq
过程,实用于初学者疾速入门。内容包含对几种常见的队列模型阐述以及实际(路由模式、公布订阅 (播送) 模式、主题模式);因为这几种模式的队列后果类似,区别在于交换机类型的不同,因而交换机的类型决定了它们之间的工作模式。交换机类型别离对应:直连交换机(Direct Exchange)、扇型交换机(Fanout Exchange)、Topic Exchange(主题交换机)。因而以下波及的常识和代码多围绕交换机
去开展阐述。
三种模式
路由模式
生产者将携带路由键的音讯发送给直连交换机,交换机依据路由键值转发到队列名为该值的队列
订阅模式(播送)
生产者将音讯发送给扇型交换机,交换机将音讯分发给所有绑定的队列,但留神同一队列多个消费者只有其中一个生产了音讯。
主题模式
与直连交换机相似,但它的 路由建(routing key) 和 绑定值(key)是有规定的;领有下面两种交换机性能。
规定:*:必须存在一个单词;#:存在零个或者多个单词。
理解了以上三种模式流程之后,上面将对三种模式进行代码实战,以加深印象。
代码实战
目录构造
│ RabbitmqApplication.java # 我的项目启动文件
│
├─config
│ RabbitConfig.java # 生产者确认音讯发送配置
│ RabbitConstants.java # 交换机、路由键、队列名常量
│
├─controller
│ MessageController.java # 音讯测试 API
│
├─message
│ DirectMessage.java # 路由模式音讯
│ FanoutMessage.java # 播送音讯
│ RabbitMessage.java # 音讯基类
│ TopicMessage.java # 主题音讯
│
├─receiver
│ MessageReceiver.java # 音讯接管类
│
└─service
│ RabbitService.java
│
└─impl
RabbitServiceImpl.java # 发送音讯业务
外围代码解析
- 音讯基类:
RabbitMessage.java
/**
* 音讯内容
*/
private T data;
public RabbitMessage<T> setData(T data) {
this.data = data;
return this;
}
/**
* 音讯惟一编号
*/
private String messageId = UUID.randomUUID().toString();
/**
* 音讯创立工夫
*/
private String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
/**
* 交换机
* @return
*/
public abstract String exchange();
/**
* 路由键
* @return
*/
public abstract String routingKey();
- 发送音讯:
RabbitServiceImpl.java
@Override
public void send(RabbitMessage message) {log.info("【生产者】发送音讯:ID:{},交换机:{},路由键:{},内容:{}", message.getMessageId(), message.exchange(), message.routingKey(), message.getData());
if (null == message.exchange()) {rabbitTemplate.convertAndSend(message.routingKey(), message, new CorrelationData(message.getMessageId()));
} else {rabbitTemplate.convertAndSend(message.exchange(), message.routingKey(), message, new CorrelationData(message.getMessageId()));
}
}
- 接管音讯:
MessageReceiver.java
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = QUEUE_DIRECT, durable = "true"),
exchange = @Exchange(name = EXCHANGE_DIRECT, type = ExchangeTypes.DIRECT)
)
)
@RabbitHandler
public void direct(DirectMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {log.info("【消费者】收到音讯:ID:{},交换机:{},路由键:{},内容:{}", message.getMessageId(), message.exchange(), message.routingKey(), message.getData());
channel.basicAck(tag, false);
}
- 依赖:
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 具体请看源码,记得批改
application.yml
配置哦!!!
拓展
无论是播送还是主题模式,在多节点利用中,当音讯发送至同一队列多个消费者时,只有其中一个节点生产了音讯,如何拓展为播送成果?
在 fanout 模式下,应用 redis 的公布订阅模式来告诉其它节点
如何避免音讯失落?
1、生产者确认音讯发送
2、交换机、路由、队列设置长久化模式,保障音讯不会失落
3、消费者确认生产音讯,手动 ack
如何避免音讯积压?
不举荐应用channel.basicReject(deliveryTag, false)
,第二个参数为是否从新放回队列,如果使用不当会导致音讯始终在生产 - 入列中循环。
可依据业务设置缓存,当缓存达到一定量时再发送音讯。
好,这篇 springboot
整合 rabbitmq
教程就暂且到此。
始终在谋求思路的传递而非代码的 COPY