共计 4264 个字符,预计需要花费 11 分钟才能阅读完成。
序
本文主要研究一下 rocketmq 的 adjustThreadPoolNumsThreshold
DefaultMQPushConsumer
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {private final InternalLogger log = ClientLogger.getLog();
//......
/**
* Threshold for dynamic adjustment of the number of thread pool
*/
private long adjustThreadPoolNumsThreshold = 100000;
public long getAdjustThreadPoolNumsThreshold() {return adjustThreadPoolNumsThreshold;}
public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;}
//......
}
- DefaultMQPushConsumer 定义了 adjustThreadPoolNumsThreshold 属性,默认为 100000
DefaultMQPushConsumerImpl
ocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
//......
public void adjustThreadPool() {long computeAccTotal = this.computeAccumulationTotal();
long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);
long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);
if (computeAccTotal >= incThreshold) {this.consumeMessageService.incCorePoolSize();
}
if (computeAccTotal < decThreshold) {this.consumeMessageService.decCorePoolSize();
}
}
private long computeAccumulationTotal() {
long msgAccTotal = 0;
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue value = next.getValue();
msgAccTotal += value.getMsgAccCnt();}
return msgAccTotal;
}
//......
}
- adjustThreadPool 方法会计算 computeAccTotal,然后使用 adjustThreadPoolNumsThreshold 1.0 作为 incThreshold,使用 adjustThreadPoolNumsThreshold 0.8 作为 decThreshold;对于 computeAccTotal 大于等于 incThreshold 的,执行 consumeMessageService.incCorePoolSize();对于 computeAccTotal 小于 decThreshold 的执行 consumeMessageService.decCorePoolSize()
MQClientInstance
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public class MQClientInstance {
//......
public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
private void startScheduledTask() {
//......
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
public void adjustThreadPool() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {if (impl instanceof DefaultMQPushConsumerImpl) {DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;
dmq.adjustThreadPool();}
} catch (Exception e) {}}
}
}
//......
}
- MQClientInstance 的 start 方法对于 CREATE_JUST 状态会执行 startScheduledTask() 方法,后者会注册一个定时任务,每隔 1 分钟执行一次 adjustThreadPool 方法;adjustThreadPool 方法则遍历 consumerTable 的 MQConsumerInner,对于 DefaultMQPushConsumerImpl 类型的 MQConsumerInner 执行 adjustThreadPool 方法
小结
DefaultMQPushConsumer 定义了 adjustThreadPoolNumsThreshold 属性,默认为 100000;MQClientInstance 的 start 方法对于 CREATE_JUST 状态会执行 startScheduledTask() 方法,后者会注册一个定时任务,每隔 1 分钟执行一次 adjustThreadPool 方法;adjustThreadPool 方法则遍历 consumerTable 的 MQConsumerInner,对于 DefaultMQPushConsumerImpl 类型的 MQConsumerInner 执行 adjustThreadPool 方法
doc
- DefaultMQPushConsumer
正文完