记一次RocketMQ 音讯曾经生产然则cosumer offset没有更新的问题
发现问题:
开发中在我的项目重启时会反复生产音讯,但其实音讯曾经生产过了。
查找问题:
1.RocketMq console查看,发现订阅组音讯提早
2.从音讯看message Detail 对应的consumerGroup trackType为 not conume yet
3.我的项目日志也没有任何谬误日志,然而依据相干业务查询数据库发现数据曾经解决实现
4.业务代码断点,没有抛出任何异样,通过resend message也能失常生产
5.狐疑是不是rocketMq 更新offset的定时工作没有启动
然而通过源码断点MQClientInstance 定时工作失常,只是每次更新的offset都是原offet
6.看看是不是生产的时候出了问题
因为是用的spring-boot整合的client,跟踪consumer源码,代码在DefaultRocketMQListenerContainer.handleMessage办法中
然而一切正常,再往上跟踪到DefaultMessageListenerConcurrently
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
@SuppressWarnings("unchecked")
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) {
log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
首先在catch代码块点打断点看看是不是有问题,后果发现并没有走到这里,这就坑爹了,害我又从其它方面各种查起因,节约了很多工夫。前面一步一步调试,最终在 log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); 打日志这一步时抛出了异样,这尼玛打个日志还能异样,还不是Exception的异样。。原本松了口气认为找到了起因就好解决了,没想到这才是刚刚开始。
7.在往下层调用代码ConsumeMessageConcurrentlyService里断点查看异样信息
java.lang.NoClassDefFoundError:Could not initialize class org.apache.rocketmq.common.message.MessageClientIDSetter
起因是在MessageClientExt类中远程桌面调用getMsgId办法里,调用了MessageClientIDSetter.getUniqID(this)间接抛出的异样从异样信息来看是MessageClientIDSetter 在初始化的时候出了问题
8.查看MessageClientIDSetter原码,有一断动态代码块,而后在这里断点跟踪。
static {
byte[] ip;
try {
ip = UtilAll.getIP();
} catch (Exception e) {
ip = createFakeIP();
}
LEN = ip.length + 2 + 4 + 4 + 2;
ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);
tempBuffer.position(0);
tempBuffer.put(ip);
tempBuffer.position(ip.length);
tempBuffer.putInt(UtilAll.getPid());
tempBuffer.position(ip.length + 2);
tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
}
发面是在ip = UtilAll.getIP();出了问题,然则并没有到catch代码块,而是跳到了DefaultMqPushConsumerImpl类中,这里又一个坑爹的是异样块没有任何解决,看不到异样信息,好吧只能一步一步持续断点调