最近得闲,探索了一下ActiveMQ。

ActiveMQ消息队列,信息收发的容器,作用有异步消息,流量削锋,应用耦合。
同行还有 Kafka、RabbitMQ、RocketMQ、ZeroMQ、MetaMQ 。

安装

下载地址:http://activemq.apache.org/co...

window版本的解压后双击/bin/activemq.bat 即可启动

它有自己的可视化页面:http://localhost:8161/admin/

默认访问密码是:admin/admin
如果需要修改在:/conf/jetty-realm.properties 中修改

JmsTemplate

springboot上整合的,使用spring 的JmsTemplate来操作ActiveMQ
一、首先在pom文件中导入所需的jar包坐标:

    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-activemq</artifactId>    </dependency>    <dependency>        <groupId>org.apache.activemq</groupId>        <artifactId>activemq-pool</artifactId>    </dependency>

二、新增一个ActiveMQ的配置文件spring-jms.xml

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">    <!-- 配置JMS连接工厂 -->    <bean id="innerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">        <property name="brokerURL" value="${spring.activemq.broker-url}" />    </bean>    <!--配置连接池-->    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"        destroy-method="stop">        <property name="connectionFactory" ref="innerConnectionFactory" />        <property name="maxConnections" value="100"></property>    </bean>    <!-- 配置JMS模板,Spring提供的JMS工具类 -->    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">        <property name="connectionFactory" ref="pooledConnectionFactory" />        <property name="defaultDestination" ref="JmsSenderDestination" />        <property name="receiveTimeout" value="10000" />    </bean></beans>

三、在启动类上配置以生效

@ImportResource(locations={"classpath:/config/spring-jms.xml"})

四、在application.properties中配置ActiveMQ 的连接地址

spring.activemq.broker-url=tcp://localhost:61616
准备就绪;开始写生产者和消费者,我这里把生产者和消费者写在一个项目里面。在这之前需要明白两个概念
队列(Queue)和主题(Topic)

传递模型

队列(Queue)和主题(Topic)是JMS支持的两种消息传递模型:

  1. 点对点(point-to-point,简称PTP)Queue消息传递模型:
    一个消息生产者对应一个消费者
  2. 发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型:
    一个消息生产者对应多个个消费者

QUEUE

  1. 先在spring-jms.xml里添加配置一个队列名称Queue_love

    <bean id="JmsSenderDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg>    <value>Queue_love</value></constructor-arg>

    </bean>

  2. 创建一个生产者来发送消息;@Qualifier("JmsSenderDestination")指定了发送到上面配置的Queue_love队列

    @Componentpublic class JmsSender {@Autowiredprivate JmsTemplate jmsTemplate;@Qualifier("JmsSenderDestination")@Autowiredprotected Destination destination;public void sendMessage(final String msg) {    logger.info("QUEUE destination :" + destination.toString() + ", 发送消息:" + msg);    jmsTemplate.send(destination, new MessageCreator() {        @Override        public Message createMessage(final Session session) throws JMSException {            return session.createTextMessage(msg);        }    });}}
  3. 创建一个消费者来消费消息:

    @Componentpublic class JmsTemplateListener implements MessageListener {@Overridepublic void onMessage(Message message) {    final TextMessage tm = (TextMessage) message;    try {        logger.info("QUEUE接收信息==="+tm.getText());    } catch (JMSException e) {        e.printStackTrace();    }}}
  4. 消费者需要在spring-jms.xml配置一下该消费者需要消费哪个队列的消息

    <!-- 配置消息队列监听者 --><bean id="JmsListener" class="com.mashu.activeMq.jmsTemplate.JmsTemplateListener" /><!-- 使用spring进行配置 监听 --><bean id="JmsTemplateListenerContainer"  class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="pooledConnectionFactory"></property><property name="destination" ref="JmsSenderDestination"></property><property name="messageListener" ref="JmsListener"></property><property name="sessionTransacted" value="false"></property><property name="concurrentConsumers" value="6"></property><property name="concurrency" value="2-4"></property><property name="maxConcurrentConsumers" value="10"></property></bean>

这样一个简单的消息队列收发程序已经写好了,看看结果

发出的消息立马就被消费了,我们可以先把消费者注释掉,只用生产者发送消息就可以在可视化页面上看到消息内容。

Topic

Topic的方式和Queue类似,只需要在定义队列的时候calss=org.apache.activemq.command.ActiveMQTopic即可

<bean id="JmsSenderTDestination" class="org.apache.activemq.command.ActiveMQTopic">    <constructor-arg>        <value>Topic_love</value>    </constructor-arg></bean>

Message

除了使用createTextMessage()方法发送纯字符串消息,还有

  1. 序列化对象的形式
    session.createObjectMessage();
  2. 流的形式,可以用来传递文件
    session.createStreamMessage();
  3. 字节的形式
    session.createBytesMessage();
  4. map的形式
    session.createMapMessage();

安全配置

ActiveMQ在使用的时候和MySQL一样,也可以配置用户名密码,默认不没有,我们可以打开:

  1. 在conf/activemq.xml添加以下信息(务必在<systemUsage>标签上面)

        <plugins>         <simpleAuthenticationPlugin>             <users>                 <authenticationUser username="${activemq.username}"                  password="${activemq.password}" groups="users,admins"/>             </users>         </simpleAuthenticationPlugin>     </plugins>
  2. 对应的用户名密码在/conf/credentials.properties中配置

    activemq.username=adminactivemq.password=123456guest.password=password
  3. 那么我们在项目中的application.properties需要加上也要用户名密码:

    spring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=123456

消息持久化

默认保存的消息在\data\kahadb目录下;也支持保存到MySQL里面;
生产者发送消息存储到数据库,消费者消费后消息从数据库消失。

  1. 修改/conf/activemq.xml
    将:

        <persistenceAdapter>            <kahaDB directory="${activemq.data}/kahadb"/>    </persistenceAdapter>

    修改为:

    <persistenceAdapter>    <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/></persistenceAdapter>
  2. 添加/conf/activemq.xml 配置MySQL连接信息

    <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://localhost:3306/db_activemq?relaxAutoCommit=true"/><property name="username" value="root"/><property name="password" value="123456"/><property name="poolPreparedStatements" value="true"/></bean>
  3. 添加 mysql-connector-java.jar 到/bin目录
  4. 新建数据库db_activemq

重启ActiveMQ后数据库产生三个表activemq_acksactivemq_lockactivemq_msgs