关于kafka:Kafka-源码学习日志加载与恢复

2次阅读

共计 7445 个字符,预计需要花费 19 分钟才能阅读完成。

本文梳理次要梳理 Kafka 日志加载与复原的源码。(版本:2.8)

原文链接:http://fxbing.github.io/2022/…

日志治理:LogManager

LogManager 是 kafka 日志管理子系统的入口点。负责日志的创立、检索和清理。所有读取和写入操作都委托给各个日志实例。LogManager 在一个或多个目录中保护日志。在日志起码的数据目录中创立新日志。预先不会尝试挪动分区或依据大小或 I/O 速率进行均衡。后盾线程通过定期截断多余的日志段来解决日志保留。

LogManger 的启动次要包含三个局部:

  1. 日志加载与复原,即:loadLogs
  2. 各个定时工作启动,次要包含:
    a. cleanupLogs:依据保留工夫和保留大小进行历史 segment 的清理
    b. flushDirtyLogs:定时刷新还没有写到磁盘上日志
    c. checkpointLogRecoveryOffsets:定时将所有数据目录所有日志的检查点写到检查点文件中
    d. checkpointLogStartOffsets:将所有日志的以后日志开始偏移量写到日志目录中的文本文件中,以防止裸露已被 DeleteRecordsRequest 删除的数据
    e. deleteLogs:定时删除标记为 delete 的日志文件
  3. 启动 LogCleaner,负责进行日志 compaction

本文次要对第一局部日志加载与复原进行梳理。

// visible for testing
private[log] def startupWithConfigOverrides(topicConfigOverrides: Map[String, LogConfig]): Unit = {loadLogs(topicConfigOverrides) // this could take a while if shutdown was not clean

  /* Schedule the cleanup task to delete old logs */
  if (scheduler != null) {info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
    scheduler.schedule("kafka-log-retention",
                       cleanupLogs _,
                       delay = InitialTaskDelayMs,
                       period = retentionCheckMs,
                       TimeUnit.MILLISECONDS)
    info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
    scheduler.schedule("kafka-log-flusher",
                       flushDirtyLogs _,
                       delay = InitialTaskDelayMs,
                       period = flushCheckMs,
                       TimeUnit.MILLISECONDS)
    scheduler.schedule("kafka-recovery-point-checkpoint",
                       checkpointLogRecoveryOffsets _,
                       delay = InitialTaskDelayMs,
                       period = flushRecoveryOffsetCheckpointMs,
                       TimeUnit.MILLISECONDS)
    scheduler.schedule("kafka-log-start-offset-checkpoint",
                       checkpointLogStartOffsets _,
                       delay = InitialTaskDelayMs,
                       period = flushStartOffsetCheckpointMs,
                       TimeUnit.MILLISECONDS)
    scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                       deleteLogs _,
                       delay = InitialTaskDelayMs,
                       unit = TimeUnit.MILLISECONDS)
  }
  if (cleanerConfig.enableCleaner) {_cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
    _cleaner.startup()}
}

全副日志加载与复原:loadLogs

所有日志的加载与复原的流程次要蕴含以下几步:

  1. 加载并记录日志文件夹中标记状态信息的文件(kafka_cleanshutdown、recovery-point-offset-checkpoint、recovery-point-offset-checkpoint)
  2. 并发对每个 tp 的日志进行加载与复原(下一大节详解)
  3. 记录并异步解决有问题的日志文件夹

    /**
     * Recover and load all logs in the given data directories
     */
    private[log] def loadLogs(topicConfigOverrides: Map[String, LogConfig]): Unit = {
      // 对所有可用的日志目录(liveLogDirs)进行加载,kafka server 启动时可能配置多个磁盘目录用来存储日志文件,然而不肯定所有的磁盘都是可用的
      info(s"Loading logs from log dirs $liveLogDirs")
      val startMs = time.hiResClockMs()
      val threadPools = ArrayBuffer.empty[ExecutorService]
      val offlineDirs = mutable.Set.empty[(String, IOException)]
      val jobs = ArrayBuffer.empty[Seq[Future[_]]]
      var numTotalLogs = 0
    
      // 遍历所有的磁盘,进行日志加载与复原,如果呈现 IOException,则将该目录记录到 offlineDirs 中进行后续解决
      for (dir <- liveLogDirs) {
     val logDirAbsolutePath = dir.getAbsolutePath
     var hadCleanShutdown: Boolean = false
     try {val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
       threadPools.append(pool)
    
       // 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不须要进行日志复原的流程。val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
       if (cleanShutdownFile.exists) {info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
         // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
         // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
         Files.deleteIfExists(cleanShutdownFile.toPath)
         hadCleanShutdown = true
       } else {
         // log recovery itself is being performed by `Log` class during initialization
         info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
       }
    
       // 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint
       var recoveryPoints = Map[TopicPartition, Long]()
       try {recoveryPoints = this.recoveryPointCheckpoints(dir).read()} catch {
         case e: Exception =>
           warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory" +
             s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
       }
    
       // 从 log-start-offset-checkpoint 文件读取所有 tp 目录的 logStartOffset
       var logStartOffsets = Map[TopicPartition, Long]()
       try {logStartOffsets = this.logStartOffsetCheckpoints(dir).read()} catch {
         case e: Exception =>
           warn(s"Error occurred while reading log-start-offset-checkpoint file of directory" +
             s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
       }
    
       // 日志的加载与复原主流程,并发对所有 tp 的日志执行 loadLog
       val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
         logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
       val numLogsLoaded = new AtomicInteger(0)
       numTotalLogs += logsToLoad.length
    
       val jobsForDir = logsToLoad.map { logDir =>
         val runnable: Runnable = () => {
           try {debug(s"Loading log $logDir")
    
             val logLoadStartMs = time.hiResClockMs()
             val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, topicConfigOverrides)
             val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
             val currentNumLoaded = numLogsLoaded.incrementAndGet()
    
             info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms" +
               s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
           } catch {
             case e: IOException =>
               offlineDirs.add((logDirAbsolutePath, e))
               error(s"Error while loading log dir $logDirAbsolutePath", e)
           }
         }
         runnable
       }
    
       jobs += jobsForDir.map(pool.submit)
     } catch {
       case e: IOException =>
         offlineDirs.add((logDirAbsolutePath, e))
         error(s"Error while loading log dir $logDirAbsolutePath", e)
     }
      }
    
      try {
     // 期待所有并发执行的日志加载流程执行实现
     for (dirJobs <- jobs) {dirJobs.foreach(_.get)
     }
     // 记录所有有问题的的目录,后续该目录会被 ReplicaManager 执行下线操作
     offlineDirs.foreach {case (dir, e) =>
       logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e)
     }
      } catch {
     case e: ExecutionException =>
       error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
       throw e.getCause
      } finally {threadPools.foreach(_.shutdown())
      }
    
      info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
    }

单 tp 日志加载与复原

单个 tp 的日志加载与复原是在 Log 类的动态代码块中进行的。如果该 tp 的文件夹的后缀为 -delete,则认为该 tp 为待删除的,退出到 logsToBeDeleted 汇合中期待定时工作对其进行清理。
Log 类的动态代码块中通过 loadSegments 加载日志

private def loadSegments(): Long = {
  // 清理临时文件(.delete 和 .clean 后缀)并保留可用的 swap 文件
  val swapFiles = removeTempFilesAndCollectSwapFiles()

  // retryOnOffsetOverflow 兜住可能产生的 LogSegmentOffsetOverflowException 异样,并进行日志切分解决。retryOnOffsetOverflow {
    // 加载文件的中的所有文件并进行必要的完整性检查
    logSegments.foreach(_.close())
    segments.clear()
    loadSegmentFiles()}

  // 依据 swap 文件复原实现所有被中断的操作
  completeSwapOperations(swapFiles)

  // 如果不是待删除的 tp 日志,执行 recover 流程
  if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
    val nextOffset = retryOnOffsetOverflow {recoverLog()
    }

    // reset the index size of the currently active log segment to allow more entries
    activeSegment.resizeIndexes(config.maxIndexSize)
    nextOffset
  } else {if (logSegments.isEmpty) {
        addSegment(LogSegment.open(dir = dir,
          baseOffset = 0,
          config,
          time = time,
          initFileSize = this.initFileSize))
     }
    0
  }
}

recoverLog 的外围代码如下:

// if we have the clean shutdown marker, skip recovery
// 只有未进行 cleanshutdown 的状况下才须要 recovery
if (!hadCleanShutdown) {
  // 取出 recoveryPoint 之后的所有 segment(失常状况下只有一个)val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
  var truncated = false

  while (unflushed.hasNext && !truncated) {val segment = unflushed.next()
    info(s"Recovering unflushed segment ${segment.baseOffset}")
    val truncatedBytes =
      try {
        // 清空 segment 对应的 index,一一 batch 读取校验数据,并从新结构 index
        recoverSegment(segment, leaderEpochCache)
      } catch {
        case _: InvalidOffsetException =>
          val startOffset = segment.baseOffset
          warn("Found invalid offset during recovery. Deleting the corrupt segment and" +
            s"creating an empty one with starting offset $startOffset")
          segment.truncateTo(startOffset)
      }
    if (truncatedBytes > 0) {
      // 如果前一个 segment 执行了 truncate,则之后的所有 segment 间接删除
      // unflushed 为迭代器,所以 unflushed.toList 代表的是所有未遍历到的 segment,而不是全副 segment
      warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
      removeAndDeleteSegments(unflushed.toList,
        asyncDelete = true,
        reason = LogRecovery)
      truncated = true
    }
  }
}
正文完
 0