关于rocketmq:记一次RocketMQ消息消费异常

30次阅读

共计 2359 个字符,预计需要花费 6 分钟才能阅读完成。

记一次 RocketMQ 音讯曾经生产然则 cosumer offset 没有更新的问题

发现问题:

开发中在我的项目重启时会反复生产音讯,但其实音讯曾经生产过了。

查找问题:

1.RocketMq console 查看,发现订阅组音讯提早

2. 从音讯看 message Detail 对应的 consumerGroup trackType 为 not conume yet

3. 我的项目日志也没有任何谬误日志,然而依据相干业务查询数据库发现数据曾经解决实现

4. 业务代码断点,没有抛出任何异样,通过 resend message 也能失常生产

5. 狐疑是不是 rocketMq 更新 offset 的定时工作没有启动

然而通过源码断点 MQClientInstance 定时工作失常,只是每次更新的 offset 都是原 offet

6. 看看是不是生产的时候出了问题

因为是用的 spring-boot 整合的 client, 跟踪 consumer 源码,代码在 DefaultRocketMQListenerContainer.handleMessage 办法中

然而一切正常,再往上跟踪到 DefaultMessageListenerConcurrently

public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

@SuppressWarnings(“unchecked”)

@Override

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

for (MessageExt messageExt : msgs) {

log.debug(“received msg: {}”, messageExt);

try {

long now = System.currentTimeMillis();

handleMessage(messageExt);

long costTime = System.currentTimeMillis() – now;

log.debug(“consume {} cost: {} ms”, messageExt.getMsgId(), costTime);

} catch (Exception e) {

log.warn(“consume message failed. messageExt:{}, error:{}”, messageExt, e);

context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

}

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

}

首先在 catch 代码块点打断点看看是不是有问题,后果发现并没有走到这里,这就坑爹了,害我又从其它方面各种查起因,节约了很多工夫。前面一步一步调试,最终在 log.debug(“consume {} cost: {} ms”, messageExt.getMsgId(), costTime); 打日志这一步时抛出了异样,这尼玛打个日志还能异样,还不是 Exception 的异样。。原本松了口气认为找到了起因就好解决了,没想到这才是刚刚开始。

7. 在往下层调用代码 ConsumeMessageConcurrentlyService 里断点查看异样信息

java.lang.NoClassDefFoundError:Could not initialize class org.apache.rocketmq.common.message.MessageClientIDSetter

起因是在 MessageClientExt 类中远程桌面调用 getMsgId 办法里,调用了 MessageClientIDSetter.getUniqID(this) 间接抛出的异样从异样信息来看是 MessageClientIDSetter 在初始化的时候出了问题

8. 查看 MessageClientIDSetter 原码,有一断动态代码块,而后在这里断点跟踪。

static {

byte[] ip;

try {

ip = UtilAll.getIP();

} catch (Exception e) {

ip = createFakeIP();

}

LEN = ip.length + 2 + 4 + 4 + 2;

ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 2 + 4);

tempBuffer.position(0);

tempBuffer.put(ip);

tempBuffer.position(ip.length);

tempBuffer.putInt(UtilAll.getPid());

tempBuffer.position(ip.length + 2);

tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());

FIX_STRING = UtilAll.bytes2string(tempBuffer.array());

setStartTime(System.currentTimeMillis());

COUNTER = new AtomicInteger(0);

}

发面是在 ip = UtilAll.getIP(); 出了问题,然则并没有到 catch 代码块,而是跳到了 DefaultMqPushConsumerImpl 类中,这里又一个坑爹的是异样块没有任何解决,看不到异样信息,好吧只能一步一步持续断点调

正文完
 0