音讯反复生产的问题

音讯反复生产是各个 MQ 都会产生的常见问题之一,在一些比拟敏感的场景下,反复生产会造成比较严重的结果,比方反复扣款等。

音讯反复生产场景及解决办法

在什么状况下会产生 RocketMQ 的音讯反复生产呢?

生产者反复发送场景

当零碎的调用链路比拟长的时候,比方,零碎 A 调用零碎 B,零碎 B 再把音讯发送到 RocketMQ 中,在零碎 A 调用零碎 B 的时候。

如果零碎 B 解决胜利,然而迟迟没有将调用胜利的后果返回给零碎 A 的时候,零碎 A 就会尝试从新发动申请给零碎 B,造成零碎 B 反复解决,发动多条音讯给 RocketMQ 造成反复生产。

消费者反复发送场景

在零碎 B 发送音讯给 RocketMQ 的时候,也有可能会产生和下面一样的问题,音讯发送超时,后果零碎 B 重试,导致 RocketMQ 接管到了反复的音讯。

消费者反复发送场景

当 RocketMQ 胜利接管到音讯,并将音讯交给消费者解决,如果消费者生产实现后还没来得及提交 offset 给 RocketMQ,本人宕机或者重启了,那么 RocketMQ 没有接管到 offset,就会认为生产失败了,会重发消息给消费者再次生产。

消费者没有立即返回胜利

反复生产的问题的一个可能的问题:消费者生产音讯时产生了异样,并没有返回 CONSUME_SUCCESS 标记。

因为音讯解决异样导致的音讯从新生产,RocketMQ 能够很好的放弃音讯,肯定要生产胜利才能够!

官网对 comsumerMessage 办法
It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure

无论如何,都不要抛出异样,如果须要从新生产,能够返回 RECONSUME_LATER 被动要求从新生产。

catch Exception 根异样来捕捉业务解决的异样:
consumer.registerMessageListener(new MessageListenerConcurrently() {                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,                     ConsumeConcurrentlyContext context) {                    logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");                    MessagePack msgpack = new MessagePack();                    for (MessageExt msg : msgs){                        byte[] data = msg.getBody();                        try {                            RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class);                            logger.debug("Receive a message:" + rtmsg);                            anlysisRTMsgPack(rtmsg, engine);                        } catch (IOException e) {                            logger.error("Unpack RTMsg:", e);                        } catch (Exception e1){                            logger.warn("Unexcepted exception.", e1);                        }                    }                    logger.debug("RETURN CONSUME SUCCESS.");                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;                }});

设置 CONSUME_FROM_LAST_OFFSET 的问题

Consumer 在生产时,会设置从哪里开始生产。默认是 CONSUME_FROM_LAST_OFFSET,设置的值如代码所示。

public enum ConsumeFromWhere {    /**     * 一个新的订阅组第一次启动从队列的最初地位开始生产<br>     * 后续再启动接着上次生产的进度开始生产     */    CONSUME_FROM_LAST_OFFSET,    @Deprecated    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,    @Deprecated    CONSUME_FROM_MIN_OFFSET,    @Deprecated    CONSUME_FROM_MAX_OFFSET,    /**     * 一个新的订阅组第一次启动从队列的最前地位开始生产<br>     * 后续再启动接着上次生产的进度开始生产     */    CONSUME_FROM_FIRST_OFFSET,    /**     * 一个新的订阅组第一次启动从指定工夫点开始生产<br>     * 后续再启动接着上次生产的进度开始生产<br>     * 工夫点设置参见DefaultMQPushConsumer.consumeTimestamp参数     */    CONSUME_FROM_TIMESTAMP,}
  • CONSUME_FROM_LAST_OFFSET:从最初的偏移量开始生产,是从该消费者上次生产到的地位开始生产。
  • 如果是一个新的消费者,就要依据这个 client 所属的生产组的状况来判断。
  • 如果所属的消费者组是新上线的,订阅的音讯,最早的音讯都没有过,RocketMQ 的设计者认为,你这是一个新上线的业务,会强制从第一条音讯开始生产。
  • 如果订阅的音讯,曾经产生了过期音讯,那么才会从咱们这个 client 启动的工夫点开始生产。
ConsumeFromWhere 这个参数只对一个新的消费者第一次启动时无效
  • CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始生产,
  • CONSUME_FROM_TIMESTAMP:从某个工夫开始生产。
  • 而判断是不是一个新的 ConsumerGroup 是在 broker 端判断。
  • 生产到哪个 offset 最先是存在 Consumer 本地的,定时和 broker 同步本人的生产 offset。
  • broker 在判断是不是一个新的 consumergroup,就是查 broker 端有没有这个 consumergroup 的 offset 记录。

偏移量有效化

对于一个新的 queue,这个参数也是没用的,都是从 0 开始生产。

所以,这就有了一个问题我曾经设置了 CONSUME_FROM_LAST_OFFSET,为什么还是反复生产了,可能你这不是新的 consumergroup,也可能是个新的 Queue。

重试队列和死信队列

  • 生产端,始终不回传生产的后果。RocketMQ 认为音讯没收到,consumer 下一次拉取,broker 仍然会发送该音讯。
  • 任何异样都要捕捉返回:ConsumeConcurrentlyStatus.RECONSUME_LATER

RocketMQ 会放到重试队列,TOPIC 是:%RETRY%+COnsumerGroup 的名字

  • 重试的音讯在提早的某个工夫点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。
  • 而如果始终这样反复生产都继续失败到肯定次数(默认 16 次),就会投递到 DLQ 死信队列,此时须要人工干预了。
/**Batch consumption size*/private int consumeMessageBatchMaxSize = 1;/**Batch pull size*/private int pullBatchSize = 32;
  • consumeMessageBatchMaxSize 是批量生产的最大条数
  • pullBatchSize 是每次拉取的最大条数

broker 端的

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

参数是设置重试的工夫,即第一次 1s 之后,第二次 5s 之后

生产环境不要改

messageDelayLevel = 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s

16 次之后,多了一个 topic 名为:%DLQ%+consumergroup

这个默认的 16 次,能够改,然而应用 DefaultMQPullConsumer 才能够批改。

DefaultMQPushConsumer 不能批改此值。

consumeMessageBatchMaxSize 这个 size 是消费者注册的回调 listener 一次解决的音讯数,默认是 1,不是每次拉取的音讯数(默认是 32),这个不要搞混。