本文梳理次要梳理 Kafka 日志加载与复原的源码。(版本:2.8)
原文链接:http://fxbing.github.io/2022/…
日志治理:LogManager
LogManager 是 kafka 日志管理子系统的入口点。负责日志的创立、检索和清理。所有读取和写入操作都委托给各个日志实例。LogManager 在一个或多个目录中保护日志。在日志起码的数据目录中创立新日志。预先不会尝试挪动分区或依据大小或 I/O 速率进行均衡。后盾线程通过定期截断多余的日志段来解决日志保留。
LogManger 的启动次要包含三个局部:
- 日志加载与复原,即:loadLogs
- 各个定时工作启动,次要包含:
a. cleanupLogs:依据保留工夫和保留大小进行历史 segment 的清理
b. flushDirtyLogs:定时刷新还没有写到磁盘上日志
c. checkpointLogRecoveryOffsets:定时将所有数据目录所有日志的检查点写到检查点文件中
d. checkpointLogStartOffsets:将所有日志的以后日志开始偏移量写到日志目录中的文本文件中,以防止裸露已被 DeleteRecordsRequest 删除的数据
e. deleteLogs:定时删除标记为 delete 的日志文件 - 启动 LogCleaner,负责进行日志 compaction
本文次要对第一局部日志加载与复原进行梳理。
// visible for testing
private[log] def startupWithConfigOverrides(topicConfigOverrides: Map[String, LogConfig]): Unit = {loadLogs(topicConfigOverrides) // this could take a while if shutdown was not clean
/* Schedule the cleanup task to delete old logs */
if (scheduler != null) {info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs _,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs _,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointLogRecoveryOffsets _,
delay = InitialTaskDelayMs,
period = flushRecoveryOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-log-start-offset-checkpoint",
checkpointLogStartOffsets _,
delay = InitialTaskDelayMs,
period = flushStartOffsetCheckpointMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
}
if (cleanerConfig.enableCleaner) {_cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time)
_cleaner.startup()}
}
全副日志加载与复原:loadLogs
所有日志的加载与复原的流程次要蕴含以下几步:
- 加载并记录日志文件夹中标记状态信息的文件(kafka_cleanshutdown、recovery-point-offset-checkpoint、recovery-point-offset-checkpoint)
- 并发对每个 tp 的日志进行加载与复原(下一大节详解)
-
记录并异步解决有问题的日志文件夹
/** * Recover and load all logs in the given data directories */ private[log] def loadLogs(topicConfigOverrides: Map[String, LogConfig]): Unit = { // 对所有可用的日志目录(liveLogDirs)进行加载,kafka server 启动时可能配置多个磁盘目录用来存储日志文件,然而不肯定所有的磁盘都是可用的 info(s"Loading logs from log dirs $liveLogDirs") val startMs = time.hiResClockMs() val threadPools = ArrayBuffer.empty[ExecutorService] val offlineDirs = mutable.Set.empty[(String, IOException)] val jobs = ArrayBuffer.empty[Seq[Future[_]]] var numTotalLogs = 0 // 遍历所有的磁盘,进行日志加载与复原,如果呈现 IOException,则将该目录记录到 offlineDirs 中进行后续解决 for (dir <- liveLogDirs) { val logDirAbsolutePath = dir.getAbsolutePath var hadCleanShutdown: Boolean = false try {val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) threadPools.append(pool) // 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不须要进行日志复原的流程。val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) if (cleanShutdownFile.exists) {info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found") // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471 Files.deleteIfExists(cleanShutdownFile.toPath) hadCleanShutdown = true } else { // log recovery itself is being performed by `Log` class during initialization info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found") } // 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint var recoveryPoints = Map[TopicPartition, Long]() try {recoveryPoints = this.recoveryPointCheckpoints(dir).read()} catch { case e: Exception => warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory" + s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e) } // 从 log-start-offset-checkpoint 文件读取所有 tp 目录的 logStartOffset var logStartOffsets = Map[TopicPartition, Long]() try {logStartOffsets = this.logStartOffsetCheckpoints(dir).read()} catch { case e: Exception => warn(s"Error occurred while reading log-start-offset-checkpoint file of directory" + s"$logDirAbsolutePath, resetting to the base offset of the first segment", e) } // 日志的加载与复原主流程,并发对所有 tp 的日志执行 loadLog val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir => logDir.isDirectory && Log.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic) val numLogsLoaded = new AtomicInteger(0) numTotalLogs += logsToLoad.length val jobsForDir = logsToLoad.map { logDir => val runnable: Runnable = () => { try {debug(s"Loading log $logDir") val logLoadStartMs = time.hiResClockMs() val log = loadLog(logDir, hadCleanShutdown, recoveryPoints, logStartOffsets, topicConfigOverrides) val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs val currentNumLoaded = numLogsLoaded.incrementAndGet() info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms" + s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)") } catch { case e: IOException => offlineDirs.add((logDirAbsolutePath, e)) error(s"Error while loading log dir $logDirAbsolutePath", e) } } runnable } jobs += jobsForDir.map(pool.submit) } catch { case e: IOException => offlineDirs.add((logDirAbsolutePath, e)) error(s"Error while loading log dir $logDirAbsolutePath", e) } } try { // 期待所有并发执行的日志加载流程执行实现 for (dirJobs <- jobs) {dirJobs.foreach(_.get) } // 记录所有有问题的的目录,后续该目录会被 ReplicaManager 执行下线操作 offlineDirs.foreach {case (dir, e) => logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while loading log dir $dir", e) } } catch { case e: ExecutionException => error(s"There was an error in one of the threads during logs loading: ${e.getCause}") throw e.getCause } finally {threadPools.foreach(_.shutdown()) } info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.") }
单 tp 日志加载与复原
单个 tp 的日志加载与复原是在 Log 类的动态代码块中进行的。如果该 tp 的文件夹的后缀为 -delete,则认为该 tp 为待删除的,退出到 logsToBeDeleted 汇合中期待定时工作对其进行清理。
Log 类的动态代码块中通过 loadSegments 加载日志
private def loadSegments(): Long = {
// 清理临时文件(.delete 和 .clean 后缀)并保留可用的 swap 文件
val swapFiles = removeTempFilesAndCollectSwapFiles()
// retryOnOffsetOverflow 兜住可能产生的 LogSegmentOffsetOverflowException 异样,并进行日志切分解决。retryOnOffsetOverflow {
// 加载文件的中的所有文件并进行必要的完整性检查
logSegments.foreach(_.close())
segments.clear()
loadSegmentFiles()}
// 依据 swap 文件复原实现所有被中断的操作
completeSwapOperations(swapFiles)
// 如果不是待删除的 tp 日志,执行 recover 流程
if (!dir.getAbsolutePath.endsWith(Log.DeleteDirSuffix)) {
val nextOffset = retryOnOffsetOverflow {recoverLog()
}
// reset the index size of the currently active log segment to allow more entries
activeSegment.resizeIndexes(config.maxIndexSize)
nextOffset
} else {if (logSegments.isEmpty) {
addSegment(LogSegment.open(dir = dir,
baseOffset = 0,
config,
time = time,
initFileSize = this.initFileSize))
}
0
}
}
recoverLog 的外围代码如下:
// if we have the clean shutdown marker, skip recovery
// 只有未进行 cleanshutdown 的状况下才须要 recovery
if (!hadCleanShutdown) {
// 取出 recoveryPoint 之后的所有 segment(失常状况下只有一个)val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
var truncated = false
while (unflushed.hasNext && !truncated) {val segment = unflushed.next()
info(s"Recovering unflushed segment ${segment.baseOffset}")
val truncatedBytes =
try {
// 清空 segment 对应的 index,一一 batch 读取校验数据,并从新结构 index
recoverSegment(segment, leaderEpochCache)
} catch {
case _: InvalidOffsetException =>
val startOffset = segment.baseOffset
warn("Found invalid offset during recovery. Deleting the corrupt segment and" +
s"creating an empty one with starting offset $startOffset")
segment.truncateTo(startOffset)
}
if (truncatedBytes > 0) {
// 如果前一个 segment 执行了 truncate,则之后的所有 segment 间接删除
// unflushed 为迭代器,所以 unflushed.toList 代表的是所有未遍历到的 segment,而不是全副 segment
warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
removeAndDeleteSegments(unflushed.toList,
asyncDelete = true,
reason = LogRecovery)
truncated = true
}
}
}