乐趣区

关于apache:Apache-Pulsar系列-深入理解Bookie-GC-回收机制

背景

Apache Bookkeeper 是基于日志的一个长久化零碎,所有的数据会以日志的模式存储到 Ledger 磁盘的 Entry Log 文件中,之后通过后盾异步回收的模式来将 EntryLog 文件回收掉。然而在咱们理论的应用场景中,发现很久之前的 EntryLog 文件无奈被删除掉,对 Entry Log 文件存在的工夫进行监控,具体如下:

咱们能够看到,假如从 Broker 侧设置的 Retention 策略最大为 5 天,即很久之前的 EntryLog 文件仍然存在于对应的 Ledger 数据盘中,导致磁盘的占用率较高。尽管 Bookie 的 GC 回收机制是后盾异步回收的,当 Broker 侧认为某条音讯能够删除时,Bookie 并不会立刻从磁盘中将该数据删除掉,而是利用 Bookie 的 GC 线程周期性的触发回收的逻辑。然而数据的删除操作居然滞后了半年多,于是萌发了搞懂 Bookie GC 回收机制的想法,到底是什么起因导致了该景象的产生。

Bookie GC 介绍

在 Apache Bookkeeper 中,数据的写入,读取以及回收(压缩)操作是互相隔离的。为了防止过多碎片文件的产生,在 Bookies 中不同 Ledgers 中的 Entrys 会聚合存储到一个 EntryLog 文件中。Bookie 能够通过运行 GC 线程(GarbageCollectorThread)来删除未关联的 Entry 条目来达到回收磁盘空间的目标。在以后的 EntryLog 文件中,如果某一个 Ledger 中蕴含无奈删除的 Entry,那么这个 EntryLog 文件将始终保留在数据盘(Ledger 盘)中无奈被删除。因为业务场景的限定,咱们没方法要求一个 EntryLog 文件中所有 Ledgers 的 Entries 都能在近乎雷同的工夫内满足可删除的条件。为了防止该景象,Bookie 引入了数据压缩的概念,即通过扫描 EntryLog 文件断定哪些 Entry 是能够删除的,能够删除的 Entry 持续保留在原始的 EntryLog 文件中,不可删除的 Entry 写入新的 EntryLog 文件中,扫描实现之后将原始的 EntryLog 文件删除掉。

Bookie 压缩类型

Bookie 的 GC 回收线程并不是始终执行的,而是基于特定的阈值,Bookie 依照一个 EntryLog 文件中有用数据的占比以及数据压缩被触发的工夫将数据压缩的操作分为如下两种类型:

Minor GC:

默认触发的工夫为每 1 小时触发一次,能够通过 minorCompactionInterval 来自定义每一次 minor GC 触发的工夫距离。当达到 Minor GC 触发的工夫阈值之后,会持续查看以后 EntryLog 中有用数据的占比是否超过默认配置的 20%。如果没有超过,则 Minor GC 失效,开始回收并压缩 EntryLog 中的数据。如果超过阈值,那么 Minor GC 不会被触发。能够通过 minorCompactionThreshold 来自定义 Minor GC 中有用数据的占比达到多少之后不会持续触发 Minor GC。为了防止 Minor GC 执行占用太多的工夫,也能够通过 minorCompactionMaxTimeMillis 的参数来管制以后 Minor GC 最大容许执行的工夫是多少。当 minorCompactionMaxTimeMillis <= 0 时,垃圾回收线程会始终执行直到扫描实现以后 Ledger 目录下所有的 Entry Log 文件。

Major GC:

默认触发的工夫为每 24 小时触发一次,能够通过 majorCompactionInterval 来自定义每一次 major GC 触发的工夫距离。当达到 Major GC 触发的工夫阈值之后,会持续查看以后 EntryLog 中有用数据的占比是否超过默认配置的 80%。如果没有超过,则 Major GC 失效,开始回收并压缩 EntryLog 中的数据。如果超过阈值,那么 Major GC 不会被触发。能够通过 majorCompactionThreshold 来自定义 Major GC 中有用数据的占比达到多少之后不会持续触发 Major GC。为了防止 Minor GC 执行占用太多的工夫,也能够通过 majorCompactionMaxTimeMillis 的参数来管制以后 Major GC 最大容许执行的工夫是多少。当 majorCompactionMaxTimeMillis <= 0 时,垃圾回收线程会始终执行直到扫描实现以后 Ledger 目录下所有的 Entry Log 文件。

留神: minorCompactionThreshold 和 majorCompactionThreshold 的最大值不能够超过 100%,当 minorGC 和 majorGC 同时配置时,MinorGC 的 minorCompactionInterval 和 minorCompactionThreshold 要求必须小于 MajorGC 中指定的阈值。

为什么须要引入压缩有用占比阈值?

当做数据压缩回收时,咱们默认别离为 Minor GC 和 Major GC 引入了数据有用占比的阈值,这样做的目标是为了防止每次垃圾回收线程运行时,都会去频繁的扫描所有的 EntryLog 文件。当一个 EntryLog 文件中有用数据的占比超过 Major GC 指定的阈值,那么能够认为以后 EntryLog 中绝大部分数据依然为无效的数据。这种状况下咱们无需持续为了回收剩下的那一点有效数据,而后将该 EntryLog 中的数据从原始的 EntryLog 文件中再写入新的 EntryLog 文件中,这样能够大幅度的节俭磁盘 I/O。

Bookie 压缩形式

以后,Bookie 提供了如下两种数据压缩的形式:

依照 Entries 的数量

默认状况下,Bookie 是通过 Entries 的数量进行压缩,默认值为 1000,即每次最大压缩 1000 条 Entry。能够通过 compactionRateByEntries 自定义每次压缩 Entries 的数量。

依照 Entries 大小

Bookie 依照 Entries 的大小进行压缩,能够通过 compactionRateByBytes 自定义每次回收最大容许被回收 Entries 的大小。当想要应用该压缩形式时,须要在 Bookie 的配置文件中同时关上如下配置:isThrottleByBytes=true。

留神:生产环境中倡议应用依照 Entries 大小压缩的形式,这个取决于 Entry 被打包的形式。对于 Pulsar 来说,一般音讯和 Batch 音讯都会被当作一条 Entry 来对待,这就可能会导致每一条 Entry 的大小都不一样。如果依照 Entries 的数量来回收,即每次回收的数据大小是不统一的,如果单个 Entry 过大,有可能导致回收期间占用较大的磁盘 IO,影响失常数据的读写 IO,造成抖动的景象产生。

Bookie GC 触发的形式

以后 Bookie 的 GC 操作反对如下两种触发形式:

主动触发

Bookie 的 GC 回收线程依照 Bookie 压缩类型大节中介绍的形式,依照特定的工夫距离及阈值周期性的执行数据压缩回收的操作。

手动触发

Bookie 反对了 REST API 的 HTTP 服务,容许用户通过手动的形式触发 GC,应用形式如下:

curl -X PUT http://127.0.0.1:8000/api/v1/…

  • IP: 即为以后 Bookie 的 IP 地址
  • Port:示例中的 8000 端口为 Bookie 配置文件中 httpServerPort 指定的端口,默认为 8000。

执行实现之后,也能够通过如下申请查看 GC 的状态等信息:

curl http://127.0.0.1:8000/api/v1/…

Output:

[ { 
   "forceCompacting" : false,
   "majorCompacting" : false,
   "minorCompacting" : false,
   "lastMajorCompactionTime" : 1662436000016,
   "lastMinorCompactionTime" : 1662456700007,
   "majorCompactionCounter" : 11,
   "minorCompactionCounter" : 99 
}]

Bookie GC 回收代码剖析

Bookie 回收的代码逻辑次要在 org.apache.bookkeeper.bookie.GarbageCollectorThread 类中的 runWithFlags() 办法, 次要的回收逻辑蕴含如下三个函数:

  • doGcLedgers()
  • doGcEntryLogs()
  • doCompactEntryLogs()

在了解 Bookie GC 的回收逻辑中,咱们首先须要介绍几个要害的汇合:

  • LedgerRangeIterator: 该接口为一个 LedgerRange 的迭代器,用来存储从 Meta Store(zookeeper)中存储的所有 Ledgers 的信息。
  • ledgerIndex:是 LedgerStorage(RocksDB)中所有 Ledger 扫描进去的一个汇合。
  • ledgersMap:每一个 EntryLog 对应一个 ledgersMap,示意以后 EntryLog 中存储的所有 Ledgers 的汇合。
  • entryLogMetaMap:每一个 Ledger 盘领有一个 entryLogMetaMap 对象,是以后 Ledger 数据盘下所有 EntryLogID -> EntryLogMeta 的一个缓存。

以后,Bookie 的索引存储反对了多种形式,默认应用的是 SortedLedgerStorage,能够在 Bookie 的配置文件中通过 ledgerStorageClass 来指定具体须要应用的索引存储形式,个别举荐应用的配置如下:


ledgerStorageClass=org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage

所以在上面的代码详解中,咱们以 DbLedgerStorage 为例。

doGcLedgers()

在 doGcLedgers() 中,代码逻辑次要如下:

  1. 首先从 RocksDB 中获取以后数据盘目录下所有的 Ledgers 数据,并应用 NavigableSet 汇合暂存以后沉闷的 Ledgers 列表。

NavigableSet bkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));
  1. 通过 ledgerManager 对象,获取 meta store(默认:zookeeper)中所有 Ledgers 的 Range,暂存在 LedgerRangeIterator 迭代器中

LedgerRangeIterator ledgerRangeIterator = ledgerManager

.getLedgerRanges(zkOpTimeoutMs);
  1. 定义一个 Set 汇合,来缓存从 zookeeper 中获取的 LedgerRangeIterator

if (ledgerRangeIterator.hasNext()) {LedgerRange lRange = ledgerRangeIterator.next();

ledgersInMetadata = lRange.getLedgers();

// 当第一次进来当前,就能够获取到以后批次中最大的那个 Ledgers 的索引是多少

end = lRange.end();} else {

// 如果从 zk 中获取到的 Ledgers 迭代器是空的或者曾经迭代完所有的 Ledgers,则重置 done 标记,退出循环。ledgersInMetadata = new TreeSet<>();

end = Long.MAX_VALUE;

done = true;

}

这里将 LedgerRangeIterator 的迭代器转化为 ledgersInMetadata 的 Set 汇合次要是为了第四步能够做 subSet 的操作。

  1. 以 RocksDB 中获取到的 Ledgers 汇合为规范,对从 zookeeper 中获取的 Ledgers 列表做 subSet 的操作

Iterable subBkActiveLedgers = bkActiveLedgers.subSet(start, true, end, true);

其中 start 地位为 0,end 的地位为 LedgerRangeIterator 迭代器最初的一个地位。因为上述两个 Set 在事后都是做过排序操作的,所以在这里能够间接进行 subSet 的操作。

  1. 拿第四步获取到的 subBkActiveLedgers 与 zookeeper 中的 ledgersInMetadata 汇合比拟,判断 zookeeper 中是否还蕴含以后 LedgerID,如果不蕴含代表能够从 Bookie 的 RocksDB 索引中删除以后 LedgerID 的信息。

// 迭代 subBkActiveLedgers 的汇合

for (Long bkLid : subBkActiveLedgers) {

// 以 zk 为规范

if (!ledgersInMetadata.contains(bkLid)) {

....

// 清理指定的 Ledger ID

// 这个 Ledger 在 Bookie 中有,在 zk 上没有,则删除。garbageCleaner.clean(bkLid);

}

}

这里以 zookeeper 中获取到的 ledgersInMetadata 为基准是因为,在 Pulsar 中当数据写入的时候是先去 zookeeper 节点注册一个长期的 zk-node 来存储以后 LedgerID 的相干元数据信息,而后再去 RocksDB 中写入 LedgerID 的存储索引信息,而后将 LedgerID 的 Entry 数据写入到 EntryLog 中。删除操作也是同样的情理,当用户在 Pulsar 中应用的 Topic 中,有 Ledger 合乎删除条件时,会去调用 ManagedLedger 的接口去 zookeeper 中删除 LedgerID 的 zk-node。能够看到,无论是读写,对于 Bookie 的 Client 来说,都是优先操作 Bookie-Zk 中的 Ledgers 信息。所以对于删除操作而言咱们也是以 zookeeper 中的 Ledgers Set 汇合为基准,来查看 RocksDB 的索引存储中有哪些 LedgerID 是能够删除的。

  1. 调用 GarbageCleaner 的接口去 RocksDB 的 ledgerIndex 中删除指定的 LedgerID

初始化 garbageCleaner 接口并实现 clean 办法,在 clean 办法中调用 DbLedgerStorage 的 deleteLedger 接口


this.garbageCleaner = ledgerId -> {

try {if (LOG.isDebugEnabled()) {LOG.debug("delete ledger :" + ledgerId);

}

gcStats.getDeletedLedgerCounter().inc();

// 调用 DbLedgerStorage 去删除接口

ledgerStorage.deleteLedger(ledgerId);

} catch (IOException e) {LOG.error("Exception when deleting the ledger index file on the Bookie:", e);

}

};

去 ledgerIndex 的缓存中删除以后的 LedgerID


@Override

public void deleteLedger(long ledgerId) throws IOException {

...

entryLocationIndex.delete(ledgerId);

ledgerIndex.delete(ledgerId);

....

}

能够看到 doGcLedgers() 函数次要是以 zookeeper 的 Ledgers 汇合为基准,去比照 RocksDB 的 ledgerIndex 索引存储中删除待删除的 Ledgers。

doGcEntryLogs()

在 doGcEntryLogs() 中,代码逻辑次要如下:

  1. 迭代 entryLogMetaMap 获取以后数据盘目录下所有的 EntryLog 信息

entryLogMetaMap.forEach((entryLogId, meta) -> {...});
  1. 以 RocksDB 中的 ledgerIndex 缓存为基准,判断以后 EntryLog 中是否有能够删除的 Ledger

removeIfLedgerNotExists(meta);

EntryLogMetadata 中的 removeLedgerIf() 办法的参数为 LongPredicate,实质是通过 ledgerIndex 中是否存在以后 LedgerID,如果不存在则 LongPredicate 的 test() 办法为 true,该 Ledger 能够删除。


private void removeIfLedgerNotExists(EntryLogMetadata meta) {

// 这个 ledger 是否能够删除,取决于以后这个 Ledger 是否在 ledgerIndex 的汇合中存在

meta.removeLedgerIf((entryLogLedger) -> {

// Remove the entry log ledger from the set if it isn't active.

try {

// ledgerStorage 为专门为压缩定制的 CompactableLedgerStorage,继承了 LedgerStorage 接口

return !ledgerStorage.ledgerExists(entryLogLedger);

} catch (IOException e) {LOG.error("Error reading from ledger storage", e);

return false;

}

});

}

而 removeLedgerIf() 办法自身操作的是 EntryLogMeta 中 ledgersMap 的这个汇合,删除操作也是基于 ledgerIndex 判断是否能够从 ledgersMap 中删除 LedgerID。


public void removeLedgerIf(LongPredicate predicate) {ledgersMap.removeIf((ledgerId, size) -> {boolean shouldRemove = predicate.test(ledgerId);

if (shouldRemove) {remainingSize -= size;}

return shouldRemove;

});

}
  1. 通过第二步的删除操作,在这里去判断以后 EntryLog 中是否所有的 LedgerID 都曾经被删除,如果都删除了,则咱们能够间接将这个 EntryLog 从数据盘中删除。

// 判断 EntryLog Meta 中的 ledgersMap 对象是否还有元素。if (meta.isEmpty()) {

// This means the entry log is not associated with any active ledgers anymore.

// We can remove this entry log file now.

LOG.info("Deleting entryLogId" + entryLogId + "as it has no active ledgers!");

// 当以后的 EntryLog 中没有任何 Ledgers 对象时,间接调用删除 EntryLog 的接口进行删除操作。removeEntryLog(entryLogId);

gcStats.getReclaimedSpaceViaDeletes().add(meta.getTotalSize());

}

能够看到,在 doGcEntryLogs 函数中,次要是以 ledgerIndex 为基准,操作每一个 EntryLog 中的 ledgesMap 对象,判断 Ledger 是否能够删除。如果以后 EntryLog 的所有 Ledger 都能够删除,则间接删除 EntryLog 文件。如果有一部分 Ledger 能够删除,一部分 Ledger 无奈删除,则进入 doCompactEntryLogs() 函数的解决逻辑中。

doCompactEntryLogs

在 doCompactEntryLogs() 中,代码次要逻辑如下:

  1. 结构 entryLogMetaMap 的长期对象 logsToCompact,并依照使用率对其排序:

List logsToCompact = new ArrayList();

// 开始之前首先把本地缓存的 entryLogMetaMap 都增加进来

logsToCompact.addAll(entryLogMetaMap.values());

// 依照使用率做一个排序

logsToCompact.sort(Comparator.comparing(EntryLogMetadata::getUsage));
  1. 迭代 entryLogMetaMap 的长期对象:logsToCompact

for (EntryLogMetadata meta : logsToCompact) {

...

// 真正触发回收的外围逻辑

compactEntryLog(meta);

}
  1. 调用 scanEntryLog() 办法,开始扫描 EntryLog 文件

// 扫描指定的 EntryLog 文件

entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(), scannerFactory.newScanner(entryLogMeta));

扫描 EntryLog 文件中能够简略梳理为如下三个逻辑:

3.1 如何扫描 EntryLog 文件

要了解 EntryLog 文件是如何被扫描进去的,咱们首先要去看 Entry 是如何被写入 EntryLog 文件中的。首先每一个 EntryLog 都有 1024 个字节 EntryLog Header 信息,次要蕴含如下内容:

  • Fingerprint(指纹信息): 4 bytes “BKLO”

在预调配 EntryLog 的时候,就固定的将 4 字节的签名信息写入

  • Log file HeaderVersion: 4 bytes

有两个版本:HEADER_V0 和 HEADER_V1,以后 EntryLog 的版本为 HEADER_V1。

  • Ledger map offset:8 bytes
  • Ledgers Count:4 bytes

所以在扫描 EntryLog 文件时,咱们首先跳过以后 EntryLog 的 Header 信息:


// Start the read position in the current entry log file to be after

// the header where all of the ledger entries are.

long pos = LOGFILE_HEADER_SIZE;

之后会持续写入 4 字节的 entrySize 以及 8 字节的 LedgerID,所以扫描的时候也须要依照这种格局将 entrySize 和 LedgerID 别离读取进去,而后根据 entrySize 的大小,持续向后读取出 Entry 真正的内容。


// Buffer where to read the entrySize (4 bytes) and the ledgerId (8 bytes)

ByteBuf headerBuffer = Unpooled.buffer(4 + 8);

while (true) {

...

long offset = pos;

pos += 4;

int entrySize = headerBuffer.readInt();

long ledgerId = headerBuffer.readLong();

headerBuffer.clear();

// 调用 scanner 的 accept() 办法

if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) {

// skip this entry

pos += entrySize;

continue;

}

// read the entry

data.clear();

if (entrySize <= 0) {LOG.warn("bad read for ledger entry from entryLog {}@{} (entry size {})",

entryLogId, pos, entrySize);

return;

}

data.capacity(entrySize);

// process the entry

// 调用 scanner 的 process() 办法,将 entry 写入新的 EntryLog 中

scanner.process(ledgerId, offset, data);

// Advance position to the next entry

pos += entrySize;

}

如此往返,一直的将 EntryLog 中的每一条 Entry 顺次读取进去。

3.2 accept 接口

accept 接口次要用来判断以后的 LedgerID 是否还在 EntryLog 文件中,即是否还在 ledgersMap 中存在。


@Override

public boolean accept(long ledgerId) {return meta.containsLedger(ledgerId);

}

3.3 process 接口

process 接口次要用来将无奈删除的 Entry 写入到新的 EntryLog 文件中,并记录 Entry 对应的 offset 信息。


@Override

public void process(final long ledgerId, long offset, ByteBuf entry) throws IOException {

...

long newoffset = entryLogger.addEntry(ledgerId, entry);

offsets.add(new EntryLocation(ledgerId, entryId, newoffset));

}
  1. 调用 flush 办法,更新新 EntryLog 文件的索引信息

// 强制把写入的数据刷新到磁盘下来,刷新的时候会同时更新索引信息,以便 broker 下次读取音讯的时候,能够去新的 EntryLog 中去读取。scannerFactory.flush();

flush() 办法次要是将上述无奈删除的 Entry 写入新 EntryLog 中的位点信息调用 DbLedgerStorage 的接口更新到 RocksDB 中去。


void flush() throws IOException {if (offsets.isEmpty()) {if (LOG.isDebugEnabled()) {LOG.debug("Skipping entry log flushing, as there are no offset!");

}

return;

}

// Before updating the index, we want to wait until all the compacted entries are flushed into the

// entryLog

try {entryLogger.flush();

// 更新新 EntryLog 文件中 offsets 的信息。ledgerStorage.updateEntriesLocations(offsets);

ledgerStorage.flushEntriesLocationsIndex();} finally {offsets.clear();

}

}
  1. 删除原始的 EntryLog 文件

// 移除掉原先旧的 EntryLog 文件

logRemovalListener.removeEntryLog(entryLogMeta.getEntryLogId());

上述的代码逻辑形容了对于单个数据盘目录下 EntryLog 残缺的回收逻辑。对于多个数据盘目录的场景,每一个数据盘目录都会创立一个独自的 GarbageCollectorThread 的线程来运行上述的逻辑。

EntryLog 文件的大小如何管制

在 Ledger 的数据盘目录中能够看到,每一个 EntryLog 文件的大小都固定为 1GB 左右,当达到这个大小时,EntryLog 文件就会滚动创立新的 EntryLog 文件来写入。这是因为默认设置的 EntryLog 大小为 1GB,具体如下:


/**

* Set the max log size limit to 1GB. It makes extra room for entry log file before

* hitting hard limit '2GB'. So we don't need to force roll entry log file when flushing

* memtable (for performance consideration)

*/

public static final long MAX_LOG_SIZE_LIMIT = 1 * 1024 * 1024 * 1024;

reachEntryLogLimit() 办法用来查看是否 EntryLog 文件达到指定的大小:


boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {if (logChannel == null) {return false;}

return logChannel.position() + size > logSizeLimit;}

用户也能够通过如下参数自定义 EntryLog 文件的大小:


logSizeLimit

如何计算 EntryLog 文件的使用率

在 doCompactEntryLogs 章节中能够看到,在迭代 entryLogMetaMap 时,根据 EntryLog 的使用率对 EntryLog 进行了排序。EntryLog 的使用率次要通过 EntryLog Metadata 中的如下两个字段进行计算的:

  • private long totalSize; // 总大小
  • private long remainingSize; // 残余大小

在数据写入 EntryLog (ledgersMap) 的过程中会同时减少 totalSize 和 remainingSize 这两个字段:


// 往 ledgersMap 中新增元素

public void addLedgerSize(long ledgerId, long size) {

totalSize += size;

remainingSize += size;

ledgersMap.addAndGet(ledgerId, size);

}

当在做数据压缩时,如果判断某一个 LedgerID 能够从 ledgersMap 中删除时,会从 remainingSize 中减去以后 Ledger 的 size:


public void removeLedgerIf(LongPredicate predicate) {ledgersMap.removeIf((ledgerId, size) -> {boolean shouldRemove = predicate.test(ledgerId);

if (shouldRemove) {remainingSize -= size; // 减去以后 ledger 的大小}

return shouldRemove;

});

}

所以在计算 EntryLog 的使用率时,拿以后 remainingSize/totalSize 即可计算出 EntryLog 文件中以后残余的无效数据的比率是多少:


public double getUsage() {if (totalSize == 0L) {return 0.0f;}

return (double) remainingSize / totalSize;

}

minor GC 与 major GC 执行数据回收的逻辑是完全一致的,EntryLog 中无效数据的使用率也是用来辨别是否为 minor GC 或者 major GC 的关键点。

退出移动版