安装ActiveMQ

到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下,下载链接如下:http://activemq.apache.org/do...。

进入bin 目录,如果我们是32位的机器,就双击 win32 目录下的 activemq.bat,如果是64位机器,则双击 win64 目录下的 activemq.bat ,运行结果如下:
成功之后在浏览器输入 http://127.0.0.1:8161/ 地址,可以看到 ActiveMQ 的管理页面,用户名和密码默认都是 admin

Spring Boot 整合 ActiveMQ

工程结构

添加 pom 依赖

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-activemq</artifactId></dependency>

config 配置

# activemqspring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=admin#默认为true表示使用内存的activeMQ,不需要安装activeMQ serverspring.activemq.in-memory=true #如果此处设置为true,需要加如下的依赖包#          <groupId>org.apache.activemq</groupId>#          <artifactId>activemq-pool</artifactId># 否则会自动配置失败,报JmsMessagingTemplate注入失败spring.activemq.pool.enabled=false

队列模式


创建 消息提供者:Producer.java

import javax.jms.Destination; import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsMessagingTemplate;import org.springframework.stereotype.Service; @Servicepublic class Producer {     @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装    private JmsMessagingTemplate jmsTemplate;     // 发送消息,destination是发送到的队列,message是待发送的消息    public void sendMessage(Destination destination, final String message){        jmsTemplate.convertAndSend(destination, message);    }}

创建消费者一: Consumer.java

import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component; @Componentpublic class Consumer {     // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息    @JmsListener(destination = "mytest.queue")    public void receiveQueue(String text) {        System.out.println("Consumer收到的报文为:"+text);    }}

创建消费者二:Consumer1 .java

import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component; @Componentpublic class Consumer1 {     // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息    @JmsListener(destination = "mytest.queue")    public void receiveQueue(String text) {         System.out.println("Consumer1收到的报文为:"+text);    }}

测试

创建一个 ActivceMQQueue 对象,表示队列模式,下面会介绍主题模式。

import org.apache.activemq.command.ActiveMQQueue;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner; import javax.jms.Destination; @RunWith(SpringRunner.class)@SpringBootTestpublic class JmsApplicationTests {     @Autowired    private Producer producer;     @Test    public void contextLoads() {         Destination destination = new ActiveMQQueue("mytest.queue");         for(int i=0; i<100; i++){            producer.sendMessage(destination, "myname is chhliu!!!");        }    }}

双向队列

使用 @SendTo 注解可以将方法的返回值重新放入到消息队列中,供其他消费者消费

这里我们修改 Consumer1.java 增加注解 @SendTo

public class Consumer1 {     // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息    @JmsListener(destination = "mytest.queue")    @SendTo("out.queue")    public String receiveQueue(String text) {         System.out.println("Consumer1收到的报文为:"+text);         return "return message" + text;    }     @JmsListener(destination = "out.queue")    public void outQueue(String text){        System.out.println("Consumer1 outQueue:"+text);    }}

主题模式

配置文件中需要增加

# 启用 topic 模式spring.jms.pub-sub-domain=true

修改 Consumer1 增加对主题的监听

import org.springframework.jms.annotation.JmsListener;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.stereotype.Component; @Componentpublic class Consumer1 {     // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息    @JmsListener(destination = "mytest.queue")    @SendTo("out.queue")    public String receiveQueue(String text) {         System.out.println("Consumer1 收到的报文为:"+text);         return "return message" + text;    }     @JmsListener(destination = "out.queue")    public void outQueue(String text){        System.out.println("Consumer1 outQueue:"+text);    }     @JmsListener(destination = "mytest.topic")    public void topicQueue(String text){        System.out.println("Consumer 接收到的 topic 消息:" + text);    }}

修改测试类

增加发送主题消息

import org.apache.activemq.command.ActiveMQQueue;import org.apache.activemq.command.ActiveMQTopic;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner; import javax.jms.Destination; @RunWith(SpringRunner.class)@SpringBootTestpublic class JmsApplicationTests {     @Autowired    private Producer producer;     @Test    public void contextLoads() {         Destination destination = new ActiveMQQueue("mytest.queue");        Destination topicDestination = new ActiveMQTopic("mytest.topic");         for(int i=0; i<10; i++){            producer.sendMessage(destination, "Queue Message......");            producer.sendMessage(topicDestination, "Topic Message!!!");        }    }}

消息的结果为:

没有出现 队列消息。此时需要修改我们的监听,指定出是哪种类型的

增加配置类

这个配置类没有在那个工程图上面出现,添加一个类就好。

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.jms.annotation.EnableJms;import org.springframework.jms.config.DefaultJmsListenerContainerFactory;import org.springframework.jms.config.JmsListenerContainerFactory; import javax.jms.ConnectionFactory; @Configuration@EnableJmspublic class JmsConfig {    @Bean    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();        factory.setPubSubDomain(true);        factory.setConnectionFactory(connectionFactory);        return factory;    }     @Bean    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();        factory.setPubSubDomain(false);        factory.setConnectionFactory(connectionFactory);        return factory;    } }

修改消费者 Consumer1.java

import org.springframework.jms.annotation.JmsListener;import org.springframework.messaging.handler.annotation.SendTo;import org.springframework.stereotype.Component; @Componentpublic class Consumer1 {     // 使用JmsListener配置消费者监听的队列,其中text是接收到的消息    @JmsListener(destination = "mytest.queue", containerFactory = "queueListenerFactory")    @SendTo("out.queue")    public String receiveQueue(String text) {         System.out.println("Consumer1 收到的报文为:"+text);         return "return message" + text;    }     @JmsListener(destination = "out.queue", containerFactory = "queueListenerFactory")    public void outQueue(String text){        System.out.println("Consumer1 outQueue:"+text);    }     @JmsListener(destination = "mytest.topic", containerFactory = "topicListenerFactory")    public void topicQueue(String text){        System.out.println("Consumer 接收到的 topic 消息:" + text);    }}

重新执行