设计(design)


1 音讯存储

音讯存储是RocketMQ中最为简单和最为重要的一部分,本节将别离从RocketMQ的音讯存储整体架构、PageCache与Mmap内存映射以及RocketMQ中两种不同的刷盘形式三方面来别离开展叙述。

1.1 音讯存储整体架构

音讯存储架构图中次要有上面三个跟音讯存储相干的文件形成。

(1) CommitLog:音讯主体以及元数据的存储主体,存储Producer端写入的音讯主体内容,音讯内容不是定长的。单个文件大小默认1G ,文件名长度为20位,右边补零,残余为起始偏移量,比方00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。音讯次要是程序写入日志文件,当文件满了,写入下一个文件;

(2) ConsumeQueue:音讯生产队列,引入的目标次要是进步音讯生产的性能,因为RocketMQ是基于主题topic的订阅模式,音讯生产是针对主题进行的,如果要遍历commitlog文件中依据topic检索音讯是十分低效的。Consumer即可依据ConsumeQueue来查找待生产的音讯。其中,ConsumeQueue(逻辑生产队列)作为生产音讯的索引,保留了指定Topic下的队列音讯在CommitLog中的起始物理偏移量offset,音讯大小size和音讯Tag的HashCode值。consumequeue文件能够看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织形式如下:topic/queue/file三层组织构造,具体存储门路为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,别离为8字节的commitlog物理偏移量、4字节的音讯长度、8字节tag hashcode,单个文件由30W个条目组成,能够像数组一样随机拜访每一个条目,每个ConsumeQueue文件大小约5.72M;

(3) IndexFile:IndexFile(索引文件)提供了一种能够通过key或工夫区间来查问音讯的办法。Index文件的存储地位是:$HOME \store\index\${fileName},文件名fileName是以创立时的工夫戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile能够保留 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap构造,故rocketmq的索引文件其底层实现为hash索引。

在下面的RocketMQ的音讯存储整体架构图中能够看出,RocketMQ采纳的是混合型的存储构造,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。RocketMQ的混合型存储构造(多个Topic的音讯实体内容都存储于一个CommitLog中)针对Producer和Consumer别离采纳了数据和索引局部相拆散的存储构造,Producer发送音讯至Broker端,而后Broker端应用同步或者异步的形式对音讯刷盘长久化,保留至CommitLog中。只有音讯被刷盘长久化至磁盘文件CommitLog中,那么Producer发送的音讯就不会失落。正因为如此,Consumer也就必定有机会去生产这条音讯。当无奈拉取到音讯后,能够等下一次音讯拉取,同时服务端也反对长轮询模式,如果一个音讯拉取申请未拉取到音讯,Broker容许期待30s的工夫,只有这段时间内有新音讯达到,将间接返回给生产端。这里,RocketMQ的具体做法是,应用Broker端的后盾服务线程—ReputMessageService不停地散发申请并异步构建ConsumeQueue(逻辑生产队列)和IndexFile(索引文件)数据。

1.2 页缓存与内存映射

页缓存(PageCache)是OS对文件的缓存,用于减速对文件的读写。一般来说,程序对文件进行程序读写的速度简直靠近于内存的读写速度,次要起因就是因为OS应用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的形式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时呈现未命中PageCache的状况,OS从物理磁盘上拜访读取文件的同时,会程序对其余相邻块的数据文件进行预读取。

在RocketMQ中,ConsumeQueue逻辑生产队列存储的数据较少,并且是程序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能简直靠近读内存,即便在有音讯沉积状况下也不会影响性能。而对于CommitLog音讯存储的日志数据文件来说,读取音讯内容时候会产生较多的随机拜访读取,重大影响性能。如果抉择适合的零碎IO调度算法,比方设置调度算法为“Deadline”(此时块存储采纳SSD的话),随机读的性能也会有所晋升。

另外,RocketMQ次要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件间接映射到用户态的内存地址中(这种Mmap的形式缩小了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为间接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为须要应用内存映射机制,故RocketMQ的文件存储都应用定长构造来存储,不便一次将整个文件映射至内存)。

1.3 音讯刷盘

(1) 同步刷盘:如上图所示,只有在音讯真正长久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个胜利的ACK响应。同步刷盘对MQ音讯可靠性来说是一种不错的保障,然而性能上会有较大影响,个别实用于金融业务利用该模式较多。

(2) 异步刷盘:可能充分利用OS的PageCache的劣势,只有音讯写入PageCache即可将胜利的ACK返回给Producer端。音讯刷盘采纳后盾异步线程提交的形式进行,升高了读写提早,进步了MQ的性能和吞吐量。

2 通信机制

RocketMQ音讯队列集群次要包含NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,根本通信流程如下:

(1) Broker启动后须要实现一次将本人注册至NameServer的操作;随后每隔30s工夫定时向NameServer上报Topic路由信息。

(2) 音讯生产者Producer作为客户端发送音讯时候,须要依据音讯的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上从新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息。

(3) 音讯生产者Producer依据2)中获取的路由信息抉择一个队列(MessageQueue)进行音讯发送;Broker作为音讯的接收者接管音讯并落盘存储。

(4) 音讯消费者Consumer依据2)中获取的路由信息,并再实现客户端的负载平衡后,抉择其中的某一个或者某几个音讯队列来拉取音讯并进行生产。

从下面1)~3)中能够看出在音讯生产者, Broker和NameServer之间都会产生通信(这里只说了MQ的局部通信),因而如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的音讯传输能力与最终的性能。

rocketmq-remoting 模块是 RocketMQ音讯队列中负责网络通信的模块,它简直被其余所有须要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和援用。为了实现客户端与服务器之间高效的数据申请与接管,RocketMQ音讯队列自定义了通信协议并在Netty的根底之上扩大了通信模块。

2.1 Remoting通信类构造

2.2 协定设计与编解码

在Client和Server之间实现一次音讯发送时,须要对发送的音讯进行一个协定约定,因而就有必要自定义RocketMQ的音讯协定。同时,为了高效地在网络中传输音讯和对收到的音讯读取,就须要对音讯进行编解码。在RocketMQ中,RemotingCommand这个类在音讯传输过程中对所有数据内容的封装,岂但蕴含了所有的数据结构,还蕴含了编码解码操作。

Header字段类型Request阐明Response阐明
codeint申请操作码,应答方依据不同的申请码进行不同的业务解决应答响应码。0示意胜利,非0则示意各种谬误
languageLanguageCode申请方实现的语言应答方实现的语言
versionint申请方程序的版本应答方程序的版本
opaqueint相当于requestId,在同一个连贯上的不同申请标识码,与响应音讯中的绝对应应答不做批改间接返回
flagint辨别是一般RPC还是onewayRPC得标记辨别是一般RPC还是onewayRPC得标记
remarkString传输自定义文本信息传输自定义文本信息
extFieldsHashMap<String, String>申请自定义扩大信息响应自定义扩大信息

可见传输内容次要能够分为以下4局部:

(1) 音讯长度:总长度,四个字节存储,占用一个int类型;

(2) 序列化类型&音讯头长度:同样占用一个int类型,第一个字节示意序列化类型,前面三个字节示意音讯头长度;

(3) 音讯头数据:通过序列化后的音讯头数据;

(4) 音讯主体数据:音讯主体的二进制字节数据内容;

2.3 音讯的通信形式和流程

在RocketMQ音讯队列中反对通信的形式次要有同步(sync)、异步(async)、单向(oneway)
三种。其中“单向”通信模式绝对简略,个别用在发送心跳包场景下,无需关注其Response。这里,次要介绍RocketMQ的异步通信流程。

2.4 Reactor多线程设计

RocketMQ的RPC通信采纳Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩大和优化。

下面的框图中能够大抵理解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为下面的1)负责监听 TCP网络连接申请,建设好连贯,创立SocketChannel,并注册到selector上。RocketMQ的源码中会主动依据OS的类型抉择NIO和Epoll,也能够通过参数配置),而后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为下面的“N”,源码中默认设置为3),在真正执行业务逻辑之前须要进行SSL验证、编解码、闲暇查看、网络连接治理,这些工作交给defaultEventExecutorGroup(即为下面的“M1”,源码中默认设置为8)去做。而解决业务操作放在业务线程池中执行,依据 RomotingCommand 的业务申请码code去processorTable这个本地缓存变量中找到对应的 processor,而后封装成task工作后,提交给对应的业务processor解决线程池来执行(sendMessageExecutor,以发送音讯为例,即为下面的 “M2”)。从入口到业务逻辑的几个步骤中线程池始终再减少,这跟每一步逻辑复杂性相干,越简单,须要的并发通道越宽。

线程数线程名线程具体阐明
1NettyBoss_%dReactor 主线程
NNettyServerEPOLLSelector_%d_%dReactor 线程池
M1NettyServerCodecThread_%dWorker线程池
M2RemotingExecutorThread_%d业务processor解决线程池

3 音讯过滤

RocketMQ分布式音讯队列的音讯过滤形式有别于其它MQ中间件,是在Consumer端订阅音讯时再做音讯过滤的。RocketMQ这么做是在于其Producer端写入音讯和Consumer端订阅音讯采纳拆散存储的机制来实现的,Consumer端订阅音讯是须要通过ConsumeQueue这个音讯生产的逻辑队列拿到一个索引,而后再从CommitLog外面读取真正的音讯实体内容,所以说到底也是还绕不开其存储构造。其ConsumeQueue的存储构造如下,能够看到其中有8个字节存储的Message Tag的哈希值,基于Tag的音讯过滤正式基于这个字段值的。

次要反对如下2种的过滤形式
(1) Tag过滤形式:Consumer端在订阅音讯时除了指定Topic还能够指定TAG,如果一个音讯有多个TAG,能够用||分隔。其中,Consumer端会将这个订阅申请构建成一个 SubscriptionData,发送一个Pull音讯的申请给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,而后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的音讯tag hash值去做过滤,因为在服务端只是依据hashcode进行判断,无奈准确对tag原始字符串进行过滤,故在音讯生产端拉取到音讯后,还须要对音讯的原始tag字符串进行比对,如果不同,则抛弃该音讯,不进行音讯生产。

(2) SQL92的过滤形式:这种形式的大抵做法和下面的Tag过滤形式一样,只是在Store层的具体过滤过程不太一样,真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。每次过滤都去执行SQL表达式会影响效率,所以RocketMQ应用了BloomFilter防止了每次都去执行。SQL92的表达式上下文为音讯的属性。

4 负载平衡

RocketMQ中的负载平衡都在Client端实现,具体来说的话,次要能够分为Producer端发送音讯时候的负载平衡和Consumer端订阅音讯的负载平衡。

4.1 Producer的负载平衡

Producer端在发送音讯的时候,会先依据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认形式下selectOneMessageQueue()办法会从TopicPublishInfo中的messageQueueList中抉择一个队列(MessageQueue)进行发送音讯。具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的根底上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按肯定的工夫做退却。例如,如果上次申请的latency超过550Lms,就退却3000Lms;超过1000L,就退却60000L;如果敞开,采纳随机递增取模的形式抉择一个队列(MessageQueue)来发送音讯,latencyFaultTolerance机制是实现音讯发送高可用的外围关键所在。

4.2 Consumer的负载平衡

在RocketMQ中,Consumer端的两种生产模式(Push/Pull)都是基于拉模式来获取音讯的,而在Push模式只是对pull模式的一种封装,其本质实现为音讯拉取线程在从服务器拉取到一批音讯后,而后提交到音讯生产线程池后,又“快马加鞭”的持续向服务器再次尝试拉取音讯。如果未拉取到音讯,则提早一下又持续拉取。在两种基于拉模式的生产形式(Push/Pull)中,均须要Consumer端在晓得从Broker端的哪一个音讯队列—队列中去获取音讯。因而,有必要在Consumer端来做负载平衡,即Broker端中多个MessageQueue调配给同一个ConsumerGroup中的哪些Consumer生产。

1、Consumer端的心跳包发送

在Consumer启动后,它就会通过定时工作一直地向RocketMQ集群中的所有Broker实例发送心跳包(其中蕴含了,音讯生产分组名称、订阅关系汇合、音讯通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳音讯后,会将它保护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保留在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载平衡提供能够根据的元数据信息。

2、Consumer端实现负载平衡的外围类—RebalanceImpl

在Consumer实例的启动流程中的启动MQClientInstance实例局部,会实现负载平衡服务线程—RebalanceService的启动(每隔20s执行一次)。通过查看源码能够发现,RebalanceService线程的run()办法最终调用的是RebalanceImpl类的rebalanceByTopic()办法,该办法是实现Consumer端负载平衡的外围。这里,rebalanceByTopic()办法会依据消费者通信类型为“播送模式”还是“集群模式”做不同的逻辑解决。这里次要来看下集群模式下的次要解决流程:

(1) 从rebalanceImpl实例的本地缓存变量—topicSubscribeInfoTable中,获取该Topic主题下的音讯生产队列汇合(mqSet);

(2) 依据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()办法向Broker端发送获取该生产组下消费者Id列表的RPC通信申请(Broker端基于后面Consumer端上报的心跳包数据而构建的consumerTable做出响应返回,业务申请码:GET_CONSUMER_LIST_BY_GROUP);

(3) 先对Topic下的音讯生产队列、消费者Id排序,而后用音讯队列调配策略算法(默认为:音讯队列的平均分配算法),计算出待拉取的音讯队列。这里的平均分配算法,相似于分页的算法,将所有MessageQueue排好序相似于记录,将所有生产端Consumer排好序相似页数,并求出每一页须要蕴含的均匀size和每个页面记录的范畴range,最初遍历整个range而计算出以后Consumer端应该调配到的记录(这里即为:MessageQueue)。

(4) 而后,调用updateProcessQueueTableInRebalance()办法,具体的做法是,先将调配到的音讯队列汇合(mqSet)与processQueueTable做一个过滤比对。

  • 上图中processQueueTable标注的红色局部,示意与调配到的音讯队列汇合mqSet互不蕴含。将这些队列设置Dropped属性为true,而后查看这些队列是否能够移除出processQueueTable缓存变量,这里具体执行removeUnnecessaryMessageQueue()办法,即每隔1s 查看是否能够获取以后生产解决队列的锁,拿到的话返回true。如果期待1s后,依然拿不到以后生产解决队列的锁则返回false。如果返回true,则从processQueueTable缓存变量中移除对应的Entry;
  • 上图中processQueueTable的绿色局部,示意与调配到的音讯队列汇合mqSet的交加。判断该ProcessQueue是否曾经过期了,在Pull模式的不必管,如果是Push模式的,设置Dropped属性为true,并且调用removeUnnecessaryMessageQueue()办法,像下面一样尝试移除Entry;

最初,为过滤后的音讯队列汇合(mqSet)中的每个MessageQueue创立一个ProcessQueue对象并存入RebalanceImpl的processQueueTable队列中(其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)办法获取该MessageQueue对象的下一个进度生产值offset,随后填充至接下来要创立的pullRequest对象属性中),并创立拉取申请对象—pullRequest增加到拉取列表—pullRequestList中,最初执行dispatchPullRequest()办法,将Pull音讯的申请对象PullRequest顺次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待该服务线程取出后向Broker端发动Pull音讯的申请。其中,能够重点比照下,RebalancePushImpl和RebalancePullImpl两个实现类的dispatchPullRequest()办法不同,RebalancePullImpl类外面的该办法为空,这样子也就答复了上一篇中最初的那道思考题了。

音讯生产队列在同一生产组不同消费者之间的负载平衡,其外围设计理念是在一个音讯生产队列在同一时间只容许被同一生产组内的一个消费者生产,一个音讯消费者能同时生产多个音讯队列。

5 事务音讯

Apache RocketMQ在4.3.0版中曾经反对分布式事务音讯,这里RocketMQ采纳了2PC的思维来实现了提交事务音讯,同时减少一个弥补逻辑来解决二阶段超时或者失败的音讯,如下图所示。

5.1 RocketMQ事务音讯流程概要

上图阐明了事务音讯的大抵计划,其中分为两个流程:失常事务音讯的发送及提交、事务音讯的弥补流程。

1.事务音讯发送及提交:

(1) 发送音讯(half音讯)。

(2) 服务端响应音讯写入后果。

(3) 依据发送后果执行本地事务(如果写入失败,此时half音讯对业务不可见,本地逻辑不执行)。

(4) 依据本地事务状态执行Commit或者Rollback(Commit操作生成音讯索引,音讯对消费者可见)

2.弥补流程:

(1) 对没有Commit/Rollback的事务音讯(pending状态的音讯),从服务端发动一次“回查”

(2) Producer收到回查音讯,查看回查音讯对应的本地事务的状态

(3) 依据本地事务状态,从新Commit或者Rollback

其中,弥补阶段用于解决音讯Commit或者Rollback产生超时或者失败的状况。

5.2 RocketMQ事务音讯设计

1.事务音讯在一阶段对用户不可见

在RocketMQ事务音讯的次要流程中,一阶段的音讯如何对用户不可见。其中,事务音讯绝对一般音讯最大的特点就是一阶段发送的音讯对用户是不可见的。那么,如何做到写入音讯然而对用户不可见呢?RocketMQ事务音讯的做法是:如果音讯是half音讯,将备份原音讯的主题与音讯生产队列,而后扭转主题为RMQ_SYS_TRANS_HALF_TOPIC。因为生产组未订阅该主题,故生产端无奈生产half类型的音讯,而后RocketMQ会开启一个定时工作,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取音讯进行生产,依据生产者组获取一个服务提供者发送回查事务状态申请,依据事务状态来决定是提交或回滚音讯。

在RocketMQ中,音讯在服务端的存储构造如下,每条音讯都会有对应的索引信息,Consumer通过ConsumeQueue这个二级索引来读取音讯实体内容,其流程如下:

RocketMQ的具体实现策略是:写入的如果事务音讯,对音讯的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到音讯的属性中,正因为音讯主题被替换,故音讯并不会转发到该原主题的音讯生产队列,消费者无奈感知音讯的存在,不会生产。其实扭转音讯主题是RocketMQ的罕用“套路”,回忆一下延时音讯的实现机制。

2.Commit和Rollback操作以及Op音讯的引入

在实现一阶段写入一条对用户不可见的音讯后,二阶段如果是Commit操作,则须要让音讯对用户可见;如果是Rollback则须要撤销一阶段的音讯。先说Rollback的状况。对于Rollback,自身一阶段的音讯对用户是不可见的,其实不须要真正撤销音讯(实际上RocketMQ也无奈去真正的删除一条音讯,因为是程序写文件的)。然而区别于这条音讯没有确定状态(Pending状态,事务悬而未决),须要一个操作来标识这条音讯的最终状态。RocketMQ事务音讯计划中引入了Op音讯的概念,用Op音讯标识事务音讯曾经确定的状态(Commit或者Rollback)。如果一条事务音讯没有对应的Op音讯,阐明这个事务的状态还无奈确定(可能是二阶段失败了)。引入Op音讯后,事务音讯无论是Commit或者Rollback都会记录一个Op操作。Commit绝对于Rollback只是在写入Op音讯前创立Half音讯的索引。

3.Op音讯的存储和对应关系

RocketMQ将Op音讯写入到全局一个特定的Topic中通过源码中的办法—TransactionalMessageUtil.buildOpTopic();这个Topic是一个外部的Topic(像Half音讯的Topic一样),不会被用户生产。Op音讯的内容为对应的Half音讯的存储的Offset,这样通过Op音讯能索引到Half音讯进行后续的回查操作。

4.Half音讯的索引构建

在执行二阶段Commit操作时,须要构建出Half音讯的索引。一阶段的Half音讯因为是写到一个非凡的Topic,所以二阶段构建索引时须要读取出Half音讯,并将Topic和Queue替换成真正的指标的Topic和Queue,之后通过一次一般音讯的写入操作来生成一条对用户可见的音讯。所以RocketMQ事务音讯二阶段其实是利用了一阶段存储的音讯的内容,在二阶段时复原出一条残缺的一般音讯,而后走一遍音讯写入流程。

5.如何解决二阶段失败的音讯?

如果在RocketMQ事务音讯的二阶段过程中失败了,例如在做Commit操作时,呈现网络问题导致Commit失败,那么须要通过肯定的策略使这条音讯最终被Commit。RocketMQ采纳了一种弥补机制,称为“回查”。Broker端对未确定状态的音讯发动回查,将音讯发送到对应的Producer端(同一个Group的Producer),由Producer依据音讯来查看本地事务的状态,进而执行Commit或者Rollback。Broker端通过比照Half音讯和Op音讯进行事务音讯的回查并且推动CheckPoint(记录那些事务音讯的状态是确定的)。

值得注意的是,rocketmq并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无奈得悉事务状态,rocketmq默认回滚该音讯。

6 音讯查问

RocketMQ反对依照上面两种维度(“依照Message Id查问音讯”、“依照Message Key查问音讯”)进行音讯查问。

6.1 依照MessageId查问音讯

RocketMQ中的MessageId的长度总共有16字节,其中蕴含了音讯存储主机地址(IP地址和端口),音讯Commit Log offset。“依照MessageId查问音讯”在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC申请后通过Remoting通信层发送(业务申请码:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,读取音讯的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个残缺的音讯返回。

6.2 依照Message Key查问音讯

“依照Message Key查问音讯”,次要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑构造,相似JDK中HashMap的实现。索引文件的具体构造如下:

IndexFile索引文件为用户提供通过“依照Message Key查问音讯”的音讯索引查问服务,IndexFile文件的存储地位是:$HOME\store\index\${fileName},文件名fileName是以创立时的工夫戳命名的,文件大小是固定的,等于40+500W*4+2000W*20= 420000040个字节大小。如果音讯的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果音讯设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

其中的索引数据蕴含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即后面读出来的 slotValue,如果有 hash抵触,就能够用这个字段将所有抵触的索引用链表的形式串起来了。Timestamp记录的是音讯storeTimestamp之间的差,并不是一个相对的工夫。整个Index File的构造如图,40 Byte 的Header用于保留一些总的统计信息,4*500W的 Slot Table并不保留真正的索引数据,而是保留每个槽位对应的单向链表的头。20*2000W 是真正的索引数据,即一个 Index File 能够保留 2000W个索引。

“依照Message Key查问音讯”的形式,RocketMQ的具体做法是,次要通过Broker端的QueryMessageProcessor业务处理器来查问,读取音讯的过程就是用topic和key找到IndexFile索引文件中的一条记录,依据其中的commitLog offset从CommitLog文件中读取音讯的实体内容。

本文由mdnice多平台公布