关于kafka:Kafka-和-Pulsar-的-Log-Compaction-实现

44次阅读

共计 10251 个字符,预计需要花费 26 分钟才能阅读完成。

原文地址:http://fxbing.github.io/2022/…

本文次要对 Kafka 和 Pulsar 的 Log Compaction 原理进行介绍,并以我的了解进行简略的比照阐明。

在 Kafka 和 Pulsar 中,都具备 Log Campaction(日志挤压)的能力,Compaction 不同于 Log Compression(日志压缩),Compaction 是指将 Topic 历史日志中雷同 Key 的音讯只保留最新的一条,而 Compression 是指音讯维度利用各种压缩算法(如:gzip、lz4、zstd 等)减小音讯大小但不扭转音讯内容。Compaction 的应用场景的特点:一是音讯有 key,二是雷同 key 的音讯只关怀最新的内容,例如:记录每支股票价格变动的 Topic,股票名称设置为 key,股票价格设置为 value,个别只关怀股票的最新价格。在这种场景下,配置 Compaction 能够让 Topic 存储的数据更少,从而在须要全量读取 Topic 内容时速度更快。

Kafka Log Compaction

本文介绍基于 Kafka 2.8,并且疏忽了幂等音讯、事务音讯的相干解决逻辑,如有趣味,能够自行浏览源码理解。

在 Kafka 中,Topic 配置 cleanup.policy用来管制 Topic 的数据清理策略,该配置的可选值有两个:一个是 delete,示意 Topic 中数据超过保留工夫或保留大小限度时,间接删除最旧的数据;另一个是compact,示意对于 Topic 中的旧数据(非 Active Segment 中的数据)执行挤压,肯定范畴内,对于 key 雷同(没有 key 的音讯会被删除)的音讯,只保留最新的一条。对于每个 Topic 能够抉择一种清理策略进行配置,也能够同时配置两种策略。本文介绍的是 compact 这个策略,在 Kafka 应用过程中,用来寄存 commit offset 信息的的外部 Topic:__consumer_offsets 会被配置为该策略,一般 Topic 个别很少应用。

实现原理

对于一台 Kafka Broker,Log Compaction 的次要流程如下:

  1. 创立 LogCleaner,启动配置指定数量的 CleanerThread,负责该 Broker 的 Log Compaction。
  2. 每个 CleanerThread 循环执行日志清理工作,循环过程如下:

    1. 寻找一个待清理的 TopicPartition。
    2. 遍历该 TopicPartition 中所有待清理的 Segment,结构 OffsetMap 记录每个 key 最新的 offset 及对应的音讯工夫。
    3. 对该 TopicPartition 中所有待清理的 Segment 进行分组,保障每组 Segment 的 Log 文件总大小和 Index 文件总大小不会超过 LogConfig 容许的范畴并且每组 Segment 的 offset 极差不会超过Int.MaxValue(Kafka 中 Segment 内的绝对 offset 为整型,这个查看是为了防止绝对 offset 溢出)。
    4. 将 Segment 依照分好的组进行清理,每一组 Segment 聚合为一个新的 Segment。每组 Segment 的清理过程为:创立一个新的 Segment,而后依据 OffsetMap 中记录的信息抉择须要保留的音讯,存入新 Segment,最初应用新 Segment 笼罩这一组旧 Segment。
    5. 对于所有曾经执行实现 Compaction 流程并且 cleanup.policy 配置蕴含 compact 策略的 Log 进行删除,本次删除不是后面用一个新 Segment 替换一组旧 Segment 中的删除,而是调用 Log.deleteOldSegments。该办法会删除LogStartOffset 之前的所有 Segment,如果 cleanup.policy 配置同时还蕴含 delete 策略,也会删除超过保留工夫或保留大小限度的 Segment。

具体阐明

上面以 Q &A 的形式介绍日志清理的细节和逻辑:

多个 CleanerThread 是如何保障 Compaction 过程线程平安的?

Kafka 中 Log Compaction 的实现次要是 LogCleanerLogCleanerManager两个类,LogCleaner是 Compaction 工作的主类,负责整体的工作流程,LogCleanerManager负责 Compaction 状态机的治理。所有状态如下:

  1. None:TopicPartition 未清理状态。
  2. LogCleaningInProgress:清理正在进行中,当 TopicPartition 被选中为待清理 Log 时会变为该状态。
  3. LogCleaningAborted:清理停止,这是一个从 LogCleaningInProgressLogCleaningPaused(1)的中间状态,在内部产生 truncate、坏盘等状况时须要放弃当初正在的清理操作,终止后该 TopicPartition 会标记为 LogCleaningAborted
  4. LogCleaningPaused(i):清理暂停,i 的初始值为 1,这是一个重入的状态,即:如果以后状态是 LogCleaningPaused(i),再次暂停该 TopicPartition 的话,状态会变为LogCleaningPaused(i+1),从暂停状态复原的话,状态会变为LogCleaningPaused(i-1)(i-1=0 时间接变为None 状态)。会触发状态变为 LogCleaningPaused(i) 的状况如下(上面的“暂停”和“暂停复原”别离代表 i + 1 和 i – 1):

    1. Topic 配置 cleanup.policy 不蕴含compact:暂停
    2. 内部产生 truncate、坏盘等状况时该 TopicPartition 状态为 NoneLogCleaningPaused(i):暂停
    3. 本轮清理实现且该 TopicPartition 状态为LogCleaningAborted:暂停
    4. 触发暂停的操作实现:暂停复原

我了解停止状态和暂停状态的区别是:是否能够在本轮清理中复原。在内部产生 truncate、坏盘等状况时,如果一个 TopicPartition 没有处于清理过程中,能够标记为暂停,在触发暂停的状况完结后,复原清理流程就会从新执行清理流程。然而如果该 TopicPartition 处于清理过程中,则必须标记为终止,在触发暂停的状况完结后,即便本轮清理没完结,也必须要先标记为暂停,在下轮操作进行清理。这是因为如果曾经在清理过程中了,本轮清理会有一些两头态的信息,不容易从两头态进行复原。

对于每个 CleanerThread,每次都会通过 LogCleanerManager.grabFilthiestCompactedLog 办法来搜寻状态为 None 并且须要被清理的 TopicPartition 进行清理。LogCleanerManager中的所有状态变更都会加锁,保障状态机是线程平安的,多个 CleanerThread 通过该状态机保障了所有 TopicPartition 的 Compaction 过程是线程平安的。

如何决定哪些 TopicPartition 应该被清理(LogCleanerManager.grabFilthiestCompactedLog的具体流程)?

能够清理的 TopicPartition 限度条件有以下几个:

  • Topic 配置 cleanup.policy 蕴含compact
  • TopicPartition 的状态为空:阐明没有其余 CleanerThread 操作该 TopicPartition。
  • 该 TopicPartition 是能够清理的:因为坏盘等问题某些 TopicPartition 会被标记为不可清理,须要跳过。
  • 不须要提早清理:音讯在日志中必须存在 "max.compaction.lag.ms" 指定的工夫后能力被删除(详情可参考 KIP-354),只有第一个未压缩 Segment 的预计最早音讯工夫戳早于 "max.compaction.lag.ms" 才会能够进行 Compaction。
  • 须要被清理的音讯总大小大于 0:从 LogStartOffset 或 CleanCheckPoint 开始到满足 "max.compaction.lag.ms" 配置要求的 offset 之间的音讯总大小
  • 满足清理频率满足要求:无关清理频率的配置能够间接看上面源码 Doc

        public static final String MIN_COMPACTION_LAG_MS_CONFIG = "min.compaction.lag.ms";
        public static final String MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain" +
            "uncompacted in the log. Only applicable for logs that are being compacted.";
    
        public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms";
        public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain" +
            "ineligible for compaction in the log. Only applicable for logs that are being compacted.";
    
        public static final String MIN_CLEANABLE_DIRTY_RATIO_CONFIG = "min.cleanable.dirty.ratio";
        public static final String MIN_CLEANABLE_DIRTY_RATIO_DOC = "This configuration controls how frequently" +
            "the log compactor will attempt to clean the log (assuming <a href=\"#compaction\">log" +
            "compaction</a> is enabled). By default we will avoid cleaning a log where more than" +
            "50% of the log has been compacted. This ratio bounds the maximum space wasted in" +
            "the log by duplicates (at 50% at most 50% of the log could be duplicates). A" +
            "higher ratio will mean fewer, more efficient cleanings but will mean more wasted" +
            "space in the log. If the" + MAX_COMPACTION_LAG_MS_CONFIG + "or the" + MIN_COMPACTION_LAG_MS_CONFIG +
            "configurations are also specified, then the log compactor considers the log to be eligible for compaction" +
            "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted)" +
            "records for at least the" + MIN_COMPACTION_LAG_MS_CONFIG + "duration, or (ii) if the log has had" +
            "dirty (uncompacted) records for at most the" + MAX_COMPACTION_LAG_MS_CONFIG + "period.";

OffsetMap 的实现原理

OffsetMap 的实现类是SkimpyOffsetsMap,用来存储 message key 和 offset 的映射关系,用来保留雷同 key 下最新消息的 Offset。此处不介绍该类的具体实现(感兴趣自行浏览源码),只列举其特点:

  • 创立时须要指定两个参数,一个是 memory,用来指定存储 offset 的 ByteBuffer 大小,另一个是 hashAlgorithm,用来确定计算 key hash 时应用的哈希算法。
  • 只容许减少,不容许删除。
  • 只在 ByteBuffer 中存储须要记录的 offset,每次 put/get 都是先对 key hash 确定 position,而后间接批改 / 读取 ByteBuffer 中的内容。

我了解该实现的次要长处是防止存储 message key,缩小内存耗费。

如何从一组旧的 Segment 中过滤出须要保留的音讯以及过滤策略是怎么的?

过滤音讯的流程是,先将旧 Segment 中的音讯读入 MemoryRecords,而后应用 MemoryRecords.filterTo 办法进行过滤,该办法反对应用自定义实现的 RecordFilter 过滤音讯。

音讯过滤的策略(理论会有事务音讯的解决)是:过滤掉雷同 key 中非最新 offset 的音讯以及满足删除条件的 墓碑音讯 (key 不为 null 然而 value 为 null)。墓碑音讯的删除条件是指,该墓碑音讯所在的 Segment 最初批改工夫间隔最新 Segment 的最初批改工夫超过delete.retention.ms 配置的工夫。

如果 Compaction 过程中 Broker 解体,重启后如何复原?

宕机复原的思考只产生在用新 Segment(文件名后缀是.cleaned)替换旧 Segment 的过程中,其余阶段产生宕机的话,复原后从新执行 Compaction 流程即可。

Segment 替换操作应用 replaceSegments 办法(源码如下)实现,替换流程是:

  1. 将新 Segment 的文件名后缀从 .cleaned 改为.swap
  2. 删除旧 Segment:删除过程是先将同步将文件后缀改为.deleted,而后进行异步删除
  3. 去掉新 Segment 的文件名后缀,流程完结

上面是对于所有可能的阶段 Broker 解体后的复原逻辑:

  • 步骤 1 之前:如果此时 broker 解体,则清理和替换操作将停止,并且在 loadSegments() 中复原时删除 .cleaned 文件。
  • 步骤 1 执行过程中解体:新 Segment 重命名为 .swap。如果代理在所有 Segment 重命名为 .swap 之前解体,则清理和替换操作将停止 .cleaned 以及 .swap 文件在 loadSegments() 复原时被删除。.cleaned 重命名为 .swap 是依照文件按偏移量的降序进行的,复原时,所有偏移量大于最小偏移量 .clean文件的.swap 文件都将被删除。
  • 步骤 1 实现后解体:如果在所有新 Segment 重命名为 .swap 后代理解体,则操作实现,残余操作流程会在 Broker 复原时继续执行。
  • 步骤 2 执行过程中解体:旧 Segment 文件被重命名为 .deleted 并安顿了异步删除。如果 Broker 解体,任何留下的 .deleted 文件都会在 loadSegments() 复原时被删除,而后调用 replaceSegments() 以实现替换,其中新 Segment 从 .swap 文件从新创立,旧 Segment 蕴含在解体前未重命名的 Segment。
  • 步骤 3 实现后解体:此时可能存在未被异步删除实现的旧 Segment,任何可能留下的 .deleted 文件都会在 loadSegments() 复原时被删除
private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
    lock synchronized {val sortedNewSegments = newSegments.sortBy(_.baseOffset)
      // Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments
      // but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment()
      // multiple times for the same segment.
      val sortedOldSegments = oldSegments.filter(seg => segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset)

      checkIfMemoryMappedBufferClosed()
      // need to do this in two phases to be crash safe AND do the delete asynchronously
      // if we crash in the middle of this we complete the swap in loadSegments()
      if (!isRecoveredSwapFile)
        sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix))
      sortedNewSegments.reverse.foreach(addSegment(_))
      val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet

      // delete the old files
      sortedOldSegments.foreach { seg =>
        // remove the index entry
        if (seg.baseOffset != sortedNewSegments.head.baseOffset)
          segments.remove(seg.baseOffset)
        // delete segment files, but do not delete producer state for segment objects which are being replaced.
        deleteSegmentFiles(List(seg), asyncDelete = true, deleteProducerStateSnapshots = !newSegmentBaseOffsets.contains(seg.baseOffset))
      }
      // okay we are safe now, remove the swap suffix
      sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
    }
  }

Pulsar Compaction

Pulsar 官网文档中对于 Compaction 的介绍也比拟具体,具体能够参考:

  • https://pulsar.apache.org/doc…
  • https://pulsar.apache.org/doc…
  • https://github.com/ivankelly/…

在 Pulsar 中,Topic Compaction 与 NonCompaction 两个状态不是互相对抗的,Compaction 是通过相似于创立一个 Subscription 来生产现有 Topic 的音讯,通过 Compaction 解决后,写入新的 Ledger 存储,在 Consumer 生产时,能够通过配置抉择读取 Compacted 数据还是 NonCompacted 数据。留神:Compact 只能对 persistent topic 执行。

触发条件

Pulsar 中触发 Topic Compaction 的形式有两种:

  1. 配置 CompactionThreshold:如后面所说,Compaction 过程相当于建设一个 Subscription 来生产原来 Topic 中的数据写入新的 Ledger,Broker 会周期性查看所有 Persistent Topic,如果CompactionThreshold 配置不为 0 并且这个 Subscription 的音讯积压超过了配置的阈值,就会主动触发 Compaction。这个配置的粒度能够是 topic、broker、namespace,最终依据优先级(topic > broker > namespace)确定最终配置值。
  2. 内部触发 Topic Compaction:除主动触发 Compaction 外,也能够通过 CLI 工具触发,一种是 AdminCli,须要调用 broker 提供的 RestAPI,另一种是应用专用工具类,不通过 RestAPI 间接指定。
$ bin/pulsar-admin topics compact persistent://my-tenant/my-namespace/my-topic # AdminCli
$ bin/pulsar compact-topic --topic persistent://my-tenant-namespace/my-topic # 专用工具

实现原理

实现原理总体能够分为 Compaction 的处理过程和 Consumer 读取 Compacted 数据两个局部。

Compaction 解决

触发 Compaction 的对立入口是 Compactor.compact(String topic) 办法,Compactor是一个抽象类,目前的实现只有 TwoPhaseCompactor 一种,上面介绍的是 TwoPhaseCompactor 的实现逻辑。顾名思义,Compaction 过程分为两个阶段,遍历两次 Topic 内容,第一次遍历用来获取每个 key 中最新的 MessageId(相当于 Kafka 中的 offset),第二次遍历依据第一次遍历获得的后果,只将每个 key 的最新消息写入新的 Ledger 中。

读取 Compacted 数据

如果 Consumer 心愿读取 Compacted 数据,须要在初始化时制订相干配置。

Consumer<byte[]> compactedTopicConsumer = client.newConsumer()
        .topic("some-compacted-topic")
        .readCompacted(true)
        .subscribe();

Broker 在收到 Consumer 的申请后,会先获取到 Topic 对应的 cursor 信息,而后从 cursor 信息中找到 Compacted 数据对应的 LedgerId,而后进行对应数据的读取。

Compacted Leger 信息如何传递给 Consumer?

如前文所说,在 TwoPhaseCompactor 处理过程中,理论是创立了一个 Subscription 来读取原来的数据。在读取数据实现进行 Ack 时,应用的接口是 RawReader.acknowledgeCumulativeAsync(MessageId messageId, Map<String, Long> properties),该接口能够在 Ack 的同时给 Broker 返回一些该 Subscription 的元信息,Broker 会将收到的元信息记录在 cursor 信息中。所以,TwoPhaseCompactor 通过这一能力,将创立的用来存储 Compacted 数据的 LedgerId 记录在了 cursor 信息中,不便 Consumer 读取时应用。

总结思考

Kafka 和 Pulsar 中实现 Compaction 的目标是为了在特定的场景下缩小 Topic 的数据量,放慢获取 Topic 中全副数据的速度,而不是心愿实现相似 KV 存储的能力。之所以这样说,是因为:

  1. Kafka 和 Pulsar 都没有保障最新数据的 Compaction:对于 Kafka 来说,Log Compaction 只会操作 ActiveSegment 之前的数据;对于 Pulsar 来说,Compaction 是一个周期性执行的工作,每次 Compaction 开始之前,都会先读取以后 Topic 中最初一个 MessageId 作为本轮 Compaction 的起点,因而只有有 Producer 在向 Topic 生产数据。就必定不能保障所有数据都被 Compacted。
  2. Kafka 和 Pulsar 的 Compaction 都是肯定范畴内的,不是全局的:Kafka 和 Pulsar 的 Compaction 都是以一种相似于滑动窗口的过程进行,“key 雷同的状况下只保留最新的音讯”针对的是一轮 Compaction 内,如果两轮 Compaction 中有雷同 key 的音讯,是没有方法合并的。

比照 Kafka 和 Pulsar 两者的实现来看,最次要的区别就是是否保留 Compaction 前的数据。相较于 Kafka 的实现,Pulsar 这种模式一方面整体逻辑更为简略,不须要思考各种文件替换过程中的解体复原逻辑,另一方面也能够给 Consumer 更多抉择的空间,然而这样也会带来肯定的存储老本。两者的实现架构是不同的,如果想要 Kafka 也像 Pulsar 一样保留两份数据,因为 Kafka 的存储正本机制是本人的治理的,可能须要比当初更为简单的实现才可能搞定,而不能像 Pulsar 一样间接通过切换 LedgerId 来抉择数据就能够了。

正文完
 0