背景

最近发现生产上mq的traceId有反复景象,实践上不同音讯生产,tracdId不应该雷同,但为什么有肯定的概率会呈现呢?

查问代码如下:

 protected ConsumeStatus consumeMsgSingle(MessageExt ext) {       log.debug("AbstractMessageListener-consumeMessage() msgId:{}, body:{}", ext.getMsgId(), new String(ext.getBody()));       String message = new String(ext.getBody());       //获取到key       String key = RocketMQUtils.concatKey(ext.getTopic(), ext.getTags());       //依据key从handleMap里获取到咱们的解决类       MessageProcessor messageProcessor = handleMap.get(key);       if (Objects.isNull(messageProcessor)) {           messageProcessor = handleMap.get(ext.getTopic());       }       Optional.ofNullable(messageProcessor).orElseThrow(() -> new RRException(String.format("未找到音讯解决类, topic:%s, tag:%s", ext.getTopic(), ext.getTags())));       Object obj = null;       try {           //将String类型的message反序列化成对应的对象。           obj = messageProcessor.transferMessage(message);           if (obj instanceof MqMetaInfo) {               MqMetaInfo meta = (MqMetaInfo) obj;               MqMetaInfoConverter.fromExt(meta, ext);           }           generateMDC(ext);       } catch (Exception e) {           StringBuilder errMsg = new StringBuilder("对象反序列化失败, ")                   .append("messageId:     ")                   .append(ext.getMsgId()).append("\n")                   .append("msgBody:       ")                   .append(new String(ext.getBody())).append("\n")                   .append("messageExt     ")                   .append(ext).append("\n")                   .append("stackTrace:    ")                   .append(JSON.toJSONString(e.getStackTrace()));           log.error("AbstractMessageListener-consumeMessage() error:{}, msgId:{},  message:{}, errMsg:{}"                   , e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString());           throw new RRException(errMsg.toString());       }       //解决音讯       boolean result = messageProcessor.handleMessage(obj);       if (!result) {           if (ext.getReconsumeTimes() > Integer.MAX_VALUE) {               return ConsumeStatus.SUCCESS;           }           return ConsumeStatus.FAIL;       }       return ConsumeStatus.SUCCESS;   }

generateMDC办法如下:

起因剖析

能够看到如果message中有traceId,则把traceId关联到该线程,并打印进去。但发现最终该办法执行实现后未做清理traceId的动作,即RocketMq的消费者用的是线程池,而线程回收后traceId仍然绑定在该线程上,如果下次有音讯过去生产则会有同样traceId呈现

重现

消费者

@Slf4j@Service(value = "multiConsumerDemoProcessor")public class MultiConsumerDemoProcessor implements MessageProcessor<String> {    @Override    public boolean handleMessage(String orderNo) {        log.info("开始生产:{}", orderNo);        try {            Thread.sleep(500);        } catch (InterruptedException e) {            e.printStackTrace();        }        return true;    }    @Override    public Class<String> getClazz() {        return null;    }    @Override    public String transferMessage(String message) {        return message;    }}

生产者

public class Producer {    public static void main(String[] args) throws MQClientException, InterruptedException {        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");        producer.setNamesrvAddr("ip");        producer.start();        for (int i = 0; i < 10; i++)            try {                {                    Message msg = new Message("multi-consumer-demo",                        "demo",                        "OrderID188",                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));                    SendResult sendResult = producer.send(msg);                    System.out.printf("%s%n", sendResult);                }            } catch (Exception e) {                e.printStackTrace();            }        //producer.shutdown();    }}
运行后果:


能够看到traceId是有反复的

解决

加上finally语句,开释traceId

解决后果