共计 2677 个字符,预计需要花费 7 分钟才能阅读完成。
背景
最近发现生产上 mq 的 traceId 有反复景象,实践上不同音讯生产,tracdId 不应该雷同,但为什么有肯定的概率会呈现呢?
查问代码如下:
protected ConsumeStatus consumeMsgSingle(MessageExt ext) {log.debug("AbstractMessageListener-consumeMessage() msgId:{}, body:{}", ext.getMsgId(), new String(ext.getBody()));
String message = new String(ext.getBody());
// 获取到 key
String key = RocketMQUtils.concatKey(ext.getTopic(), ext.getTags());
// 依据 key 从 handleMap 里获取到咱们的解决类
MessageProcessor messageProcessor = handleMap.get(key);
if (Objects.isNull(messageProcessor)) {messageProcessor = handleMap.get(ext.getTopic());
}
Optional.ofNullable(messageProcessor).orElseThrow(() -> new RRException(String.format("未找到音讯解决类, topic:%s, tag:%s", ext.getTopic(), ext.getTags())));
Object obj = null;
try {
// 将 String 类型的 message 反序列化成对应的对象。obj = messageProcessor.transferMessage(message);
if (obj instanceof MqMetaInfo) {MqMetaInfo meta = (MqMetaInfo) obj;
MqMetaInfoConverter.fromExt(meta, ext);
}
generateMDC(ext);
} catch (Exception e) {StringBuilder errMsg = new StringBuilder("对象反序列化失败,")
.append("messageId:")
.append(ext.getMsgId()).append("\n")
.append("msgBody:")
.append(new String(ext.getBody())).append("\n")
.append("messageExt")
.append(ext).append("\n")
.append("stackTrace:")
.append(JSON.toJSONString(e.getStackTrace()));
log.error("AbstractMessageListener-consumeMessage() error:{}, msgId:{}, message:{}, errMsg:{}"
, e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString());
throw new RRException(errMsg.toString());
}
// 解决音讯
boolean result = messageProcessor.handleMessage(obj);
if (!result) {if (ext.getReconsumeTimes() > Integer.MAX_VALUE) {return ConsumeStatus.SUCCESS;}
return ConsumeStatus.FAIL;
}
return ConsumeStatus.SUCCESS;
}
generateMDC 办法如下:
起因剖析
能够看到如果 message 中有 traceId,则把 traceId 关联到该线程,并打印进去。但发现最终该办法执行实现后未做清理 traceId 的动作,即 RocketMq 的消费者用的是线程池,而线程回收后 traceId 仍然绑定在该线程上,如果下次有音讯过去生产则会有同样 traceId 呈现
重现
消费者
@Slf4j
@Service(value = "multiConsumerDemoProcessor")
public class MultiConsumerDemoProcessor implements MessageProcessor<String> {
@Override
public boolean handleMessage(String orderNo) {log.info("开始生产:{}", orderNo);
try {Thread.sleep(500);
} catch (InterruptedException e) {e.printStackTrace();
}
return true;
}
@Override
public Class<String> getClazz() {return null;}
@Override
public String transferMessage(String message) {return message;}
}
生产者
public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("ip");
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("multi-consumer-demo",
"demo",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {e.printStackTrace();
}
//producer.shutdown();}
}
运行后果:
能够看到 traceId 是有反复的
解决
加上 finally 语句,开释 traceId
解决后果
正文完