序
本文次要钻研一下 klog 的 Flush
Flush
k8s.io/klog/v2@v2.4.0/klog.go
// Flush flushes all pending log I/O.
func Flush() {logging.lockAndFlushAll()
}
Flush 办法执行的是 logging.lockAndFlushAll()
init
k8s.io/klog/v2@v2.4.0/klog.go
// init sets up the defaults and runs flushDaemon.
func init() {
logging.stderrThreshold = errorLog // Default stderrThreshold is ERROR.
logging.setVState(0, nil, false)
logging.logDir = ""logging.logFile =""
logging.logFileMaxSizeMB = 1800
logging.toStderr = true
logging.alsoToStderr = false
logging.skipHeaders = false
logging.addDirHeader = false
logging.skipLogHeaders = false
logging.oneOutput = false
go logging.flushDaemon()}
klog 的 init 办法异步协程执行 logging.flushDaemon()
logging.flushDaemon()
k8s.io/klog/v2@v2.4.0/klog.go
// flushDaemon periodically flushes the log file buffers.
func (l *loggingT) flushDaemon() {for range time.NewTicker(flushInterval).C {l.lockAndFlushAll()
}
}
flushDaemon 办法 range 新建 ticker 的 channel,而后执行 l.lockAndFlushAll()
lockAndFlushAll
k8s.io/klog/v2@v2.4.0/klog.go
// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {l.mu.Lock()
l.flushAll()
l.mu.Unlock()}
lockAndFlushAll 应用 lock 执行 flushAll
flushAll
k8s.io/klog/v2@v2.4.0/klog.go
const (
infoLog severity = iota
warningLog
errorLog
fatalLog
numSeverity = 4
)
// flushAll flushes all the logs and attempts to "sync" their data to disk.
// l.mu is held.
func (l *loggingT) flushAll() {
// Flush from fatal down, in case there's trouble flushing.
for s := fatalLog; s >= infoLog; s-- {file := l.file[s]
if file != nil {file.Flush() // ignore error
file.Sync() // ignore error}
}
}
flushAll 办法从 fatalLog 开始递加到 infoLog 级别挨个执行 l.file[s] 的 Flush 及 Sync 办法
flushSyncWriter
k8s.io/klog/v2@v2.4.0/klog.go
// flushSyncWriter is the interface satisfied by logging destinations.
type flushSyncWriter interface {Flush() error
Sync() error
io.Writer
}
type Writer interface {Write(p []byte) (n int, err error)
}
flushSyncWriter 接口定义了 Flush、Sync 办法,内嵌了 io.Writer 接口
redirectBuffer
k8s.io/klog/v2@v2.4.0/klog.go
// redirectBuffer is used to set an alternate destination for the logs
type redirectBuffer struct {w io.Writer}
func (rb *redirectBuffer) Sync() error {return nil}
func (rb *redirectBuffer) Flush() error {return nil}
func (rb *redirectBuffer) Write(bytes []byte) (n int, err error) {return rb.w.Write(bytes)
}
redirectBuffer 内嵌了 io.Writer,其 Write 办法通过 io.Writer 来写;其 Sync 及 Flush 办法都为空操作
syncBuffer
k8s.io/klog/v2@v2.4.0/klog.go
// syncBuffer joins a bufio.Writer to its underlying file, providing access to the
// file's Sync method and providing a wrapper for the Write method that provides log
// file rotation. There are conflicting methods, so the file cannot be embedded.
// l.mu is held for all its methods.
type syncBuffer struct {
logger *loggingT
*bufio.Writer
file *os.File
sev severity
nbytes uint64 // The number of bytes written to this file
maxbytes uint64 // The max number of bytes this syncBuffer.file can hold before cleaning up.
}
func (sb *syncBuffer) Sync() error {return sb.file.Sync()
}
func (sb *syncBuffer) Write(p []byte) (n int, err error) {if sb.nbytes+uint64(len(p)) >= sb.maxbytes {if err := sb.rotateFile(time.Now(), false); err != nil {sb.logger.exit(err)
}
}
n, err = sb.Writer.Write(p)
sb.nbytes += uint64(n)
if err != nil {sb.logger.exit(err)
}
return
}
syncBuffer 定义了 logger、file、sev、nbytes、maxbytes 属性,内嵌了
*bufio.Writer
;其 Sync 办法执行的是*os.File
.Sync;其 Flush 办法执行的是*bufio.Writer
.Flush
Flush
/usr/local/go/src/bufio/bufio.go
type Writer struct {
err error
buf []byte
n int
wr io.Writer
}
// Flush writes any buffered data to the underlying io.Writer.
func (b *Writer) Flush() error {
if b.err != nil {return b.err}
if b.n == 0 {return nil}
n, err := b.wr.Write(b.buf[0:b.n])
if n < b.n && err == nil {err = io.ErrShortWrite}
if err != nil {
if n > 0 && n < b.n {copy(b.buf[0:b.n-n], b.buf[n:b.n])
}
b.n -= n
b.err = err
return err
}
b.n = 0
return nil
}
*bufio.Writer
.Flush 办法执行的是底层 io.Writer 的 Write 办法
syncBuffer.rotateFile
// rotateFile closes the syncBuffer's file and starts a new one.
// The startup argument indicates whether this is the initial startup of klog.
// If startup is true, existing files are opened for appending instead of truncated.
func (sb *syncBuffer) rotateFile(now time.Time, startup bool) error {
if sb.file != nil {sb.Flush()
sb.file.Close()}
var err error
sb.file, _, err = create(severityName[sb.sev], now, startup)
if err != nil {return err}
if startup {fileInfo, err := sb.file.Stat()
if err != nil {return fmt.Errorf("file stat could not get fileinfo: %v", err)
}
// init file size
sb.nbytes = uint64(fileInfo.Size())
} else {sb.nbytes = 0}
sb.Writer = bufio.NewWriterSize(sb.file, bufferSize)
if sb.logger.skipLogHeaders {return nil}
// Write header.
var buf bytes.Buffer
fmt.Fprintf(&buf, "Log file created at: %s\n", now.Format("2006/01/02 15:04:05"))
fmt.Fprintf(&buf, "Running on machine: %s\n", host)
fmt.Fprintf(&buf, "Binary: Built with %s %s for %s/%s\n", runtime.Compiler, runtime.Version(), runtime.GOOS, runtime.GOARCH)
fmt.Fprintf(&buf, "Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg\n")
n, err := sb.file.Write(buf.Bytes())
sb.nbytes += uint64(n)
return err
}
syncBuffer.rotateFile 办法会设置其 Writer 为 bufio.NewWriterSize(sb.file, bufferSize),底层 writer 为 syncBuffer 的 file
小结
klog 的 init 办法异步协程执行 logging.flushDaemon(),它外部执行的是 l.lockAndFlushAll();Flush 办法是执行 l.lockAndFlushAll();l.lockAndFlushAll() 办法应用 lock 执行 flushAll;flushAll 办法从 fatalLog 开始递加到 infoLog 级别挨个执行 l.file[s] 的 Flush 及 Sync 办法;对于 redirectBuffer,其 Flush 及 Sync 办法为空操作;对于 syncBuffer,其 Sync 办法执行的是 *os.File
.Sync;其 Flush 办法执行的是 *bufio.Writer
.Flush,*bufio.Writer
.Flush 办法执行的是底层 io.Writer 的 Write 办法,即 syncBuffer 的 file 的 Write 办法。
doc
- klog