关于golang:Golang将日志以Json格式输出到Kafka

45次阅读

共计 6572 个字符,预计需要花费 17 分钟才能阅读完成。

在上一篇文章中我实现了一个反对 Debug、Info、Error 等多个级别的日志库,并将日志写到了磁盘文件中,代码比较简单,适宜练手。有趣味的能够通过这个链接返回:https://github.com/bosima/ylo…

工程实际中,咱们往往还须要对日志进行采集,将日志归集到一起,而后用于各种解决剖析,比方生产环境上的谬误剖析、异样告警等等。在日志音讯零碎畛域,Kafka 久负盛名,这篇文章就以将日志发送到 Kafka 来实现日志的采集;同时思考到日志剖析时对结构化数据的需要,这篇文章还会提供一种输入 Json 格局日志的办法。

这个升级版的日志库还要放弃向前兼容,即还可能应用一般文本格式,以及写日志到磁盘文件,这两个个性和要新增的两个性能别离属于同类解决,因而我这里对它们进行形象,造成两个接口:格式化接口、写日志接口。

格式化接口

所谓格式化,就是日志的格局解决。这个日志库目前要反对两种格局:一般文本和 Json。

为了在不同格局之上提供一个对立的形象,ylog 中定义 logEntry 来代表一条日志:

type logEntry struct {
    Ts    time.Time `json:"ts"`
    File  string    `json:"file"`
    Line  int       `json:"line"`
    Level LogLevel  `json:"level"`
    Msg   string    `json:"msg"`
}

格式化接口的能力就是将日志从 logEntry 格局转化为其它某种数据格式。ylog 中对它的定义是:

type LoggerFormatter interface {Format(*logEntry, *[]byte) error
}

第 1 个参数是一个 logEntry 实例,也就是要被格式化的日志,第 2 个参数是日志格式化之后要写入的容器。

一般文本格式化器

其实现是这样的:

type textFormatter struct {
}

func NewTextFormatter() *textFormatter {return &textFormatter{}
}

func (f *textFormatter) Format(entry *logEntry, buf *[]byte) error {formatTime(buf, entry.Ts)
    *buf = append(*buf, ' ')

    file := toShort(entry.File)
    *buf = append(*buf, file...)
    *buf = append(*buf, ':')
    itoa(buf, entry.Line, -1)
    *buf = append(*buf, ' ')

    *buf = append(*buf, levelNames[entry.Level]...)
    *buf = append(*buf, ' ')

    *buf = append(*buf, entry.Msg...)

    return nil
}

能够看到它的次要性能就是将 logEntry 中的各个字段依照某种程序平铺开来,两头用空格分隔。

其中的很多数据处理办法参考了 Golang 规范日志库中的数据格式化解决代码,有趣味的能够去 Github 中具体查看。

这里对日期工夫格式化为字符串做了特地的优化,在规范日志库中为了将年、月、日、时、分、秒、毫秒、微秒等格式化指定长度的字符串,应用了一个函数:

func itoa(buf *[]byte, i int, wid int) {
    // Assemble decimal in reverse order.
    var b [20]byte
    bp := len(b) - 1
    for i >= 10 || wid > 1 {
        wid--
        q := i / 10
        b[bp] = byte('0' + i - q*10)
        bp--
        i = q
    }
    // i < 10
    b[bp] = byte('0' + i)
    *buf = append(*buf, b[bp:]...)
}

其逻辑大略就是将数字中的每一位转换为字符并存入 byte 中,留神这里初始化 byte 数组的时候是 20 位,这是 int64 最大的数字位数。

其实工夫字符串中的每个局部位数都是固定的,比方年是 4 位、月日时分秒都是 2 位,基本不须要 20 位,所以这个空间能够节俭;还有这里用了循环,这对于 CPU 的分支预测可能有那么点影响,所以我这里别离对不同位数写了专门的格式化办法,以 2 位数为例:

func itoa2(buf *[]byte, i int) {
    q := i / 10
    s := byte('0' + i - q*10)
    f := byte('0' + q)
    *buf = append(*buf, f, s)
}

Json 文本格式化器

其实现是这样的:

type jsonFormatter struct {
}

func NewJsonFormatter() *jsonFormatter {return &jsonFormatter{}
}

func (f *jsonFormatter) Format(entry *logEntry, buf *[]byte) (err error) {entry.File = toShortFile(entry.File)
    jsonBuf, err := json.Marshal(entry)
    *buf = append(*buf, jsonBuf...)
    return
}

代码也很简略,应用规范库的 json 序列化办法将 logEntry 实例转化为 Json 格局的数据。

对于 Json 格局,后续思考反对用户自定义 Json 字段,这里临时先简略解决。

写日志接口

写日志就是将日志输入到别的指标,比方 ylog 要反对的输入到磁盘文件、输入到 Kafka 等。

前边格式化接口将格式化后的数据封装到了 []byte 中,写日志接口就是将格式化解决的输入 []byte 写到某种输入指标中。参考 Golang 中各种 Writer 的定义,ylog 中对它的定义是:

type LoggerWriter interface {Ensure(*logEntry) error
    Write([]byte) error
    Sync() error
    Close() error}

这里有 4 个办法:

  • Ensure 确保输入指标曾经筹备好接收数据,比方关上要写入的文件、创立 Kafka 连贯等等。
  • Write 向输入指标写数据。
  • Sync 要求输入指标将缓存长久化,比方写数据到磁盘时,操作系统会有缓存,通过这个办法要求缓存数据写入磁盘。
  • Close 写日志完结,敞开输入指标。

写日志到文件

这里定义一个名为 fileWriter 的类型,它须要实现 LoggerWriter 的接口。

先看类型的定义:

type fileWriter struct {
    file     *os.File
    lastHour int64
    Path     string
}

蕴含四个字段:

  • file 要输入的文件对象。
  • lastHour 依照小时创立文件的须要。
  • Path 日志文件的根门路。

再看其实现的接口:

func (w *fileWriter) Ensure(entry *logEntry) (err error) {
    if w.file == nil {f, err := w.createFile(w.Path, entry.Ts)
        if err != nil {return err}
        w.lastHour = w.getTimeHour(entry.Ts)
        w.file = f
        return nil
    }

    currentHour := w.getTimeHour(entry.Ts)
    if w.lastHour != currentHour {_ = w.file.Close()
        f, err := w.createFile(w.Path, entry.Ts)
        if err != nil {return err}
        w.lastHour = currentHour
        w.file = f
    }

    return
}

func (w *fileWriter) Write(buf []byte) (err error) {buf = append(buf, '\n')
    _, err = w.file.Write(buf)
    return
}

func (w *fileWriter) Sync() error {return w.file.Sync()
}

func (w *fileWriter) Close() error {return w.file.Close()
}

Ensure 中的次要逻辑是创立以后要写入的文件对象,如果小时数变了,先把之前的敞开,再创立一个新的文件。

Write 把数据写入到文件对象,这里加了一个换行符,也就是说对于文件日志,其每条日志最初都会有一个换行符,这样比拟不便浏览。

Sync 调用文件对象的 Sync 办法,将日志从操作系统缓存刷到磁盘。

Close 敞开以后文件对象。

写日志到 Kafka

这里定义一个名为 kafkaWriter 的类型,它也须要实现 LoggerWriter 的接口。

先看其构造体定义:

type kafkaWriter struct {
    Topic     string
    Address   string
    writer    *kafka.Writer
    batchSize int
}

这里蕴含四个字段:

Topic 写 Kafka 时须要一个主题,这里默认以后 Logger 中所有日志应用同一个主题。

Address Kafka 的拜访地址。

writer 向 Kafka 写数据时应用的 Writer,这里集成的是:github.com/segmentio/kafka-go,反对主动重试和重连。

batchSize Kafka 写日志的批次大小,批量写能够进步日志的写效率。

再看其实现的接口:

func (w *kafkaWriter) Ensure(curTime time.Time) (err error) {
    if w.writer == nil {
        w.writer = &kafka.Writer{Addr:      kafka.TCP(w.Address),
            Topic:     w.Topic,
            BatchSize: w.batchSize,
            Async:     true,
        }
    }

    return
}

func (w *kafkaWriter) Write(buf []byte) (err error) {
    // buf will be reused by ylog when this method return,
    // with aysnc write, we need copy data to a new slice
    kbuf := append([]byte(nil), buf...)
    err = w.writer.WriteMessages(context.Background(),
        kafka.Message{Value: kbuf},
    )
    return
}

func (w *kafkaWriter) Sync() error {return nil}

func (w *kafkaWriter) Close() error {return w.writer.Close()
}

这里采纳的是异步发送到 Kafka 的形式,WriteMessages 办法不会阻塞,因为传入的 buf 要被 ylog 重用,所以这里 copy 了一下。异步还会存在的一个问题就是不会返回谬误,可能失落数据,不过对于日志这种数据,没有那么严格的要求,也能够承受。

如果采纳同步发送,因为批量发送比拟有效率,这里能够攒几条再发,但日志比拟稠密时,可能短时间很难攒够,就会呈现长时间等不到日志的状况,所以还要有个超时机制,这有点麻烦,不过我也写了一个版本,有趣味的能够去看看:https://github.com/bosima/ylo…

接口的组装

有了格式化接口和写日志接口,下一步就是将它们组装起来,以实现相应的解决能力。

首先是创立它们,因为我这里也没有动静配置的需要,所以就放到创立 Logger 实例的时候了,这样比较简单。

func NewYesLogger(opts ...Option) (logger *YesLogger) {logger = &YesLogger{}
    ...
    logger.writer = NewFileWriter("logs")
    logger.formatter = NewTextFormatter()

    for _, opt := range opts {opt(logger)
    }
    ...
    return
}

能够看到默认的 formatter 是 textFormatter,默认的 writer 是 fileWriter。这个函数传入的 Option 其实是个函数,在下边的 opt(logger) 中会执行它们,所以应用其它的 Formatter 或者 Writer 能够这样做:

logger := ylog.NewYesLogger(
        ...
        ylog.Writer(ylog.NewKafkaWriter(address, topic, writeBatchSize)),
        ylog.Formatter(ylog.NewJsonFormatter()),
)

这里 ylog.Writer 和 ylog.Formatter 就是合乎 Option 类型的函数,调用它们能够设置不同的 Formatter 和 Writer。

而后怎么应用它们呢?

...
l.formatter.Format(entry, &buf)
l.writer.Ensure(entry)
err := l.writer.Write(buf)
...

当 logEntry 进入音讯解决环节后,首先调用 formatter 的 Format 办法格式化 logEntry;而后调用了 writer 的 Ensure 办法确保 writer 曾经筹备好,最初调用 writer 的 Write 办法将格式化之后的数据输入到对应的指标。

为什么不将 Ensure 办法放到 Write 中呢?这是因为目前写文本日志的时候须要依据 logEntry 中的日志工夫创立日志文件,这样就须要给 Writer 传递两个参数,有点顺当,所以这里将它们离开了。

如何进步日志解决的吞吐量

Kafka 的吞吐量是很高的,那么如果放到 ylog 本身来说,如何进步它的吞吐量呢?

首先想到的就是 Channel,能够应用有缓冲的 Channel 模仿一个队列,生产者不停的向 Channel 发送数据,如果 Writer 能够始终在缓冲被填满之前将数据取走,那么实践上说生产者就是非阻塞的,相比同步输入到某个 Writer,没有间接磁盘 IO、网络 IO,日志解决的吞吐量必将大幅晋升。

定义一个 Channel,其容量默认为以后机器逻辑处理器的数量:

logger.pipe = make(chan *logEntry, runtime.NumCPU())

发送数据的代码:

entry := &logEntry{
        Level: level,
        Msg:   s,
        File:  file,
        Line:  line,
        Ts:    now,
    }

    l.pipe <- entry

接收数据的代码:

    for {
        select {
        case entry := <-l.pipe:
            // reuse the slice memory
            buf = buf[:0]
            l.formatter.Format(entry, &buf)
            l.writer.Ensure(entry.Ts)
            err := l.writer.Write(buf)
        ...
        }
    }

实际效果怎么样呢?看下 Benchmark:

goos: darwin
goarch: amd64
pkg: github.com/bosima/ylog
cpu: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz
BenchmarkInfo-8        1332333           871.6 ns/op         328 B/op           4 allocs/op

这个后果能够和 zerolog、zap 等高性能日志库一较高下了,当然目前能够做的事件要比它们简略很多。

如果对 Java 有所理解的同学应该据说过 log4j,在 log4j2 中引入了一个名为 Disruptor 的组件,它让日志解决飞快了起来,受到很多 Java 开发者的追捧。Disruptor 之所以这么厉害,是因为它应用了无锁并发、环形队列、缓存行填充等多种高级技术。

相比之下,Golang 的 Channel 尽管也应用了环形缓冲,然而还是应用了锁,作为队列来说性能并不是最优的。

Golang 中有没有相似的货色呢?最近进去的 ZenQ 可能是一个不错的抉择,不过看似还不太稳固,过段时间再尝试下。有趣味的能够去看看:https://github.com/alphadose/…。


好了,以上就是本文的次要内容。对于 ylog 的介绍也告一段落了,后续会在 Github 上继续更新,减少更多有用的性能,并一直优化解决性能,欢送关注:https://github.com/bosima/ylog。

播种更多架构常识,请关注微信公众号 萤火架构。原创内容,转载请注明出处。

正文完
 0