乐趣区

聊聊rocketmq的adjustThreadPoolNumsThreshold

本文主要研究一下 rocketmq 的 adjustThreadPoolNumsThreshold

DefaultMQPushConsumer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {private final InternalLogger log = ClientLogger.getLog();

    //......

    /**
     * Threshold for dynamic adjustment of the number of thread pool
     */
    private long adjustThreadPoolNumsThreshold = 100000;

    public long getAdjustThreadPoolNumsThreshold() {return adjustThreadPoolNumsThreshold;}

    public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;}

    //......
}
  • DefaultMQPushConsumer 定义了 adjustThreadPoolNumsThreshold 属性,默认为 100000

DefaultMQPushConsumerImpl

ocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    //......

    public void adjustThreadPool() {long computeAccTotal = this.computeAccumulationTotal();
        long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();

        long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);

        long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);

        if (computeAccTotal >= incThreshold) {this.consumeMessageService.incCorePoolSize();
        }

        if (computeAccTotal < decThreshold) {this.consumeMessageService.decCorePoolSize();
        }
    }

    private long computeAccumulationTotal() {
        long msgAccTotal = 0;
        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
        Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
        while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();
            ProcessQueue value = next.getValue();
            msgAccTotal += value.getMsgAccCnt();}

        return msgAccTotal;
    }

    //......
}
  • adjustThreadPool 方法会计算 computeAccTotal,然后使用 adjustThreadPoolNumsThreshold 1.0 作为 incThreshold,使用 adjustThreadPoolNumsThreshold 0.8 作为 decThreshold;对于 computeAccTotal 大于等于 incThreshold 的,执行 consumeMessageService.incCorePoolSize();对于 computeAccTotal 小于 decThreshold 的执行 consumeMessageService.decCorePoolSize()

MQClientInstance

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public class MQClientInstance {
    //......

    public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

    private void startScheduledTask() {

        //......

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

    public void adjustThreadPool() {Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {if (impl instanceof DefaultMQPushConsumerImpl) {DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;
                        dmq.adjustThreadPool();}
                } catch (Exception e) {}}
        }
    }

    //......
}
  • MQClientInstance 的 start 方法对于 CREATE_JUST 状态会执行 startScheduledTask() 方法,后者会注册一个定时任务,每隔 1 分钟执行一次 adjustThreadPool 方法;adjustThreadPool 方法则遍历 consumerTable 的 MQConsumerInner,对于 DefaultMQPushConsumerImpl 类型的 MQConsumerInner 执行 adjustThreadPool 方法

小结

DefaultMQPushConsumer 定义了 adjustThreadPoolNumsThreshold 属性,默认为 100000;MQClientInstance 的 start 方法对于 CREATE_JUST 状态会执行 startScheduledTask() 方法,后者会注册一个定时任务,每隔 1 分钟执行一次 adjustThreadPool 方法;adjustThreadPool 方法则遍历 consumerTable 的 MQConsumerInner,对于 DefaultMQPushConsumerImpl 类型的 MQConsumerInner 执行 adjustThreadPool 方法

doc

  • DefaultMQPushConsumer
退出移动版