DefaultMQPushConsumer 的结构
实例代码
// 设置 group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");
// 注册生产监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {System.out.println("Receive New Messages:" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动
consumer.start();
consumer 结构
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
// 设置调配策略
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
defaultMQPushConsumerImpl 的结构
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
// 有一个 pull 的产生异样后提早多久再接着解决的一个工夫
this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();}
总结
consumer 结构次要是结构外部的 defaultMQPushConsumerImpl,而后须要理解一下调配的策略,有一个细节,就是这个 pull 过程中产生异样了的一个提早解决的工夫