关于rabbitmq:RabbitMQ如何保证消息的可靠性

36次阅读

共计 4303 个字符,预计需要花费 11 分钟才能阅读完成。

一条生产胜利被生产经验了生产者 ->MQ-> 消费者,因而在这三个步骤中都有可能造成音讯失落。

一 音讯生产者没有把音讯胜利发送到 MQ

1.1 事务机制

AMQP协定提供了事务机制,在投递音讯时开启事务反对,如果音讯投递失败,则回滚事务。

自定义事务管理器

@Configuration
public class RabbitTranscation {
    
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){return new RabbitTemplate(connectionFactory);
    }
}

批改 yml

spring:
  rabbitmq:
    # 音讯在未被队列收到的状况下返回
    publisher-returns: true

开启事务反对

rabbitTemplate.setChannelTransacted(true);

音讯未接管时调用 ReturnCallback

rabbitTemplate.setMandatory(true);

生产者投递音讯

@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        // 设置 channel 开启事务
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setReturnCallback(this);
    }
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("这条音讯发送失败了"+message+", 请解决");
    }
    
    @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
    public void publishMessage(String message) throws Exception {rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message);
    }
}

然而,很少有人这么干,因为这是同步操作,一条音讯发送之后会使发送端阻塞,以期待 RabbitMQ-Server 的回应,之后能力持续发送下一条音讯,生产者生产音讯的吞吐量和性能都会大大降低。

1.2 发送方确认机制

发送音讯时将信道设置为 confirm 模式,音讯进入该信道后,都会被指派给一个惟一 ID,一旦音讯被投递到所匹配的队列后,RabbitMQ就会发送给生产者一个确认。

开启音讯确认机制

spring:
  rabbitmq:
    # 音讯在未被队列收到的状况下返回
    publisher-returns: true
    # 开启音讯确认机制
    publisher-confirm-type: correlated

音讯未接管时调用 ReturnCallback

rabbitTemplate.setMandatory(true);

生产者投递音讯

@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("确认了这条音讯:"+correlationData);
        }else{System.out.println("确认失败了:"+correlationData+";出现异常:"+cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("这条音讯发送失败了"+message+", 请解决");
    }

    public void publisMessage(String message){rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message);
    }
}

如果音讯确认失败后,咱们能够进行音讯弥补,也就是音讯的重试机制。当未收到确认信息时进行音讯的从新投递。设置如下配置即可实现。

spring:
  rabbitmq:
    # 反对音讯发送失败后重返队列
    publisher-returns: true
    # 开启音讯确认机制
    publisher-confirm-type: correlated
    listener:
      simple:
        retry:
          # 开启重试
          enabled: true
          # 最大重试次数
          max-attempts: 5
          # 重试工夫距离
          initial-interval: 3000

二 音讯发送到 MQ 后,MQ 宕机导致内存中的音讯失落

音讯在 MQ 中有可能产生失落,这时候咱们就须要将队列和音讯都进行长久化。

@Queue 注解为咱们提供了队列相干的一些属性,具体如下:

  1. name: 队列的名称;
  2. durable: 是否长久化;
  3. exclusive: 是否独享、排外的;
  4. autoDelete: 是否主动删除;
  5. arguments:队列的其余属性参数,有如下可选项,可参看图 2 的 arguments:

    • x-message-ttl:音讯的过期工夫,单位:毫秒;
    • x-expires:队列过期工夫,队列在多长时间未被拜访将被删除,单位:毫秒;
    • x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除音讯;
    • x-max-length-bytes:队列音讯内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除音讯;
    • x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时音讯会产生什么。有效值是 drop-head、reject-publish 或 reject-publish-dlx。仲裁队列类型仅反对 drop-head;
    • x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的音讯可指定发送到该交换器中;
    • x-dead-letter-routing-key:死信音讯路由键,在音讯发送到死信交换器时会应用该路由键,如果不设置,则应用音讯的原来的路由键值
    • x-single-active-consumer:示意队列是否是繁多流动消费者,true 时,注册的生产组内只有一个消费者生产音讯,其余被疏忽,false 时音讯循环分发给所有消费者(默认 false)
    • x-max-priority:队列要反对的最大优先级数; 如果未设置,队列将不反对音讯优先级;
    • x-queue-mode(Lazy mode):将队列设置为提早模式,在磁盘上保留尽可能多的音讯,以缩小 RAM 的应用; 如果未设置,队列将保留内存缓存以尽可能快地传递音讯;
    • x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

长久化队列

创立队列的时候将长久化属性 durable 设置为 true,同时要将 autoDelete 设置为 false

@Queue(value = "javatrip",durable = "false",autoDelete = "false")

长久化音讯

发送音讯的时候将音讯的 deliveryMode 设置为 2,在 Spring Boot 中音讯默认就是长久化的。

三 消费者生产音讯的时候,未生产结束就呈现了异样

消费者刚生产了音讯,还没有解决业务,后果产生异样。这时候就须要敞开主动确认,改为手动确认音讯。

批改 yml 为手动签收模式

spring:
  rabbitmq:
    listener:
      simple:
        # 手动签收模式
        acknowledge-mode: manual
        # 每次签收一条音讯
        prefetch: 1

消费者手动签收

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {

    @RabbitHandler
    public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{System.out.println(message);
        // 惟一的音讯 ID
        Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 确认该条音讯
        if(...){channel.basicAck(deliverTag,false);
        }else{
            // 生产失败,音讯重返队列
            channel.basicNack(deliverTag,false,true);
        }
      
    }
}

四 总结

音讯失落的起因?

生产者、MQ、消费者都有可能造成音讯失落

如何保障音讯的可靠性?

  • 发送方采取发送者确认模式
  • MQ 进行队列及音讯的长久化
  • 消费者生产胜利后手动确认音讯

正文完
 0