关于golang:聊聊golang的zap的ZapKafkaWriter

6次阅读

共计 2033 个字符,预计需要花费 6 分钟才能阅读完成。

本文次要钻研一下 golang 的 zap 的 ZapKafkaWriter

ZapKafkaWriter

package logger

import (
    "errors"
    "sync"
    "sync/atomic"
    "syscall"
)

// ZapKafkaWriter is a zap WriteSyncer (io.Writer) that writes messages to Kafka
type ZapKafkaWriter struct {
    kp        *KafkaProducer
    ce        *CloudEvents
    closed    int32          // Nonzero if closing, must access atomically
    pendingWg sync.WaitGroup // WaitGroup for pending messages
    closeMut  sync.Mutex
}

// newZapKafkaWriter returns a kafka io.writer instance
func newZapKafkaWriter(
    kpCfg ProducerConfiguration, cloudEvents *CloudEvents,
    ceCfg CloudEventsConfiguration) (*ZapKafkaWriter, error) {

    // create an async producer
    kp, err := newKafkaProducer(kpCfg, cloudEvents, ceCfg)
    if err != nil {return nil, err}

    zw := &ZapKafkaWriter{
        kp: kp,
        ce: cloudEvents,
    }
    return zw, nil
}

ZapKafkaWriter 定义了 KafkaProducer、CloudEvents、closed、pendingWg、closeMut 属性,其 newZapKafkaWriter 办法依据 ProducerConfiguration、cloudEvents、CloudEventsConfiguration 来创立 KafkaProducer,而后依据 KafkaProducer 来创立 ZapKafkaWriter

zapcore.WriteSyncer

// Sync satisfies zapcore.WriteSyncer interface, zapcore.AddSync works as well
func (zw *ZapKafkaWriter) Sync() error {return nil}

// Write sends byte slices to Kafka ignoring error responses (Thread-safe)
// Write might block if the Input() channel of the AsyncProducer is full
func (zw *ZapKafkaWriter) Write(msg []byte) (int, error) {if zw.Closed() {return 0, syscall.EINVAL}

    if zw.kp.producer == nil {return 0, errors.New("No producer defined")
    }

    zw.pendingWg.Add(1)
    defer zw.pendingWg.Done()

    err := zw.kp.sendMessage(msg)
    return len(msg), err
}

// Closed returns true if the writer is closed, false otherwise (Thread-safe)
func (zw *ZapKafkaWriter) Closed() bool {return atomic.LoadInt32(&zw.closed) != 0
}

// Close must be called when the writer is no longer needed (Thread-safe)
func (zw *ZapKafkaWriter) Close() (err error) {zw.closeMut.Lock()
    defer zw.closeMut.Unlock()

    if zw.Closed() {return syscall.EINVAL}

    atomic.StoreInt32(&zw.closed, 1)

    zw.pendingWg.Wait()
    return nil
}

ZapKafkaWriter 实现了 zapcore.WriteSyncer 接口,其 Write 办法应用 KafkaProducer 发送音讯,其 Sync 办法目前不做任何操作,它还提供了 Close 办法,也就是也实现了 Sink 接口

小结

WriteSyncer 内嵌了 io.Writer 接口,定义了 Sync 办法;Sink 接口内嵌了 zapcore.WriteSyncer 及 io.Closer 接口;ZapKafkaWriter 实现 Sink 接口及 zapcore.WriteSyncer 接口,其 Write 办法间接将 data 通过 kafka 发送进来。

doc

  • zap
  • zap_writer
正文完
 0