本文次要内容有:
- 日志治理相干实现类
- 日志刷盘
- 日志清理
日志治理相干实现类
kafka的日志治理是通过LogManager类实现的,它的次要作用是日志的创立、检索、清理。
kafkaServer启动LogManager线程
LogManager线程是在kafka节点服务启动的时候启动的,代码如下:
//kafka.server.KafkaServer
def startup() {
try {
info("starting")
/* start log manager */
//创立LogManager对象
logManager = createLogManager(zkUtils.zkClient, brokerState)
//启动LogManager线程
logManager.startup()
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}
阐明,初始化LogManager的过程中会加载所有日志分区,对应办法为loadLogs(),在加载日志分区过程中会调用loadSegments()加载该分区所有的Segment文件,应用是的线程池进行加载的。
LogManager启动
在所有分区日志都加载实现后,KafkaServer调用startup()办法启动LogManager线程,在这个过程中会启动四个定时工作。
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
//note: 定时清理过期的日志 segment,并保护日志的大小
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
//note: 定时刷新还没有写到磁盘上日志
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
//note: 定时将所有数据目录所有日志的检查点写到检查点文件中
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
//note: 定时删除标记为 delete 的日志文件
scheduler.schedule("kafka-delete-logs",
deleteLogs,
delay = InitialTaskDelayMs,
period = defaultConfig.fileDeleteDelayMs,
TimeUnit.MILLISECONDS)
}
//note: 如果设置为 true, 主动清理 compaction 类型的 topic
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
四个定时工作:
- cleanupLogs,定时清理过期日志segment,并保护日志大小,默认5min执行一次
- flushDirtyLogs,定时刷新还没写到磁盘上数据
- checkpointRecoveryPointOffsets,定时将所有日志的checkpoint写到checkpoint文件中,默认60s执行一次
- deleteLogs,定时删除标记为delete的日志文件,默认30s执行一次。
checkpoint文件
在LogManager中有一个十分重要的文件—checkpoint文件:
- 创立LogManager时会读取checkpoint文件,并将每个分区对应的checkpoint作为日志的复原点(recoveryPoint),最初创立分区对应的日志实例
- 在将日志刷盘时,将最新的偏移量作为日志的checkpoint进行更新
- LogManager启动一个定时工作,定时读取所有日志的检查点,并写入全局的检查点文件
日志刷盘
在linux零碎中,当数据写入到文件系统后,数据其实在操作系统的page cache里,只有执行了刷盘后数据才会写到磁盘里。
在下面提到的定时工作flushDirtyLogs里,会定时将页面缓存中的数据刷新到磁盘中,kafka的刷盘策略有两种:
- 工夫策略,通过log.flush.interval.ms进行配置,默认为无限大。
- 大小策略,通过log.flush.interval.messages进行配置,当数据超过这个值时进行刷盘。
须要提一下的是,定时工作里只会依据工夫策略进行判断是否刷盘,依据大小判断是在append追加日志时进行的判断:
def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
// now append to the log
segment.append(firstOffset = appendInfo.firstOffset,
largestOffset = appendInfo.lastOffset,
largestTimestamp = appendInfo.maxTimestamp,
shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
records = validRecords)
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
.format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
if (unflushedMessages >= config.flushInterval)
flush()
}
日志清理
为了保障分区总大小不超过阈值(log.retention.bytes),LogManager会定时清理旧数据。不过个别状况下是通过配置log.retention.hours来配置segment的保留工夫的。
清理旧日志次要有两种:
- 删除,超过工夫或大小阈值的旧segment间接进行删除
- 压缩,不是删除在,是是采纳合并压缩的形式进行
发表回复