乐趣区

关于kafka:kafka之二kafka日志管理

本文次要内容有:

  1. 日志治理相干实现类
  2. 日志刷盘
  3. 日志清理

日志治理相干实现类


kafka 的日志治理是通过 LogManager 类实现的,它的次要作用是日志的创立、检索、清理。

kafkaServer 启动 LogManager 线程

LogManager 线程是在 kafka 节点服务启动的时候启动的,代码如下:

//kafka.server.KafkaServer
def startup() {
  try {info("starting")
    /* start log manager */
    // 创立 LogManager 对象
    logManager = createLogManager(zkUtils.zkClient, brokerState)
    // 启动 LogManager 线程
    logManager.startup()}
  catch {
    case e: Throwable =>
    fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
    isStartingUp.set(false)
    shutdown()
    throw e
  }
}

阐明,初始化 LogManager 的过程中会加载所有日志分区,对应办法为 loadLogs(), 在加载日志分区过程中会调用 loadSegments()加载该分区所有的 Segment 文件,应用是的线程池进行加载的。

LogManager 启动

在所有分区日志都加载实现后,KafkaServer 调用 startup()办法启动 LogManager 线程,在这个过程中会启动四个定时工作。

def startup() {
  /* Schedule the cleanup task to delete old logs */
  if(scheduler != null) {
    //note: 定时清理过期的日志 segment, 并保护日志的大小
    info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
    scheduler.schedule("kafka-log-retention",
                       cleanupLogs,
                       delay = InitialTaskDelayMs,
                       period = retentionCheckMs,
                       TimeUnit.MILLISECONDS)
    //note: 定时刷新还没有写到磁盘上日志
    info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
    scheduler.schedule("kafka-log-flusher",
                       flushDirtyLogs,
                       delay = InitialTaskDelayMs,
                       period = flushCheckMs,
                       TimeUnit.MILLISECONDS)
    //note: 定时将所有数据目录所有日志的检查点写到检查点文件中
    scheduler.schedule("kafka-recovery-point-checkpoint",
                       checkpointRecoveryPointOffsets,
                       delay = InitialTaskDelayMs,
                       period = flushCheckpointMs,
                       TimeUnit.MILLISECONDS)
    //note: 定时删除标记为 delete 的日志文件
    scheduler.schedule("kafka-delete-logs",
                       deleteLogs,
                       delay = InitialTaskDelayMs,
                       period = defaultConfig.fileDeleteDelayMs,
                       TimeUnit.MILLISECONDS)
  }
  //note: 如果设置为 true,主动清理 compaction 类型的 topic
  if(cleanerConfig.enableCleaner)
    cleaner.startup()}

四个定时工作:

  1. cleanupLogs,定时清理过期日志 segment,并保护日志大小,默认 5min 执行一次
  2. flushDirtyLogs,定时刷新还没写到磁盘上数据
  3. checkpointRecoveryPointOffsets,定时将所有日志的 checkpoint 写到 checkpoint 文件中,默认 60s 执行一次
  4. deleteLogs,定时删除标记为 delete 的日志文件,默认 30s 执行一次。

checkpoint 文件
在 LogManager 中有一个十分重要的文件 —checkpoint 文件:

  1. 创立 LogManager 时会读取 checkpoint 文件,并将每个分区对应的 checkpoint 作为日志的复原点(recoveryPoint),最初创立分区对应的日志实例
  2. 在将日志刷盘时,将最新的偏移量作为日志的 checkpoint 进行更新
  3. LogManager 启动一个定时工作,定时读取所有日志的检查点,并写入全局的检查点文件

日志刷盘


在 linux 零碎中,当数据写入到文件系统后,数据其实在操作系统的 page cache 里,只有执行了刷盘后数据才会写到磁盘里。
在下面提到的定时工作 flushDirtyLogs 里,会定时将页面缓存中的数据刷新到磁盘中,kafka 的刷盘策略有两种:

  1. 工夫策略,通过 log.flush.interval.ms 进行配置,默认为无限大。
  2. 大小策略,通过 log.flush.interval.messages 进行配置,当数据超过这个值时进行刷盘。

须要提一下的是,定时工作里只会依据工夫策略进行判断是否刷盘,依据大小判断是在 append 追加日志时进行的判断:

def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
  // now append to the log
  segment.append(firstOffset = appendInfo.firstOffset,
    largestOffset = appendInfo.lastOffset,
    largestTimestamp = appendInfo.maxTimestamp,
    shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
    records = validRecords)

  // increment the log end offset
  updateLogEndOffset(appendInfo.lastOffset + 1)

  trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
    .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))

  if (unflushedMessages >= config.flushInterval)
    flush()}

日志清理


为了保障分区总大小不超过阈值(log.retention.bytes),LogManager 会定时清理旧数据。不过个别状况下是通过配置 log.retention.hours 来配置 segment 的保留工夫的。

清理旧日志次要有两种:

  1. 删除,超过工夫或大小阈值的旧 segment 间接进行删除
  2. 压缩,不是删除在,是是采纳合并压缩的形式进行
退出移动版