MQ 老演员了,这篇文章次要是记录 ActiveMQ 从下载到利用到我的项目中的去的一个配置、编写的过程
让大家疾速上配置到利用
下载并批改配置
首先,下载 ActiveMQ,这是官网下载地址:https://activemq.apache.org/c…
下载下来之后,找到 config 目录里的 activemq.xml 文件
找到 <policyEntries> 标签,在标签下里增加配置
<!-- 死信队列 -->
<policyEntry topic=">" >
<deadLetterStrategy>
<!--
queuePrefix: 设置死信队列前缀
useQueueForQueueMessages: 设置应用队列保留死信,还能够设置 useQueueForTopicMessages,应用 Topic 来保留死信
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" processNonPersistent="true" />
</deadLetterStrategy>
</policyEntry>
这是配置死信队列
须要用到的 Maven 依赖
<!--activemq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--activemq pool-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
配置 SpringBoot 的 YML 文件
spring:
#ActiveMQ 根底配置
activemq:
#通信地址,不是 http,是 tcp,61616 是;连贯地址
broker-url: tcp://127.0.0.1:61616
#账号密码能够在 MQ 的 users.properties 配置文件中配置
user: admin
password: admin
pool:
#是否启用
enabled: true
#最大连接数
max-connections: 100
生产者配置应用
生产者——config 类
//MQ 的配置
@Configuration
public class ActiveMQConfig {
//yml 配置文件中的那个连贯地址
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
// 队列的 Bean
@Bean
public Queue getQueue(){return new ActiveMQQueue("ActiveMQQueue");
}
// 连贯工厂的 Bean,brokerUrl 连贯地址
@Bean
public ActiveMQConnectionFactory connectionFactory(){return new ActiveMQConnectionFactory(brokerUrl);
}
}
生产者——应用
应用 @Scheduled(cron = “0/5 ?”) 定时工作注解,须要在启动类加 @EnableScheduling 注解,来扫描
@Autowired
private JmsMessagingTemplate messagingTemplate;
@Autowired
private Queue queue;
// 定时工作的注解(启动后每隔 5 秒执行一次)@Scheduled(cron = "0/5 * * * * ?")
// 事务注解
@Transactional(rollbackFor = Exception.class)
public void task() {System.out.println("定时工作执行...");
// 放入队列
messagingTemplate.convertAndSend(queue, "MingLog");
System.out.println("已放入队列...");
}
消费者配置应用
消费者——config 类
//RctiveMQ 消费者的配置
@Configuration
public class ActiveMQConfig {
//yml 配置文件中的那个连贯地址
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
// 连贯工厂的 Bean
@Bean
public ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory =
// 账号、明码、连贯地址
new ActiveMQConnectionFactory("admin","admin",brokerUrl);
// 配置管制音讯在回滚时如何从新传递
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
//messageConsumer 的配置选项 Bean
@Bean
public RedeliveryPolicy getRedeliveryPolicy(){return new RedeliveryPolicy();
}
// 音讯的监听连贯工厂 Bean
@Bean
public JmsListenerContainerFactory getJmsListenerContainerFactory(ActiveMQConnectionFactory connectionFactory){
// 创立默认音讯监听工厂
DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
// 将连贯工厂 set 进去
defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
// 设置生产确认形式 1: 主动确认,2:客户端手动确认,3:主动批量确认,4 事务提交并确认。defaultJmsListenerContainerFactory.setSessionAcknowledgeMode(2);
return defaultJmsListenerContainerFactory;
}
}
消费者——应用
个别应用音讯监听器,来执行工作 / 办法 / 业务
音讯胜利, 手动确认 acknowledge()
音讯生产失败,手动回滚,recover()
单个音讯生产失败 6 次,该音讯进入死信队列
// 音讯监听注解,destination 要监听的队列,containerFactory 音讯监听的连贯工厂
@JmsListener(destination = "ActiveMQQueue", containerFactory = "jmsListenerContainerFactory")
//TextMessage 监听到的音讯
public void mqListenerEvent(TextMessage textMessage, Session session) throws JMSException {
try {String text = textMessage.getText();
System.out.println("收到的音讯:" + text);
// 业务代码
// 音讯生产确认
textMessage.acknowledge();}catch (Exception e){System.out.println("异样了...");
e.getMessage();
// 音讯回滚
session.recover();}
}
所以,个别咱们会再去独自保护死信队列,将为生产胜利的音讯,做弥补、记录日志等操作
死信队列进行监听,和下面的监听相似,只不过监听的对象不同
// 监听死信队列
@JmsListener(destination = "ActiveMQ.DLQ")
public void receive2(TextMessage textMessage, Session session) throws JMSException {
try {
// 做日志记录、弥补策略、记录 DB、Redis 等等等等
System.out.println("死信队列:"+textMessage.getText());
// 记录好,手动确认
textMessage.acknowledge();}catch (Exception e){System.out.println("异样了...");
e.getMessage();
// 音讯回滚
session.recover();}
}
好啦,到这就完结了,小伙伴们快去启动试试吧
嘿嘿,大家喜爱的能够关注我的微信公众号哦,据说当初关注的,当前都是尊贵的老粉了