一,Broker缓存的数据

Broker次要缓存了路由信息,蕴含producer表,consumer表,consumerGroup表和topic表。这些信息是在ProducerManager,ConsumerManager,SubscriptionGroupManager,TopicConfigManager这几个类里进行治理的。

ProducerManager//producer列表HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable
  • groupChannelTable:各ProducerGroup中别离有哪些存活的Producer连贯;每个连贯的Producer最初一次发来心跳的工夫
ConsumerManager//consumer列表ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable
  • consumerTable:每个ConsumerGroup中别离有哪些存活的Consumer连贯,别离订阅了哪些Topic,订阅的每个Topic应用什么过滤条件(TAG)。
SubscriptionGroupManager//ConsumerGroup表ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable
  • subscriptionGroupTable:各ConsumerGroup的消费行为特点,例如:生产失败后的最大重试次数;重试队列个数;如果从MasterBroker生产迟缓,切换到哪个Slave Broker进行生产
TopicConfigManager//topic列表ConcurrentMap<String, TopicConfig> topicConfigTable
  • topicConfigTable:散布在以后Broker上的各Topic分片的配置信息,如:蕴含的读/写Queue的数量;是否有读/写权限

二,Broker启动设计

  1. 创立BrokerController
    创立BrokerController类是在BrokerStartup#createBrokerController办法里进行的。先是进行参考解析,完了创立BrokerController类,紧接着调用其initialize办法,外面的逻辑次要有:
    1)加载topic,consumer生产进度,订阅关系与consumer过滤的配置,并会加载音讯的日志文件
    2)再创立一个netty服务监听10909这个VIP端口
    3)初始化一系列线程池,而后在registerProcessor办法里将这些线程池与处理器进行关联,为当前不同的业务应用不同的线程池,也就是线程隔离
    4)启动一些定时工作,比方记录Broker状态,生产进度长久化等
    5)最初进行权限校验初始化和Rpc调用钩子相干服务,这些服务加载形式是Java的SPI形式进行的。
  2. 启动Broker

     public void start() throws Exception {     //启动音讯存储相干的工作     if (this.messageStore != null) {         this.messageStore.start();     }     //启动broker服务器     if (this.remotingServer != null) {         this.remotingServer.start();     }     //启动给音讯发送者应用的netty服务     if (this.fastRemotingServer != null) {         this.fastRemotingServer.start();     }     //启动监控SSL连贯文件的服务     if (this.fileWatchService != null) {         this.fileWatchService.start();     }     //启动内部API的客户端     if (this.brokerOuterAPI != null) {         this.brokerOuterAPI.start();     }     //启动pull模式相干的服务     if (this.pullRequestHoldService != null) {         this.pullRequestHoldService.start();     }     //启动心跳检测服务     if (this.clientHousekeepingService != null) {         this.clientHousekeepingService.start();     }     //启动音讯过滤服务     if (this.filterServerManager != null) {         this.filterServerManager.start();     }     //如果没启动DLegerCommitLog ,就将Broker注册到NameServer上     if (!messageStoreConfig.isEnableDLegerCommitLog()) {         startProcessorByHa(messageStoreConfig.getBrokerRole());         handleSlaveSynchronize(messageStoreConfig.getBrokerRole());     }     /*向namesrv注册*/     this.registerBrokerAll(true, false, true);     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {         @Override         public void run() {             try {                 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());             } catch (Throwable e) {                 log.error("registerBrokerAll Exception", e);             }         }     }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);     if (this.brokerStatsManager != null) {         this.brokerStatsManager.start();     }     if (this.brokerFastFailure != null) {         this.brokerFastFailure.start();     } }
  • messageStore服务:解决音讯的存储相干的日志,比方CommitLog,ConsumeQueue等
  • remotingServer服务:解决客户端producer&consumer的申请
  • fastRemotingServer服务:默认端口可能存在多用,可能会造成业务阻塞。新开一个VIP端口专门进行音讯解决。不过4.5版本之后默认已敞开,是为了妆容之前版本。
  • fileWatchService服务:启动监控服务连贯时用到的SSL连贯文件的服务
  • brokerOuterAPI服务:RocketMQ控制台跟Broker交互时候的客户端
  • pullRequestHoldService服务:解决push模式生产,或者提早生产的服务
  • clientHousekeepingService服务:心跳连贯用的服务
  • filterServerManager服务:过滤音讯服务
  • transactionalMessageCheckService服务:定期检查和处理事务音讯服务
  • slaveSynchronize服务:主从路由信息同步服务
  1. netty服务端的启动
    这里能够参考之前文章三大点4小点里的服务端的创立

参考文章:
Broker局部之Broker启动过程BrokerStartup(2)