乐趣区

关于前端:RocketMq中的traceId重复问题

背景

最近发现生产上 mq 的 traceId 有反复景象,实践上不同音讯生产,tracdId 不应该雷同,但为什么有肯定的概率会呈现呢?

查问代码如下:

 protected ConsumeStatus consumeMsgSingle(MessageExt ext) {log.debug("AbstractMessageListener-consumeMessage() msgId:{}, body:{}", ext.getMsgId(), new String(ext.getBody()));
       String message = new String(ext.getBody());
       // 获取到 key
       String key = RocketMQUtils.concatKey(ext.getTopic(), ext.getTags());
       // 依据 key 从 handleMap 里获取到咱们的解决类
       MessageProcessor messageProcessor = handleMap.get(key);
       if (Objects.isNull(messageProcessor)) {messageProcessor = handleMap.get(ext.getTopic());
       }
       Optional.ofNullable(messageProcessor).orElseThrow(() -> new RRException(String.format("未找到音讯解决类, topic:%s, tag:%s", ext.getTopic(), ext.getTags())));
       Object obj = null;
       try {
           // 将 String 类型的 message 反序列化成对应的对象。obj = messageProcessor.transferMessage(message);
           if (obj instanceof MqMetaInfo) {MqMetaInfo meta = (MqMetaInfo) obj;
               MqMetaInfoConverter.fromExt(meta, ext);
           }
           generateMDC(ext);
       } catch (Exception e) {StringBuilder errMsg = new StringBuilder("对象反序列化失败,")
                   .append("messageId:")
                   .append(ext.getMsgId()).append("\n")
                   .append("msgBody:")
                   .append(new String(ext.getBody())).append("\n")
                   .append("messageExt")
                   .append(ext).append("\n")
                   .append("stackTrace:")
                   .append(JSON.toJSONString(e.getStackTrace()));

           log.error("AbstractMessageListener-consumeMessage() error:{}, msgId:{},  message:{}, errMsg:{}"
                   , e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString());
           throw new RRException(errMsg.toString());
       }
       // 解决音讯
       boolean result = messageProcessor.handleMessage(obj);
       if (!result) {if (ext.getReconsumeTimes() > Integer.MAX_VALUE) {return ConsumeStatus.SUCCESS;}
           return ConsumeStatus.FAIL;
       }
       return ConsumeStatus.SUCCESS;
   }

generateMDC 办法如下:

起因剖析

能够看到如果 message 中有 traceId,则把 traceId 关联到该线程,并打印进去。但发现最终该办法执行实现后未做清理 traceId 的动作,即 RocketMq 的消费者用的是线程池,而线程回收后 traceId 仍然绑定在该线程上,如果下次有音讯过去生产则会有同样 traceId 呈现

重现

消费者

@Slf4j
@Service(value = "multiConsumerDemoProcessor")
public class MultiConsumerDemoProcessor implements MessageProcessor<String> {

    @Override
    public boolean handleMessage(String orderNo) {log.info("开始生产:{}", orderNo);
        try {Thread.sleep(500);
        } catch (InterruptedException e) {e.printStackTrace();
        }
        return true;
    }

    @Override
    public Class<String> getClazz() {return null;}

    @Override
    public String transferMessage(String message) {return message;}
}

生产者

public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

        producer.setNamesrvAddr("ip");
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("multi-consumer-demo",
                        "demo",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {e.printStackTrace();
            }

        //producer.shutdown();}
}
运行后果:


能够看到 traceId 是有反复的

解决

加上 finally 语句,开释 traceId

解决后果

退出移动版