原文地址:http://fxbing.github.io/2022/...

本文次要对 Kafka 和 Pulsar 的 Log Compaction 原理进行介绍,并以我的了解进行简略的比照阐明。

在 Kafka 和 Pulsar 中,都具备 Log Campaction(日志挤压)的能力,Compaction 不同于 Log Compression(日志压缩),Compaction 是指将 Topic 历史日志中雷同 Key 的音讯只保留最新的一条,而 Compression 是指音讯维度利用各种压缩算法(如:gzip、lz4、zstd等)减小音讯大小但不扭转音讯内容。Compaction 的应用场景的特点:一是音讯有 key,二是雷同 key 的音讯只关怀最新的内容,例如:记录每支股票价格变动的 Topic,股票名称设置为 key,股票价格设置为 value,个别只关怀股票的最新价格。在这种场景下,配置 Compaction 能够让 Topic 存储的数据更少,从而在须要全量读取 Topic 内容时速度更快。

Kafka Log Compaction

本文介绍基于 Kafka 2.8,并且疏忽了幂等音讯、事务音讯的相干解决逻辑,如有趣味,能够自行浏览源码理解。

在 Kafka 中,Topic 配置 cleanup.policy用来管制 Topic 的数据清理策略,该配置的可选值有两个:一个是delete,示意 Topic 中数据超过保留工夫或保留大小限度时,间接删除最旧的数据;另一个是compact,示意对于 Topic 中的旧数据(非 Active Segment 中的数据)执行挤压,肯定范畴内,对于 key 雷同(没有 key 的音讯会被删除)的音讯,只保留最新的一条。对于每个 Topic 能够抉择一种清理策略进行配置,也能够同时配置两种策略。本文介绍的是 compact这个策略,在 Kafka 应用过程中,用来寄存 commit offset 信息的的外部 Topic:__consumer_offsets 会被配置为该策略,一般 Topic 个别很少应用。

实现原理

对于一台 Kafka Broker, Log Compaction 的次要流程如下:

  1. 创立 LogCleaner,启动配置指定数量的 CleanerThread,负责该 Broker 的 Log Compaction。
  2. 每个 CleanerThread 循环执行日志清理工作,循环过程如下:

    1. 寻找一个待清理的 TopicPartition。
    2. 遍历该TopicPartition 中所有待清理的 Segment,结构 OffsetMap 记录每个 key 最新的 offset 及对应的音讯工夫。
    3. 对该TopicPartition 中所有待清理的 Segment 进行分组,保障每组 Segment 的 Log 文件总大小和 Index 文件总大小不会超过 LogConfig 容许的范畴并且每组 Segment 的 offset 极差不会超过Int.MaxValue(Kafka 中 Segment 内的绝对 offset 为整型,这个查看是为了防止绝对 offset 溢出)。
    4. 将 Segment 依照分好的组进行清理,每一组 Segment 聚合为一个新的 Segment。每组 Segment 的清理过程为:创立一个新的 Segment,而后依据 OffsetMap 中记录的信息抉择须要保留的音讯,存入新 Segment,最初应用新 Segment 笼罩这一组旧 Segment。
    5. 对于所有曾经执行实现 Compaction 流程并且cleanup.policy配置蕴含compact策略的 Log 进行删除,本次删除不是后面用一个新 Segment 替换一组旧 Segment 中的删除,而是调用Log.deleteOldSegments。该办法会删除LogStartOffset之前的所有 Segment,如果cleanup.policy配置同时还蕴含delete策略,也会删除超过保留工夫或保留大小限度的 Segment。

具体阐明

上面以Q&A的形式介绍日志清理的细节和逻辑:

多个 CleanerThread 是如何保障 Compaction 过程线程平安的?

Kafka 中 Log Compaction 的实现次要是LogCleanerLogCleanerManager两个类,LogCleaner是 Compaction 工作的主类,负责整体的工作流程,LogCleanerManager负责 Compaction 状态机的治理。所有状态如下:

  1. None :TopicPartition 未清理状态。
  2. LogCleaningInProgress :清理正在进行中,当 TopicPartition 被选中为待清理 Log 时会变为该状态。
  3. LogCleaningAborted :清理停止,这是一个从LogCleaningInProgressLogCleaningPaused(1)的中间状态,在内部产生 truncate、坏盘等状况时须要放弃当初正在的清理操作, 终止后该 TopicPartition 会标记为 LogCleaningAborted
  4. LogCleaningPaused(i):清理暂停,i 的初始值为1,这是一个重入的状态,即:如果以后状态是LogCleaningPaused(i),再次暂停该 TopicPartition 的话,状态会变为LogCleaningPaused(i+1),从暂停状态复原的话,状态会变为LogCleaningPaused(i-1)(i-1=0 时间接变为None状态)。会触发状态变为LogCleaningPaused(i)的状况如下(上面的“暂停”和“暂停复原”别离代表 i + 1 和 i - 1):

    1. Topic 配置cleanup.policy不蕴含compact:暂停
    2. 内部产生 truncate、坏盘等状况时该 TopicPartition 状态为NoneLogCleaningPaused(i):暂停
    3. 本轮清理实现且该 TopicPartition 状态为LogCleaningAborted:暂停
    4. 触发暂停的操作实现:暂停复原
我了解停止状态和暂停状态的区别是:是否能够在本轮清理中复原。在内部产生 truncate、坏盘等状况时,如果一个 TopicPartition 没有处于清理过程中,能够标记为暂停,在触发暂停的状况完结后,复原清理流程就会从新执行清理流程。然而如果该 TopicPartition 处于清理过程中,则必须标记为终止,在触发暂停的状况完结后,即便本轮清理没完结,也必须要先标记为暂停,在下轮操作进行清理。这是因为如果曾经在清理过程中了,本轮清理会有一些两头态的信息,不容易从两头态进行复原。

对于每个 CleanerThread,每次都会通过LogCleanerManager.grabFilthiestCompactedLog办法来搜寻状态为None并且须要被清理的 TopicPartition 进行清理。LogCleanerManager中的所有状态变更都会加锁,保障状态机是线程平安的,多个 CleanerThread 通过该状态机保障了所有 TopicPartition 的 Compaction 过程是线程平安的。

如何决定哪些 TopicPartition 应该被清理(LogCleanerManager.grabFilthiestCompactedLog的具体流程)?

能够清理的 TopicPartition 限度条件有以下几个:

  • Topic 配置cleanup.policy蕴含compact
  • TopicPartition 的状态为空:阐明没有其余 CleanerThread 操作该 TopicPartition。
  • 该 TopicPartition 是能够清理的:因为坏盘等问题某些 TopicPartition 会被标记为不可清理,须要跳过。
  • 不须要提早清理:音讯在日志中必须存在"max.compaction.lag.ms"指定的工夫后能力被删除(详情可参考KIP-354),只有第一个未压缩 Segment 的预计最早音讯工夫戳早于"max.compaction.lag.ms"才会能够进行 Compaction。
  • 须要被清理的音讯总大小大于0:从 LogStartOffset 或 CleanCheckPoint 开始到满足"max.compaction.lag.ms"配置要求的 offset 之间的音讯总大小
  • 满足清理频率满足要求:无关清理频率的配置能够间接看上面源码 Doc

        public static final String MIN_COMPACTION_LAG_MS_CONFIG = "min.compaction.lag.ms";    public static final String MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain " +        "uncompacted in the log. Only applicable for logs that are being compacted.";    public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms";    public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain " +        "ineligible for compaction in the log. Only applicable for logs that are being compacted.";    public static final String MIN_CLEANABLE_DIRTY_RATIO_CONFIG = "min.cleanable.dirty.ratio";    public static final String MIN_CLEANABLE_DIRTY_RATIO_DOC = "This configuration controls how frequently " +        "the log compactor will attempt to clean the log (assuming <a href=\"#compaction\">log " +        "compaction</a> is enabled). By default we will avoid cleaning a log where more than " +        "50% of the log has been compacted. This ratio bounds the maximum space wasted in " +        "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " +        "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +        "space in the log. If the " + MAX_COMPACTION_LAG_MS_CONFIG + " or the " + MIN_COMPACTION_LAG_MS_CONFIG +        " configurations are also specified, then the log compactor considers the log to be eligible for compaction " +        "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " +        "records for at least the " + MIN_COMPACTION_LAG_MS_CONFIG + " duration, or (ii) if the log has had " +        "dirty (uncompacted) records for at most the " + MAX_COMPACTION_LAG_MS_CONFIG + " period.";

OffsetMap 的实现原理

OffsetMap 的实现类是SkimpyOffsetsMap,用来存储 message key 和 offset 的映射关系,用来保留雷同 key 下最新消息的 Offset。此处不介绍该类的具体实现(感兴趣自行浏览源码),只列举其特点:

  • 创立时须要指定两个参数,一个是 memory,用来指定存储 offset 的 ByteBuffer 大小,另一个是 hashAlgorithm,用来确定计算 key hash 时应用的哈希算法。
  • 只容许减少,不容许删除。
  • 只在 ByteBuffer 中存储须要记录的 offset,每次 put/get 都是先对 key hash 确定 position,而后间接批改/读取 ByteBuffer 中的内容。
我了解该实现的次要长处是防止存储 message key,缩小内存耗费。

如何从一组旧的 Segment 中过滤出须要保留的音讯以及过滤策略是怎么的?

过滤音讯的流程是,先将旧 Segment 中的音讯读入MemoryRecords,而后应用MemoryRecords.filterTo办法进行过滤,该办法反对应用自定义实现的 RecordFilter 过滤音讯。

音讯过滤的策略(理论会有事务音讯的解决)是:过滤掉雷同 key 中非最新 offset 的音讯以及满足删除条件的墓碑音讯(key 不为 null 然而 value 为 null)。墓碑音讯的删除条件是指,该墓碑音讯所在的 Segment 最初批改工夫间隔最新 Segment 的最初批改工夫超过delete.retention.ms配置的工夫。

如果 Compaction 过程中 Broker 解体,重启后如何复原?

宕机复原的思考只产生在用新 Segment(文件名后缀是.cleaned) 替换旧 Segment 的过程中,其余阶段产生宕机的话,复原后从新执行 Compaction 流程即可。

Segment 替换操作应用replaceSegments办法(源码如下)实现,替换流程是:

  1. 将新 Segment 的文件名后缀从.cleaned改为.swap
  2. 删除旧 Segment:删除过程是先将同步将文件后缀改为.deleted,而后进行异步删除
  3. 去掉新 Segment 的文件名后缀,流程完结

上面是对于所有可能的阶段 Broker 解体后的复原逻辑:

  • 步骤1之前:如果此时 broker 解体,则清理和替换操作将停止,并且在 loadSegments() 中复原时删除 .cleaned 文件。
  • 步骤1执行过程中解体:新 Segment 重命名为 .swap。如果代理在所有 Segment 重命名为 .swap 之前解体,则清理和替换操作将停止 .cleaned 以及 .swap 文件在 loadSegments() 复原时被删除。 .cleaned 重命名为 .swap 是依照文件按偏移量的降序进行的,复原时,所有偏移量大于最小偏移量 .clean文件的.swap 文件都将被删除。
  • 步骤1实现后解体:如果在所有新 Segment 重命名为 .swap 后代理解体,则操作实现,残余操作流程会在 Broker 复原时继续执行。
  • 步骤2执行过程中解体:旧 Segment 文件被重命名为 .deleted 并安顿了异步删除。如果 Broker 解体,任何留下的 .deleted 文件都会在 loadSegments() 复原时被删除,而后调用 replaceSegments() 以实现替换,其中新 Segment 从 .swap 文件从新创立,旧 Segment 蕴含在解体前未重命名的 Segment。
  • 步骤3实现后解体:此时可能存在未被异步删除实现的旧 Segment,任何可能留下的 .deleted 文件都会在 loadSegments() 复原时被删除
private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {    lock synchronized {      val sortedNewSegments = newSegments.sortBy(_.baseOffset)      // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments      // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment()      // multiple times for the same segment.      val sortedOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset)      checkIfMemoryMappedBufferClosed()      // need to do this in two phases to be crash safe AND do the delete asynchronously      // if we crash in the middle of this we complete the swap in loadSegments()      if (!isRecoveredSwapFile)        sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix))      sortedNewSegments.reverse.foreach(addSegment(_))      val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet      // delete the old files      sortedOldSegments.foreach { seg =>        // remove the index entry        if (seg.baseOffset != sortedNewSegments.head.baseOffset)          segments.remove(seg.baseOffset)        // delete segment files, but do not delete producer state for segment objects which are being replaced.        deleteSegmentFiles(List(seg), asyncDelete = true, deleteProducerStateSnapshots = !newSegmentBaseOffsets.contains(seg.baseOffset))      }      // okay we are safe now, remove the swap suffix      sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))    }  }

Pulsar Compaction

Pulsar 官网文档中对于 Compaction 的介绍也比拟具体,具体能够参考:

  • https://pulsar.apache.org/doc...
  • https://pulsar.apache.org/doc...
  • https://github.com/ivankelly/...

在 Pulsar 中,Topic Compaction 与 NonCompaction 两个状态不是互相对抗的,Compaction 是通过相似于创立一个Subscription 来生产现有 Topic 的音讯,通过 Compaction 解决后,写入新的 Ledger 存储,在 Consumer 生产时,能够通过配置抉择读取 Compacted 数据还是 NonCompacted 数据。留神:Compact 只能对 persistent topic 执行。

触发条件

Pulsar 中触发 Topic Compaction 的形式有两种:

  1. 配置CompactionThreshold:如后面所说,Compaction 过程相当于建设一个 Subscription 来生产原来 Topic 中的数据写入新的 Ledger,Broker 会周期性查看所有 Persistent Topic,如果CompactionThreshold配置不为0并且这个 Subscription 的音讯积压超过了配置的阈值,就会主动触发 Compaction。这个配置的粒度能够是 topic、broker、namespace,最终依据优先级(topic > broker > namespace)确定最终配置值。
  2. 内部触发 Topic Compaction:除主动触发 Compaction 外,也能够通过 CLI 工具触发,一种是 AdminCli,须要调用 broker 提供的 RestAPI,另一种是应用专用工具类,不通过 RestAPI 间接指定。
$ bin/pulsar-admin topics compact persistent://my-tenant/my-namespace/my-topic # AdminCli$ bin/pulsar compact-topic --topic persistent://my-tenant-namespace/my-topic # 专用工具

实现原理

实现原理总体能够分为 Compaction 的处理过程和 Consumer 读取 Compacted 数据两个局部。

Compaction 解决

触发 Compaction 的对立入口是Compactor.compact(String topic)办法,Compactor是一个抽象类,目前的实现只有TwoPhaseCompactor一种,上面介绍的是TwoPhaseCompactor的实现逻辑。顾名思义,Compaction 过程分为两个阶段,遍历两次 Topic 内容,第一次遍历用来获取每个 key 中最新的 MessageId(相当于 Kafka 中的 offset),第二次遍历依据第一次遍历获得的后果,只将每个 key 的最新消息写入新的 Ledger 中。

读取 Compacted 数据

如果 Consumer 心愿读取 Compacted 数据,须要在初始化时制订相干配置。

Consumer<byte[]> compactedTopicConsumer = client.newConsumer()        .topic("some-compacted-topic")        .readCompacted(true)        .subscribe();

Broker 在收到 Consumer 的申请后,会先获取到 Topic 对应的 cursor 信息,而后从 cursor 信息中找到 Compacted 数据对应的 LedgerId,而后进行对应数据的读取。

Compacted Leger 信息如何传递给 Consumer?

如前文所说,在TwoPhaseCompactor处理过程中,理论是创立了一个 Subscription 来读取原来的数据。在读取数据实现进行 Ack 时,应用的接口是RawReader.acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties),该接口能够在 Ack 的同时给 Broker 返回一些该 Subscription 的元信息,Broker 会将收到的元信息记录在 cursor 信息中。所以,TwoPhaseCompactor通过这一能力,将创立的用来存储 Compacted 数据的 LedgerId 记录在了 cursor 信息中,不便 Consumer 读取时应用。

总结思考

Kafka 和 Pulsar 中实现 Compaction 的目标是为了在特定的场景下缩小 Topic 的数据量,放慢获取 Topic 中全副数据的速度,而不是心愿实现相似 KV 存储的能力。之所以这样说,是因为:

  1. Kafka 和 Pulsar 都没有保障最新数据的 Compaction:对于 Kafka 来说,Log Compaction 只会操作 ActiveSegment 之前的数据;对于 Pulsar 来说,Compaction 是一个周期性执行的工作,每次 Compaction 开始之前,都会先读取以后 Topic 中最初一个 MessageId 作为本轮 Compaction 的起点,因而只有有 Producer 在向 Topic 生产数据。就必定不能保障所有数据都被 Compacted。
  2. Kafka 和 Pulsar 的 Compaction 都是肯定范畴内的,不是全局的:Kafka 和 Pulsar 的 Compaction 都是以一种相似于滑动窗口的过程进行,“key 雷同的状况下只保留最新的音讯”针对的是一轮 Compaction 内,如果两轮 Compaction 中有雷同 key 的音讯,是没有方法合并的。

比照 Kafka 和 Pulsar 两者的实现来看,最次要的区别就是是否保留 Compaction 前的数据。相较于 Kafka 的实现,Pulsar 这种模式一方面整体逻辑更为简略,不须要思考各种文件替换过程中的解体复原逻辑,另一方面也能够给 Consumer 更多抉择的空间,然而这样也会带来肯定的存储老本。两者的实现架构是不同的,如果想要 Kafka 也像 Pulsar 一样保留两份数据,因为 Kafka 的存储正本机制是本人的治理的,可能须要比当初更为简单的实现才可能搞定,而不能像 Pulsar 一样间接通过切换 LedgerId 来抉择数据就能够了。