共计 3166 个字符,预计需要花费 8 分钟才能阅读完成。
ActiveMq 事务
ActiveMq 事务的作用就是在发送、接收处理消息过程中,如果出现问题,可以回滚。
ActiveMq 异步 / 同步发送
以下摘抄自 https://blog.csdn.net/songhai…
同步发送:消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者 (ProducerAck),这个确认消息暗示 broker 已经成功接收到消息并把消息保存到二级存储中。
异步发送如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到 broker 的确认之前一直阻塞 Producer.send 方法。
当发送方法在一个事务上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。
想要使用异步,在 brokerURL 中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true 如果设置了 alwaysSyncSend=true 系统将会忽略 useAsyncSend 设置的值都采用同步 1) 当 alwaysSyncSend=false 时,“NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送”2) 当 alwaysSyncSend=false 时,如果指定了 useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。如果 useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送。总结:默认情况 (alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送。jms.sendTimeout: 发送超时时间,默认等于 0,如果 jms.sendTimeout>0 将会忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)所有的消息都是用同步发送!
官方连接:http://activemq.apache.org/as…
配置使用异步发送方式
1. 在连接上配置 cf = new ActiveMQConnectionFactory(“tcp://locahost:61616?jms.useAsyncSend=true”);
2. 通过 ConnectionFactory((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
3. 通过 connection((ActiveMQConnection)connection).setUseAsyncSend(true);
SpringBoot JMS 实现异步发送
1. 如果在配置中使用了连接池,那么 SpringBoot 默认会使用 PooledConnectionFactory,ActiveMQConnectionFactory 的 useAsyncSend 默认会 true。使用连接池配置如下
activemq:
in-memory: true
broker-url: tcp://127.0.0.1:61616
pool:
enabled: true
max-connections: 5
user:
password:
2. 修改 JmsTemplate 默认参数
JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
// 设备为 true,deliveryMode, priority, timeToLive 等设置才会起作用
template.setExplicitQosEnabled(true);
// 设为非持久化模式
template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
完整代码如下:
@Slf4j
@Configuration
public class ActiveConfig {
/**
* 配置用于异步发送的非持久化 JmsTemplate
*/
@Autowired
@Bean
@Primary
public JmsTemplate asynJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
template.setExplicitQosEnabled(true);
template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
log.info(“jsmtemplate ————->sessionTransacted:{}”,template.isSessionTransacted());
log.info(“jsmtemplate ————->ExplicitQosEnabled:{}”,template.isExplicitQosEnabled());
return template;
}
/**
* 配置用于同步发送的持久化 JmsTemplate
*/
@Autowired
@Bean
public JmsTemplate synJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
log.info(“jsmtemplate ————->sessionTransacted:{}”,template.isSessionTransacted());
log.info(“jsmtemplate ————->ExplicitQosEnabled:{}”,template.isExplicitQosEnabled());
return template;
}
// 如果对于 SpringBoot 自动生成的 PooledConnectionFactory 需要调优,可以自己生 PooledConnectionFactory 调优参数
// private PooledConnectionFactory getPooledConnectionFactory(String userName,String password,String brokerURL) {
// ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName,password,brokerURL);
// ActiveMQPrefetchPolicy activeMQPrefetchPolicy = new ActiveMQPrefetchPolicy();
// activeMQConnectionFactory.setPrefetchPolicy(activeMQPrefetchPolicy);
// PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
// pooledConnectionFactory.setMaxConnections(5);
// return pooledConnectionFactory;
// }