DefaultMQPushConsumer的结构
实例代码
// 设置group DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group"); // 注册生产监听 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(" Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动 consumer.start();
consumer结构
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) { this.consumerGroup = consumerGroup; this.namespace = namespace; // 设置调配策略 this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);}
defaultMQPushConsumerImpl的结构
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) { this.defaultMQPushConsumer = defaultMQPushConsumer; this.rpcHook = rpcHook; // 有一个pull的产生异样后提早多久再接着解决的一个工夫 this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();}
总结
consumer结构次要是结构外部的defaultMQPushConsumerImpl,而后须要理解一下调配的策略,有一个细节,就是这个pull过程中产生异样了的一个提早解决的工夫