一、背景

最近我的项目中须要用到了RabbitMQ来监听音讯队列,监听的音讯队列的 虚拟主机(virtualHost)和队列名(queueName)是不统一的,然而接管到的音讯格局雷同的。而且可能还存在程序不停机的状况下,动静的减少新的队列(queue)的监听,因而就须要咱们本人在程序中实现一种办法实现动静配置RabbitMQ

二、需要

咱们有2RabbitMQ的配置,在程序启动的时候,动静的配置好这2个RabbitMQ,实现音讯的监听。

RabbitMQ的配置信息

hostportusernamepasswordvirtualHostqueueName
47.101.130.1645672rabbit-multi-01rabbit-multi-01/rabbit-multi-01queue-rabbit-multi-01
47.101.130.1645672rabbit-multi-02rabbit-multi-02/rabbit-multi-02queue-rabbit-multi-02

三、实现思路

1、动静配置RabbitMQ

包含 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer

2、将上方配置好的Bean注入到Spring容器中,之后可能须要用到

Spring容器中注入Bean的办法

DefaultListableBeanFactory#registerSingleton
DefaultListableBeanFactory#registerBeanDefinition

四、实现步骤

1、引入maven依赖

<dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <optional>true</optional>    </dependency>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-test</artifactId>        <scope>test</scope>    </dependency></dependencies>

2、创立RabbitProperties用来示意RabbitMQ的配置信息

@Data@NoArgsConstructor@AllArgsConstructor@Builderpublic class RabbitProperties {    private String host;    private Integer port;    private String username;    private String password;    private String virtualHost;    private String queueName;}

3、配置RabbitMQ

配置 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer等,并动静注入到Spring容器中

@Configuration@RequiredArgsConstructor@Slf4jpublic class MultiRabbitMqConfig {    private final DefaultListableBeanFactory defaultListableBeanFactory;    private static Map<String, RabbitProperties> multiMqPropertiesMap = new HashMap<String, RabbitProperties>() {        {            put("first", RabbitProperties.builder()                    .host("47.101.130.164")                    .port(5672)                    .username("rabbit-multi-01")                    .password("rabbit-multi-01")                    .virtualHost("/rabbit-multi-01")                    .queueName("queue-rabbit-multi-01").build());            put("second", RabbitProperties.builder()                    .host("47.101.130.164")                    .port(5672)                    .username("rabbit-multi-02")                    .password("rabbit-multi-02")                    .virtualHost("/rabbit-multi-02")                    .queueName("queue-rabbit-multi-02").build());        }    };    @PostConstruct    public void initRabbitmq() {        multiMqPropertiesMap.forEach((key, rabbitProperties) -> {            AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class)                    .addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL)                    .addPropertyValue("host", rabbitProperties.getHost())                    .addPropertyValue("port", rabbitProperties.getPort())                    .addPropertyValue("username", rabbitProperties.getUsername())                    .addPropertyValue("password", rabbitProperties.getPassword())                    .addPropertyValue("virtualHost", rabbitProperties.getVirtualHost())                    .getBeanDefinition();            String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory");            defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition);            CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class);            String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin");            AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class)                    .addConstructorArgValue(connectionFactory)                    .addPropertyValue("autoStartup", true)                    .getBeanDefinition();            defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition);            RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class);            log.info("rabbitAdmin:[{}]", rabbitAdmin);            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);            defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate);            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);            // 设置监听的队列            simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName());            // 指定要创立的并发使用者的数量,默认值是1,当并发高时能够减少这个的数值,同时下方max的数值也要减少            simpleMessageListenerContainer.setConcurrentConsumers(3);            // 最大的并发消费者            simpleMessageListenerContainer.setMaxConcurrentConsumers(10);            // 设置是否重回队列            simpleMessageListenerContainer.setDefaultRequeueRejected(false);            // 设置签收模式            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);            // 设置非独占模式            simpleMessageListenerContainer.setExclusive(false);            // 设置consumer未被 ack 的音讯个数            simpleMessageListenerContainer.setPrefetchCount(1);            // 设置音讯监听器            simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {                try {                    log.info("============> Thread:[{}] 接管到音讯:[{}] ", Thread.currentThread().getName(), new String(message.getBody()));                    log.info("====>connection:[{}]", channel.getConnection());                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);                } catch (Exception e) {                    log.error(e.getMessage(), e);                    // 产生异样此处须要捕捉到                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);                }            });            defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer);        });        new Thread(() -> {            try {                TimeUnit.SECONDS.sleep(3);            } catch (InterruptedException e) {                e.printStackTrace();            }            RabbitTemplate firstRabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean("firstRabbitTemplate");            firstRabbitTemplate.convertAndSend("exchange-rabbit-multi-01", "", "first queue message");            log.info("over...");        }).start();    }}

五、实现成果

六、代码

https://gitee.com/huan1993/ra...