乐趣区

关于java:RabbitMQ-结合业务实现消息确认

唠个嗑

网络上收罗了屡次想晓得 RabbitMQ 事实业务种怎么实现音讯的可靠性的,然而大多都不太现实,站在各位大佬伟人的肩膀上钻研了一段时间,我也整顿了一套简略可行性的计划,包含音讯异样解决。这篇文章想次要讲一些业务解决计划,我的项目中退出 RabbitMQ 中间件很简略,然而依据具体业务实现音讯的可靠性,这个须要多加思考。当然上面也会通过测试代码来剖析,文末也会附上源码地址。

1、筹备

1.1、环境筹备

之前博客上写过一篇编译装置的办法 地址,大家能够参考,因为 RabbitMQ 底层语言的起因可能略微麻烦点,那就没有方法了吗?如果你是先搞测试,再在我的项目中应用的话,那能够应用 docker 装置,2 行代码,如下

docker pull rabbitmq

docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

1.2、实践筹备

RabbitMQ 的类型包含:direct、topic、fanout、headers、system(翻源码看到的)

这里次要通过 topic 来剖析,bindingkey 能够通过通配符 # 和 * 来匹配多个 路由键(routingKey),
bindingkey 是绑定交换机(exchange)和队列(queue)的,生产者(publisher)发消息的时候会携带 routingKey、exchange 和 音讯发送给 RabbitMQ,
连贯胜利后理论是组件 exchange 接管了生产者的音讯,而后通过 bindingkey 匹配 routingKey,决定送给哪个 queue,每个消费者都会有 queue,所以 queue 接管到音讯 后就能够确保消费者接能够收到音讯了,最初消费者再生产。

再具体的内容能够查看大佬 erlie 的总结 地址

2、音讯确认

RabbitMQ 根底配置

pom

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.5.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
    
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

spring:
  rabbitmq:
    host: 192.168.1.105
    port: 5672
    username: guest
    password: guest
    virtual-host: /

# 交换机、队列和绑定键申明
test:
  exchange: test.exchange
  one:
    queue: one.test.queue
    binding-key: one.test.key

consumer

@RabbitListener(bindings=@QueueBinding(
                // 配置交换器
                exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
                // 配置路由键
                key="${test.one.binding-key}",
                // 配置队列名称
                value=@Queue(value="${test.one.queue}",autoDelete="true")
))
public void test(String msg) {log.info("test 收到的音讯为:[{}]", msg);
    // 业务代码...
}

publisher

@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${test.exchange}")
private String exchange;
@Value("${test.one.binding-key}")
private String routingKey;

public void test() {rabbitTemplate.convertAndSend(exchange, routingKey, "test msg");
}

通过下面默认的配置基本上就能够应用 RabbitMQ 了, 然而这不是本篇的重点. 咱们要晓得音讯发送和到生产的过程中呈现问题怎么办?这就须要咱们分段确认音讯是否接管胜利,如果失败了该如何解决,先想想能够怎么做。
咱们先捋一下思路,音讯发送给 RabbitMQ,如果连贯 RabbitMQ 失败,则记录该音讯,如果连贯胜利然而 exchange 接管失败则记录下该音讯,如果 exchange 接管胜利然而 queue 接管 exchange 的音讯失败则记录下该音讯,音讯从生产者到 queue 有 3 个地位可能因网络抖动或其余起因呈现问题,那咱们在这三个地位记录下问题后,对立通过打算定时获取记录的音讯并且从新发送,如果重发三次还没有胜利则标记该记录为异样音讯。

2.1、生产者音讯确认和回调

这里分两步:

  1. 音讯确认指的是 RabbitMQ(exchange)确认接管到了消费者发送的音讯
  2. 音讯回调指的是 queue 接管 exchange 的音讯失败,则回调通知 RabbitMQ 失败的音讯

2.1.1、音讯确认

开启配置

spring:
  rabbitmq:
    #publisher-confirms: true #已过期
    publisher-confirm-type: correlated #开启生产者音讯确认 

还有另外 2 种模式:

  1. none 值是禁用公布确认模式,是默认值;
  2. simple 值经测试有两种成果,其一成果和 correlated 值一样会触发回调办法,其二在公布音讯胜利后应用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 办法期待 broker 节点返回发送后果,依据返回后果来断定下一步的逻辑,要留神的点是 waitForConfirmsOrDie 办法如果返回 false 则会敞开 channel,则接下来无奈发送音讯到 broker。

当然还需新建一个类实现 RabbitTemplate.ConfirmCallback,重写办法 confirm,该办法有三个参数 correlationData、ack、cause,次要说下 ack,值为 true 示意 exchange 胜利接管到音讯,false 示意 exchange 接管音讯失败,这里 2 种后果能够别离解决,比方 false 能够把接管失败的音讯入库,而后通过定时器来解决,比拟懂的同学当初可能就有疑难了,correlationData 只能失去 msgId, 基本没有具体的音讯,这里能够施展你杰出的想象力,能够通过对象封装失去,也能够通过存内存或者磁盘存储失去,办法总比艰难多。

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {String msgId = correlationData.getId();
   if (ack) {log.info("胜利发送给 mq, msgId:[{}]", msgId);
   } else {log.error("发送给 mq 失败, msgId:[{}], 失败理由 cause:[{}]", msgId, cause);
       // 音讯从生产者没有到 exchange,那存库
       saveToDB(msgId,...);
   }
}

这里还差一步,就是原生的 rabbitTemplate 怎么晓得音讯确认时应用刚建的类呢,注入即可
rabbitTemplate.setConfirmCallback(刚建的类); 如有疑难可看文末源码。

2.1.2、音讯回调

spring:
  rabbitmq:
    publisher-returns: true #开启生产者音讯回调 

同上,须要新建类并实现 RabbitTemplate.ReturnCallback, 并且重写办法 returnedMessage,最初须要注入如下内容

rabbitTemplate.setReturnCallback(刚建的类);
// 要想使 returnCallback 失效,必须设置为 true
rabbitTemplate.setMandatory(true);
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
    String msg = new String(message.getBody());
    log.error("音讯回调 msgId:[{}], msg:[{}] 不能被正确路由,replyCode:[{}], replyText:[{}], exchange:[{}], routingKey:[{}]", msgId, msg, replyCode, replyText, exchange, routingKey);
    // 音讯从 exchange 没有到 queue,那存库
    saveToDB(msgId, exchange, routingKey, msg);
}

如果音讯发送时走到了回调办法 returnedMessage 中,阐明目前的音讯有问题是须要解决的,同上,入库。定时器来解决。

当然音讯的发送办法 rabbitTemplate.convertAndSend() 会多一个参数 correlationData

具体解决办法能够参考源码,这里只提供思路。

2.2、消费者音讯确认

消费者音讯确认是确认生产了队列中的音讯,如果呈现问题 RabbitMQ 会有重试机制,长时间失败则须要人工干预,这个和生产者的确认是先后关系,理论是没有关联关系的,说到这,说下我之前转的牛角尖,始终想寻找 exchange 如何确认消费者胜利生产音讯,然而无果,起初细想,RabbitMQ 应该设计的就是消费者和 queue 交互,没必要和 exchange 交互。如有大佬晓得 exchange 如何 ack 消费者生产音讯能够通知小弟,不胜感激。
圆规正转,上消费者代码

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #开启消费者音讯确认; none:主动确认、auto:依据状况确认 
@RabbitListener(bindings=@QueueBinding(
                    // 配置交换器
                    exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
                    // 配置路由键
                    key="${test.one.binding-key}",
                    // 配置队列名称
                    value=@Queue(value="${test.one.queue}",autoDelete="true")
))
    public void test(Message message, Channel channel) {String msg = new String(message.getBody());
        log.info("test 收到的音讯为:[{}], msgId:[{}]", msg, message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
        try {
            // 业务解决
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {e.printStackTrace();
            try {channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (IOException ioException) {ioException.printStackTrace();
            }
        }
    }

次要说 3 个办法:

1、basicAck 是确认音讯,须要传递两个参数

  1. deliveryTag(惟一标识 ID):当一个消费者向 RabbitMQ 注册后,会建设起一个 Channel,RabbitMQ 会用 basic.deliver 办法向消费者推送音讯,这个办法携带了一个 delivery tag,它代表了 RabbitMQ 向该 Channel 投递的这条音讯的惟一标识 ID,是一个枯燥递增的正整数,delivery tag 的范畴仅限于 Channel
  2. multiple:为了缩小网络流量,手动确认能够被批处理,当该参数为 true 时,则能够一次性确认 delivery_tag 小于等于传入值的所有音讯,false 则只确认传入值等于 delivery_tag 的音讯

2、basicNack 是回绝音讯,能够回绝多条,比 basicAck 多一个布尔值的参数,如果为 true,被 nack 后从新入队列而后从新生产生产;如果为 false 被 nack 就丢了。

3、basicReject 只能回绝一条音讯,reject 后音讯间接丢了。

总结

这里简略实现 RabbitMQ 音讯牢靠的形式是通过把音讯发送时呈现问题后间接入库,而后通过打算定时查问再从新发送给 RabbitMQ, 如果 exchange 胜利 ack 后则标记为重发胜利,如果重发 3 次还是失败则标记异样,须要人工解决。

探讨

这种解决其实不算是最优的计划,技术上还能够有如下计划

  1. 生产者发消息时记录该条音讯,并设该记录 1 分钟后失效,留 1 分钟给 exchange 确认并间接标记该音讯记录为胜利,而后打算工作定时扫无效且未确认的音讯,再发送给 RabbitMQ,如果确认后则标记为胜利,否则 3 次后标记为失败。
  2. 还和小编写的计划相似,记录失败的音讯,然而定时工作获取到失败的音讯后,间接调用消费者的服务解决,不通过 RabbitMQ,然而这就须要保护音讯和消费者服务的关系了,略微简单些。

最初心愿能够帮到看官,如果记录的不对烦请评论指出,一起探讨

https://github.com/charmsongo/springboot-samples/tree/master/springboot-rabbitmq-songo

退出移动版