关于java:RocketMQ-消费者2客户端设计和启动流程详解-源码解析

34次阅读

共计 8836 个字符,预计需要花费 23 分钟才能阅读完成。

1. 背景

本文是 RocketMQ 消费者系列的第二篇,介绍消费者相干类与调用关系,同时蕴含消费者启动流程。
看完本文可能对音讯生产波及到的相干类和生产流程有大体的理解。

2. 概要设计

2.1 消费者客户端设计

先看一下 RocketMQ 客户端代码中消费者相干的类图。

其中 DefaultMQPullConsumerDefaultMQPushConsumer 就是咱们理论生产中须要新建的消费者对象。它们别离实现了消费者接口,扩大了客户端配置类。

新建 DefaultXXXXConsumer 对象时会在外部一个创立 DefaultMQXXXXConsumerImpl 对象。这里应用了代理模式,DefaultXXXXConsumer 对象只是一个壳,外部的大部分办法都通过调用代理 DefaultMQXXXXConsumerImpl 来执行。

DefaultMQXXXXConsumerImpl 实现类中蕴含了客户端实例 MQClientInstnace,每个客户端过程个别只有一个这玩意。它的用途很多,比方保留路由和客户端信息,向 Broker 发送申请等。

2.2 消费者客户端启动

消费者的启动次要波及下面讲到的 DefaultMQXXXXConsumerDefaultMQXXXXConsumerImplMQClientInstnace 这三个类。

2.2.1 新建消费者

  • 新建消费者时结构 DefaultMQXXXXConsumer 对象,指定队列负载算法,外部结构一个 DefaultMQXXXXConsumerImpl 对象。
  • DefaultMQXXXXConsumerImpl 设为刚创立状态,并新建重均衡服务 RebalanceService
  • 在首次启动前,DefaultMQXXXXConsumerImpl 对象中的 MQClientInstance 对象还没有被创立进去。

2.2.2 消费者启动

  • 启动命令也是在 DefaultMQXXXXConsumer 调用并代理到 DefaultMQXXXXConsumerImpl
  • 此时 DefaultMQXXXXConsumerImpl 会初始化一些服务和参数,而后创立一个 MQClientInstance 对象。
  • MQClientInstance 对象启动客户端的各种服务(Broker 通信、定时工作、音讯拉取、重均衡……)

3. 具体设计

3.1 消费者客户端类设计

3.1.1 整体类图


3.1.2 消费者接口

因为须要反对拉和推两种生产模式,所以按通常的想法,消费者类的设计中将会有一个 消费者接口 ,而后 推消费者 拉消费者接口 别离扩大 消费者接口。消费者接口提供一些共用办法,拉和推消费者实现拉生产和推生产办法。RocketMQ 就是这样做的。其中 MQConsumer 即消费者接口,扩大 MQAdmin 在这显得有些多余。

  • MQAdmin 接口提供了客户端的一些根本的治理接口,生产者、消费者和命令工具都扩大了它。
  • MQConsumer 接口很简略,次要提供了通过 Topic 获取读队列的办法 Set<MessageQueue> fetchSubscribeMessageQueues(final String topic)

3.1.3 拉 & 推模式消费者接口

接下来是拉消费者和推消费者接口。

如果咱们本人来设计拉 & 推模式消费者接口,须要定义哪些办法?能够设想一下生产时要做的操作,就能够定义出相应的办法。

  • 拉模式消费者的生产步骤为:拉取音讯,执行生产逻辑,上报生产进度,如果有需要的话对于生产失败的音讯还须要发回 Broker 从新生产。
  • 推模式消费者生产步骤更简略,只须要订阅一个 Topic,而后指定生产回调函数,即可在收到音讯时主动生产。

RocketMQ 的拉 & 推模式消费者接口就定义了这些办法,先来看一下类图:

MQPullConsumer

  • void registerMessageQueueListener(final String topic, final MessageQueueListener listener) 办法注册音讯队列变更时的回调办法。
  • PullResult pull 从 RocketMQ 服务器拉取一批音讯。

    • MessageQueue:拉取的队列
    • MessageSelector:音讯过滤器
    • offset:拉取的音讯在生产队列中的偏移量
    • maxNums:最大拉取音讯条数
    • timeout:拉取超时工夫
  • void pull 为异步拉取办法,拉取胜利后调用 PullCallback
  • updateConsumeOffset 更新音讯生产偏移量
  • fetchConsumeOffset 获取音讯生产偏移量
  • sendMessageBack 对于生产失败的音讯,发回 Broker 从新生产

MQPushConsumer

  • subscribe:订阅主题,订阅之后能够收到来自该主题的音讯。

    • topic:订阅的主题,能够屡次调用该办法来订阅多个主题
    • subExpression:音讯过滤表达式
    • messageSelector:音讯选择器,提供了 SQL92 和 Tag 模式的过滤抉择性能
  • unsubscribe:勾销订阅
  • registerMessageListener:用来注册生产监听器,蕴含两种生产模式:并发生产和程序生产

3.1.4 消费者实现

DefaultMQXXXXConsumer 是拉消费者接口 MQXXXXConsumer 的默认实现。这里用到了代理模式,将具体的办法实现都实现在 DefaultMQXXXXConsumerImpl 中,DefaultMQXXXXConsumer 保留了一个 DefaultMQXXXXConsumerImpl 的代理。

DefaultMQXXXXConsumerImpl 实现了 MQConsumerInner 接口,提供了消费者实现的一些专用办法。

DefaultMQXXXXConsumerImpl 中有一个客户端实例的援用 MQClientInstance mqClientFactory,用来与 Broker 通信、保留元数据。

MQClientInstnace:客户端实例,每个客户端过程个别只有一个这玩意。它的用途很多,很多操作最终都是调用它来做的。

  • 保留路由信息
  • 保留生产者消费者组信息
  • 向 Broker 发送申请
  • 启动重均衡

3.1.5 推模式消费者实现

拉模式消费者须要手动拉取音讯进行生产,平平无奇。推模式消费者主动监听推送过去的音讯并进行生产,着重解说。

推模式消费者理论外部也是通过拉取音讯的形式进行音讯拉取,只不过封装了订阅和监听器这样的对外接口,让用户在应用时感觉像 Broker 被动推送音讯到消费者。

在拉消费者背地,有一个线程默默被动拉取音讯,能力将拉转换为推,它就是 PullMessageService。此外,推消费者还反对并发生产和程序生产,RocketMQ 定义了 ConsumeMessageService 接口来执行音讯生产,ConsumeMessageConcurrentlyServiceConsumeMessageOrderlyService 别离是并发生产和程序生产的实现。它们外部都定义了一个生产线程池 consumeExecutor 来执行最终的音讯生产逻辑。而用户真正编写的只有最终的生产逻辑,即实现 MessageListener 接口的 consumeMessage 办法。

推模式消费者实现相干的类图如下所示:

在图中,展现了音讯生产整个流程的调用关系。在系列前面的文章中会具体解说。

  1. 客户端实例中的重均衡服务进行重均衡,生成一个 PullRequest 并调用拉消费者实现类的 executePullRequestImmediately 办法
  2. DefaultMQPushConsumerImpl 调用 PullMessageService 线程的 executePullRequestImmediately 办法,
  3. 该办法将 PullRequest 放入待执行的拉取申请队列
  4. PullMessageService 线程阻塞期待申请队列中的拉取申请
  5. 收到拉去申请 PullRequest 后就执行拉取音讯拉取办法 pullMessage 从 Broker 拉取音讯,拉取后执行生产音讯逻辑
  6. 生产音讯逻辑会调用 ConsumeMessageServicesubmitConsumeRequest 办法
  7. 该办法将生产音讯的申请提交到生产线程池 consumeExecutor
  8. 生产线程池执行真正的音讯生产逻辑,调用 MessageListener 接口的 consumeMessage 办法
  9. 拉取一批音讯胜利后,将拉取申请 PullRequest 的拉取偏移量更新后再次调用 executePullRequestImmediately 办法,放入拉取队列,从新拉取

3.2 消费者启动

因为拉模式和推模式消费者的启动流程大致相同,所以只介绍推模式消费者的启动流程。

DefaultMQPushConsumer 的启动办法外部理论是调用其代理类 DefaultMQPushConsumerImpl 的启动办法,他自身的启动办法并没有什么逻辑。

DefaultMQPushConsumerImpl 的启动办法执行的动作如下:

  1. 查看是否是刚创立状态,如果是才持续走启动流程
  2. 查看消费者配置信息是否非法
  3. 将用户的 Topic 订阅信息和重试 Topic 的订阅信息增加到 rebalanceImpl 中的 Map 中
  4. 创立和初始化一些对象

    1. 创立或获取曾经创立的客户端实例 MQClientInstance
    2. 初始化消费者的重均衡实现 RebalanceImpl
    3. 创立拉取音讯接口调用包装类 PullApiWrapper
    4. 注册音讯过滤钩子函数列表(如果有的话)
  5. 初始化生产进度

    • 播送模式,生产进度保留在消费者本地 LocalFileOffsetStore
    • 集群模式,生产进度保留在 Broker RemoteBrokerOffsetStore
  6. 初始化音讯生产服务,生产服务外部保护一个线程池,负责音讯生产
  7. 将消费者注册到客户端实例对象
  8. 启动客户端实例对象
  9. 从 Name server 更新 Topic 路由信息(如果路由信息有变动)
  10. 将客户端的信息(ID、生产者、消费者信息)上报给 Broker
  11. 唤醒重均衡线程 RebalanceService 立刻执行重均衡
  12. 重均衡后调用拉取音讯办法,生成拉取申请 PullRequest 并放入 PullMessageService,开始生产流程

客户端实例 MQClientInstance 的启动流程如下:

  1. 更新 Namesrv 地址
  2. 启动通信模块 MQClientAPIImpl
  3. 启动定时工作(从 Namesrv 拉取路由、向 Broker 发送心跳等)
  4. 启动拉取音讯服务 PullMessageService
  5. 启动重均衡线程 RebalanceService
  6. 启动默认生产者(用于将生产失败的音讯从新生产到 Broker)

4. 源码解析

4.1 DefaultMQProducerImpl 启动

// DefaultMQProducerImpl
/**
 * Push 消费者启动
 *
 * @throws MQClientException
 */
public synchronized void start() throws MQClientException {switch (this.serviceState) {
            // 查看消费者状态。只有第一次启动才执行,如果二次调用 start 办法会报错
        case CREATE_JUST:
            log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                     this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
            this.serviceState = ServiceState.START_FAILED;

            // 查看消费者配置是否非法
            this.checkConfig();

            // 将用户的 Topic 订阅信息和重试 Topic 的订阅信息增加到 RebalanceImpl 的容器中
            this.copySubscription();

            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();
            }

            // 创立客户端实例
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

            // 初始化 RebalanceImpl
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            // 创立拉取音讯接口调用包装类
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            // 注册音讯过滤钩子函数列表
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            // 初始化生产进度
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        // 播送模式,生产进度保留在消费者本地
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        // 集群模式,生产进度保留在 Broker
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }
            this.offsetStore.load();

            // 初始化音讯生产服务
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService =
                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService =
                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
            }

            this.consumeMessageService.start();

            // 注册消费者到客户端实例
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                            null);
            }

            // 启动客户端实例
            mQClientFactory.start();
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once,"
                                        + this.serviceState
                                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                        null);
        default:
            break;
    }

    // 从 Namesrv 更新路由信息
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    this.mQClientFactory.checkClientInBroker();
    // 将客户端信息上报给 Broker
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    // 唤醒重均衡线程,立刻执行重均衡
    this.mQClientFactory.rebalanceImmediately();}

4.2 MQClientInstance 启动

// MQClientInstance.java
/**
 * 启动客户端代理
 *
 * @throws MQClientException
 */
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // 启动通信模块
                this.mQClientAPIImpl.start();
                // 启动定时工作(从 Namesrv 拉取路由、向 Broker 发送心跳等)this.startScheduledTask();
                // 启动拉取音讯服务
                this.pullMessageService.start();
                // 启动重均衡线程
                this.rebalanceService.start();
                // 启动默认生产者(用于将生产失败的音讯从新生产到 Broker)this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

欢送关注公众号【消息中间件】(middleware-mq),更新消息中间件的源码解析和最新动静!

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0