乐趣区

RockMQ源码学习consumer-获取消息1

最近闲来无事,down 了 RocketMQ 4.5.2 的源码学习一下,ok, 进入主题

consumer.start()对应的调用如下图所示:

由上图可知,在 DefaultMQPushConsumer.start()中 调用 MQClientInstance 的构造方法,递归调用 PullMessageService 的构造方法。由此说明了消息传递少不了这个 PullMessageService 帮忙。

DefaultMQPushConsumer.start()中调用了 DefaultMQPushConsumerImpl.start(),如下图所示

DefaultMQPushConsumerImpl.start() 调用了 mQClientFactory.start(),MQClientInstance.start()中调用了 PullMessageService.start();

至此 PullMessageService.run()就被执行。最终调用 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
我们再看看 code 里具体了些什么?

ConsumeRequest 被在线程池中执行,而 ConsumeRequest 本身就是 Runnable, 在 ta 的 run()中调用 MessageListenerConcurrently 接口的 consumeMessage(),code 如下

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
try {if (msgs != null && !msgs.isEmpty()) {for (MessageExt msg : msgs) {MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
        }
    }
    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
 } catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
        RemotingHelper.exceptionSimpleDesc(e),
        ConsumeMessageConcurrentlyService.this.consumerGroup,
        msgs,
        messageQueue);
    hasException = true;
}

我只提出来主要的 code, 完整细节大家自行阅读吧。Ok, 我们再回到生命开始的地方:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_5", getAclRPCHook(), new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // Wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20180422221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                printBody(msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");

注意这句printBody(msgs),太棒了,消息终于被打印出来了。

退出移动版