DefaultMQPushConsumer的结构

实例代码

        // 设置group        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");        // 注册生产监听        consumer.registerMessageListener(new MessageListenerConcurrently() {            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,                ConsumeConcurrentlyContext context) {                System.out.println(" Receive New Messages: " + msgs);                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });    // 启动    consumer.start();

consumer结构

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,    AllocateMessageQueueStrategy allocateMessageQueueStrategy) {    this.consumerGroup = consumerGroup;    this.namespace = namespace;    // 设置调配策略    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);}

defaultMQPushConsumerImpl的结构

public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {    this.defaultMQPushConsumer = defaultMQPushConsumer;    this.rpcHook = rpcHook;    // 有一个pull的产生异样后提早多久再接着解决的一个工夫    this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();}

总结

consumer结构次要是结构外部的defaultMQPushConsumerImpl,而后须要理解一下调配的策略,有一个细节,就是这个pull过程中产生异样了的一个提早解决的工夫