本文梳理次要梳理 Kafka 日志加载与复原的源码。(版本:2.8)

原文链接:http://fxbing.github.io/2022/...

日志治理:LogManager

LogManager 是 kafka 日志管理子系统的入口点。负责日志的创立、检索和清理。所有读取和写入操作都委托给各个日志实例。LogManager 在一个或多个目录中保护日志。在日志起码的数据目录中创立新日志。预先不会尝试挪动分区或依据大小或 I/O 速率进行均衡。后盾线程通过定期截断多余的日志段来解决日志保留。

LogManger 的启动次要包含三个局部:

  1. 日志加载与复原,即:loadLogs
  2. 各个定时工作启动,次要包含:
    a. cleanupLogs:依据保留工夫和保留大小进行历史 segment 的清理
    b. flushDirtyLogs:定时刷新还没有写到磁盘上日志
    c. checkpointLogRecoveryOffsets:定时将所有数据目录所有日志的检查点写到检查点文件中
    d. checkpointLogStartOffsets:将所有日志的以后日志开始偏移量写到日志目录中的文本文件中,以防止裸露已被 DeleteRecordsRequest 删除的数据
    e. deleteLogs:定时删除标记为 delete 的日志文件
  3. 启动 LogCleaner,负责进行日志 compaction

本文次要对第一局部日志加载与复原进行梳理。

// visible for testingprivate[log] def startupWithConfigOverrides(topicConfigOverrides: Map[String, LogConfig]): Unit = {  loadLogs(topicConfigOverrides) // this could take a while if shutdown was not clean  /* Schedule the cleanup task to delete old logs */  if (scheduler != null) {    info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))    scheduler.schedule("kafka-log-retention",                       cleanupLogs _,                       delay = InitialTaskDelayMs,                       period = retentionCheckMs,                       TimeUnit.MILLISECONDS)    info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))    scheduler.schedule("kafka-log-flusher",                       flushDirtyLogs _,                       delay = InitialTaskDelayMs,                       period = flushCheckMs,                       TimeUnit.MILLISECONDS)    scheduler.schedule("kafka-recovery-point-checkpoint",                       checkpointLogRecoveryOffsets _,                       delay = InitialTaskDelayMs,                       period = flushRecoveryOffsetCheckpointMs,                       TimeUnit.MILLISECONDS)    scheduler.schedule("kafka-log-start-offset-checkpoint",                       checkpointLogStartOffsets _,                       delay = InitialTaskDelayMs,                       period = flushStartOffsetCheckpointMs,                       TimeUnit.MILLISECONDS)    scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period                       deleteLogs _,                       delay = InitialTaskDelayMs,                       unit = TimeUnit.MILLISECONDS)  }  if (cleanerConfig.enableCleaner) {    _cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)    _cleaner.startup()  }}

全副日志加载与复原:loadLogs

所有日志的加载与复原的流程次要蕴含以下几步:

  1. 加载并记录日志文件夹中标记状态信息的文件(kafka_cleanshutdown、recovery-point-offset-checkpoint、recovery-point-offset-checkpoint)
  2. 并发对每个 tp 的日志进行加载与复原(下一大节详解)
  3. 记录并异步解决有问题的日志文件夹

    /** * Recover and load all logs in the given data directories */private[log] def loadLogs(topicConfigOverrides: Map[String, LogConfig]): Unit = {  // 对所有可用的日志目录(liveLogDirs)进行加载,kafka server 启动时可能配置多个磁盘目录用来存储日志文件,然而不肯定所有的磁盘都是可用的  info(s"Loading logs from log dirs $liveLogDirs")  val startMs = time.hiResClockMs()  val threadPools = ArrayBuffer.empty[ExecutorService]  val offlineDirs = mutable.Set.empty[(String, IOException)]  val jobs = ArrayBuffer.empty[Seq[Future[_]]]  var numTotalLogs = 0  // 遍历所有的磁盘,进行日志加载与复原,如果呈现 IOException,则将该目录记录到 offlineDirs 中进行后续解决  for (dir <- liveLogDirs) { val logDirAbsolutePath = dir.getAbsolutePath var hadCleanShutdown: Boolean = false try {   val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)   threadPools.append(pool)   // 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不须要进行日志复原的流程。   val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)   if (cleanShutdownFile.exists) {     info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")     // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile     // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471     Files.deleteIfExists(cleanShutdownFile.toPath)     hadCleanShutdown = true   } else {     // log recovery itself is being performed by `Log` class during initialization     info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")   }   // 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint   var recoveryPoints = Map[TopicPartition, Long]()   try {     recoveryPoints = this.recoveryPointCheckpoints(dir).read()   } catch {     case e: Exception =>       warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +         s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)   }   // 从 log-start-offset-checkpoint 文件读取所有 tp 目录的 logStartOffset   var logStartOffsets = Map[TopicPartition, Long]()   try {     logStartOffsets = this.logStartOffsetCheckpoints(dir).read()   } catch {     case e: Exception =>       warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +         s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)   }   // 日志的加载与复原主流程,并发对所有 tp 的日志执行 loadLog   val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>     logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)   val numLogsLoaded = new AtomicInteger(0)   numTotalLogs += logsToLoad.length   val jobsForDir = logsToLoad.map { logDir =>     val runnable: Runnable = () => {       try {         debug(s"Loading log $logDir")         val logLoadStartMs = time.hiResClockMs()         val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, topicConfigOverrides)         val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs         val currentNumLoaded = numLogsLoaded.incrementAndGet()         info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " +           s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")       } catch {         case e: IOException =>           offlineDirs.add((logDirAbsolutePath, e))           error(s"Error while loading log dir $logDirAbsolutePath", e)       }     }     runnable   }   jobs += jobsForDir.map(pool.submit) } catch {   case e: IOException =>     offlineDirs.add((logDirAbsolutePath, e))     error(s"Error while loading log dir $logDirAbsolutePath", e) }  }  try { // 期待所有并发执行的日志加载流程执行实现 for (dirJobs <- jobs) {   dirJobs.foreach(_.get) } // 记录所有有问题的的目录,后续该目录会被 ReplicaManager 执行下线操作 offlineDirs.foreach { case (dir, e) =>   logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e) }  } catch { case e: ExecutionException =>   error(s"There was an error in one of the threads during logs loading: ${e.getCause}")   throw e.getCause  } finally { threadPools.foreach(_.shutdown())  }  info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")}

单 tp 日志加载与复原

单个 tp 的日志加载与复原是在 Log 类的动态代码块中进行的。如果该 tp 的文件夹的后缀为-delete,则认为该 tp 为待删除的,退出到 logsToBeDeleted 汇合中期待定时工作对其进行清理。
Log 类的动态代码块中通过 loadSegments 加载日志

private def loadSegments(): Long = {  // 清理临时文件(.delete 和 .clean 后缀)并保留可用的 swap 文件  val swapFiles = removeTempFilesAndCollectSwapFiles()  // retryOnOffsetOverflow 兜住可能产生的 LogSegmentOffsetOverflowException 异样,并进行日志切分解决。  retryOnOffsetOverflow {    // 加载文件的中的所有文件并进行必要的完整性检查    logSegments.foreach(_.close())    segments.clear()    loadSegmentFiles()  }  // 依据 swap 文件复原实现所有被中断的操作  completeSwapOperations(swapFiles)  // 如果不是待删除的 tp 日志,执行 recover 流程  if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {    val nextOffset = retryOnOffsetOverflow {      recoverLog()    }    // reset the index size of the currently active log segment to allow more entries    activeSegment.resizeIndexes(config.maxIndexSize)    nextOffset  } else {     if (logSegments.isEmpty) {        addSegment(LogSegment.open(dir = dir,          baseOffset = 0,          config,          time = time,          initFileSize = this.initFileSize))     }    0  }}

recoverLog 的外围代码如下:

// if we have the clean shutdown marker, skip recovery// 只有未进行 cleanshutdown 的状况下才须要 recoveryif (!hadCleanShutdown) {  // 取出 recoveryPoint 之后的所有 segment(失常状况下只有一个)  val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator  var truncated = false  while (unflushed.hasNext && !truncated) {    val segment = unflushed.next()    info(s"Recovering unflushed segment ${segment.baseOffset}")    val truncatedBytes =      try {        // 清空 segment 对应的 index,一一 batch 读取校验数据,并从新结构index        recoverSegment(segment, leaderEpochCache)      } catch {        case _: InvalidOffsetException =>          val startOffset = segment.baseOffset          warn("Found invalid offset during recovery. Deleting the corrupt segment and " +            s"creating an empty one with starting offset $startOffset")          segment.truncateTo(startOffset)      }    if (truncatedBytes > 0) {      // 如果前一个 segment 执行了 truncate, 则之后的所有 segment 间接删除      // unflushed 为迭代器,所以 unflushed.toList 代表的是所有未遍历到的 segment,而不是全副 segment      warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")      removeAndDeleteSegments(unflushed.toList,        asyncDelete = true,        reason = LogRecovery)      truncated = true    }  }}