乐趣区

关于rocketmq:RoketMq源码Consumer的构造

DefaultMQPushConsumer 的结构

实例代码

        // 设置 group
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");
        // 注册生产监听
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {System.out.println("Receive New Messages:" + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    // 启动
    consumer.start();

consumer 结构

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
    this.consumerGroup = consumerGroup;
    this.namespace = namespace;
    // 设置调配策略
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

defaultMQPushConsumerImpl 的结构

public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
    this.defaultMQPushConsumer = defaultMQPushConsumer;
    this.rpcHook = rpcHook;
    // 有一个 pull 的产生异样后提早多久再接着解决的一个工夫
    this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();}

总结

consumer 结构次要是结构外部的 defaultMQPushConsumerImpl,而后须要理解一下调配的策略,有一个细节,就是这个 pull 过程中产生异样了的一个提早解决的工夫

退出移动版