乐趣区

关于rocketmq:RocketMQ学习九Broker分析

一,Broker 缓存的数据

Broker 次要缓存了路由信息,蕴含 producer 表,consumer 表,consumerGroup 表和 topic 表。这些信息是在 ProducerManager,ConsumerManager,SubscriptionGroupManager,TopicConfigManager 这几个类里进行治理的。

ProducerManager
//producer 列表
HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable
  • groupChannelTable: 各 ProducerGroup 中别离有哪些存活的 Producer 连贯;每个连贯的 Producer 最初一次发来心跳的工夫
ConsumerManager
//consumer 列表
ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable
  • consumerTable: 每个 ConsumerGroup 中别离有哪些存活的 Consumer 连贯,别离订阅了哪些 Topic,订阅的每个 Topic 应用什么过滤条件(TAG)。
SubscriptionGroupManager
//ConsumerGroup 表
ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable
  • subscriptionGroupTable: 各 ConsumerGroup 的消费行为特点,例如:生产失败后的最大重试次数;重试队列个数;如果从 MasterBroker 生产迟缓,切换到哪个 Slave Broker 进行生产
TopicConfigManager
//topic 列表
ConcurrentMap<String, TopicConfig> topicConfigTable
  • topicConfigTable: 散布在以后 Broker 上的各 Topic 分片的配置信息,如:蕴含的读 / 写 Queue 的数量;是否有读 / 写权限

二,Broker 启动设计

  1. 创立 BrokerController
    创立 BrokerController 类是在 BrokerStartup#createBrokerController 办法里进行的。先是进行参考解析,完了创立 BrokerController 类,紧接着调用其 initialize 办法,外面的逻辑次要有:
    1)加载 topic,consumer 生产进度,订阅关系与 consumer 过滤的配置,并会加载音讯的日志文件
    2)再创立一个 netty 服务监听 10909 这个 VIP 端口
    3)初始化一系列线程池,而后在 registerProcessor 办法里将这些线程池与处理器进行关联,为当前不同的业务应用不同的线程池,也就是线程隔离
    4)启动一些定时工作,比方记录 Broker 状态,生产进度长久化等
    5)最初进行权限校验初始化和 Rpc 调用钩子相干服务,这些服务加载形式是 Java 的 SPI 形式进行的。
  2. 启动 Broker

     public void start() throws Exception {
         // 启动音讯存储相干的工作
         if (this.messageStore != null) {this.messageStore.start();
         }
         // 启动 broker 服务器
         if (this.remotingServer != null) {this.remotingServer.start();
         }
         // 启动给音讯发送者应用的 netty 服务
         if (this.fastRemotingServer != null) {this.fastRemotingServer.start();
         }
         // 启动监控 SSL 连贯文件的服务
         if (this.fileWatchService != null) {this.fileWatchService.start();
         }
         // 启动内部 API 的客户端
         if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();
         }
         // 启动 pull 模式相干的服务
         if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();
         }
         // 启动心跳检测服务
         if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();
         }
         // 启动音讯过滤服务
         if (this.filterServerManager != null) {this.filterServerManager.start();
         }
         // 如果没启动 DLegerCommitLog,就将 Broker 注册到 NameServer 上
         if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());
             handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
         }
    
         /* 向 namesrv 注册 */
         this.registerBrokerAll(true, false, true);
    
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
             @Override
             public void run() {
                 try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                 } catch (Throwable e) {log.error("registerBrokerAll Exception", e);
                 }
             }
         }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    
         if (this.brokerStatsManager != null) {this.brokerStatsManager.start();
         }
    
         if (this.brokerFastFailure != null) {this.brokerFastFailure.start();
         }
    
    
     }
  • messageStore 服务:解决音讯的存储相干的日志,比方 CommitLog,ConsumeQueue 等
  • remotingServer 服务:解决客户端 producer&consumer 的申请
  • fastRemotingServer 服务:默认端口可能存在多用,可能会造成业务阻塞。新开一个 VIP 端口专门进行音讯解决。不过 4.5 版本之后默认已敞开,是为了妆容之前版本。
  • fileWatchService 服务:启动监控服务连贯时用到的 SSL 连贯文件的服务
  • brokerOuterAPI 服务:RocketMQ 控制台跟 Broker 交互时候的客户端
  • pullRequestHoldService 服务:解决 push 模式生产,或者提早生产的服务
  • clientHousekeepingService 服务:心跳连贯用的服务
  • filterServerManager 服务:过滤音讯服务
  • transactionalMessageCheckService 服务:定期检查和处理事务音讯服务
  • slaveSynchronize 服务:主从路由信息同步服务
  1. netty 服务端的启动
    这里能够参考之前文章三大点 4 小点里的服务端的创立

参考文章:
Broker 局部之 Broker 启动过程 BrokerStartup(2)

退出移动版