乐趣区

关于java:RocketMQ-消费者2客户端设计和启动流程详解-源码解析

1. 背景

本文是 RocketMQ 消费者系列的第二篇,介绍消费者相干类与调用关系,同时蕴含消费者启动流程。
看完本文可能对音讯生产波及到的相干类和生产流程有大体的理解。

2. 概要设计

2.1 消费者客户端设计

先看一下 RocketMQ 客户端代码中消费者相干的类图。

其中 DefaultMQPullConsumerDefaultMQPushConsumer 就是咱们理论生产中须要新建的消费者对象。它们别离实现了消费者接口,扩大了客户端配置类。

新建 DefaultXXXXConsumer 对象时会在外部一个创立 DefaultMQXXXXConsumerImpl 对象。这里应用了代理模式,DefaultXXXXConsumer 对象只是一个壳,外部的大部分办法都通过调用代理 DefaultMQXXXXConsumerImpl 来执行。

DefaultMQXXXXConsumerImpl 实现类中蕴含了客户端实例 MQClientInstnace,每个客户端过程个别只有一个这玩意。它的用途很多,比方保留路由和客户端信息,向 Broker 发送申请等。

2.2 消费者客户端启动

消费者的启动次要波及下面讲到的 DefaultMQXXXXConsumerDefaultMQXXXXConsumerImplMQClientInstnace 这三个类。

2.2.1 新建消费者

  • 新建消费者时结构 DefaultMQXXXXConsumer 对象,指定队列负载算法,外部结构一个 DefaultMQXXXXConsumerImpl 对象。
  • DefaultMQXXXXConsumerImpl 设为刚创立状态,并新建重均衡服务 RebalanceService
  • 在首次启动前,DefaultMQXXXXConsumerImpl 对象中的 MQClientInstance 对象还没有被创立进去。

2.2.2 消费者启动

  • 启动命令也是在 DefaultMQXXXXConsumer 调用并代理到 DefaultMQXXXXConsumerImpl
  • 此时 DefaultMQXXXXConsumerImpl 会初始化一些服务和参数,而后创立一个 MQClientInstance 对象。
  • MQClientInstance 对象启动客户端的各种服务(Broker 通信、定时工作、音讯拉取、重均衡……)

3. 具体设计

3.1 消费者客户端类设计

3.1.1 整体类图


3.1.2 消费者接口

因为须要反对拉和推两种生产模式,所以按通常的想法,消费者类的设计中将会有一个 消费者接口 ,而后 推消费者 拉消费者接口 别离扩大 消费者接口。消费者接口提供一些共用办法,拉和推消费者实现拉生产和推生产办法。RocketMQ 就是这样做的。其中 MQConsumer 即消费者接口,扩大 MQAdmin 在这显得有些多余。

  • MQAdmin 接口提供了客户端的一些根本的治理接口,生产者、消费者和命令工具都扩大了它。
  • MQConsumer 接口很简略,次要提供了通过 Topic 获取读队列的办法 Set<MessageQueue> fetchSubscribeMessageQueues(final String topic)

3.1.3 拉 & 推模式消费者接口

接下来是拉消费者和推消费者接口。

如果咱们本人来设计拉 & 推模式消费者接口,须要定义哪些办法?能够设想一下生产时要做的操作,就能够定义出相应的办法。

  • 拉模式消费者的生产步骤为:拉取音讯,执行生产逻辑,上报生产进度,如果有需要的话对于生产失败的音讯还须要发回 Broker 从新生产。
  • 推模式消费者生产步骤更简略,只须要订阅一个 Topic,而后指定生产回调函数,即可在收到音讯时主动生产。

RocketMQ 的拉 & 推模式消费者接口就定义了这些办法,先来看一下类图:

MQPullConsumer

  • void registerMessageQueueListener(final String topic, final MessageQueueListener listener) 办法注册音讯队列变更时的回调办法。
  • PullResult pull 从 RocketMQ 服务器拉取一批音讯。

    • MessageQueue:拉取的队列
    • MessageSelector:音讯过滤器
    • offset:拉取的音讯在生产队列中的偏移量
    • maxNums:最大拉取音讯条数
    • timeout:拉取超时工夫
  • void pull 为异步拉取办法,拉取胜利后调用 PullCallback
  • updateConsumeOffset 更新音讯生产偏移量
  • fetchConsumeOffset 获取音讯生产偏移量
  • sendMessageBack 对于生产失败的音讯,发回 Broker 从新生产

MQPushConsumer

  • subscribe:订阅主题,订阅之后能够收到来自该主题的音讯。

    • topic:订阅的主题,能够屡次调用该办法来订阅多个主题
    • subExpression:音讯过滤表达式
    • messageSelector:音讯选择器,提供了 SQL92 和 Tag 模式的过滤抉择性能
  • unsubscribe:勾销订阅
  • registerMessageListener:用来注册生产监听器,蕴含两种生产模式:并发生产和程序生产

3.1.4 消费者实现

DefaultMQXXXXConsumer 是拉消费者接口 MQXXXXConsumer 的默认实现。这里用到了代理模式,将具体的办法实现都实现在 DefaultMQXXXXConsumerImpl 中,DefaultMQXXXXConsumer 保留了一个 DefaultMQXXXXConsumerImpl 的代理。

DefaultMQXXXXConsumerImpl 实现了 MQConsumerInner 接口,提供了消费者实现的一些专用办法。

DefaultMQXXXXConsumerImpl 中有一个客户端实例的援用 MQClientInstance mqClientFactory,用来与 Broker 通信、保留元数据。

MQClientInstnace:客户端实例,每个客户端过程个别只有一个这玩意。它的用途很多,很多操作最终都是调用它来做的。

  • 保留路由信息
  • 保留生产者消费者组信息
  • 向 Broker 发送申请
  • 启动重均衡

3.1.5 推模式消费者实现

拉模式消费者须要手动拉取音讯进行生产,平平无奇。推模式消费者主动监听推送过去的音讯并进行生产,着重解说。

推模式消费者理论外部也是通过拉取音讯的形式进行音讯拉取,只不过封装了订阅和监听器这样的对外接口,让用户在应用时感觉像 Broker 被动推送音讯到消费者。

在拉消费者背地,有一个线程默默被动拉取音讯,能力将拉转换为推,它就是 PullMessageService。此外,推消费者还反对并发生产和程序生产,RocketMQ 定义了 ConsumeMessageService 接口来执行音讯生产,ConsumeMessageConcurrentlyServiceConsumeMessageOrderlyService 别离是并发生产和程序生产的实现。它们外部都定义了一个生产线程池 consumeExecutor 来执行最终的音讯生产逻辑。而用户真正编写的只有最终的生产逻辑,即实现 MessageListener 接口的 consumeMessage 办法。

推模式消费者实现相干的类图如下所示:

在图中,展现了音讯生产整个流程的调用关系。在系列前面的文章中会具体解说。

  1. 客户端实例中的重均衡服务进行重均衡,生成一个 PullRequest 并调用拉消费者实现类的 executePullRequestImmediately 办法
  2. DefaultMQPushConsumerImpl 调用 PullMessageService 线程的 executePullRequestImmediately 办法,
  3. 该办法将 PullRequest 放入待执行的拉取申请队列
  4. PullMessageService 线程阻塞期待申请队列中的拉取申请
  5. 收到拉去申请 PullRequest 后就执行拉取音讯拉取办法 pullMessage 从 Broker 拉取音讯,拉取后执行生产音讯逻辑
  6. 生产音讯逻辑会调用 ConsumeMessageServicesubmitConsumeRequest 办法
  7. 该办法将生产音讯的申请提交到生产线程池 consumeExecutor
  8. 生产线程池执行真正的音讯生产逻辑,调用 MessageListener 接口的 consumeMessage 办法
  9. 拉取一批音讯胜利后,将拉取申请 PullRequest 的拉取偏移量更新后再次调用 executePullRequestImmediately 办法,放入拉取队列,从新拉取

3.2 消费者启动

因为拉模式和推模式消费者的启动流程大致相同,所以只介绍推模式消费者的启动流程。

DefaultMQPushConsumer 的启动办法外部理论是调用其代理类 DefaultMQPushConsumerImpl 的启动办法,他自身的启动办法并没有什么逻辑。

DefaultMQPushConsumerImpl 的启动办法执行的动作如下:

  1. 查看是否是刚创立状态,如果是才持续走启动流程
  2. 查看消费者配置信息是否非法
  3. 将用户的 Topic 订阅信息和重试 Topic 的订阅信息增加到 rebalanceImpl 中的 Map 中
  4. 创立和初始化一些对象

    1. 创立或获取曾经创立的客户端实例 MQClientInstance
    2. 初始化消费者的重均衡实现 RebalanceImpl
    3. 创立拉取音讯接口调用包装类 PullApiWrapper
    4. 注册音讯过滤钩子函数列表(如果有的话)
  5. 初始化生产进度

    • 播送模式,生产进度保留在消费者本地 LocalFileOffsetStore
    • 集群模式,生产进度保留在 Broker RemoteBrokerOffsetStore
  6. 初始化音讯生产服务,生产服务外部保护一个线程池,负责音讯生产
  7. 将消费者注册到客户端实例对象
  8. 启动客户端实例对象
  9. 从 Name server 更新 Topic 路由信息(如果路由信息有变动)
  10. 将客户端的信息(ID、生产者、消费者信息)上报给 Broker
  11. 唤醒重均衡线程 RebalanceService 立刻执行重均衡
  12. 重均衡后调用拉取音讯办法,生成拉取申请 PullRequest 并放入 PullMessageService,开始生产流程

客户端实例 MQClientInstance 的启动流程如下:

  1. 更新 Namesrv 地址
  2. 启动通信模块 MQClientAPIImpl
  3. 启动定时工作(从 Namesrv 拉取路由、向 Broker 发送心跳等)
  4. 启动拉取音讯服务 PullMessageService
  5. 启动重均衡线程 RebalanceService
  6. 启动默认生产者(用于将生产失败的音讯从新生产到 Broker)

4. 源码解析

4.1 DefaultMQProducerImpl 启动

// DefaultMQProducerImpl
/**
 * Push 消费者启动
 *
 * @throws MQClientException
 */
public synchronized void start() throws MQClientException {switch (this.serviceState) {
            // 查看消费者状态。只有第一次启动才执行,如果二次调用 start 办法会报错
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                     this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;

            // 查看消费者配置是否非法
            this.checkConfig();

            // 将用户的 Topic 订阅信息和重试 Topic 的订阅信息增加到 RebalanceImpl 的容器中
            this.copySubscription();

            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            // 创立客户端实例
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

            // 初始化 RebalanceImpl
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            // 创立拉取音讯接口调用包装类
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            // 注册音讯过滤钩子函数列表
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            // 初始化生产进度
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        // 播送模式,生产进度保留在消费者本地
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        // 集群模式,生产进度保留在 Broker
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            this.offsetStore.load();

            // 初始化音讯生产服务
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            this.consumeMessageService.start();

            // 注册消费者到客户端实例
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                            null);
            }

            // 启动客户端实例
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once,"
                                        + this.serviceState
                                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                        null);
        default:
            break;
    }

    // 从 Namesrv 更新路由信息
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    // 将客户端信息上报给 Broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 唤醒重均衡线程,立刻执行重均衡
    this.mQClientFactory.rebalanceImmediately();}

4.2 MQClientInstance 启动

// MQClientInstance.java
/**
 * 启动客户端代理
 *
 * @throws MQClientException
 */
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // 启动通信模块
                this.mQClientAPIImpl.start();
                // 启动定时工作(从 Namesrv 拉取路由、向 Broker 发送心跳等)this.startScheduledTask();
                // 启动拉取音讯服务
                this.pullMessageService.start();
                // 启动重均衡线程
                this.rebalanceService.start();
                // 启动默认生产者(用于将生产失败的音讯从新生产到 Broker)this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

欢送关注公众号【消息中间件】(middleware-mq),更新消息中间件的源码解析和最新动静!

本文由博客一文多发平台 OpenWrite 公布!

退出移动版