共计 8836 个字符,预计需要花费 23 分钟才能阅读完成。
1. 背景
本文是 RocketMQ 消费者系列的第二篇,介绍消费者相干类与调用关系,同时蕴含消费者启动流程。
看完本文可能对音讯生产波及到的相干类和生产流程有大体的理解。
2. 概要设计
2.1 消费者客户端设计
先看一下 RocketMQ 客户端代码中消费者相干的类图。
其中 DefaultMQPullConsumer
和 DefaultMQPushConsumer
就是咱们理论生产中须要新建的消费者对象。它们别离实现了消费者接口,扩大了客户端配置类。
新建 DefaultXXXXConsumer
对象时会在外部一个创立 DefaultMQXXXXConsumerImpl
对象。这里应用了代理模式,DefaultXXXXConsumer
对象只是一个壳,外部的大部分办法都通过调用代理 DefaultMQXXXXConsumerImpl
来执行。
DefaultMQXXXXConsumerImpl
实现类中蕴含了客户端实例 MQClientInstnace
,每个客户端过程个别只有一个这玩意。它的用途很多,比方保留路由和客户端信息,向 Broker 发送申请等。
2.2 消费者客户端启动
消费者的启动次要波及下面讲到的 DefaultMQXXXXConsumer
、DefaultMQXXXXConsumerImpl
和 MQClientInstnace
这三个类。
2.2.1 新建消费者
- 新建消费者时结构
DefaultMQXXXXConsumer
对象,指定队列负载算法,外部结构一个DefaultMQXXXXConsumerImpl
对象。 DefaultMQXXXXConsumerImpl
设为刚创立状态,并新建重均衡服务RebalanceService
- 在首次启动前,
DefaultMQXXXXConsumerImpl
对象中的MQClientInstance
对象还没有被创立进去。
2.2.2 消费者启动
- 启动命令也是在
DefaultMQXXXXConsumer
调用并代理到DefaultMQXXXXConsumerImpl
。 - 此时
DefaultMQXXXXConsumerImpl
会初始化一些服务和参数,而后创立一个MQClientInstance
对象。 MQClientInstance
对象启动客户端的各种服务(Broker 通信、定时工作、音讯拉取、重均衡……)
3. 具体设计
3.1 消费者客户端类设计
3.1.1 整体类图
3.1.2 消费者接口
因为须要反对拉和推两种生产模式,所以按通常的想法,消费者类的设计中将会有一个 消费者接口 ,而后 推消费者 和拉消费者接口 别离扩大 消费者接口。消费者接口提供一些共用办法,拉和推消费者实现拉生产和推生产办法。RocketMQ 就是这样做的。其中 MQConsumer 即消费者接口,扩大 MQAdmin 在这显得有些多余。
- MQAdmin 接口提供了客户端的一些根本的治理接口,生产者、消费者和命令工具都扩大了它。
- MQConsumer 接口很简略,次要提供了通过 Topic 获取读队列的办法
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic)
。
3.1.3 拉 & 推模式消费者接口
接下来是拉消费者和推消费者接口。
如果咱们本人来设计拉 & 推模式消费者接口,须要定义哪些办法?能够设想一下生产时要做的操作,就能够定义出相应的办法。
- 拉模式消费者的生产步骤为:拉取音讯,执行生产逻辑,上报生产进度,如果有需要的话对于生产失败的音讯还须要发回 Broker 从新生产。
- 推模式消费者生产步骤更简略,只须要订阅一个 Topic,而后指定生产回调函数,即可在收到音讯时主动生产。
RocketMQ 的拉 & 推模式消费者接口就定义了这些办法,先来看一下类图:
MQPullConsumer
void registerMessageQueueListener(final String topic, final MessageQueueListener listener)
办法注册音讯队列变更时的回调办法。-
PullResult pull
从 RocketMQ 服务器拉取一批音讯。- MessageQueue:拉取的队列
- MessageSelector:音讯过滤器
- offset:拉取的音讯在生产队列中的偏移量
- maxNums:最大拉取音讯条数
- timeout:拉取超时工夫
void pull
为异步拉取办法,拉取胜利后调用PullCallback
updateConsumeOffset
更新音讯生产偏移量fetchConsumeOffset
获取音讯生产偏移量sendMessageBack
对于生产失败的音讯,发回 Broker 从新生产
MQPushConsumer
-
subscribe
:订阅主题,订阅之后能够收到来自该主题的音讯。- topic:订阅的主题,能够屡次调用该办法来订阅多个主题
- subExpression:音讯过滤表达式
- messageSelector:音讯选择器,提供了 SQL92 和 Tag 模式的过滤抉择性能
unsubscribe
:勾销订阅registerMessageListener
:用来注册生产监听器,蕴含两种生产模式:并发生产和程序生产
3.1.4 消费者实现
DefaultMQXXXXConsumer
是拉消费者接口 MQXXXXConsumer
的默认实现。这里用到了代理模式,将具体的办法实现都实现在 DefaultMQXXXXConsumerImpl
中,DefaultMQXXXXConsumer
保留了一个 DefaultMQXXXXConsumerImpl
的代理。
DefaultMQXXXXConsumerImpl
实现了 MQConsumerInner
接口,提供了消费者实现的一些专用办法。
DefaultMQXXXXConsumerImpl
中有一个客户端实例的援用 MQClientInstance mqClientFactory
,用来与 Broker 通信、保留元数据。
MQClientInstnace:客户端实例,每个客户端过程个别只有一个这玩意。它的用途很多,很多操作最终都是调用它来做的。
- 保留路由信息
- 保留生产者消费者组信息
- 向 Broker 发送申请
- 启动重均衡
3.1.5 推模式消费者实现
拉模式消费者须要手动拉取音讯进行生产,平平无奇。推模式消费者主动监听推送过去的音讯并进行生产,着重解说。
推模式消费者理论外部也是通过拉取音讯的形式进行音讯拉取,只不过封装了订阅和监听器这样的对外接口,让用户在应用时感觉像 Broker 被动推送音讯到消费者。
在拉消费者背地,有一个线程默默被动拉取音讯,能力将拉转换为推,它就是 PullMessageService
。此外,推消费者还反对并发生产和程序生产,RocketMQ 定义了 ConsumeMessageService
接口来执行音讯生产,ConsumeMessageConcurrentlyService
和 ConsumeMessageOrderlyService
别离是并发生产和程序生产的实现。它们外部都定义了一个生产线程池 consumeExecutor
来执行最终的音讯生产逻辑。而用户真正编写的只有最终的生产逻辑,即实现 MessageListener
接口的 consumeMessage
办法。
推模式消费者实现相干的类图如下所示:
在图中,展现了音讯生产整个流程的调用关系。在系列前面的文章中会具体解说。
- 客户端实例中的重均衡服务进行重均衡,生成一个
PullRequest
并调用拉消费者实现类的executePullRequestImmediately
办法 DefaultMQPushConsumerImpl
调用PullMessageService
线程的executePullRequestImmediately
办法,- 该办法将
PullRequest
放入待执行的拉取申请队列 PullMessageService
线程阻塞期待申请队列中的拉取申请- 收到拉去申请
PullRequest
后就执行拉取音讯拉取办法pullMessage
从 Broker 拉取音讯,拉取后执行生产音讯逻辑 - 生产音讯逻辑会调用
ConsumeMessageService
的submitConsumeRequest
办法 - 该办法将生产音讯的申请提交到生产线程池
consumeExecutor
- 生产线程池执行真正的音讯生产逻辑,调用
MessageListener
接口的consumeMessage
办法 - 拉取一批音讯胜利后,将拉取申请
PullRequest
的拉取偏移量更新后再次调用executePullRequestImmediately
办法,放入拉取队列,从新拉取
3.2 消费者启动
因为拉模式和推模式消费者的启动流程大致相同,所以只介绍推模式消费者的启动流程。
DefaultMQPushConsumer
的启动办法外部理论是调用其代理类 DefaultMQPushConsumerImpl
的启动办法,他自身的启动办法并没有什么逻辑。
DefaultMQPushConsumerImpl
的启动办法执行的动作如下:
- 查看是否是刚创立状态,如果是才持续走启动流程
- 查看消费者配置信息是否非法
- 将用户的 Topic 订阅信息和重试 Topic 的订阅信息增加到
rebalanceImpl
中的 Map 中 -
创立和初始化一些对象
- 创立或获取曾经创立的客户端实例
MQClientInstance
- 初始化消费者的重均衡实现
RebalanceImpl
- 创立拉取音讯接口调用包装类
PullApiWrapper
- 注册音讯过滤钩子函数列表(如果有的话)
- 创立或获取曾经创立的客户端实例
-
初始化生产进度
- 播送模式,生产进度保留在消费者本地
LocalFileOffsetStore
- 集群模式,生产进度保留在 Broker
RemoteBrokerOffsetStore
- 播送模式,生产进度保留在消费者本地
- 初始化音讯生产服务,生产服务外部保护一个线程池,负责音讯生产
- 将消费者注册到客户端实例对象
- 启动客户端实例对象
- 从 Name server 更新 Topic 路由信息(如果路由信息有变动)
- 将客户端的信息(ID、生产者、消费者信息)上报给 Broker
- 唤醒重均衡线程
RebalanceService
立刻执行重均衡 - 重均衡后调用拉取音讯办法,生成拉取申请
PullRequest
并放入PullMessageService
,开始生产流程
客户端实例 MQClientInstance
的启动流程如下:
- 更新 Namesrv 地址
- 启动通信模块
MQClientAPIImpl
- 启动定时工作(从 Namesrv 拉取路由、向 Broker 发送心跳等)
- 启动拉取音讯服务
PullMessageService
- 启动重均衡线程
RebalanceService
- 启动默认生产者(用于将生产失败的音讯从新生产到 Broker)
4. 源码解析
4.1 DefaultMQProducerImpl
启动
// DefaultMQProducerImpl
/**
* Push 消费者启动
*
* @throws MQClientException
*/
public synchronized void start() throws MQClientException {switch (this.serviceState) {
// 查看消费者状态。只有第一次启动才执行,如果二次调用 start 办法会报错
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
// 查看消费者配置是否非法
this.checkConfig();
// 将用户的 Topic 订阅信息和重试 Topic 的订阅信息增加到 RebalanceImpl 的容器中
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 创立客户端实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 初始化 RebalanceImpl
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 创立拉取音讯接口调用包装类
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 注册音讯过滤钩子函数列表
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
// 初始化生产进度
if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 播送模式,生产进度保留在消费者本地
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
// 集群模式,生产进度保留在 Broker
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
// 初始化音讯生产服务
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
// 注册消费者到客户端实例
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 启动客户端实例
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once,"
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 从 Namesrv 更新路由信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
// 将客户端信息上报给 Broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒重均衡线程,立刻执行重均衡
this.mQClientFactory.rebalanceImmediately();}
4.2 MQClientInstance
启动
// MQClientInstance.java
/**
* 启动客户端代理
*
* @throws MQClientException
*/
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();
}
// 启动通信模块
this.mQClientAPIImpl.start();
// 启动定时工作(从 Namesrv 拉取路由、向 Broker 发送心跳等)this.startScheduledTask();
// 启动拉取音讯服务
this.pullMessageService.start();
// 启动重均衡线程
this.rebalanceService.start();
// 启动默认生产者(用于将生产失败的音讯从新生产到 Broker)this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
欢送关注公众号【消息中间件】(middleware-mq),更新消息中间件的源码解析和最新动静!
本文由博客一文多发平台 OpenWrite 公布!