背景
最近发现生产上mq的traceId有反复景象,实践上不同音讯生产,tracdId不应该雷同,但为什么有肯定的概率会呈现呢?
查问代码如下:
protected ConsumeStatus consumeMsgSingle(MessageExt ext) { log.debug("AbstractMessageListener-consumeMessage() msgId:{}, body:{}", ext.getMsgId(), new String(ext.getBody())); String message = new String(ext.getBody()); //获取到key String key = RocketMQUtils.concatKey(ext.getTopic(), ext.getTags()); //依据key从handleMap里获取到咱们的解决类 MessageProcessor messageProcessor = handleMap.get(key); if (Objects.isNull(messageProcessor)) { messageProcessor = handleMap.get(ext.getTopic()); } Optional.ofNullable(messageProcessor).orElseThrow(() -> new RRException(String.format("未找到音讯解决类, topic:%s, tag:%s", ext.getTopic(), ext.getTags()))); Object obj = null; try { //将String类型的message反序列化成对应的对象。 obj = messageProcessor.transferMessage(message); if (obj instanceof MqMetaInfo) { MqMetaInfo meta = (MqMetaInfo) obj; MqMetaInfoConverter.fromExt(meta, ext); } generateMDC(ext); } catch (Exception e) { StringBuilder errMsg = new StringBuilder("对象反序列化失败, ") .append("messageId: ") .append(ext.getMsgId()).append("\n") .append("msgBody: ") .append(new String(ext.getBody())).append("\n") .append("messageExt ") .append(ext).append("\n") .append("stackTrace: ") .append(JSON.toJSONString(e.getStackTrace())); log.error("AbstractMessageListener-consumeMessage() error:{}, msgId:{}, message:{}, errMsg:{}" , e, ext.getMsgId(), new String(ext.getBody()), errMsg.toString()); throw new RRException(errMsg.toString()); } //解决音讯 boolean result = messageProcessor.handleMessage(obj); if (!result) { if (ext.getReconsumeTimes() > Integer.MAX_VALUE) { return ConsumeStatus.SUCCESS; } return ConsumeStatus.FAIL; } return ConsumeStatus.SUCCESS; }
generateMDC办法如下:
起因剖析
能够看到如果message中有traceId,则把traceId关联到该线程,并打印进去。但发现最终该办法执行实现后未做清理traceId的动作,即RocketMq的消费者用的是线程池,而线程回收后traceId仍然绑定在该线程上,如果下次有音讯过去生产则会有同样traceId呈现
重现
消费者
@Slf4j@Service(value = "multiConsumerDemoProcessor")public class MultiConsumerDemoProcessor implements MessageProcessor<String> { @Override public boolean handleMessage(String orderNo) { log.info("开始生产:{}", orderNo); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return true; } @Override public Class<String> getClazz() { return null; } @Override public String transferMessage(String message) { return message; }}
生产者
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("ip"); producer.start(); for (int i = 0; i < 10; i++) try { { Message msg = new Message("multi-consumer-demo", "demo", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } //producer.shutdown(); }}
运行后果:
能够看到traceId是有反复的
解决
加上finally语句,开释traceId