乐趣区

关于springboot:三springBoot对接rabbitMq

前情提要:rabbitmq 治理界面查看姿态

一、疾速搭建 / 根本信息发送和生产

1、引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、application.yml

spring:
  rabbitmq:
    host: ipXXX
    port: 5672
    username: 账户 XXX
    password: 明码 XXX
    virtual-host: /wen  # 交换器名称

以 direct 模式为例

1、配置文件

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author : JCccc
 * @CreateTime : 2019/9/3
 * @Description :
 **/
@Configuration
public class RabbitConfig {
    // 队列 起名:TestDirectQueue
    @Bean
    public Queue emailQueue() {
        // durable: 是否长久化, 默认是 false, 长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效
        // exclusive: 默认也是 false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于 durable
        // autoDelete: 是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。//   return new Queue("TestDirectQueue",true,true,false);
        // 个别设置一下队列的长久化就好, 其余两个就是默认 false
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue smsQueue() {
        // durable: 是否长久化, 默认是 false, 长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效
        // exclusive: 默认也是 false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于 durable
        // autoDelete: 是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。//   return new Queue("TestDirectQueue",true,true,false);
        // 个别设置一下队列的长久化就好, 其余两个就是默认 false
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
        // durable: 是否长久化, 默认是 false, 长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效
        // exclusive: 默认也是 false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于 durable
        // autoDelete: 是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。//   return new Queue("TestDirectQueue",true,true,false);
        // 个别设置一下队列的长久化就好, 其余两个就是默认 false
        return new Queue("weixin.fanout.queue", true);
    }
    @Bean
    public Queue TTLQueue() {Map<String, Object> map = new HashMap<>(16);
        map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则 30 秒后过期
        return new Queue("TTL_QUEUE", true, false, false, map);
    }

    @Bean
    public DirectExchange TTLExchange() {return new DirectExchange("TTL_EXCHANGE", true, false);
    }


    //Direct 交换机 起名:TestDirectExchange
    @Bean
    public DirectExchange fanoutOrderExchange() {//  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("fanout_exchange", true, false);
    }
    // 绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    public Binding bindingDirect() {return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
    }

    @Bean
    public Binding bindingDirect1() {return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect2() {return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect3() {return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
    }
}


2、生产者
package com.pit.barberShop.common.MQ.Rabbit.fanout;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author:wenye
 * @date:Created in 2021/6/15 21:41
 * @description:播送模式
 * @version: $
 */
@RestController
@RequestMapping("/rabbitmq")
public class ProducerFanout {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 1: 定义交换机
    private String exchangeName = "fanout_exchange";
    // 2: 路由 key
    private String routeKey = "";

    @RequestMapping("/fanout")
    public void markerFanout() {
        String message ="shua";
        // 发送音讯
        rabbitTemplate.convertAndSend(exchangeName, routeKey, message);
    }

    @RequestMapping("/ttl")
    public String testTTL() {MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("20000"); // 设置过期工夫,单位:毫秒
        byte[] msgBytes = "测试音讯主动过期".getBytes();
        Message message = new Message(msgBytes, messageProperties);
        rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message);
        return "ok";
    }
}

3、消费者

package com.pit.barberShop.common.MQ.Rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
 * @author:wenye
 * @date:Created in 2021/6/15 22:07
 * @description:fanout 消费者
 * @version: $
 */
@Component
public class ConsumerFanout {

    @RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),
            // order.fanout 交换机的名字 必须和生产者保持一致
            exchange = @Exchange(value = "fanout_exchange",
                    // 这里是确定的 rabbitmq 模式是:fanout 是以播送模式、公布订阅模式
                    type = ExchangeTypes.DIRECT)
    ))
    public void messagerevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("sms-two111------------->" + message);
    }


    @RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"),
            // order.fanout 交换机的名字 必须和生产者保持一致
            exchange = @Exchange(value = "fanout_exchange",
                    // 这里是确定的 rabbitmq 模式是:fanout 是以播送模式、公布订阅模式
                    type = ExchangeTypes.DIRECT)
    ))
    public void messageWXrevice(String message){
        // 此处省略发邮件的逻辑
        System.out.println("weixin----two---------->" + message);
    }
}

二、过期工夫

1、生产者发送音讯时设置过期工夫

    public String testTTL() {MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("20000"); // 设置过期工夫,单位:毫秒
        byte[] msgBytes = "测试音讯主动过期".getBytes();
        Message message = new Message(msgBytes, messageProperties);
        rabbitTemplate.convertAndSend("TTL_EXCHANGE", "", message);
        return "ok";
    }

2、队列中的所有音讯设置过期工夫

配置中增加
@Bean
    public Queue TTLQueue() {Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则 30 秒后过期
        return new Queue("TTL_QUEUE", true, false, false, map);
    }

  @Bean
    public Queue TTLQueue() {Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则 30 秒后过期
        return new Queue("TTL_QUEUE", true, false, false, map);
    }

    @Bean
    public DirectExchange TTLExchange() {return new DirectExchange("TTL_EXCHANGE", true, false);
    }

    @Bean
    public Binding bindingDirect() {return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");
    }

三、音讯确认机制配置
参考:https://blog.csdn.net/qq33098…
默认是自动应答

spring:
  rabbitmq:
    # 开启发送确认
    publisher-confirms: true
    # 开启发送失败退回
    publisher-returns: true

目前回调存在 ConfirmCallback 和 ReturnCallback 两者。他们的区别在于
如果音讯没有到 exchange, 则 ConfirmCallback 回调,ack=false,
如果音讯达到 exchange, 则 ConfirmCallback 回调,ack=true

exchange 到 queue 胜利, 则不回调 ReturnCallback
rabbitMQ 音讯生产者发送音讯的流程

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    /**
    * correlationData:对象外部只有一个 id 属性,用来示意以后音讯的唯一性。* ack:音讯投递到 broker 的状态,true 示意胜利。* cause:示意投递失败的起因。**/
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause){if (!ack) {log.error("音讯发送异样!");
        } else {log.info("发送者爸爸曾经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }

    }


}

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {// 重写 returnedMessage() 办法,办法有五个参数 message(音讯体)、replyCode(响应 code)、replyText(响应内容)、exchange(交换机)、routingKey 路由(队列)@Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

配置文件
1、避免反复签发 ack 须要在配置类中重写
  @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        // 此处也设置为手动 ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

2、从新创立设置交换器和队列属性
@Bean
   public Queue chongfuQueue() {
    // durable: 是否长久化, 默认是 false, 长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效
    // exclusive: 默认也是 false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于 durable
    // autoDelete: 是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。//   return new Queue("TestDirectQueue",true,true,false);
    // 个别设置一下队列的长久化就好, 其余两个就是默认 false
    return new Queue("chongfu.fanout.queue", true);
}

 //Direct 交换机 起名:TestDirectExchange
@Bean
public DirectExchange chongfuExchange() {//  return new DirectExchange("TestDirectExchange",true,true);
    return new DirectExchange("chongfu_exchange", true, false);
}

@Bean
public Binding bindingDirect4() {return BindingBuilder.bind(chongfuQueue()).to(chongfuExchange()).with("");
}


生产者

public void markerchongfu() {
        /**
         * 确保音讯发送失败后能够从新返回到队列中
         * 留神:yml 须要配置 publisher-returns: true
         */
        rabbitTemplate.setMandatory(true);

        /**
         * 消费者确认收到音讯后,手动 ack 回执回调解决
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 音讯投递到队列失败回调解决
         */
        rabbitTemplate.setReturnCallback(returnCallbackService);

        /**
         * 发送音讯
         */
        String s = UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend("chongfu_exchange", routeKey, "帅哥",
                message -> {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(s));
    }

消费者

@RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。value = @Queue(value = "chongfu.fanout.queue",autoDelete = "false"),
            // order.fanout 交换机的名字 必须和生产者保持一致
            exchange = @Exchange(value = "chongfu_exchange",
                    // 这里是确定的 rabbitmq 模式是:fanout 是以播送模式、公布订阅模式
                    type = ExchangeTypes.DIRECT)
    ))
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {log.info("小富收到音讯:{}", msg);
//            log.info("序号:{}", message.getMessageProperties().getDeliveryTag());
//            System.out.println(msg);
            //TODO 具体业务
            // 收到音讯 basicAck()
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {log.error("音讯已反复解决失败, 回绝再次接管...");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 回绝音讯
            } else {log.error("音讯行将再次返回队列解决...");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }

生产音讯有三种回执办法

1、basicAck

basicAck:示意胜利确认,应用此回执办法后,音讯会被 rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple) 

  • deliveryTag:示意音讯投递序号,每次生产音讯或者音讯从新投递后,deliveryTag 都会减少。手动音讯确认模式下,咱们能够对指定 deliveryTag 的音讯进行 ack、nack、reject 等操作。
  • multiple:是否批量确认,值为 true 则会一次性 ack 所有小于以后音讯 deliveryTag 的音讯。

举个栗子:假如我先发送三条音讯 deliveryTag 别离是 5、6、7,可它们都没有被确认,当我发第四条音讯此时 deliveryTag 为 8,multiple 设置为 true,会将 5、6、7、8 的音讯全副进行确认。

2、basicNack

basicNack:示意失败确认,个别在生产音讯业务异样时用到此办法,能够将音讯从新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:示意音讯投递序号。
  • multiple:是否批量确认。
  • requeue:值为 true 音讯将从新入队列。

3、basicReject

basicReject:回绝音讯,与 basicNack 区别在于不能进行批量操作,其余用法很类似。

void basicReject(long deliveryTag, boolean requeue)

  • deliveryTag:示意音讯投递序号。
  • requeue:值为 true 音讯将从新入队列。

四、死信队列

死信队列其实和一般的队列没啥大的区别,都须要创立本人的 Queue、Exchange, 而后通过 RoutingKey 绑定到 Exchange 下来,只不过死信队列的 RoutingKey 和 Exchange 要作为参数,绑定到失常的队列下来,一种利用场景是失常队列外面的音讯被 basicNack 或者 reject 时,音讯就会被路由到失常队列绑定的死信队列中,还有一种还有罕用的场景就是开启了主动签收,而后消费者生产音讯时出现异常,超过了重试次数,那么这条音讯也会进入死信队列,如果配置了话,

例子

 // 模仿异样用的交换器,topic 交换器会通配符匹配,当然字符串截然不同也会匹配
    @Bean
    TopicExchange emailExchange() {return new TopicExchange("demoTopicExchange");
    }

    // 死信队列
    @Bean
    public Queue deadLetterQueue() {return new Queue("demo.dead.letter");
    }
    // 死信交换器
    @Bean
    TopicExchange deadLetterExchange() {return new TopicExchange("demoDeadLetterTopicExchange");
    }
    // 绑定死信队列
    @Bean
    Binding bindingDeadLetterQueue() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("demo.dead.letter");
    }

生产者
@RequestMapping("/sixin")
    public void sendEmailMessage() {CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("demoTopicExchange","demo.email","11",correlationData);
        log.info("--- 发送 email 音讯 ---{}---messageId---{}","111",correlationData.getId());
    }

消费者

  /**
     * 邮件消费者
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(bindings =@QueueBinding(
            // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。value = @Queue(value = "demo.email",autoDelete = "false",
            arguments = {@Argument(name =  "x-dead-letter-exchange", value = "demoDeadLetterTopicExchange"),
                    @Argument(name = "x-dead-letter-routing-key",value = "demo.dead.letter"),
                    @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long")
            }),
            key = "demo.email",
            // order.fanout 交换机的名字 必须和生产者保持一致
            exchange = @Exchange(value = "demoTopicExchange",
                    // 这里是确定的 rabbitmq 模式是:fanout 是以播送模式、公布订阅模式
                    type = ExchangeTypes.TOPIC)
    ))
    public void handleEmailMessage(Message message, Channel channel,String msg) throws IOException {

        try {log.info("--- 承受到音讯 ---{}",msg);
            // 被动异样
            int m=1/0;
            // 手动签收
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
        catch (Exception e) {
            // 异样,ture 从新入队, 或者 false, 进入死信队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

        }
    }

    /**
     * 死信消费者,主动签收开启状态下,超过重试次数,或者手动签收,reject 或者 Nack
     * @param message
     */
    @RabbitListener(queues = "demo.dead.letter")
    public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {

        // 能够思考数据库记录,每次进来查数量,达到肯定的数量,进行预警,人工染指解决
        log.info("接管到死信音讯:---{}--- 音讯 ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
        // 回复 ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

同样也可应用 java 类配置

 @Bean
    public Queue emailQueue() {Map<String, Object> arguments = new HashMap<>(2);
        // 绑定死信交换机
        arguments.put("x-dead-letter-exchange", "demoDeadLetterTopicExchange");
        // 绑定死信的路由 key
        arguments.put("x-dead-letter-routing-key", "demo.dead.letter");
        arguments.put("x-message-ttl", 3000);

        return new Queue(emailQueue,true,false,false,arguments);
    }

    
    @Bean
    TopicExchange emailExchange() {return new TopicExchange(topicExchange);
    }


    @Bean
    Binding bindingEmailQueue() {return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
    }

五、长久化机制和内存磁盘监控

1、长久化
RabbitMQ 的长久化队列分为:

1:队列长久化
2:音讯长久化
3:交换机长久化

不论是长久化的音讯还是非长久化的音讯都能够写入到磁盘中,只不过非长久的是等内存不足的状况下才会被写入到磁盘中。

2、内存磁盘监控

六、分布式事务

七、配置详解

 rabbitmq:
    addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定 client 连贯到的 server 的地址,多个以逗号分隔(优先取 addresses,而后再取 host)
#    port:
    ## 集群配置 addresses 之间用逗号隔开
    # addresses: ip:port,ip:port
    password: admin
    username: 123456
    virtual-host: / # 连贯到 rabbitMQ 的 vhost
    requested-heartbeat: #指定心跳超时,单位秒,0 为不指定;默认 60s
    publisher-confirms: #是否启用 公布确认
    publisher-reurns: # 是否启用公布返回
    connection-timeout: #连贯超时,单位毫秒,0 示意无穷大,不超时
    cache:
      channel.size: # 缓存中放弃的 channel 数量
      channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个 channel 的超时工夫,单位毫秒;如果为 0,则总是创立一个新 channel
      connection.size: # 缓存的连接数,只有是 CONNECTION 模式时失效
      connection.mode: # 连贯工厂缓存模式:CHANNEL 和 CONNECTION
    listener:
      simple.auto-startup: # 是否启动时主动启动容器
      simple.acknowledge-mode: # 示意音讯确认形式,其有三种配置形式,别离是 none、manual 和 auto;默认 auto
      simple.concurrency: # 最小的消费者数量
      simple.max-concurrency: # 最大的消费者数量
      simple.prefetch: # 指定一个申请能解决多少个音讯,如果有事务的话,必须大于等于 transaction 数量.
      simple.transaction-size: # 指定一个事务处理的音讯数量,最好是小于等于 prefetch 的数量.
      simple.default-requeue-rejected: # 决定被回绝的音讯是否从新入队;默认是 true(与参数 acknowledge-mode 有关系)simple.idle-event-interval: # 多少长时间公布闲暇容器工夫,单位毫秒
      simple.retry.enabled: # 监听重试是否可用
      simple.retry.max-attempts: # 最大重试次数
      simple.retry.initial-interval: # 第一次和第二次尝试公布或传递音讯之间的距离
      simple.retry.multiplier: # 利用于上一重试距离的乘数
      simple.retry.max-interval: # 最大重试工夫距离
      simple.retry.stateless: # 重试是有状态 or 无状态
    template:
      mandatory: # 启用强制信息;默认 false
      receive-timeout: # receive() 操作的超时工夫
      reply-timeout: # sendAndReceive() 操作的超时工夫
      retry.enabled: # 发送重试是否可用
      retry.max-attempts: # 最大重试次数
      retry.initial-interval: # 第一次和第二次尝试公布或传递音讯之间的距离
      retry.multiplier: # 利用于上一重试距离的乘数
      retry.max-interval: #最大重试工夫距离
退出移动版