在我的项目开发的过程中咱们常常会遇到相似的业务场景:用户申请提现,后盾进行账务解决、发送提现短信、调用银行打款通道。

在这个过程中调用三方通道(短信或银行通道)都比拟耗时,同时账务解决可能也是由专门的账务零碎进行解决。那么,为了进步并发和相应速度,前面的三个操作都能够通过异步进行解决。这就用到了音讯队列。

音讯队列中间件是分布式系统中重要的组件,次要解决利用耦合、异步音讯、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可短少的中间件。

市面上比拟常见的音讯队列有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。

在Spring Boot的starter中专门集成了ActiveMQ,因而,本篇文章咱们就来讲讲对ActiveMQ的集成。

JMS标准

JMS即Java音讯服务(Java Message Service)利用程序接口,是一个Java平台中对于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送音讯,进行异步通信。Java音讯服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供反对。

JMS的音讯机制有2种模型,一种是队列的模式(Point to Point—)发送的音讯只能被一个消费者生产;一种是订阅(Topic)模式,能够被多个订阅者订阅,订阅者都会接管到同样的音讯。

而ActiveMQ就是对JMS的实现之一。

ActiveMQ介绍

ActiveMQ是一种开源的基于JMS(Java Message Servie)标准的一种消息中间件的实现,ActiveMQ的设计指标是提供规范的、面向音讯的、可能逾越多语言和多零碎的利用集成音讯通信中间件。

它为企业应用中消息传递提供高可用、杰出性能、可扩大、稳固和平安保障。

ActiveMQ实现JMS标准并在此之上提供大量额定的个性。ActiveMQ反对队列和订阅两种模式的音讯发送。

AcitveMQ的数据传送流程如下图:

ActiveMQ的两种消息传递类型:

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者承受该条队列里的数据。

(2)基于公布/订阅模式的传输,即依据订阅话题来接管相应数据,一个生产者可向多个消费者推送数据,与MQTT协定的实现是相似的。

两种消息传递类型的不同,点对点传输消费者能够接管到在连贯之前生产者所推送的数据,而基于公布/订阅模式的传输方式消费者只能接管到连贯之后生产者推送的数据。

Spring Boot集成ActiveMQ

Spring Boot针对ActiveMQ专门提供了spring-boot-starter-activemq,用来反对ActiveMQ在Spring Boot的主动集成配置。在此基础上咱们能够很轻易的进行集成和应用。

创立我的项目并引入依赖

创立规范的Spring Boot我的项目,并在我的项目中引入以下依赖:

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-activemq</artifactId></dependency>

此时如果不须要web或其余相干解决,只引入该依赖即可。如果应用pool的话, 就须要在pom中退出以下依赖:

<dependency>     <groupId>org.apache.activemq</groupId>     <artifactId>activemq-pool</artifactId></dependency>

配置文件

在application.properties中增加如下配置:

# 基于内存的ActiveMQspring.activemq.in-memory=true# 不应用连接池,如果应用连接池还需在pom中增加activemq-pool的依赖spring.activemq.pool.enabled=false# 独立装置的ActiveMQ#spring.activemq.broker-url=tcp://127.0.0.1:61616#spring.activemq.user=admin#spring.activemq.password=admin

上述配置中有两套配置,Spring Boot反对基于内存ActiveMQ和基于独立装置的ActiveMQ。失常申请基于内存的模式是为了不便测试而应用,基于独立装置的模式才是真正用于生产环境。此处为了解说性能,不便测试,采纳基于内存的模式。

队列模式实例

首先,咱们来实现基于队列(Queue)模式的实现。这里须要用到两个类ActiveMQQueue和JmsMessagingTemplate。前者是由ActiveMQ对javax.jms.Queue的接口实现。后者为Spring提供发送音讯的工具类,联合Queue对音讯进行发送。

JmsMessagingTemplate默认曾经被实例化,间接拿来应用即可。而ActiveMQQueue则须要咱们进行实例化,并传入音讯队列的名称。

@Configurationpublic class MyMqConfig {    @Bean    public Queue queue() {        return new ActiveMQQueue("sms.queue");    }}

Spring Boot中很惯例的实例化操作,不再赘述。当实例化完ActiveMQQueue之后,咱们的队列便创立实现,上面创立对应的生产者和消费者。

生产者对应代码如下:

@Componentpublic class Producer {    @Resource    private JmsMessagingTemplate jmsMessagingTemplate;    @Resource    private Queue queue;    public void sendMsg(String msg) {        System.out.println("发送音讯内容 :" + msg);        this.jmsMessagingTemplate.convertAndSend(this.queue, msg);    }}

此处用到JmsMessagingTemplate和Queue,下面曾经提到,这两个类都曾经实现了初始化。消费者对应的配置如下:

@Componentpublic class Consumer {    @JmsListener(destination = "sms.queue")    public void receiveMsg(String text) {        System.out.println("接管到音讯 : "+text);    }}

Spring提供了注解式监听器端点:应用@JmsListener。应用@JmsListener托管bean的带正文办法对其进行订阅。在Java8中,@JmsListener是一个可反复的注解,能够关联多个JMS destinations到同一个办法中。而在Java 6和7中,能够应用@JmsListeners注解。

其中destination指定监控的音讯队列名称为“sms.queue”。当队列sms.queue中有音讯发送时会触发此办法的执行,text为音讯内容。

下面实现了队列初始化、生产者和消费者代码的编写,上面通过单元测试来验证是否可能正确发送和解决音讯。

@RunWith(SpringRunner.class)@SpringBootTestpublic class ActiveMqTests {    @Autowired    private Producer producer;    @Test    public void sendSimpleQueueMessage() {        this.producer.sendMsg("提现200.00元");    }}

执行单元测试,会发现在日志中打印如下信息:

发送音讯内容 :提现200.00元接管到音讯 : 提现200.00元

阐明音讯能够失常发送和接管。如果是基于内存模式,在执行单元测试时会打印出“javax.jms.JMSException: peer (vm://localhost#1) stopped.”异样日志,这是Info级别的谬误,是ActiveMQ的一个bug。

订阅模式实例

播送发送的音讯,能够被多个消费者接管。这里咱们就在原有的根底上进行播送音讯的增加。

首先,Spring Boot集成ActiveMQ时默认只反对队列或者播送之一,通过配置项spring.jms.pub-sub-domain来指定,true 为播送模式,false为队列模式,默认状况下反对队列模式。

此时要应用播送模式,则需在配置文件中增加如下配置:

spring.jms.pub-sub-domain=true

须要留神的是,此时队列模式不可失常工作。

而后在MyMqConfig中增加:

@Beanpublic Topic topic() {    return new ActiveMQTopic("sms.topic");}

这里创立了ActiveMQTopic,并将topic的名称指定为sms.topic。

Producer中新增如下代码:

@Resourceprivate Topic topic;public void sendTopic(String msg) {    System.out.println("发送Topic音讯内容 :"+msg);    this.jmsMessagingTemplate.convertAndSend(this.topic, msg);}

为了演示多个播送接收者,在Comsumer中新增两个消费者:

@JmsListener(destination = "sms.topic")public void receiveTopic1(String text) {    System.out.println("receiveTopic1接管到Topic音讯 : " + text);}@JmsListener(destination = "sms.topic")public void receiveTopic2(String text) {    System.out.println("receiveTopic2接管到Topic音讯 : " + text);}

单元测试类中新增如下测试:

@Testpublic void sendSimpleTopicMessage() {    this.producer.sendTopic("提现200.00元");}

此时,执行单元测试,便可看到如下日志信息:

发送Topic音讯内容 :提现200.00元receiveTopic2接管到Topic音讯 : 提现200.00元receiveTopic1接管到Topic音讯 : 提现200.00元

阐明音讯发送胜利。

同时反对两种模式

在下面的实例中,要么反对队列模式要么反对播送模式,如果在生产环境中两者都须要反对,那么就须要自定义JmsListenerContainerFactory实例。当然,如果Spring Boot默认的配置无奈满足需要,也能够自定义该类,这里只是其中场景之一。

根本配置和应用步骤:通过DefaultJmsListenerContainerFactory创立自定义的JmsListenerContainerFactory实例,在@JmsListener注解中通过containerFactory属性进行援用。

在MyMqConfig配置类中新增如下配置:

@Bean("queueListenerFactory")public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();    factory.setConnectionFactory(connectionFactory);    factory.setPubSubDomain(false);    return factory;}@Bean("topicListenerFactory")public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();    factory.setConnectionFactory(connectionFactory);    //设置为公布订阅形式, 默认状况下应用的生产消费者形式    factory.setPubSubDomain(true);    return factory;}

这里别离实例化了基于队列和订阅的工厂类。而后别离在对应的消费者办法上增加containerFactory属性。示例代码如下:

@JmsListener(destination = "sms.queue", containerFactory = "queueListenerFactory")public void receiveMsg(String text) {    System.out.println("接管到音讯 : " + text);}@JmsListener(destination = "sms.topic", containerFactory = "topicListenerFactory")public void receiveTopic1(String text) {    System.out.println("receiveTopic1接管到Topic音讯 : " + text);}

别离执行两种模式的音讯,发现都失常互利。同时,此时配置文件中的项spring.jms.pub-sub-domain也有效了。

其余事项

1、activeMq的端口号是61616;

2、应用topic,须要配置spring.jms.pub-sub-domain=true;

3、queue如果没有消费者,会将信息存储到queue中;

4、发送的音讯为对象的时候,须要将对象序列化;消费者接管对象信息时须要应用ObjectMessage进行转化;

5、应用JmsListener注解中的containerFactory属性,能够配置spring.jms.pub-sub属性,实现同时接管queque和topic;

6、queue为点对点模式;tipic为公布订阅模式;

7、示例中的音讯队列名称(sms.queue和sms.topic)可依据须要设置成配置属性;

源码地址:https://github.com/secbr/spri...

参考文章: <br/>
https://www.cnblogs.com/xigua... <br/>
https://blog.csdn.net/bihansh...


<center>程序新视界:精彩和成长都不容错过</center>