下面是从官网上找的一张图,producer发送音讯后,由broker生成CommitLog与ConsumerQueue文件,而后consumer依据ConsumerQueue里获取音讯在commitLog里的起始物理地址+message size后,拿到音讯进行生产。
上面别离介绍下CommitLog,ConsumerQueue与IndexFile这三个文件。
一,CommitLog文件
存储Producer端写入的音讯主体内容,音讯内容不是定长的。单个文件大小默认1G ,文件名长度为20位,右边补零,残余为起始偏移量,这里的偏移量也就是文件大小的字节示意,这样的益处是只有咱们晓得了音讯的偏移量就能很快晓得该音讯在哪个文件里了。假如某音讯物理偏移量是1073741830,则绝对的偏移量是6(6 = 1073741830 - 1073741824),于是判断出该音讯位于第二个commitLog文件上。
它有上面几个个性:
- 程序写入,一个文件写满再写下一个
- 刷盘前会有一个mappedFile内存映射文件,音讯是先写入到这个内存映射文件中,而后依据刷盘策略写到硬盘中
- header+body格局,其中header是定长的,记录了音讯的长度
- 读取音讯时先解析出header,从中获取到音讯长度,接着读取音讯体
二,ConsumerQueue文件
在介绍ConsumerQueue文件之前须要先提一下生产进度,它是Broker治理每个一个消费者生产topic的进度。这个进度有可能是失常生产后产生的进度,也可能是重置的生产进度,这两种情景下,消费者都会上报进度而后Broker进行记录。之所以要治理生产进度是为了确保消费者在失常状态,重启,异样敞开状况下能精确的接着上一次生产的进度进行生产,也就是确保音讯能够‘至多生产一次’,所以消费者须要有幂等措施确保不会反复生产,这个后续再提。
再回到ConsumerQueue文件上,这个文件里记录着某个音讯投递到某个队列里的地位信息,咱们晓得音讯是存在CommitLog文件里的,但必须要先获取音讯的偏移量而后再依据偏移量去CommitLog里进行查问,而音讯的偏移量是记录在ConsumerQueue文件里的,也能够这样了解:ConsumerQueue是CommitLog的一个索引文件。
ConsumerQueue是依照topic维度存储的,每个topic默认4个队列,外面寄存的consumequeue文件。
外面记录了一个Topic下的队列里音讯在CommitLog中的起始物理偏移量offset,音讯大小size和音讯Tag的HashCode值。
它有如下几个个性:
- 每个topic默认为4个队列
- 单个队列下最大可有30W个条目,每个ConsumeQueue文件(条目)大小约5.72M
- consumequeue文件采取定长设计,共20个字节。使之能够应用相似拜访数组的形式疾速定位数据。
ConsumerQueue不存音讯的tag而是存tag的hashCode次要是为了保障条目标固定长度。
这样咱们定位一条音讯的流程就有2步:
- 先读ConsumeQueue失去音讯在CommitLog中所在的offset
- 再通过offset找到CommitLog对应的音讯内容
消费者通过broker保留的offset(offsetTable.offset json文件中保留的ConsumerQueue的下标)能够在ConsumeQueue中获取音讯,下一章再具体写。
ConsumerQueue除了根本的音讯检索外还有两个作用:
- 通过tag过滤音讯。过滤tag是也是通过遍历ConsumeQueue来实现的(先比拟hash(tag)符合条件的再到consumer比拟tag原文)
- ConsumeQueue还能借助于操作系统的PageCache进行缓存晋升检索性能
三,IndexFile文件
音讯还能够通过key或工夫进行检索,当然咱们也不可能间接从CommitLog里查寻,而是须要借助IndexFile。
IndexFile里的文件名fileName是以创立时的工夫戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile能够保留 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap构造,故rocketmq的索引文件其底层实现为hash索引。
- IndexFile生成原理
每当一个新的音讯的index进来,首先取MessageKey的hashCode,而后用hashCode对slot总数取模,失去应该放到哪个slot中,slot总数零碎默认500W个。只有是取hash就必然面临hash抵触的问题,跟HashMap一样,IndexFile也是应用一个链表构造来解决hash抵触。只是这里跟HashMap略微有点区别的中央是,slot中放的是最新index的指针,也就是发生冲突后最新的放在slot里,这个是因为个别查问的时候必定是优先查最近的音讯。
每个slot中放的指针值是索引在indexFile中的偏移量,每个索引大小是20字节,所以依据以后索引是这个文件中的第几个(偏移量),就很容易定位到索引的地位。而后每个索引都保留了跟它同一个slot的前一个索引的地位,以此类推造成一个链表的构造。 - IndexFil组成
IndexFile由三局部组成:
1)索引文件由索引文件头IndexHeader。头文件由40个字节的数据组成,次要内容有:
//8位 该索引文件的第一个音讯(Message)的存储工夫(落盘工夫)this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());//8位 该索引文件的最初一个音讯(Message)的存储工夫(落盘工夫)this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());//8位 该索引文件第一个音讯(Message)的在CommitLog(音讯存储文件)的物理地位偏移量(能够通过该物理偏移间接获取到该音讯)this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());//8位 该索引文件最初一个音讯(Message)的在CommitLog(音讯存储文件)的物理地位偏移量this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());//4位 该索引文件目前的hash slot的个数this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());//4位 索引文件目前的索引个数this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
2)槽位Slot
紧临着IndexHeader,默认slot是500万个,每个固定大小为4byte,slot中存着一个int值,示意以后slot下最新的一个index序号。
在计算对应的槽位时,会先算出MessageKey的hashCode,而后用Hashcode对slot的总数进行取模,决定该音讯key的地位,slot的总数默认是500W个。
只有取hash就必然面临着hash抵触的问题,indexfile也是采纳链表构造来解决hash抵触(留神,500w个slot很大,另外抵触的情景个别不会很大,所以没有应用红黑树)。slot的值对应以后slot下最新的那个index的序号,index中存储了以后slot下、以后index的前一个index序号,这就把slot下的所有index链起来了
//slot的数据寄存地位 40 + keyHash %(500W)* 4int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//Slot Table 4字节//记录该slot以后index,如果hash抵触(即absSlotPos统一)作为下一次该slot新增的前置indexthis.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
3)音讯的索引内容
//Index Linked list//topic+message key的hash值this.mappedByteBuffer.putInt(absIndexPos, keyHash);//音讯在CommitLog的物理文件地址, 能够间接查问到该音讯(索引的外围机制)this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//音讯的落盘工夫与header里的beginTimestamp的差值(为了节俭存储空间,如果间接存message的落盘工夫就得8bytes)this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//9、记录该slot上一个index//hash抵触解决的要害之处, 雷同hash值上一个音讯索引的index(如果以后音讯索引是该hash值的第一个索引,则prevIndex=0, 也是音讯索引查找时的进行条件),每个slot地位的第一个音讯的prevIndex就是0的this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
四,查问流程
因为indexHeader,slot,index都是固定大小,所以:
- 公式1:第n个slot在indexFile中的起始地位是这样:40+(n-1)*4
- 公式2:第s个index在indexFile中的起始地位是这样:40+50000004+(s-1)20
- 查问的传入值除了key外,还蕴含一个工夫起始值以及截止值,为啥还要传工夫范畴呢?
一个indexFile写完一个会持续写下一个,仅仅一个key无奈定位到具体的indexFile,工夫范畴就为了更准确的定位到具体的indexFile,放大查找的范畴,indexFile文件名是一个工夫戳,依据这个日期就能够定位到传入的日期范畴对应在哪个或者哪些indexFile中。
所以整体的查问流程如下:
key-->计算hash值-->hash值%500w,算出对应的slot序号-->依据40+(n-1)4(公式1)算出该slot在文件中的地位-->读取slot值,也就是index序号-->依据40+50000004+(s-1)*20(公式2)算出该index在文件中的地位-->读取该index-->将key的hash值以及传入的工夫范畴与index的keyHash值以及timeDiff值进行比对。不满足则依据index中的preIndexNo找到上一个index,持续上一步;满足则依据index中的phyOffset拿到commitLog中的音讯
为啥比对时还要带上工夫范畴呢?只比key不行吗?答案是不行,因为key可能会反复,producer在音讯生产时能够指定音讯的key,这个key显然无奈保障唯一性,那主动生成的msgId呢?也不能保障惟一。
五,构建两个索引文件过程
IndexFile两个索引文件的构建是放在同一个后台任务中ReputMessageService中的,具体流程如下:
indexFile的索引构建流程如下:
- 拿到音讯的msgId-->hash值计算-->对500万取余计算对应的slot序号-->依据40+(n-1)*4算出该slot的文件地位-->读取slot值,也就是index序号
- 追加写入一条Index数据,keyHash、phyOffset、timeDiff不必多说,preIndexNo咱们说了是前一个index的序号,也就是slot的以后值,如果没有值,阐明该slot下没有index
- 更新slot值为插入的index的序号(更新前存的是上一个Index的序号或者空,更新后存的是新插入的Index的序号)
- 更新IndexHeader中的endTimestamp、endPhyOffset、indexCount、hashSlotCount(这一项可能不会更新)
如果音讯设置了一个或多个key属性,则反复下面的过程,构建索引。
1)构建index索引代码如下:
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { //1. 判断该索引文件的索引数小于最大的索引数,如果>=最大索引数,IndexService就会尝试新建一个索引文件 if (this.indexHeader.getIndexCount() < this.indexNum) { //2. 计算该message key的hash值 int keyHash = indexKeyHashMethod(key); //3. 依据message key的hash值散列到某个hash slot里 int slotPos = keyHash % this.hashSlotNum; //4. 计算失去该hash slot的理论文件地位Position int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; try { //5. 依据该hash slot的理论文件地位absSlotPos失去slot里的值 //这里有两种状况: //1). slot=0, 以后message的key是该hash值第一个音讯索引 //2). slot>0, 该key hash值上一个音讯索引的地位 int slotValue = this.mappedByteBuffer.getInt(absSlotPos); //6. 数据校验及修改 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; } long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } //7. 计算以后音讯索引具体的存储地位(Append模式) int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; //8. 存入该音讯索引 this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); //9. 要害之处:在该key hash slot处存入以后音讯索引的地位,下次通过该key进行搜寻时 //会找到该key hash slot -> slot value -> curIndex -> //if(curIndex.prevIndex>0) pre index (始终循环 直至该curIndex.prevIndex==0就进行) this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); } this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); } return false; }
2)indexfile的索引搜寻代码如下:
搜寻的是音讯的物理偏移量
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { //1. 计算该key的hash int keyHash = indexKeyHashMethod(key); //2. 计算该hash value 对应的hash slot地位 int slotPos = keyHash % this.hashSlotNum; //3. 计算该hash value 对应的hash slot物理文件地位 int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { if (lock) { // fileLock = this.fileChannel.lock(absSlotPos, // hashSlotSize, true); } //4. 取出该hash slot 的值 int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // if (fileLock != null) { // fileLock.release(); // fileLock = null; // } //5. 该slot value <= 0 就代表没有该key对应的音讯索引,间接完结搜寻 //该slot value > maxIndexCount 就代表该key对应的音讯索引超过最大 //限度,数据有误,间接完结搜寻 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { } else { 6. 从以后slot value 开始搜寻 for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; } 7. 找到以后slot value(也就是index count)物理文件地位 int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; 8. 读取音讯索引数据 int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); //9. 获取该音讯索引的上一个音讯索引index(能够看成链表的prev 指向上一个链节点的援用) int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); 10. 数据校验 if (timeDiff < 0) { break; } timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); //10. 数据校验比对 hash值和落盘工夫 if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); } //当prevIndex <= 0 或prevIndex > maxIndexCount 或prevIndexRead == nextIndexToRead 或 timeRead < begin 进行搜寻 if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; } nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } this.mappedFile.release(); } } }
参考文章:通过这三个文件彻底搞懂rocketmq的存储原理
rocketMq存储模型_indexFile