关于java:SpringBoot集成ActiveMQ实例详解

56次阅读

共计 6171 个字符,预计需要花费 16 分钟才能阅读完成。

在我的项目开发的过程中咱们常常会遇到相似的业务场景:用户申请提现,后盾进行账务解决、发送提现短信、调用银行打款通道。

在这个过程中调用三方通道(短信或银行通道)都比拟耗时,同时账务解决可能也是由专门的账务零碎进行解决。那么,为了进步并发和相应速度,前面的三个操作都能够通过异步进行解决。这就用到了音讯队列。

音讯队列中间件是分布式系统中重要的组件,次要解决利用耦合、异步音讯、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可短少的中间件。

市面上比拟常见的音讯队列有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。

在 Spring Boot 的 starter 中专门集成了 ActiveMQ,因而,本篇文章咱们就来讲讲对 ActiveMQ 的集成。

JMS 标准

JMS 即 Java 音讯服务(Java Message Service)利用程序接口,是一个 Java 平台中对于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送音讯,进行异步通信。Java 音讯服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供反对。

JMS 的音讯机制有 2 种模型,一种是队列的模式(Point to Point—)发送的音讯只能被一个消费者生产;一种是订阅(Topic)模式,能够被多个订阅者订阅,订阅者都会接管到同样的音讯。

而 ActiveMQ 就是对 JMS 的实现之一。

ActiveMQ 介绍

ActiveMQ 是一种开源的基于 JMS(Java Message Servie)标准的一种消息中间件的实现,ActiveMQ 的设计指标是提供规范的、面向音讯的、可能逾越多语言和多零碎的利用集成音讯通信中间件。

它为企业应用中消息传递提供高可用、杰出性能、可扩大、稳固和平安保障。

ActiveMQ 实现 JMS 标准并在此之上提供大量额定的个性。ActiveMQ 反对队列和订阅两种模式的音讯发送。

AcitveMQ 的数据传送流程如下图:

ActiveMQ 的两种消息传递类型:

(1)点对点传输,即一个生产者对应一个消费者,生产者向 broke 推送数据,数据存储在 broke 的一个队列中,当消费者承受该条队列里的数据。

(2)基于公布 / 订阅模式的传输,即依据订阅话题来接管相应数据,一个生产者可向多个消费者推送数据,与 MQTT 协定的实现是相似的。

两种消息传递类型的不同,点对点传输消费者能够接管到在连贯之前生产者所推送的数据,而基于公布 / 订阅模式的传输方式消费者只能接管到连贯之后生产者推送的数据。

Spring Boot 集成 ActiveMQ

Spring Boot 针对 ActiveMQ 专门提供了 spring-boot-starter-activemq,用来反对 ActiveMQ 在 Spring Boot 的主动集成配置。在此基础上咱们能够很轻易的进行集成和应用。

创立我的项目并引入依赖

创立规范的 Spring Boot 我的项目,并在我的项目中引入以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

此时如果不须要 web 或其余相干解决,只引入该依赖即可。如果应用 pool 的话, 就须要在 pom 中退出以下依赖:

<dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-pool</artifactId>
</dependency>

配置文件

在 application.properties 中增加如下配置:

# 基于内存的 ActiveMQ
spring.activemq.in-memory=true
# 不应用连接池,如果应用连接池还需在 pom 中增加 activemq-pool 的依赖
spring.activemq.pool.enabled=false

# 独立装置的 ActiveMQ
#spring.activemq.broker-url=tcp://127.0.0.1:61616
#spring.activemq.user=admin
#spring.activemq.password=admin

上述配置中有两套配置,Spring Boot 反对基于内存 ActiveMQ 和基于独立装置的 ActiveMQ。失常申请基于内存的模式是为了不便测试而应用,基于独立装置的模式才是真正用于生产环境。此处为了解说性能,不便测试,采纳基于内存的模式。

队列模式实例

首先,咱们来实现基于队列(Queue)模式的实现。这里须要用到两个类 ActiveMQQueue 和 JmsMessagingTemplate。前者是由 ActiveMQ 对 javax.jms.Queue 的接口实现。后者为 Spring 提供发送音讯的工具类,联合 Queue 对音讯进行发送。

JmsMessagingTemplate 默认曾经被实例化,间接拿来应用即可。而 ActiveMQQueue 则须要咱们进行实例化,并传入音讯队列的名称。

@Configuration
public class MyMqConfig {

    @Bean
    public Queue queue() {return new ActiveMQQueue("sms.queue");
    }
}

Spring Boot 中很惯例的实例化操作,不再赘述。当实例化完 ActiveMQQueue 之后,咱们的队列便创立实现,上面创立对应的生产者和消费者。

生产者对应代码如下:

@Component
public class Producer {

    @Resource
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Resource
    private Queue queue;

    public void sendMsg(String msg) {System.out.println("发送音讯内容 :" + msg);
        this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
    }

}

此处用到 JmsMessagingTemplate 和 Queue,下面曾经提到,这两个类都曾经实现了初始化。消费者对应的配置如下:

@Component
public class Consumer {@JmsListener(destination = "sms.queue")
    public void receiveMsg(String text) {System.out.println("接管到音讯 :"+text);
    }
}

Spring 提供了注解式监听器端点:应用 @JmsListener。应用 @JmsListener 托管 bean 的带正文办法对其进行订阅。在 Java8 中,@JmsListener 是一个可反复的注解,能够关联多个 JMS destinations 到同一个办法中。而在 Java 6 和 7 中,能够应用 @JmsListeners 注解。

其中 destination 指定监控的音讯队列名称为“sms.queue”。当队列 sms.queue 中有音讯发送时会触发此办法的执行,text 为音讯内容。

下面实现了队列初始化、生产者和消费者代码的编写,上面通过单元测试来验证是否可能正确发送和解决音讯。

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActiveMqTests {

    @Autowired
    private Producer producer;

    @Test
    public void sendSimpleQueueMessage() {this.producer.sendMsg("提现 200.00 元");
    }
}

执行单元测试,会发现在日志中打印如下信息:

 发送音讯内容 : 提现 200.00 元
接管到音讯 : 提现 200.00 元 

阐明音讯能够失常发送和接管。如果是基于内存模式,在执行单元测试时会打印出“javax.jms.JMSException: peer (vm://localhost#1) stopped.”异样日志,这是 Info 级别的谬误,是 ActiveMQ 的一个 bug。

订阅模式实例

播送发送的音讯,能够被多个消费者接管。这里咱们就在原有的根底上进行播送音讯的增加。

首先,Spring Boot 集成 ActiveMQ 时默认只反对队列或者播送之一,通过配置项 spring.jms.pub-sub-domain 来指定,true 为播送模式,false 为队列模式,默认状况下反对队列模式。

此时要应用播送模式,则需在配置文件中增加如下配置:

spring.jms.pub-sub-domain=true

须要留神的是,此时队列模式不可失常工作。

而后在 MyMqConfig 中增加:

@Bean
public Topic topic() {return new ActiveMQTopic("sms.topic");
}

这里创立了 ActiveMQTopic,并将 topic 的名称指定为 sms.topic。

Producer 中新增如下代码:

@Resource
private Topic topic;

public void sendTopic(String msg) {System.out.println("发送 Topic 音讯内容 :"+msg);
    this.jmsMessagingTemplate.convertAndSend(this.topic, msg);
}

为了演示多个播送接收者,在 Comsumer 中新增两个消费者:

@JmsListener(destination = "sms.topic")
public void receiveTopic1(String text) {System.out.println("receiveTopic1 接管到 Topic 音讯 :" + text);
}

@JmsListener(destination = "sms.topic")
public void receiveTopic2(String text) {System.out.println("receiveTopic2 接管到 Topic 音讯 :" + text);
}

单元测试类中新增如下测试:

@Test
public void sendSimpleTopicMessage() {this.producer.sendTopic("提现 200.00 元");
}

此时,执行单元测试,便可看到如下日志信息:

 发送 Topic 音讯内容 : 提现 200.00 元
receiveTopic2 接管到 Topic 音讯 : 提现 200.00 元
receiveTopic1 接管到 Topic 音讯 : 提现 200.00 元 

阐明音讯发送胜利。

同时反对两种模式

在下面的实例中,要么反对队列模式要么反对播送模式,如果在生产环境中两者都须要反对,那么就须要自定义 JmsListenerContainerFactory 实例。当然,如果 Spring Boot 默认的配置无奈满足需要,也能够自定义该类,这里只是其中场景之一。

根本配置和应用步骤:通过 DefaultJmsListenerContainerFactory 创立自定义的 JmsListenerContainerFactory 实例,在 @JmsListener 注解中通过 containerFactory 属性进行援用。

在 MyMqConfig 配置类中新增如下配置:

@Bean("queueListenerFactory")
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setPubSubDomain(false);
    return factory;
}

@Bean("topicListenerFactory")
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    // 设置为公布订阅形式, 默认状况下应用的生产消费者形式
    factory.setPubSubDomain(true);
    return factory;
}

这里别离实例化了基于队列和订阅的工厂类。而后别离在对应的消费者办法上增加 containerFactory 属性。示例代码如下:

@JmsListener(destination = "sms.queue", containerFactory = "queueListenerFactory")
public void receiveMsg(String text) {System.out.println("接管到音讯 :" + text);
}

@JmsListener(destination = "sms.topic", containerFactory = "topicListenerFactory")
public void receiveTopic1(String text) {System.out.println("receiveTopic1 接管到 Topic 音讯 :" + text);
}

别离执行两种模式的音讯,发现都失常互利。同时,此时配置文件中的项 spring.jms.pub-sub-domain 也有效了。

其余事项

1、activeMq 的端口号是 61616;

2、应用 topic, 须要配置 spring.jms.pub-sub-domain=true;

3、queue 如果没有消费者,会将信息存储到 queue 中;

4、发送的音讯为对象的时候,须要将对象序列化;消费者接管对象信息时须要应用 ObjectMessage 进行转化;

5、应用 JmsListener 注解中的 containerFactory 属性,能够配置 spring.jms.pub-sub 属性,实现同时接管 queque 和 topic;

6、queue 为点对点模式;tipic 为公布订阅模式;

7、示例中的音讯队列名称(sms.queue 和 sms.topic)可依据须要设置成配置属性;

源码地址:https://github.com/secbr/spri…

参考文章:<br/>
https://www.cnblogs.com/xigua… <br/>
https://blog.csdn.net/bihansh…

<center> 程序新视界 :精彩和成长都不容错过 </center>

正文完
 0