前情提要:rabbitmq 治理界面查看姿态

一、疾速搭建/根本信息发送和生产

1、引入依赖

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、application.yml

spring:  rabbitmq:    host: ipXXX    port: 5672    username: 账户XXX    password: 明码XXX    virtual-host: /wen  # 交换器名称

以 direct模式为例

1、配置文件import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/@Configurationpublic class RabbitConfig {    //队列 起名:TestDirectQueue    @Bean    public Queue emailQueue() {        // durable:是否长久化,默认是false,长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效        // exclusive:默认也是false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于durable        // autoDelete:是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。        //   return new Queue("TestDirectQueue",true,true,false);        //个别设置一下队列的长久化就好,其余两个就是默认false        return new Queue("email.fanout.queue", true);    }    @Bean    public Queue smsQueue() {        // durable:是否长久化,默认是false,长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效        // exclusive:默认也是false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于durable        // autoDelete:是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。        //   return new Queue("TestDirectQueue",true,true,false);        //个别设置一下队列的长久化就好,其余两个就是默认false        return new Queue("sms.fanout.queue", true);    }    @Bean    public Queue weixinQueue() {        // durable:是否长久化,默认是false,长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效        // exclusive:默认也是false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于durable        // autoDelete:是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。        //   return new Queue("TestDirectQueue",true,true,false);        //个别设置一下队列的长久化就好,其余两个就是默认false        return new Queue("weixin.fanout.queue", true);    }    @Bean    public Queue TTLQueue() {        Map<String, Object> map = new HashMap<>(16);        map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则30秒后过期        return new Queue("TTL_QUEUE", true, false, false, map);    }    @Bean    public DirectExchange TTLExchange() {        return new DirectExchange("TTL_EXCHANGE", true, false);    }    //Direct交换机 起名:TestDirectExchange    @Bean    public DirectExchange fanoutOrderExchange() {        //  return new DirectExchange("TestDirectExchange",true,true);        return new DirectExchange("fanout_exchange", true, false);    }    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting    @Bean    public Binding bindingDirect() {        return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");    }    @Bean    public Binding bindingDirect1() {        return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");    }    @Bean    public Binding bindingDirect2() {        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");    }    @Bean    public Binding bindingDirect3() {        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");    }}2、生产者package com.pit.barberShop.common.MQ.Rabbit.fanout;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @author :wenye * @date :Created in 2021/6/15 21:41 * @description:播送模式 * @version: $ */@RestController@RequestMapping("/rabbitmq")public class ProducerFanout {    @Autowired    private RabbitTemplate rabbitTemplate;    // 1: 定义交换机    private String exchangeName = "fanout_exchange";    // 2: 路由key    private String routeKey = "";    @RequestMapping("/fanout")    public void markerFanout() {        String message ="shua";        // 发送音讯        rabbitTemplate.convertAndSend(exchangeName, routeKey, message);    }    @RequestMapping("/ttl")    public String testTTL() {        MessageProperties messageProperties = new MessageProperties();        messageProperties.setExpiration("20000"); // 设置过期工夫,单位:毫秒        byte[] msgBytes = "测试音讯主动过期".getBytes();        Message message = new Message(msgBytes, messageProperties);        rabbitTemplate.convertAndSend("TTL_EXCHANGE", "TTL", message);        return "ok";    }}3、消费者package com.pit.barberShop.common.MQ.Rabbit.fanout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.core.ExchangeTypes;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.stereotype.Component;/** * @author :wenye * @date :Created in 2021/6/15 22:07 * @description:fanout消费者 * @version: $ */@Componentpublic class ConsumerFanout {    @RabbitListener(bindings =@QueueBinding(            // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。            value = @Queue(value = "sms.fanout.queue",autoDelete = "false"),            // order.fanout 交换机的名字 必须和生产者保持一致            exchange = @Exchange(value = "fanout_exchange",                    // 这里是确定的rabbitmq模式是:fanout 是以播送模式 、 公布订阅模式                    type = ExchangeTypes.DIRECT)    ))    public void messagerevice(String message){        // 此处省略发邮件的逻辑        System.out.println("sms-two111------------->" + message);    }    @RabbitListener(bindings =@QueueBinding(            // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。            value = @Queue(value = "weixin.fanout.queue",autoDelete = "false"),            // order.fanout 交换机的名字 必须和生产者保持一致            exchange = @Exchange(value = "fanout_exchange",                    // 这里是确定的rabbitmq模式是:fanout 是以播送模式 、 公布订阅模式                    type = ExchangeTypes.DIRECT)    ))    public void messageWXrevice(String message){        // 此处省略发邮件的逻辑        System.out.println("weixin----two---------->" + message);    }}

二、过期工夫

1、生产者发送音讯时设置过期工夫    public String testTTL() {        MessageProperties messageProperties = new MessageProperties();        messageProperties.setExpiration("20000"); // 设置过期工夫,单位:毫秒        byte[] msgBytes = "测试音讯主动过期".getBytes();        Message message = new Message(msgBytes, messageProperties);        rabbitTemplate.convertAndSend("TTL_EXCHANGE", "", message);        return "ok";    }2、队列中的所有音讯设置过期工夫配置中增加@Bean    public Queue TTLQueue() {        Map<String, Object> map = new HashMap<>();        map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则30秒后过期        return new Queue("TTL_QUEUE", true, false, false, map);    }  @Bean    public Queue TTLQueue() {        Map<String, Object> map = new HashMap<>();        map.put("x-message-ttl", 30000); // 队列中的音讯未被生产则30秒后过期        return new Queue("TTL_QUEUE", true, false, false, map);    }    @Bean    public DirectExchange TTLExchange() {        return new DirectExchange("TTL_EXCHANGE", true, false);    }    @Bean    public Binding bindingDirect() {        return BindingBuilder.bind(TTLQueue()).to(TTLExchange()).with("TTL");    }

三、音讯确认机制配置
参考:https://blog.csdn.net/qq33098...
默认是自动应答

spring:  rabbitmq:    # 开启发送确认    publisher-confirms: true    # 开启发送失败退回    publisher-returns: true
目前回调存在ConfirmCallback和ReturnCallback两者。他们的区别在于
如果音讯没有到exchange,则ConfirmCallback回调,ack=false,
如果音讯达到exchange,则ConfirmCallback回调,ack=true

exchange到queue胜利,则不回调ReturnCallback
rabbitMQ 音讯生产者发送音讯的流程

import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;@Slf4j@Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {    /**    * correlationData:对象外部只有一个 id 属性,用来示意以后音讯的唯一性。    * ack:音讯投递到broker 的状态,true示意胜利。    * cause:示意投递失败的起因。    **/    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause){        if (!ack) {            log.error("音讯发送异样!");        } else {            log.info("发送者爸爸曾经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);        }    }}import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;@Slf4j@Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback {    //重写 returnedMessage() 办法,办法有五个参数message(音讯体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey路由(队列)    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);    }}配置文件1、避免反复签发ack须要在配置类中重写  @Bean    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {        RabbitTemplate template = new RabbitTemplate(connectionFactory);        template.setMessageConverter(new Jackson2JsonMessageConverter());        return template;    }    @Bean    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        //此处也设置为手动ack        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);        factory.setConnectionFactory(connectionFactory);        factory.setMessageConverter(new Jackson2JsonMessageConverter());        return factory;    }2、从新创立设置交换器和队列属性@Bean   public Queue chongfuQueue() {    // durable:是否长久化,默认是false,长久化队列:会被存储在磁盘上,当音讯代理重启时依然存在,暂存队列:以后连贯无效    // exclusive:默认也是false,只能被以后创立的连贯应用,而且当连贯敞开后队列即被删除。此参考优先级高于durable    // autoDelete:是否主动删除,当没有生产者或者消费者应用此队列,该队列会主动删除。    //   return new Queue("TestDirectQueue",true,true,false);    //个别设置一下队列的长久化就好,其余两个就是默认false    return new Queue("chongfu.fanout.queue", true);} //Direct交换机 起名:TestDirectExchange@Beanpublic DirectExchange chongfuExchange() {    //  return new DirectExchange("TestDirectExchange",true,true);    return new DirectExchange("chongfu_exchange", true, false);}@Beanpublic Binding bindingDirect4() {    return BindingBuilder.bind(chongfuQueue()).to(chongfuExchange()).with("");}生产者public void markerchongfu() {        /**         * 确保音讯发送失败后能够从新返回到队列中         * 留神:yml须要配置 publisher-returns: true         */        rabbitTemplate.setMandatory(true);        /**         * 消费者确认收到音讯后,手动ack回执回调解决         */        rabbitTemplate.setConfirmCallback(confirmCallbackService);        /**         * 音讯投递到队列失败回调解决         */        rabbitTemplate.setReturnCallback(returnCallbackService);        /**         * 发送音讯         */        String s = UUID.randomUUID().toString();        rabbitTemplate.convertAndSend("chongfu_exchange", routeKey, "帅哥",                message -> {                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);                    return message;                },                new CorrelationData(s));    }消费者@RabbitListener(bindings =@QueueBinding(            // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。            value = @Queue(value = "chongfu.fanout.queue",autoDelete = "false"),            // order.fanout 交换机的名字 必须和生产者保持一致            exchange = @Exchange(value = "chongfu_exchange",                    // 这里是确定的rabbitmq模式是:fanout 是以播送模式 、 公布订阅模式                    type = ExchangeTypes.DIRECT)    ))    public void processHandler(String msg, Channel channel, Message message) throws IOException {        try {            log.info("小富收到音讯:{}", msg);//            log.info("序号:{}", message.getMessageProperties().getDeliveryTag());//            System.out.println(msg);            //TODO 具体业务            // 收到音讯 basicAck()            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }  catch (Exception e) {            if (message.getMessageProperties().getRedelivered()) {                log.error("音讯已反复解决失败,回绝再次接管...");                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 回绝音讯            } else {                log.error("音讯行将再次返回队列解决...");                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);            }        }    }
生产音讯有三种回执办法

1、basicAck

basicAck:示意胜利确认,应用此回执办法后,音讯会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple) 

  • deliveryTag:示意音讯投递序号,每次生产音讯或者音讯从新投递后,deliveryTag都会减少。手动音讯确认模式下,咱们能够对指定deliveryTag的音讯进行ack、nack、reject等操作。
  • multiple:是否批量确认,值为 true 则会一次性 ack所有小于以后音讯 deliveryTag 的音讯。

举个栗子: 假如我先发送三条音讯deliveryTag别离是5、6、7,可它们都没有被确认,当我发第四条音讯此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的音讯全副进行确认。

2、basicNack

basicNack :示意失败确认,个别在生产音讯业务异样时用到此办法,能够将音讯从新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

  • deliveryTag:示意音讯投递序号。
  • multiple:是否批量确认。
  • requeue:值为 true 音讯将从新入队列。

3、basicReject

basicReject:回绝音讯,与basicNack区别在于不能进行批量操作,其余用法很类似。

void basicReject(long deliveryTag, boolean requeue)

  • deliveryTag:示意音讯投递序号。
  • requeue:值为 true 音讯将从新入队列。

四、死信队列

死信队列其实和一般的队列没啥大的区别,都须要创立本人的Queue、Exchange,而后通过RoutingKey绑定到Exchange下来,只不过死信队列的RoutingKey和Exchange要作为参数,绑定到失常的队列下来,一种利用场景是失常队列外面的音讯被basicNack或者reject时,音讯就会被路由到失常队列绑定的死信队列中,还有一种还有罕用的场景就是开启了主动签收,而后消费者生产音讯时出现异常,超过了重试次数,那么这条音讯也会进入死信队列,如果配置了话,

例子

 //模仿异样用的交换器 ,topic交换器会通配符匹配,当然字符串截然不同也会匹配    @Bean    TopicExchange emailExchange() {        return new TopicExchange("demoTopicExchange");    }    //死信队列    @Bean    public Queue deadLetterQueue() {        return new Queue("demo.dead.letter");    }    //死信交换器    @Bean    TopicExchange deadLetterExchange() {        return new TopicExchange("demoDeadLetterTopicExchange");    }    //绑定死信队列    @Bean    Binding bindingDeadLetterQueue() {        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("demo.dead.letter");    }生产者@RequestMapping("/sixin")    public void sendEmailMessage() {        CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());        rabbitTemplate.convertAndSend("demoTopicExchange","demo.email","11",correlationData);        log.info("---发送 email 音讯---{}---messageId---{}","111",correlationData.getId());    }消费者  /**     * 邮件消费者     * @param message     * @param channel     * @throws IOException     */    @RabbitListener(bindings =@QueueBinding(            // email.fanout.queue 是队列名字,这个名字你能够自定轻易定义。            value = @Queue(value = "demo.email",autoDelete = "false",            arguments = {                    @Argument(name =  "x-dead-letter-exchange", value = "demoDeadLetterTopicExchange"),                    @Argument(name = "x-dead-letter-routing-key",value = "demo.dead.letter"),                    @Argument(name = "x-message-ttl",value = "3000",type = "java.lang.Long")            }),            key = "demo.email",            // order.fanout 交换机的名字 必须和生产者保持一致            exchange = @Exchange(value = "demoTopicExchange",                    // 这里是确定的rabbitmq模式是:fanout 是以播送模式 、 公布订阅模式                    type = ExchangeTypes.TOPIC)    ))    public void handleEmailMessage(Message message, Channel channel,String msg) throws IOException {        try {            log.info("---承受到音讯---{}",msg);            //被动异样            int m=1/0;            //手动签收            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);        }        catch (Exception e) {            //异样,ture 从新入队,或者false,进入死信队列            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);        }    }    /**     * 死信消费者,主动签收开启状态下,超过重试次数,或者手动签收,reject或者Nack     * @param message     */    @RabbitListener(queues = "demo.dead.letter")    public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {        //能够思考数据库记录,每次进来查数量,达到肯定的数量,进行预警,人工染指解决        log.info("接管到死信音讯:---{}---音讯ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));        //回复ack        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);    }

同样也可应用java类配置

 @Bean    public Queue emailQueue() {        Map<String, Object> arguments = new HashMap<>(2);        // 绑定死信交换机        arguments.put("x-dead-letter-exchange", "demoDeadLetterTopicExchange");        // 绑定死信的路由key        arguments.put("x-dead-letter-routing-key", "demo.dead.letter");        arguments.put("x-message-ttl", 3000);        return new Queue(emailQueue,true,false,false,arguments);    }        @Bean    TopicExchange emailExchange() {        return new TopicExchange(topicExchange);    }    @Bean    Binding bindingEmailQueue() {        return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");    }

五、长久化机制和内存磁盘监控

1、长久化
RabbitMQ的长久化队列分为:

1:队列长久化
2:音讯长久化
3:交换机长久化

不论是长久化的音讯还是非长久化的音讯都能够写入到磁盘中,只不过非长久的是等内存不足的状况下才会被写入到磁盘中。

2、内存磁盘监控

六、分布式事务

七、配置详解

 rabbitmq:    addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连贯到的server的地址,多个以逗号分隔(优先取addresses,而后再取host)#    port:    ##集群配置 addresses之间用逗号隔开    # addresses: ip:port,ip:port    password: admin    username: 123456    virtual-host: / # 连贯到rabbitMQ的vhost    requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s    publisher-confirms: #是否启用 公布确认    publisher-reurns: # 是否启用公布返回    connection-timeout: #连贯超时,单位毫秒,0示意无穷大,不超时    cache:      channel.size: # 缓存中放弃的channel数量      channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时工夫,单位毫秒;如果为0,则总是创立一个新channel      connection.size: # 缓存的连接数,只有是CONNECTION模式时失效      connection.mode: # 连贯工厂缓存模式:CHANNEL 和 CONNECTION    listener:      simple.auto-startup: # 是否启动时主动启动容器      simple.acknowledge-mode: # 示意音讯确认形式,其有三种配置形式,别离是none、manual和auto;默认auto      simple.concurrency: # 最小的消费者数量      simple.max-concurrency: # 最大的消费者数量      simple.prefetch: # 指定一个申请能解决多少个音讯,如果有事务的话,必须大于等于transaction数量.      simple.transaction-size: # 指定一个事务处理的音讯数量,最好是小于等于prefetch的数量.      simple.default-requeue-rejected: # 决定被回绝的音讯是否从新入队;默认是true(与参数acknowledge-mode有关系)      simple.idle-event-interval: # 多少长时间公布闲暇容器工夫,单位毫秒      simple.retry.enabled: # 监听重试是否可用      simple.retry.max-attempts: # 最大重试次数      simple.retry.initial-interval: # 第一次和第二次尝试公布或传递音讯之间的距离      simple.retry.multiplier: # 利用于上一重试距离的乘数      simple.retry.max-interval: # 最大重试工夫距离      simple.retry.stateless: # 重试是有状态or无状态    template:      mandatory: # 启用强制信息;默认false      receive-timeout: # receive() 操作的超时工夫      reply-timeout: # sendAndReceive() 操作的超时工夫      retry.enabled: # 发送重试是否可用      retry.max-attempts: # 最大重试次数      retry.initial-interval: # 第一次和第二次尝试公布或传递音讯之间的距离      retry.multiplier: # 利用于上一重试距离的乘数      retry.max-interval: #最大重试工夫距离