本文梳理次要梳理 Kafka 日志加载与复原的源码。(版本:2.8)
原文链接:http://fxbing.github.io/2022/...
日志治理:LogManager
LogManager 是 kafka 日志管理子系统的入口点。负责日志的创立、检索和清理。所有读取和写入操作都委托给各个日志实例。LogManager 在一个或多个目录中保护日志。在日志起码的数据目录中创立新日志。预先不会尝试挪动分区或依据大小或 I/O 速率进行均衡。后盾线程通过定期截断多余的日志段来解决日志保留。
LogManger 的启动次要包含三个局部:
- 日志加载与复原,即:loadLogs
- 各个定时工作启动,次要包含:
a. cleanupLogs:依据保留工夫和保留大小进行历史 segment 的清理
b. flushDirtyLogs:定时刷新还没有写到磁盘上日志
c. checkpointLogRecoveryOffsets:定时将所有数据目录所有日志的检查点写到检查点文件中
d. checkpointLogStartOffsets:将所有日志的以后日志开始偏移量写到日志目录中的文本文件中,以防止裸露已被 DeleteRecordsRequest 删除的数据
e. deleteLogs:定时删除标记为 delete 的日志文件 - 启动 LogCleaner,负责进行日志 compaction
本文次要对第一局部日志加载与复原进行梳理。
// visible for testingprivate[log] def startupWithConfigOverrides(topicConfigOverrides: Map[String, LogConfig]): Unit = { loadLogs(topicConfigOverrides) // this could take a while if shutdown was not clean /* Schedule the cleanup task to delete old logs */ if (scheduler != null) { info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) scheduler.schedule("kafka-log-retention", cleanupLogs _, delay = InitialTaskDelayMs, period = retentionCheckMs, TimeUnit.MILLISECONDS) info("Starting log flusher with a default period of %d ms.".format(flushCheckMs)) scheduler.schedule("kafka-log-flusher", flushDirtyLogs _, delay = InitialTaskDelayMs, period = flushCheckMs, TimeUnit.MILLISECONDS) scheduler.schedule("kafka-recovery-point-checkpoint", checkpointLogRecoveryOffsets _, delay = InitialTaskDelayMs, period = flushRecoveryOffsetCheckpointMs, TimeUnit.MILLISECONDS) scheduler.schedule("kafka-log-start-offset-checkpoint", checkpointLogStartOffsets _, delay = InitialTaskDelayMs, period = flushStartOffsetCheckpointMs, TimeUnit.MILLISECONDS) scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period deleteLogs _, delay = InitialTaskDelayMs, unit = TimeUnit.MILLISECONDS) } if (cleanerConfig.enableCleaner) { _cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time) _cleaner.startup() }}
全副日志加载与复原:loadLogs
所有日志的加载与复原的流程次要蕴含以下几步:
- 加载并记录日志文件夹中标记状态信息的文件(kafka_cleanshutdown、recovery-point-offset-checkpoint、recovery-point-offset-checkpoint)
- 并发对每个 tp 的日志进行加载与复原(下一大节详解)
记录并异步解决有问题的日志文件夹
/** * Recover and load all logs in the given data directories */private[log] def loadLogs(topicConfigOverrides: Map[String, LogConfig]): Unit = { // 对所有可用的日志目录(liveLogDirs)进行加载,kafka server 启动时可能配置多个磁盘目录用来存储日志文件,然而不肯定所有的磁盘都是可用的 info(s"Loading logs from log dirs $liveLogDirs") val startMs = time.hiResClockMs() val threadPools = ArrayBuffer.empty[ExecutorService] val offlineDirs = mutable.Set.empty[(String, IOException)] val jobs = ArrayBuffer.empty[Seq[Future[_]]] var numTotalLogs = 0 // 遍历所有的磁盘,进行日志加载与复原,如果呈现 IOException,则将该目录记录到 offlineDirs 中进行后续解决 for (dir <- liveLogDirs) { val logDirAbsolutePath = dir.getAbsolutePath var hadCleanShutdown: Boolean = false try { val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) threadPools.append(pool) // 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不须要进行日志复原的流程。 val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) if (cleanShutdownFile.exists) { info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found") // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471 Files.deleteIfExists(cleanShutdownFile.toPath) hadCleanShutdown = true } else { // log recovery itself is being performed by `Log` class during initialization info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found") } // 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint var recoveryPoints = Map[TopicPartition, Long]() try { recoveryPoints = this.recoveryPointCheckpoints(dir).read() } catch { case e: Exception => warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " + s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e) } // 从 log-start-offset-checkpoint 文件读取所有 tp 目录的 logStartOffset var logStartOffsets = Map[TopicPartition, Long]() try { logStartOffsets = this.logStartOffsetCheckpoints(dir).read() } catch { case e: Exception => warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " + s"$logDirAbsolutePath, resetting to the base offset of the first segment", e) } // 日志的加载与复原主流程,并发对所有 tp 的日志执行 loadLog val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir => logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { try { debug(s"Loading log $logDir") val logLoadStartMs = time.hiResClockMs() val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, topicConfigOverrides) val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs val currentNumLoaded = numLogsLoaded.incrementAndGet() info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " + s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") } catch { case e: IOException => offlineDirs.add((logDirAbsolutePath, e)) error(s"Error while loading log dir $logDirAbsolutePath", e) } } runnable } jobs += jobsForDir.map(pool.submit) } catch { case e: IOException => offlineDirs.add((logDirAbsolutePath, e)) error(s"Error while loading log dir $logDirAbsolutePath", e) } } try { // 期待所有并发执行的日志加载流程执行实现 for (dirJobs <- jobs) { dirJobs.foreach(_.get) } // 记录所有有问题的的目录,后续该目录会被 ReplicaManager 执行下线操作 offlineDirs.foreach { case (dir, e) => logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e) } } catch { case e: ExecutionException => error(s"There was an error in one of the threads during logs loading: ${e.getCause}") throw e.getCause } finally { threadPools.foreach(_.shutdown()) } info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")}
单 tp 日志加载与复原
单个 tp 的日志加载与复原是在 Log 类的动态代码块中进行的。如果该 tp 的文件夹的后缀为-delete,则认为该 tp 为待删除的,退出到 logsToBeDeleted 汇合中期待定时工作对其进行清理。
Log 类的动态代码块中通过 loadSegments 加载日志
private def loadSegments(): Long = { // 清理临时文件(.delete 和 .clean 后缀)并保留可用的 swap 文件 val swapFiles = removeTempFilesAndCollectSwapFiles() // retryOnOffsetOverflow 兜住可能产生的 LogSegmentOffsetOverflowException 异样,并进行日志切分解决。 retryOnOffsetOverflow { // 加载文件的中的所有文件并进行必要的完整性检查 logSegments.foreach(_.close()) segments.clear() loadSegmentFiles() } // 依据 swap 文件复原实现所有被中断的操作 completeSwapOperations(swapFiles) // 如果不是待删除的 tp 日志,执行 recover 流程 if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) { val nextOffset = retryOnOffsetOverflow { recoverLog() } // reset the index size of the currently active log segment to allow more entries activeSegment.resizeIndexes(config.maxIndexSize) nextOffset } else { if (logSegments.isEmpty) { addSegment(LogSegment.open(dir = dir, baseOffset = 0, config, time = time, initFileSize = this.initFileSize)) } 0 }}
recoverLog 的外围代码如下:
// if we have the clean shutdown marker, skip recovery// 只有未进行 cleanshutdown 的状况下才须要 recoveryif (!hadCleanShutdown) { // 取出 recoveryPoint 之后的所有 segment(失常状况下只有一个) val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator var truncated = false while (unflushed.hasNext && !truncated) { val segment = unflushed.next() info(s"Recovering unflushed segment ${segment.baseOffset}") val truncatedBytes = try { // 清空 segment 对应的 index,一一 batch 读取校验数据,并从新结构index recoverSegment(segment, leaderEpochCache) } catch { case _: InvalidOffsetException => val startOffset = segment.baseOffset warn("Found invalid offset during recovery. Deleting the corrupt segment and " + s"creating an empty one with starting offset $startOffset") segment.truncateTo(startOffset) } if (truncatedBytes > 0) { // 如果前一个 segment 执行了 truncate, 则之后的所有 segment 间接删除 // unflushed 为迭代器,所以 unflushed.toList 代表的是所有未遍历到的 segment,而不是全副 segment warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}") removeAndDeleteSegments(unflushed.toList, asyncDelete = true, reason = LogRecovery) truncated = true } }}