一、背景
最近我的项目中须要用到了 RabbitMQ
来监听音讯队列,监听的音讯队列的 虚拟主机 (virtualHost
) 和队列名 (queueName
) 是不统一的,然而接管到的音讯格局雷同的。而且可能还存在程序不停机的状况下,动静的减少新的队列 (queue
) 的监听,因而就须要咱们本人在程序中实现一种办法实现动静配置RabbitMQ
。
二、需要
咱们有 2
个RabbitMQ
的配置,在程序启动的时候,动静的配置好这 2 个RabbitMQ
,实现音讯的监听。
RabbitMQ
的配置信息
host | port | username | password | virtualHost | queueName |
---|---|---|---|---|---|
47.101.130.164 | 5672 | rabbit-multi-01 | rabbit-multi-01 | /rabbit-multi-01 | queue-rabbit-multi-01 |
47.101.130.164 | 5672 | rabbit-multi-02 | rabbit-multi-02 | /rabbit-multi-02 | queue-rabbit-multi-02 |
三、实现思路
1、动静配置 RabbitMQ
包含 ConnectionFactory
,RabbitAdmin
,RabbitTemplate
,SimpleMessageListenerContainer
等
2、将上方配置好的 Bean 注入到 Spring 容器中,之后可能须要用到
向 Spring
容器中注入 Bean
的办法
DefaultListableBeanFactory#registerSingleton
DefaultListableBeanFactory#registerBeanDefinition
四、实现步骤
1、引入 maven 依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2、创立 RabbitProperties 用来示意 RabbitMQ 的配置信息
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RabbitProperties {
private String host;
private Integer port;
private String username;
private String password;
private String virtualHost;
private String queueName;
}
3、配置 RabbitMQ
配置 ConnectionFactory
,RabbitAdmin
,RabbitTemplate
,SimpleMessageListenerContainer
等,并动静注入到 Spring
容器中
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MultiRabbitMqConfig {
private final DefaultListableBeanFactory defaultListableBeanFactory;
private static Map<String, RabbitProperties> multiMqPropertiesMap = new HashMap<String, RabbitProperties>() {
{put("first", RabbitProperties.builder()
.host("47.101.130.164")
.port(5672)
.username("rabbit-multi-01")
.password("rabbit-multi-01")
.virtualHost("/rabbit-multi-01")
.queueName("queue-rabbit-multi-01").build());
put("second", RabbitProperties.builder()
.host("47.101.130.164")
.port(5672)
.username("rabbit-multi-02")
.password("rabbit-multi-02")
.virtualHost("/rabbit-multi-02")
.queueName("queue-rabbit-multi-02").build());
}
};
@PostConstruct
public void initRabbitmq() {multiMqPropertiesMap.forEach((key, rabbitProperties) -> {AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class)
.addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL)
.addPropertyValue("host", rabbitProperties.getHost())
.addPropertyValue("port", rabbitProperties.getPort())
.addPropertyValue("username", rabbitProperties.getUsername())
.addPropertyValue("password", rabbitProperties.getPassword())
.addPropertyValue("virtualHost", rabbitProperties.getVirtualHost())
.getBeanDefinition();
String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory");
defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition);
CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class);
String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin");
AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class)
.addConstructorArgValue(connectionFactory)
.addPropertyValue("autoStartup", true)
.getBeanDefinition();
defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition);
RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class);
log.info("rabbitAdmin:[{}]", rabbitAdmin);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate);
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
// 设置监听的队列
simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName());
// 指定要创立的并发使用者的数量, 默认值是 1, 当并发高时能够减少这个的数值,同时下方 max 的数值也要减少
simpleMessageListenerContainer.setConcurrentConsumers(3);
// 最大的并发消费者
simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
// 设置是否重回队列
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
// 设置签收模式
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置非独占模式
simpleMessageListenerContainer.setExclusive(false);
// 设置 consumer 未被 ack 的音讯个数
simpleMessageListenerContainer.setPrefetchCount(1);
// 设置音讯监听器
simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
try {log.info("============> Thread:[{}] 接管到音讯:[{}]", Thread.currentThread().getName(), new String(message.getBody()));
log.info("====>connection:[{}]", channel.getConnection());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {log.error(e.getMessage(), e);
// 产生异样此处须要捕捉到
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
});
defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer);
});
new Thread(() -> {
try {TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {e.printStackTrace();
}
RabbitTemplate firstRabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean("firstRabbitTemplate");
firstRabbitTemplate.convertAndSend("exchange-rabbit-multi-01", "","first queue message");
log.info("over...");
}).start();}
}
五、实现成果
六、代码
https://gitee.com/huan1993/ra…