MQ老演员了,这篇文章次要是记录ActiveMQ从下载到利用到我的项目中的去的一个配置、编写的过程

让大家疾速上配置到利用

下载并批改配置

首先,下载ActiveMQ,这是官网下载地址:https://activemq.apache.org/c...

下载下来之后,找到config目录里的activemq.xml文件

找到<policyEntries>标签,在标签下里增加配置

 <!--死信队列-->                  <policyEntry topic=">" >                    <deadLetterStrategy>                      <!--                              queuePrefix:设置死信队列前缀                              useQueueForQueueMessages: 设置应用队列保留死信,还能够设置useQueueForTopicMessages,应用Topic来保留死信                      -->                      <individualDeadLetterStrategy   queuePrefix="DLQ." useQueueForQueueMessages="true" processNonPersistent="true" />                    </deadLetterStrategy>                  </policyEntry>

这是配置死信队列

须要用到的Maven依赖

<!--activemq--><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-activemq</artifactId></dependency><!--activemq pool--><dependency>    <groupId>org.apache.activemq</groupId>    <artifactId>activemq-pool</artifactId></dependency>

配置SpringBoot的YML文件

spring:    #ActiveMQ根底配置  activemq:    #通信地址,不是http,是tcp,61616是;连贯地址    broker-url: tcp://127.0.0.1:61616    #账号密码能够在MQ的users.properties配置文件中配置    user: admin    password: admin    pool:      #是否启用        enabled: true      #最大连接数      max-connections: 100

生产者配置应用

生产者——config类

//MQ的配置@Configurationpublic class ActiveMQConfig {//yml配置文件中的那个连贯地址    @Value("${spring.activemq.broker-url}")    private String brokerUrl;//队列的Bean    @Bean    public Queue getQueue(){        return new ActiveMQQueue("ActiveMQQueue");    }//连贯工厂的Bean,brokerUrl连贯地址    @Bean    public ActiveMQConnectionFactory connectionFactory(){        return new ActiveMQConnectionFactory(brokerUrl);    }}

生产者——应用

应用@Scheduled(cron = "0/5 ?")定时工作注解,须要在启动类加@EnableScheduling注解,来扫描

@Autowiredprivate JmsMessagingTemplate messagingTemplate;@Autowiredprivate Queue queue;//定时工作的注解(启动后每隔5秒执行一次)@Scheduled(cron = "0/5 * * * * ?")//事务注解@Transactional(rollbackFor = Exception.class)public void task() {    System.out.println("定时工作执行...");    //放入队列    messagingTemplate.convertAndSend(queue, "MingLog");    System.out.println("已放入队列...");}

消费者配置应用

消费者——config类

//RctiveMQ 消费者的配置@Configurationpublic class ActiveMQConfig {    //yml配置文件中的那个连贯地址    @Value("${spring.activemq.broker-url}")    private String brokerUrl;    //连贯工厂的Bean    @Bean    public ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy){        ActiveMQConnectionFactory activeMQConnectionFactory =        //账号、明码、连贯地址                new ActiveMQConnectionFactory("admin","admin",brokerUrl);                //配置管制音讯在回滚时如何从新传递        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);        return activeMQConnectionFactory;    }    //messageConsumer的配置选项Bean    @Bean    public RedeliveryPolicy getRedeliveryPolicy(){        return new RedeliveryPolicy();    }    //音讯的监听连贯工厂Bean    @Bean    public JmsListenerContainerFactory getJmsListenerContainerFactory(ActiveMQConnectionFactory connectionFactory){        //创立默认音讯监听工厂        DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();        //将连贯工厂set进去        defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);        //设置生产确认形式 1: 主动确认,2: 客户端手动确认,3:主动批量确认,4 事务提交并确认。        defaultJmsListenerContainerFactory.setSessionAcknowledgeMode(2);        return defaultJmsListenerContainerFactory;    }}

消费者——应用

个别应用音讯监听器,来执行工作/办法/业务

音讯胜利,手动确认acknowledge()

音讯生产失败,手动回滚,recover()

单个音讯生产失败6次,该音讯进入死信队列

//音讯监听注解,destination要监听的队列,containerFactory音讯监听的连贯工厂@JmsListener(destination = "ActiveMQQueue", containerFactory = "jmsListenerContainerFactory")//TextMessage 监听到的音讯public void mqListenerEvent(TextMessage textMessage, Session session) throws JMSException {    try {        String text = textMessage.getText();        System.out.println("收到的音讯:" + text);        //业务代码        //音讯生产确认        textMessage.acknowledge();    }catch (Exception e){        System.out.println("异样了...");        e.getMessage();        //音讯回滚        session.recover();    }}

所以,个别咱们会再去独自保护死信队列,将为生产胜利的音讯,做弥补、记录日志等操作

死信队列进行监听,和下面的监听相似,只不过监听的对象不同

//监听死信队列@JmsListener(destination = "ActiveMQ.DLQ")public void receive2(TextMessage textMessage, Session session) throws JMSException {    try {        //做日志记录、弥补策略、记录DB、Redis等等等等        System.out.println("死信队列:"+textMessage.getText());        //记录好,手动确认        textMessage.acknowledge();   }catch (Exception e){        System.out.println("异样了...");        e.getMessage();        //音讯回滚        session.recover();    }     }

好啦,到这就完结了,小伙伴们快去启动试试吧

嘿嘿,大家喜爱的能够关注我的微信公众号哦,据说当初关注的,当前都是尊贵的老粉了