消息队列概述及ActiveMQ使用

8次阅读

共计 7529 个字符,预计需要花费 19 分钟才能阅读完成。

消息中间件定义

一般认为,消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。

为什么要用消息中间件

随着系统的发展,各个模块越来越庞大、业务逻辑越来越复杂,必然要做服务化和业务拆分的, 这个时候各个系统之间的交互,RPC 是首选。但是随着系统的继续发展, 一些功能涉及几十个服务的调用,这时候需要消息中间件来解决问题。

消息中间件主要解决分布式系统之间消息的传递,同时为分布式系统中其他子系统提供了伸缩性和扩展性。为系统带来了:
1. 低耦合,不管是程序还是模块之间,使用消息中间件进行间接通信。
2. 异步通信能力,使得子系统之间得以充分执行自己的逻辑而无需等待。
3. 高并发能力,将高峰期大量的请求存储下来慢慢交给后台进行处理,比如适用于秒杀业务。

和 RPC 区别

RPC 和消息中间件的场景的差异很大程度上在于就是“依赖性”和“同步性”:
RPC 是强依赖,典型的同步方式,像本地调用。消息中间件方式属于异步方式。消息队列是系统级、模块级的通信。RPC 是对象级、函数级通信。

业务上的必须环节一般用 RPC,对于一些不影响流程的不是强依赖的可以考虑消息队列, 如发送短信, 统计数据, 解耦应用。

消息队列应用场景

1. 异步处理;2. 应用解耦;3. 限流;4. 日志处理;5 消息通讯


常用的消息中间件比较


JMS 规范(ActiveMQ 基于 JMS 规范)

JMS 规范包含以下 6 个要素
1. 连接工厂;2.JMS 连接;3.JMS 会话;4.JMS 目的(Broker);5.JMS 生产者;6.JMS 消费者


JMS 规范的消息
JMS 消息由以下三部分组成:

  • 消息头。每个消息头字段都有相应的 getter 和 setter 方法。
  • 消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。
  • 消息体。JMS 定义的消息类型有 TextMessage、MapMessage、BytesMessage、StreamMessage(BIO) 和 ObjectMessage。ActiveMQ 也有对应的实现。

注意 1: 生产者及消费者的消息类型必须一致才能接受到消息。
注意 2: 一般来说不用对象消息类型, 传输对象消息,对象得序列化 (实现 Serializable 接口),JDK 本身的序列化效率低, 产生的字节码数组大。可把对象序列化成 JSON 串。
注意 3: 社区查阅: 经过性能测试消息建议不超过 1K(1024 字节),大消息如大于 1M 的选择 kafka 性能好

JMS 消息模型

1.Point-to-Point 点对点:
生产者发布消息到队列 queue 上, 若没有对应的消费者则消息保留; 若 queue 上有多个消费者的时候, 消息只会被一个消费者消费。

2.Topic/ 主题 (发布与订阅)(广播):
生产者发布消息到主题, 主题会向所有消费者 (订阅者) 发送消息; 若没有消费者在线, 则消息丢失也就类似广播, 没了就没了。


ActiveMQ 安装

官网 http://activemq.apache.org/ac…:8161/admin 为后台管理平台可查询队列情况及消息条数等等。

查看 activemq.xml 可查看 ActiveMQ 应用的缺省端口为 61616,8161 为管理平台端口。

ActiveMQ 的使用

一:原生 API 编程(最灵活, 重要)

消费者:看代码很明显是基于 JMS 规范的要素来编程的, 要通信必须要建立连接。

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {
    /* 默认连接用户名 */
    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;
    /* 默认连接密码 */
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默认连接地址 */
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        /* 连接工厂 */
        ConnectionFactory connectionFactory;
        /* 连接 */
        Connection connection = null;
        /* 会话 */
        Session session;
        /* 消息的目的地 */
        Destination destination;
        /* 消息的消费者 */
        MessageConsumer messageConsumer;

        /* 实例化连接工厂 */
        connectionFactory
                = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);

        try {
            /* 通过连接工厂获取连接 */
            connection = connectionFactory.createConnection();
            /* 启动连接 */
            connection.start();
            /* 创建 session*/
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /* 创建一个名为 HelloWorldQueue 消息队列 */
            //destination = session.createTopic("HelloWorldTopic");
            destination = session.createQueue("HelloWorldQueue");
            /* 创建消息消费者 */
            messageConsumer = session.createConsumer(destination);
            Message message;
            while((message = messageConsumer.receive())!=null){System.out.println("收到消息"+((TextMessage)message).getText());
            }

        } catch (JMSException e) {e.printStackTrace();
        }finally {if(connection!=null){
                try {connection.close();
                } catch (JMSException e) {e.printStackTrace();
                }
            }
        }

    }
}



生产者: 同样的生产者也是基于 JMS 规范要素。

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProducer {

    /* 默认连接用户名 */
    private static final String USERNAME
            = ActiveMQConnection.DEFAULT_USER;
    /* 默认连接密码 */
    private static final String PASSWORD
            = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默认连接地址 */
    private static final String BROKEURL
            = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final int SENDNUM = 5;

    public static void main(String[] args) {
        /* 连接工厂 */
        ConnectionFactory connectionFactory;
        /* 连接 */
        Connection connection = null;
        /* 会话 */
        Session session;
        /* 消息的目的地 */
        Destination destination;
        /* 消息的生产者 */
        MessageProducer messageProducer;

        /* 实例化连接工厂 */
        connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,
                BROKEURL);
        try {
            /* 通过连接工厂获取连接 */
            connection = connectionFactory.createConnection();
            /* 启动连接 */
            connection.start();
            /* 创建 session
            * 第一个参数表示是否使用事务,第二次参数表示是否自动确认 */
            session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            /* 创建一个名为 HelloWorldQueue 消息队列 */
            //destination = session.createTopic("HelloWorldTopic");
            destination = session.createQueue("HelloWorldQueue");
            /* 创建消息生产者 */
            messageProducer = session.createProducer(destination);
            /* 循环发送消息 */
            for(int i=0;i<SENDNUM;i++){String msg = "发送消息"+i+" "+System.currentTimeMillis();
                TextMessage textMessage = session.createTextMessage(msg);
                System.out.println("标准用法:"+msg);
                messageProducer.send(textMessage);
            }
        } catch (Exception e) {e.printStackTrace();
        }finally {if(connection!=null){
                try {connection.close();
                } catch (JMSException e) {e.printStackTrace();
                }
            }

        }
    }
}

先启动 ActiveMQ 再执行代码 demo 就可以了解对应的特性, 如上代码为点对点模型, 不管消费者在生产者前后启动的都能接受到消息, 毕竟生产者发布的消息无对应的消费者消费时, 队列会保存消息。
同样的也可以测试下主题模式, 如上放开注释即可。


二:Spring 整合

生产者配置:

<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
         brokerURL="tcp://127.0.0.1:61616" userName=""password="" />

<!-- Spring Caching 连接工厂 -->
<!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="sessionCacheSize" value="100"></property>
</bean>


<!-- Spring JmsTemplate 的消息生产者 start-->
<!-- 定义 JmsTemplate 的 Queue 类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 队列模式 -->
    <property name="pubSubDomain" value="false"></property>
</bean>

<!-- 定义 JmsTemplate 的 Topic 类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 发布订阅模式 -->
    <property name="pubSubDomain" value="true"></property>
</bean>

<!--Spring JmsTemplate 的消息生产者 end-->

Queue 生产者: 直接注入队列模式的 bean 即可使用

@Component
public class QueueSender {

    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message){jmsTemplate.send(queueName, new MessageCreator() {public Message createMessage(Session session) throws JMSException {Message msg = session.createTextMessage(message);
                //TODO  应答
                return msg;
            }
        }); 
        
    }
      
}

Topic 生产者:

@Component
public class TopicSender {

    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTemplate;

    public void send(String queueName,final String message){jmsTemplate.send(queueName, new MessageCreator() {public Message createMessage(Session session) throws JMSException {TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
    }
}


消费者配置:

<!-- ActiveMQ 连接工厂 -->
<amq:connectionFactory id="amqConnectionFactory"
         brokerURL="tcp://127.0.0.1:61616" userName=""password="" />

<!-- Spring Caching 连接工厂 -->
<!-- Spring 用于管理真正的 ConnectionFactory 的 ConnectionFactory -->
<bean id="connectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
    <property name="sessionCacheSize" value="100"></property>
</bean>


<!-- 消息消费者 start-->

<!-- 定义 Topic 监听器 -->
<jms:listener-container destination-type="topic" container-type="default"
                        connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
    <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
</jms:listener-container>

<!-- 定义 Queue 监听器 -->
<jms:listener-container destination-type="queue" container-type="default"
                        connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
    <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
</jms:listener-container>
<!-- 消息消费者 end -->

队列消费者:

@Component
public class QueueReceiver1 implements MessageListener {public void onMessage(Message message) {
        try {String textMsg = ((TextMessage)message).getText();
            System.out.println("QueueReceiver1 accept msg :"+textMsg);
        } catch (JMSException e) {e.printStackTrace();
        }
    }
}

主题消费者:

@Component
public class TopicReceiver1 implements MessageListener {public void onMessage(Message message) {
        try {System.out.println(((TextMessage)message).getText());
        } catch (JMSException e) {e.printStackTrace();
        }
    }
}




SpringBoot 整合可在官网查询对应配置

正文完
 0