关于java:快速入门使用ActiveMQ

6次阅读

共计 3781 个字符,预计需要花费 10 分钟才能阅读完成。

MQ 老演员了,这篇文章次要是记录 ActiveMQ 从下载到利用到我的项目中的去的一个配置、编写的过程

让大家疾速上配置到利用

下载并批改配置

首先,下载 ActiveMQ,这是官网下载地址:https://activemq.apache.org/c…

下载下来之后,找到 config 目录里的 activemq.xml 文件

找到 <policyEntries> 标签,在标签下里增加配置

 <!-- 死信队列 -->
                  <policyEntry topic=">" >
                    <deadLetterStrategy>
                      <!--
                              queuePrefix: 设置死信队列前缀
                              useQueueForQueueMessages: 设置应用队列保留死信,还能够设置 useQueueForTopicMessages,应用 Topic 来保留死信
                      -->
                      <individualDeadLetterStrategy   queuePrefix="DLQ." useQueueForQueueMessages="true" processNonPersistent="true" />
                    </deadLetterStrategy>
                  </policyEntry>

这是配置死信队列

须要用到的 Maven 依赖

<!--activemq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!--activemq pool-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

配置 SpringBoot 的 YML 文件

spring:
    #ActiveMQ 根底配置
  activemq:
    #通信地址,不是 http,是 tcp,61616 是;连贯地址
    broker-url: tcp://127.0.0.1:61616
    #账号密码能够在 MQ 的 users.properties 配置文件中配置
    user: admin
    password: admin
    pool:
      #是否启用  
      enabled: true
      #最大连接数
      max-connections: 100

生产者配置应用

生产者——config 类

//MQ 的配置
@Configuration
public class ActiveMQConfig {

//yml 配置文件中的那个连贯地址
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

// 队列的 Bean
    @Bean
    public Queue getQueue(){return new ActiveMQQueue("ActiveMQQueue");
    }
// 连贯工厂的 Bean,brokerUrl 连贯地址
    @Bean
    public ActiveMQConnectionFactory connectionFactory(){return new ActiveMQConnectionFactory(brokerUrl);
    }
}

生产者——应用

应用 @Scheduled(cron = “0/5 ?”) 定时工作注解,须要在启动类加 @EnableScheduling 注解,来扫描

@Autowired
private JmsMessagingTemplate messagingTemplate;

@Autowired
private Queue queue;

// 定时工作的注解(启动后每隔 5 秒执行一次)@Scheduled(cron = "0/5 * * * * ?")
// 事务注解
@Transactional(rollbackFor = Exception.class)
public void task() {System.out.println("定时工作执行...");
    // 放入队列
    messagingTemplate.convertAndSend(queue, "MingLog");
    System.out.println("已放入队列...");
}

消费者配置应用

消费者——config 类

//RctiveMQ 消费者的配置
@Configuration
public class ActiveMQConfig {

    //yml 配置文件中的那个连贯地址
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    // 连贯工厂的 Bean
    @Bean
    public ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy){
        ActiveMQConnectionFactory activeMQConnectionFactory =
        // 账号、明码、连贯地址
                new ActiveMQConnectionFactory("admin","admin",brokerUrl);
                // 配置管制音讯在回滚时如何从新传递
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return activeMQConnectionFactory;
    }

    //messageConsumer 的配置选项 Bean
    @Bean
    public RedeliveryPolicy getRedeliveryPolicy(){return new RedeliveryPolicy();
    }

    // 音讯的监听连贯工厂 Bean
    @Bean
    public JmsListenerContainerFactory getJmsListenerContainerFactory(ActiveMQConnectionFactory connectionFactory){
        // 创立默认音讯监听工厂
        DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
        // 将连贯工厂 set 进去
        defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
        // 设置生产确认形式 1: 主动确认,2:客户端手动确认,3:主动批量确认,4 事务提交并确认。defaultJmsListenerContainerFactory.setSessionAcknowledgeMode(2);
        return defaultJmsListenerContainerFactory;
    }
}

消费者——应用

个别应用音讯监听器,来执行工作 / 办法 / 业务

音讯胜利, 手动确认 acknowledge()

音讯生产失败,手动回滚,recover()

单个音讯生产失败 6 次,该音讯进入死信队列

// 音讯监听注解,destination 要监听的队列,containerFactory 音讯监听的连贯工厂
@JmsListener(destination = "ActiveMQQueue", containerFactory = "jmsListenerContainerFactory")
//TextMessage 监听到的音讯
public void mqListenerEvent(TextMessage textMessage, Session session) throws JMSException {
    try {String text = textMessage.getText();
        System.out.println("收到的音讯:" + text);
        // 业务代码
        // 音讯生产确认
        textMessage.acknowledge();}catch (Exception e){System.out.println("异样了...");
        e.getMessage();
        // 音讯回滚
        session.recover();}
}

所以,个别咱们会再去独自保护死信队列,将为生产胜利的音讯,做弥补、记录日志等操作

死信队列进行监听,和下面的监听相似,只不过监听的对象不同

// 监听死信队列
@JmsListener(destination = "ActiveMQ.DLQ")
public void receive2(TextMessage textMessage, Session session) throws JMSException {
    try {
        // 做日志记录、弥补策略、记录 DB、Redis 等等等等
        System.out.println("死信队列:"+textMessage.getText());
        // 记录好,手动确认
        textMessage.acknowledge();}catch (Exception e){System.out.println("异样了...");
        e.getMessage();
        // 音讯回滚
        session.recover();}     
}

好啦,到这就完结了,小伙伴们快去启动试试吧

嘿嘿,大家喜爱的能够关注我的微信公众号哦,据说当初关注的,当前都是尊贵的老粉了

正文完
 0