最近闲来无事,down 了 RocketMQ 4.5.2 的源码学习一下,ok, 进入主题
consumer.start()对应的调用如下图所示:
由上图可知,在 DefaultMQPushConsumer.start()中 调用 MQClientInstance 的构造方法,递归调用 PullMessageService 的构造方法。由此说明了消息传递少不了这个 PullMessageService 帮忙。
DefaultMQPushConsumer.start()中调用了 DefaultMQPushConsumerImpl.start(),如下图所示
DefaultMQPushConsumerImpl.start() 调用了 mQClientFactory.start(),MQClientInstance.start()中调用了 PullMessageService.start();
至此 PullMessageService.run()就被执行。最终调用 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
我们再看看 code 里具体了些什么?
ConsumeRequest 被在线程池中执行,而 ConsumeRequest 本身就是 Runnable, 在 ta 的 run()中调用 MessageListenerConcurrently 接口的 consumeMessage(),code 如下
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
我只提出来主要的 code, 完整细节大家自行阅读吧。Ok, 我们再回到生命开始的地方:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// Wrong time format 2017_0422_221800
consumer.setConsumeTimestamp("20180422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
printBody(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
注意这句printBody(msgs),太棒了,消息终于被打印出来了。