乐趣区

关于rocketmq:RocketMQ学习十消息存储


下面是从官网上找的一张图,producer 发送音讯后,由 broker 生成 CommitLog 与 ConsumerQueue 文件,而后 consumer 依据 ConsumerQueue 里获取音讯在 commitLog 里的起始物理地址 +message size 后,拿到音讯进行生产。
上面别离介绍下 CommitLog,ConsumerQueue 与 IndexFile 这三个文件。

一,CommitLog 文件

存储 Producer 端写入的音讯主体内容, 音讯内容不是定长的。单个文件大小默认 1G,文件名长度为 20 位,右边补零,残余为起始偏移量,这里的偏移量也就是文件大小的字节示意,这样的益处是只有咱们晓得了音讯的偏移量就能很快晓得该音讯在哪个文件里了。假如某音讯物理偏移量是 1073741830,则绝对的偏移量是 6(6 = 1073741830 – 1073741824),于是判断出该音讯位于第二个 commitLog 文件上。

它有上面几个个性:

  • 程序写入,一个文件写满再写下一个
  • 刷盘前会有一个 mappedFile 内存映射文件,音讯是先写入到这个内存映射文件中,而后依据刷盘策略写到硬盘中
  • header+body 格局,其中 header 是定长的,记录了音讯的长度
  • 读取音讯时先解析出 header,从中获取到音讯长度,接着读取音讯体

二,ConsumerQueue 文件

在介绍 ConsumerQueue 文件之前须要先提一下 生产进度 ,它是 Broker 治理每个一个消费者生产 topic 的进度。 这个进度有可能是失常生产后产生的进度,也可能是重置的生产进度 ,这两种情景下,消费者都会上报进度而后 Broker 进行记录。之所以要治理生产进度是为了确保消费者在失常状态,重启,异样敞开状况下能精确的接着上一次生产的进度进行生产,也就是确保音讯能够‘至多生产一次’,所以消费者须要有幂等措施确保不会反复生产,这个后续再提。
再回到 ConsumerQueue 文件上,这个文件里记录着某个音讯投递到某个队列里的地位信息,咱们晓得音讯是存在 CommitLog 文件里的,但必须要先获取音讯的偏移量而后再依据偏移量去 CommitLog 里进行查问,而音讯的偏移量是记录在 ConsumerQueue 文件里的,也能够这样了解:ConsumerQueue 是 CommitLog 的一个索引文件。

ConsumerQueue 是依照 topic 维度存储的,每个 topic 默认 4 个队列,外面寄存的 consumequeue 文件。

外面记录了一个 Topic 下的队列里音讯在 CommitLog 中的起始物理偏移量 offset,音讯大小 size 和音讯 Tag 的 HashCode 值。

它有如下几个个性:

  • 每个 topic 默认为 4 个队列
  • 单个队列下最大可有 30W 个条目,每个 ConsumeQueue 文件 (条目) 大小约 5.72M
  • consumequeue 文件采取定长设计,共 20 个字节。使之能够应用相似拜访数组的形式疾速定位数据。
    ConsumerQueue 不存音讯的 tag 而是存 tag 的 hashCode 次要是为了保障条目标固定长度。

这样咱们定位一条音讯的流程就有 2 步:

  1. 先读 ConsumeQueue 失去音讯在 CommitLog 中所在的 offset
  2. 再通过 offset 找到 CommitLog 对应的音讯内容

消费者通过 broker 保留的 offset(offsetTable.offset json 文件中保留的 ConsumerQueue 的下标)能够在 ConsumeQueue 中获取音讯,下一章再具体写。

ConsumerQueue 除了根本的音讯检索外还有两个作用:

  1. 通过 tag 过滤音讯。过滤 tag 是也是通过遍历 ConsumeQueue 来实现的(先比拟 hash(tag)符合条件的再到 consumer 比拟 tag 原文)
  2. ConsumeQueue 还能借助于操作系统的 PageCache 进行缓存晋升检索性能

三,IndexFile 文件
音讯还能够通过 key 或工夫进行检索,当然咱们也不可能间接从 CommitLog 里查寻,而是须要借助 IndexFile。
IndexFile 里的文件名 fileName 是以创立时的工夫戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 能够保留 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 构造,故 rocketmq 的索引文件其底层实现为 hash 索引。

  1. IndexFile 生成原理
    每当一个新的音讯的 index 进来,首先取 MessageKey 的 hashCode,而后用 hashCode 对 slot 总数取模,失去应该放到哪个 slot 中,slot 总数零碎 默认 500W 个 。只有是取 hash 就必然面临 hash 抵触的问题,跟 HashMap 一样,IndexFile 也是应用一个链表构造来解决 hash 抵触。只是这里跟 HashMap 略微有点区别的中央是,slot 中放的是最新 index 的指针,也就是发生冲突后最新的放在 slot 里,这个是因为个别查问的时候必定是优先查最近的音讯。
    每个 slot 中放的指针值是索引在 indexFile 中的偏移量,每个索引大小是 20 字节,所以依据以后索引是这个文件中的第几个(偏移量),就很容易定位到索引的地位。而后每个索引都保留了跟它同一个 slot 的前一个索引的地位,以此类推造成一个链表的构造。
  2. IndexFil 组成

IndexFile 由三局部组成:
1)索引文件由索引文件头 IndexHeader。头文件由 40 个字节的数据组成,次要内容有:

// 8 位 该索引文件的第一个音讯 (Message) 的存储工夫(落盘工夫)
this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());
// 8 位 该索引文件的最初一个音讯 (Message) 的存储工夫(落盘工夫)
this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());
// 8 位 该索引文件第一个音讯 (Message) 的在 CommitLog(音讯存储文件)的物理地位偏移量(能够通过该物理偏移间接获取到该音讯)
this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());
// 8 位 该索引文件最初一个音讯 (Message) 的在 CommitLog(音讯存储文件)的物理地位偏移量
this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());
// 4 位 该索引文件目前的 hash slot 的个数
this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());
// 4 位 索引文件目前的索引个数
this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());

2)槽位 Slot
紧临着 IndexHeader,默认 slot 是 500 万个,每个固定大小为 4byte,slot 中存着一个 int 值,示意以后 slot 下最新的一个 index 序号。
在计算对应的槽位时,会先算出 MessageKey 的 hashCode, 而后用 Hashcode 对 slot 的总数进行取模,决定该音讯 key 的地位,slot 的总数默认是 500W 个。
只有取 hash 就必然面临着 hash 抵触的问题,indexfile 也是采纳链表构造来解决 hash 抵触(留神,500w 个 slot 很大,另外抵触的情景个别不会很大,所以没有应用红黑树)。slot 的值对应以后 slot 下最新的那个 index 的序号,index 中存储了以后 slot 下、以后 index 的前一个 index 序号,这就把 slot 下的所有 index 链起来了

//slot 的数据寄存地位 40 + keyHash %(500W)* 4
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
//Slot Table 4 字节
// 记录该 slot 以后 index,如果 hash 抵触(即 absSlotPos 统一)作为下一次该 slot 新增的前置 index
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

3)音讯的索引内容

//Index Linked list
//topic+message key 的 hash 值
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
// 音讯在 CommitLog 的物理文件地址, 能够间接查问到该音讯(索引的外围机制)
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
// 音讯的落盘工夫与 header 里的 beginTimestamp 的差值(为了节俭存储空间,如果间接存 message 的落盘工夫就得 8bytes)
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
//9、记录该 slot 上一个 index
//hash 抵触解决的要害之处, 雷同 hash 值上一个音讯索引的 index(如果以后音讯索引是该 hash 值的第一个索引,则 prevIndex=0, 也是音讯索引查找时的进行条件),每个 slot 地位的第一个音讯的 prevIndex 就是 0 的
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

四,查问流程

因为 indexHeader,slot,index 都是固定大小,所以:

  • 公式 1:第 n 个 slot 在 indexFile 中的起始地位是这样:40+(n-1)*4
  • 公式 2:第 s 个 index 在 indexFile 中的起始地位是这样:40+50000004+(s-1)20
  • 查问的传入值除了 key 外,还蕴含一个工夫起始值以及截止值,为啥还要传工夫范畴呢?

一个 indexFile 写完一个会持续写下一个,仅仅一个 key 无奈定位到具体的 indexFile,工夫范畴就为了更准确的定位到具体的 indexFile,放大查找的范畴,indexFile 文件名是一个工夫戳,依据这个日期就能够定位到传入的日期范畴对应在哪个或者哪些 indexFile 中。

所以整体的查问流程如下:

key–> 计算 hash 值 –>hash 值 %500w, 算出对应的 slot 序号 –> 依据 40+(n-1)4(公式 1)算出该 slot 在文件中的地位 –> 读取 slot 值,也就是 index 序号 –> 依据 40+50000004+(s-1)*20(公式 2)算出该 index 在文件中的地位 –> 读取该 index–> 将 key 的 hash 值以及传入的工夫范畴与 index 的 keyHash 值以及 timeDiff 值进行比对。不满足则依据 index 中的 preIndexNo 找到上一个 index,持续上一步;满足则依据 index 中的 phyOffset 拿到 commitLog 中的音讯

为啥比对时还要带上工夫范畴呢?只比 key 不行吗?答案是不行,因为 key 可能会反复,producer 在音讯生产时能够指定音讯的 key,这个 key 显然无奈保障唯一性,那主动生成的 msgId 呢?也不能保障惟一。

五,构建两个索引文件过程
IndexFile 两个索引文件的构建是放在同一个后台任务中 ReputMessageService 中的,具体流程如下:
indexFile 的索引构建流程如下:

  1. 拿到音讯的 msgId–>hash 值计算 –> 对 500 万取余计算对应的 slot 序号 –> 依据 40+(n-1)* 4 算出该 slot 的文件地位 –> 读取 slot 值,也就是 index 序号
  2. 追加写入一条 Index 数据,keyHash、phyOffset、timeDiff 不必多说,preIndexNo 咱们说了是前一个 index 的序号,也就是 slot 的以后值,如果没有值,阐明该 slot 下没有 index
  3. 更新 slot 值为插入的 index 的序号(更新前存的是上一个 Index 的序号或者空,更新后存的是新插入的 Index 的序号)
  4. 更新 IndexHeader 中的 endTimestamp、endPhyOffset、indexCount、hashSlotCount(这一项可能不会更新)

如果音讯设置了一个或多个 key 属性,则反复下面的过程,构建索引。

1)构建 index 索引代码如下:

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        //1. 判断该索引文件的索引数小于最大的索引数,如果 >= 最大索引数,IndexService 就会尝试新建一个索引文件
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            //2. 计算该 message key 的 hash 值
            int keyHash = indexKeyHashMethod(key);
            //3. 依据 message key 的 hash 值散列到某个 hash slot 里
            int slotPos = keyHash % this.hashSlotNum;
            //4. 计算失去该 hash slot 的理论文件地位 Position
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;


            try {
                //5. 依据该 hash slot 的理论文件地位 absSlotPos 失去 slot 里的值
                // 这里有两种状况://1). slot=0, 以后 message 的 key 是该 hash 值第一个音讯索引
                //2). slot>0, 该 key hash 值上一个音讯索引的地位
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);


                //6. 数据校验及修改
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {slotValue = invalidIndex;}


                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();


                timeDiff = timeDiff / 1000;


                if (this.indexHeader.getBeginTimestamp() <= 0) {timeDiff = 0;} else if (timeDiff > Integer.MAX_VALUE) {timeDiff = Integer.MAX_VALUE;} else if (timeDiff < 0) {timeDiff = 0;}


                //7. 计算以后音讯索引具体的存储地位(Append 模式)
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                //8. 存入该音讯索引
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);


                //9. 要害之处:在该 key hash slot 处存入以后音讯索引的地位,下次通过该 key 进行搜寻时
                // 会找到该 key hash slot -> slot value -> curIndex -> 
                //if(curIndex.prevIndex>0) pre index (始终循环 直至该 curIndex.prevIndex== 0 就进行)
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());


                if (this.indexHeader.getIndexCount() <= 1) {this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }


                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);


                return true;
            } catch (Exception e) {log.error("putKey exception, Key:" + key + "KeyHashCode:" + key.hashCode(), e);
            } 
        } else {log.warn("Over index file capacity: index count =" + this.indexHeader.getIndexCount()
                + "; index max num =" + this.indexNum);
        }


        return false;
 }

2)indexfile 的索引搜寻代码如下
搜寻的是音讯的物理偏移量

public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {if (this.mappedFile.hold()) {
            //1. 计算该 key 的 hash
            int keyHash = indexKeyHashMethod(key);
            //2. 计算该 hash value 对应的 hash slot 地位
            int slotPos = keyHash % this.hashSlotNum;
            //3. 计算该 hash value 对应的 hash slot 物理文件地位
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }
                //4. 取出该 hash slot 的值
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {// fileLock.release();
                // fileLock = null;
                // }
                
                //5. 该 slot value <= 0 就代表没有该 key 对应的音讯索引, 间接完结搜寻
                // 该 slot value > maxIndexCount 就代表该 key 对应的音讯索引超过最大
                // 限度,数据有误, 间接完结搜寻
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) { } else {
                    6. 从以后 slot value 开始搜寻
                    for (int nextIndexToRead = slotValue; ;) {if (phyOffsets.size() >= maxNum) {break;}
                    
                        7. 找到以后 slot value(也就是 index count)物理文件地位
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;
                        8. 读取音讯索引数据
                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        //9. 获取该音讯索引的上一个音讯索引 index(能够看成链表的 prev 指向上一个链节点的援用)
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
                        10. 数据校验
                        if (timeDiff < 0) {break;}

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
                        //10. 数据校验比对 hash 值和落盘工夫
                        if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);
                        }
                        // 当 prevIndex <= 0 或 prevIndex > maxIndexCount 或 prevIndexRead == nextIndexToRead 或 timeRead < begin 进行搜寻
                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {break;}

                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {log.error("selectPhyOffset exception", e);
            } finally {if (fileLock != null) {
                    try {fileLock.release();
                    } catch (IOException e) {log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();}
        }
    }

参考文章:通过这三个文件彻底搞懂 rocketmq 的存储原理
rocketMq 存储模型_indexFile

退出移动版