在我的项目开发的过程中咱们常常会遇到相似的业务场景:用户申请提现,后盾进行账务解决、发送提现短信、调用银行打款通道。
在这个过程中调用三方通道(短信或银行通道)都比拟耗时,同时账务解决可能也是由专门的账务零碎进行解决。那么,为了进步并发和相应速度,前面的三个操作都能够通过异步进行解决。这就用到了音讯队列。
音讯队列中间件是分布式系统中重要的组件,次要解决利用耦合、异步音讯、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可短少的中间件。
市面上比拟常见的音讯队列有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。
在Spring Boot的starter中专门集成了ActiveMQ,因而,本篇文章咱们就来讲讲对ActiveMQ的集成。
JMS标准
JMS即Java音讯服务(Java Message Service)利用程序接口,是一个Java平台中对于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送音讯,进行异步通信。Java音讯服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供反对。
JMS的音讯机制有2种模型,一种是队列的模式(Point to Point—)发送的音讯只能被一个消费者生产;一种是订阅(Topic)模式,能够被多个订阅者订阅,订阅者都会接管到同样的音讯。
而ActiveMQ就是对JMS的实现之一。
ActiveMQ介绍
ActiveMQ是一种开源的基于JMS(Java Message Servie)标准的一种消息中间件的实现,ActiveMQ的设计指标是提供规范的、面向音讯的、可能逾越多语言和多零碎的利用集成音讯通信中间件。
它为企业应用中消息传递提供高可用、杰出性能、可扩大、稳固和平安保障。
ActiveMQ实现JMS标准并在此之上提供大量额定的个性。ActiveMQ反对队列和订阅两种模式的音讯发送。
AcitveMQ的数据传送流程如下图:
ActiveMQ的两种消息传递类型:
(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者承受该条队列里的数据。
(2)基于公布/订阅模式的传输,即依据订阅话题来接管相应数据,一个生产者可向多个消费者推送数据,与MQTT协定的实现是相似的。
两种消息传递类型的不同,点对点传输消费者能够接管到在连贯之前生产者所推送的数据,而基于公布/订阅模式的传输方式消费者只能接管到连贯之后生产者推送的数据。
Spring Boot集成ActiveMQ
Spring Boot针对ActiveMQ专门提供了spring-boot-starter-activemq,用来反对ActiveMQ在Spring Boot的主动集成配置。在此基础上咱们能够很轻易的进行集成和应用。
创立我的项目并引入依赖
创立规范的Spring Boot我的项目,并在我的项目中引入以下依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId></dependency>
此时如果不须要web或其余相干解决,只引入该依赖即可。如果应用pool的话, 就须要在pom中退出以下依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId></dependency>
配置文件
在application.properties中增加如下配置:
# 基于内存的ActiveMQspring.activemq.in-memory=true# 不应用连接池,如果应用连接池还需在pom中增加activemq-pool的依赖spring.activemq.pool.enabled=false# 独立装置的ActiveMQ#spring.activemq.broker-url=tcp://127.0.0.1:61616#spring.activemq.user=admin#spring.activemq.password=admin
上述配置中有两套配置,Spring Boot反对基于内存ActiveMQ和基于独立装置的ActiveMQ。失常申请基于内存的模式是为了不便测试而应用,基于独立装置的模式才是真正用于生产环境。此处为了解说性能,不便测试,采纳基于内存的模式。
队列模式实例
首先,咱们来实现基于队列(Queue)模式的实现。这里须要用到两个类ActiveMQQueue和JmsMessagingTemplate。前者是由ActiveMQ对javax.jms.Queue的接口实现。后者为Spring提供发送音讯的工具类,联合Queue对音讯进行发送。
JmsMessagingTemplate默认曾经被实例化,间接拿来应用即可。而ActiveMQQueue则须要咱们进行实例化,并传入音讯队列的名称。
@Configurationpublic class MyMqConfig { @Bean public Queue queue() { return new ActiveMQQueue("sms.queue"); }}
Spring Boot中很惯例的实例化操作,不再赘述。当实例化完ActiveMQQueue之后,咱们的队列便创立实现,上面创立对应的生产者和消费者。
生产者对应代码如下:
@Componentpublic class Producer { @Resource private JmsMessagingTemplate jmsMessagingTemplate; @Resource private Queue queue; public void sendMsg(String msg) { System.out.println("发送音讯内容 :" + msg); this.jmsMessagingTemplate.convertAndSend(this.queue, msg); }}
此处用到JmsMessagingTemplate和Queue,下面曾经提到,这两个类都曾经实现了初始化。消费者对应的配置如下:
@Componentpublic class Consumer { @JmsListener(destination = "sms.queue") public void receiveMsg(String text) { System.out.println("接管到音讯 : "+text); }}
Spring提供了注解式监听器端点:应用@JmsListener。应用@JmsListener托管bean的带正文办法对其进行订阅。在Java8中,@JmsListener是一个可反复的注解,能够关联多个JMS destinations到同一个办法中。而在Java 6和7中,能够应用@JmsListeners注解。
其中destination指定监控的音讯队列名称为“sms.queue”。当队列sms.queue中有音讯发送时会触发此办法的执行,text为音讯内容。
下面实现了队列初始化、生产者和消费者代码的编写,上面通过单元测试来验证是否可能正确发送和解决音讯。
@RunWith(SpringRunner.class)@SpringBootTestpublic class ActiveMqTests { @Autowired private Producer producer; @Test public void sendSimpleQueueMessage() { this.producer.sendMsg("提现200.00元"); }}
执行单元测试,会发现在日志中打印如下信息:
发送音讯内容 :提现200.00元接管到音讯 : 提现200.00元
阐明音讯能够失常发送和接管。如果是基于内存模式,在执行单元测试时会打印出“javax.jms.JMSException: peer (vm://localhost#1) stopped.”异样日志,这是Info级别的谬误,是ActiveMQ的一个bug。
订阅模式实例
播送发送的音讯,能够被多个消费者接管。这里咱们就在原有的根底上进行播送音讯的增加。
首先,Spring Boot集成ActiveMQ时默认只反对队列或者播送之一,通过配置项spring.jms.pub-sub-domain来指定,true 为播送模式,false为队列模式,默认状况下反对队列模式。
此时要应用播送模式,则需在配置文件中增加如下配置:
spring.jms.pub-sub-domain=true
须要留神的是,此时队列模式不可失常工作。
而后在MyMqConfig中增加:
@Beanpublic Topic topic() { return new ActiveMQTopic("sms.topic");}
这里创立了ActiveMQTopic,并将topic的名称指定为sms.topic。
Producer中新增如下代码:
@Resourceprivate Topic topic;public void sendTopic(String msg) { System.out.println("发送Topic音讯内容 :"+msg); this.jmsMessagingTemplate.convertAndSend(this.topic, msg);}
为了演示多个播送接收者,在Comsumer中新增两个消费者:
@JmsListener(destination = "sms.topic")public void receiveTopic1(String text) { System.out.println("receiveTopic1接管到Topic音讯 : " + text);}@JmsListener(destination = "sms.topic")public void receiveTopic2(String text) { System.out.println("receiveTopic2接管到Topic音讯 : " + text);}
单元测试类中新增如下测试:
@Testpublic void sendSimpleTopicMessage() { this.producer.sendTopic("提现200.00元");}
此时,执行单元测试,便可看到如下日志信息:
发送Topic音讯内容 :提现200.00元receiveTopic2接管到Topic音讯 : 提现200.00元receiveTopic1接管到Topic音讯 : 提现200.00元
阐明音讯发送胜利。
同时反对两种模式
在下面的实例中,要么反对队列模式要么反对播送模式,如果在生产环境中两者都须要反对,那么就须要自定义JmsListenerContainerFactory实例。当然,如果Spring Boot默认的配置无奈满足需要,也能够自定义该类,这里只是其中场景之一。
根本配置和应用步骤:通过DefaultJmsListenerContainerFactory创立自定义的JmsListenerContainerFactory实例,在@JmsListener注解中通过containerFactory属性进行援用。
在MyMqConfig配置类中新增如下配置:
@Bean("queueListenerFactory")public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setPubSubDomain(false); return factory;}@Bean("topicListenerFactory")public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //设置为公布订阅形式, 默认状况下应用的生产消费者形式 factory.setPubSubDomain(true); return factory;}
这里别离实例化了基于队列和订阅的工厂类。而后别离在对应的消费者办法上增加containerFactory属性。示例代码如下:
@JmsListener(destination = "sms.queue", containerFactory = "queueListenerFactory")public void receiveMsg(String text) { System.out.println("接管到音讯 : " + text);}@JmsListener(destination = "sms.topic", containerFactory = "topicListenerFactory")public void receiveTopic1(String text) { System.out.println("receiveTopic1接管到Topic音讯 : " + text);}
别离执行两种模式的音讯,发现都失常互利。同时,此时配置文件中的项spring.jms.pub-sub-domain也有效了。
其余事项
1、activeMq的端口号是61616;
2、应用topic,须要配置spring.jms.pub-sub-domain=true;
3、queue如果没有消费者,会将信息存储到queue中;
4、发送的音讯为对象的时候,须要将对象序列化;消费者接管对象信息时须要应用ObjectMessage进行转化;
5、应用JmsListener注解中的containerFactory属性,能够配置spring.jms.pub-sub属性,实现同时接管queque和topic;
6、queue为点对点模式;tipic为公布订阅模式;
7、示例中的音讯队列名称(sms.queue和sms.topic)可依据须要设置成配置属性;
源码地址:https://github.com/secbr/spri...
参考文章: <br/>
https://www.cnblogs.com/xigua... <br/>
https://blog.csdn.net/bihansh...
<center>程序新视界:精彩和成长都不容错过</center>