关于java:SpringBoot整合多个RabbitMQ

11次阅读

共计 5446 个字符,预计需要花费 14 分钟才能阅读完成。

一、背景

​ 最近我的项目中须要用到了 RabbitMQ 来监听音讯队列,监听的音讯队列的 虚拟主机 (virtualHost) 和队列名 (queueName) 是不统一的,然而接管到的音讯格局雷同的。而且可能还存在程序不停机的状况下,动静的减少新的队列 (queue) 的监听,因而就须要咱们本人在程序中实现一种办法实现动静配置RabbitMQ

二、需要

咱们有 2RabbitMQ的配置,在程序启动的时候,动静的配置好这 2 个RabbitMQ,实现音讯的监听。

RabbitMQ的配置信息

host port username password virtualHost queueName
47.101.130.164 5672 rabbit-multi-01 rabbit-multi-01 /rabbit-multi-01 queue-rabbit-multi-01
47.101.130.164 5672 rabbit-multi-02 rabbit-multi-02 /rabbit-multi-02 queue-rabbit-multi-02

三、实现思路

1、动静配置 RabbitMQ

包含 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer

2、将上方配置好的 Bean 注入到 Spring 容器中,之后可能须要用到

Spring 容器中注入 Bean 的办法

DefaultListableBeanFactory#registerSingleton
DefaultListableBeanFactory#registerBeanDefinition

四、实现步骤

1、引入 maven 依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2、创立 RabbitProperties 用来示意 RabbitMQ 的配置信息

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RabbitProperties {
    private String host;
    private Integer port;
    private String username;
    private String password;
    private String virtualHost;
    private String queueName;
}

3、配置 RabbitMQ

配置 ConnectionFactory,RabbitAdmin,RabbitTemplate,SimpleMessageListenerContainer等,并动静注入到 Spring 容器中

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MultiRabbitMqConfig {

    private final DefaultListableBeanFactory defaultListableBeanFactory;

    private static Map<String, RabbitProperties> multiMqPropertiesMap = new HashMap<String, RabbitProperties>() {
        {put("first", RabbitProperties.builder()
                    .host("47.101.130.164")
                    .port(5672)
                    .username("rabbit-multi-01")
                    .password("rabbit-multi-01")
                    .virtualHost("/rabbit-multi-01")
                    .queueName("queue-rabbit-multi-01").build());
            put("second", RabbitProperties.builder()
                    .host("47.101.130.164")
                    .port(5672)
                    .username("rabbit-multi-02")
                    .password("rabbit-multi-02")
                    .virtualHost("/rabbit-multi-02")
                    .queueName("queue-rabbit-multi-02").build());
        }
    };

    @PostConstruct
    public void initRabbitmq() {multiMqPropertiesMap.forEach((key, rabbitProperties) -> {AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class)
                    .addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL)
                    .addPropertyValue("host", rabbitProperties.getHost())
                    .addPropertyValue("port", rabbitProperties.getPort())
                    .addPropertyValue("username", rabbitProperties.getUsername())
                    .addPropertyValue("password", rabbitProperties.getPassword())
                    .addPropertyValue("virtualHost", rabbitProperties.getVirtualHost())
                    .getBeanDefinition();
            String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory");
            defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition);
            CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class);

            String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin");
            AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class)
                    .addConstructorArgValue(connectionFactory)
                    .addPropertyValue("autoStartup", true)
                    .getBeanDefinition();
            defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition);
            RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class);
            log.info("rabbitAdmin:[{}]", rabbitAdmin);

            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate);

            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
            // 设置监听的队列
            simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName());
            // 指定要创立的并发使用者的数量, 默认值是 1, 当并发高时能够减少这个的数值,同时下方 max 的数值也要减少
            simpleMessageListenerContainer.setConcurrentConsumers(3);
            // 最大的并发消费者
            simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
            // 设置是否重回队列
            simpleMessageListenerContainer.setDefaultRequeueRejected(false);
            // 设置签收模式
            simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 设置非独占模式
            simpleMessageListenerContainer.setExclusive(false);
            // 设置 consumer 未被 ack 的音讯个数
            simpleMessageListenerContainer.setPrefetchCount(1);
            // 设置音讯监听器
            simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
                try {log.info("============> Thread:[{}] 接管到音讯:[{}]", Thread.currentThread().getName(), new String(message.getBody()));
                    log.info("====>connection:[{}]", channel.getConnection());
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {log.error(e.getMessage(), e);
                    // 产生异样此处须要捕捉到
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                }
            });
            defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer);
        });
        new Thread(() -> {
            try {TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {e.printStackTrace();
            }
            RabbitTemplate firstRabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean("firstRabbitTemplate");
            firstRabbitTemplate.convertAndSend("exchange-rabbit-multi-01", "","first queue message");
            log.info("over...");
        }).start();}
}

五、实现成果

六、代码

https://gitee.com/huan1993/ra…

正文完
 0