关于java:RabbitMQ-高可用之如何确保消息成功消费

4次阅读

共计 5242 个字符,预计需要花费 14 分钟才能阅读完成。

@[toc]
后面一篇文章松哥和大家聊了 MQ 高可用之如何确保音讯胜利发送,各种配置齐上阵,最终确保了音讯的胜利发送,甚至在一些极其状况下还可能产生同一条音讯反复发送的状况,不论怎么样,音讯总算发送进来了,如果小伙伴们还没看过上篇文章,倡议先看看,再来学习本文:

  • 四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?

明天咱们就来聊一聊音讯生产的问题,看看如何确保音讯生产胜利,并且确保幂等性。

1. 两种生产思路

RabbitMQ 的音讯生产,整体上来说有两种不同的思路:

  • 推(push):MQ 被动将音讯 推送 给消费者,这种形式须要消费者设置一个缓冲区去缓存音讯,对于消费者而言,内存中总是有一堆须要解决的音讯,所以这种形式的效率比拟高,这也是目前大多数利用采纳的生产形式。
  • 拉(pull):消费者被动从 MQ 拉取 音讯,这种形式效率并不是很高,不过有的时候如果服务端须要批量拉取音讯,倒是能够采纳这种形式。

两种形式我都举个例子看下。

先来看推(push):

这种形式大家比拟常见,就是通过 @RabbitListener 注解去标记消费者,如下:

@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
    public void handle(String msg) {System.out.println("msg =" + msg);
    }
}

当监听的队列中有音讯时,就会触发该办法。

再来看拉(pull):

@Test
public void test01() throws UnsupportedEncodingException {Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.JAVABOY_QUEUE_NAME);
    System.out.println("o =" + new String(((byte[]) o),"UTF-8"));
}

调用 receiveAndConvert 办法,办法参数为队列名称,办法执行实现后,会从 MQ 上拉取一条音讯下来,如果该办法返回值为 null,示意该队列上没有音讯了。receiveAndConvert 办法有一个重载办法,能够在重载办法中传入一个期待超时工夫,例如 3 秒。此时,假如队列中没有音讯了,则 receiveAndConvert 办法会阻塞 3 秒,3 秒内如果队列中有了新音讯就返回,3 秒后如果队列中还是没有新音讯,就返回 null,这个期待超时工夫要是不设置的话,默认为 0。

这是音讯两种不同的生产模式。

如果须要从音讯队列中继续取得音讯,就能够应用推模式;如果只是单纯的生产一条音讯,则应用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅音讯,这会重大影响 RabbitMQ 的性能。

2. 确保生产胜利两种思路

在上篇文章中,咱们想尽办法确保音讯可能发送胜利,对于音讯生产胜利,其实官网提供了相干的机制,咱们一起来看下。

为了保障音讯可能牢靠的达到音讯消费者,RabbitMQ 中提供了音讯生产确认机制。当消费者去生产音讯的时候,能够通过指定 autoAck 参数来示意音讯生产的确认形式。

  • 当 autoAck 为 false 的时候,此时即便消费者曾经收到音讯了,RabbitMQ 也不会立马将音讯移除,而是期待消费者显式的回复确认信号后,才会将音讯打上删除标记,而后再删除。
  • 当 autoAck 为 true 的时候,此时音讯消费者就会主动把发送进来的音讯设置为确认,而后将音讯移除(从内存或者磁盘中),即便这些音讯并没有达到消费者。

咱们来看一张图:

如上图所示,在 RabbitMQ 的 web 治理页面:

  • Ready 示意待生产的音讯数量。
  • Unacked 示意曾经发送给消费者然而还没收到消费者 ack 的音讯数量。

这是咱们能够从 UI 层面察看音讯的生产状况确认状况。

当咱们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,生产分成了两个局部:

  • 待生产的音讯
  • 曾经投递给消费者,然而还没有被消费者确认的音讯

换句话说,当设置 autoAck 为 false 的时候,消费者就变得十分从容了,它将有足够的工夫去解决这条音讯,当音讯失常解决实现后,再手动 ack,此时 RabbitMQ 才会认为这条音讯生产胜利了。如果 RabbitMQ 始终没有收到客户端的反馈,并且此时客户端也曾经断开连接了,那么 RabbitMQ 就会将刚刚的音讯从新放回队列中,期待下一次被生产。

综上所述,确保音讯被胜利生产,无非就是手动 Ack 或者主动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致音讯被反复生产,所以一般来说咱们还须要在解决音讯时,解决幂等性问题。

3. 音讯回绝

当客户端收到音讯时,能够抉择生产这条音讯,也能够抉择回绝这条音讯。咱们来看下回绝的形式:

@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
    public void handle(Channel channel, Message message) {
        // 获取音讯编号
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 回绝音讯
            channel.basicReject(deliveryTag, true);
        } catch (IOException e) {e.printStackTrace();
        }
    }
}

消费者收到音讯之后,能够抉择回绝生产该条音讯,回绝的步骤分两步:

  1. 获取音讯编号 deliveryTag。
  2. 调用 basicReject 办法回绝音讯。

调用 basicReject 办法时,第二个参数是 requeue,即是否从新入队。如果第二个参数为 true,则这条被回绝的音讯会从新进入到音讯队列中,期待下一次被生产;如果第二个参数为 false,则这条被回绝的音讯就会被丢掉,不会有新的消费者去生产它了。

须要留神的是,basicReject 办法一次只能回绝一条音讯。

4. 音讯确认

音讯确认分为主动确认和手动确认,咱们别离来看。

4.1 主动确认

先来看看主动确认,在 Spring Boot 中,默认状况下,音讯生产就是主动确认的。

咱们来看如下一个音讯生产办法:

@Component
public class ConsumerDemo {@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
    public void handle2(String msg) {System.out.println("msg =" + msg);
        int i = 1 / 0;
    }
}

通过 @Componet 注解将以后类注入到 Spring 容器中,而后通过 @RabbitListener 注解来标记一个音讯生产办法,默认状况下,音讯生产办法自带事务,即如果该办法在执行过程中抛出异样,那么被生产的音讯会从新回到队列中期待下一次被生产,如果该办法失常执行完没有抛出异样,则这条音讯就算是被生产了。

4.2 手动确认

手动确认我又把它分为两种:推模式手动确认与拉模式手动确认。

4.2.1 推模式手动确认

要开启手动确认,须要咱们首先敞开主动确认,敞开形式如下:

spring.rabbitmq.listener.simple.acknowledge-mode=manual

这个配置示意将音讯的确认模式改为手动确认。

接下来咱们来看下消费者中的代码:

@RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
public void handle3(Message message,Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
        // 音讯生产的代码写到这里
        String s = new String(message.getBody());
        System.out.println("s =" + s);
        // 生产实现后,手动 ack
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        // 手动 nack
        try {channel.basicNack(deliveryTag, false, true);
        } catch (IOException ex) {ex.printStackTrace();
        }
    }
}

将消费者要做的事件放到一个 try..catch 代码块中。

如果音讯失常生产胜利,则执行 basicAck 实现确认。

如果音讯生产失败,则执行 basicNack 办法,通知 RabbitMQ 音讯生产失败。

这里波及到两个办法:

  • basicAck:这个是手动确认音讯曾经胜利生产,该办法有两个参数:第一个参数示意音讯的 id;第二个参数 multiple 如果为 false,示意仅确认以后音讯生产胜利,如果为 true,则示意以后音讯之前所有未被以后消费者确认的音讯都生产胜利。
  • basicNack:这个是通知 RabbitMQ 以后音讯未被胜利生产,该办法有三个参数:第一个参数示意音讯的 id;第二个参数 multiple 如果为 false,示意仅回绝以后音讯的生产,如果为 true,则示意回绝以后音讯之前所有未被以后消费者确认的音讯;第三个参数 requeue 含意和后面所说的一样,被回绝的音讯是否从新入队。

当 basicNack 中最初一个参数设置为 false 的时候,还波及到一个死信队列的问题,这个松哥当前再专门写文章和大家细聊。

4.2.2 拉模式手动确认

拉模式手动 ack 比拟麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的办法,所以咱们得用原生的方法,如下:

public void receive2() {Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
    long deliveryTag = 0L;
    try {GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
        deliveryTag = getResponse.getEnvelope().getDeliveryTag();
        System.out.println("o =" + new String((getResponse.getBody()), "UTF-8"));
        channel.basicAck(deliveryTag, false);
    } catch (IOException e) {
        try {channel.basicNack(deliveryTag, false, true);
        } catch (IOException ex) {ex.printStackTrace();
        }
    }
}

这里波及到的 basicAck 和 basicNack 办法跟后面的一样,我就不再赘述。

5. 幂等性问题

最初咱们再来说说音讯的幂等性问题。

大家构想上面一个场景:

消费者在生产完一条音讯后,向 RabbitMQ 发送一个 ack 确认,此时因为网络断开或者其余起因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条音讯删除,当从新建设起连贯后,消费者还是会再次收到该条音讯,这就造成了音讯的反复生产。同时,因为相似的起因,音讯在发送的时候,同一条音讯也可能会发送两次(参见四种策略确保 RabbitMQ 音讯发送可靠性!你用哪种?)。种种原因导致咱们在生产音讯时,肯定要解决好幂等性问题。

幂等性问题的解决倒也不难,基本上都是从业务上来解决,我来大略说说思路。

采纳 Redis,在消费者生产音讯之前,现将音讯的 id 放到 Redis 中,存储形式如下:

  • id-0(正在执行业务)
  • id-1(执行业务胜利)

如果 ack 失败,在 RabbitMQ 将音讯交给其余的消费者时,先执行 setnx,如果 key 曾经存在(阐明之前有人生产过该音讯),获取他的值,如果是 0,以后消费者就什么都不做,如果是 1,间接 ack。

极其状况:第一个消费者在执行业务时,呈现了死锁,在 setnx 的根底上,再给 key 设置一个生存工夫。生产者,发送音讯时,指定 messageId。

当然这只是一个简略思路供大家参考。

松哥在 vhr 我的项目中也解决了音讯幂等性问题,感兴趣的小伙伴能够查看 vhr 源码(https://github.com/lenve/vhr),代码在 mailserver 中。

6. 小结

好啦,明天就和小伙伴们聊了下 RabbitMQ 中和音讯生产相干的几个话题,感兴趣的小伙伴能够实际下哦~

复制文章题目并在公众号后盾回复,能够下载本文案例~

正文完
 0