本文次要钻研一下golang的zap的ZapKafkaWriter

ZapKafkaWriter

package loggerimport (    "errors"    "sync"    "sync/atomic"    "syscall")// ZapKafkaWriter is a zap WriteSyncer (io.Writer) that writes messages to Kafkatype ZapKafkaWriter struct {    kp        *KafkaProducer    ce        *CloudEvents    closed    int32          // Nonzero if closing, must access atomically    pendingWg sync.WaitGroup // WaitGroup for pending messages    closeMut  sync.Mutex}// newZapKafkaWriter returns a kafka io.writer instancefunc newZapKafkaWriter(    kpCfg ProducerConfiguration, cloudEvents *CloudEvents,    ceCfg CloudEventsConfiguration) (*ZapKafkaWriter, error) {    // create an async producer    kp, err := newKafkaProducer(kpCfg, cloudEvents, ceCfg)    if err != nil {        return nil, err    }    zw := &ZapKafkaWriter{        kp: kp,        ce: cloudEvents,    }    return zw, nil}
ZapKafkaWriter定义了KafkaProducer、CloudEvents、closed、pendingWg、closeMut属性,其newZapKafkaWriter办法依据ProducerConfiguration、cloudEvents、CloudEventsConfiguration来创立KafkaProducer,而后依据KafkaProducer来创立ZapKafkaWriter

zapcore.WriteSyncer

// Sync satisfies zapcore.WriteSyncer interface, zapcore.AddSync works as wellfunc (zw *ZapKafkaWriter) Sync() error {    return nil}// Write sends byte slices to Kafka ignoring error responses (Thread-safe)// Write might block if the Input() channel of the AsyncProducer is fullfunc (zw *ZapKafkaWriter) Write(msg []byte) (int, error) {    if zw.Closed() {        return 0, syscall.EINVAL    }    if zw.kp.producer == nil {        return 0, errors.New("No producer defined")    }    zw.pendingWg.Add(1)    defer zw.pendingWg.Done()    err := zw.kp.sendMessage(msg)    return len(msg), err}// Closed returns true if the writer is closed, false otherwise (Thread-safe)func (zw *ZapKafkaWriter) Closed() bool {    return atomic.LoadInt32(&zw.closed) != 0}// Close must be called when the writer is no longer needed (Thread-safe)func (zw *ZapKafkaWriter) Close() (err error) {    zw.closeMut.Lock()    defer zw.closeMut.Unlock()    if zw.Closed() {        return syscall.EINVAL    }    atomic.StoreInt32(&zw.closed, 1)    zw.pendingWg.Wait()    return nil}
ZapKafkaWriter实现了zapcore.WriteSyncer接口,其Write办法应用KafkaProducer发送音讯,其Sync办法目前不做任何操作,它还提供了Close办法,也就是也实现了Sink接口

小结

WriteSyncer内嵌了io.Writer接口,定义了Sync办法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write办法间接将data通过kafka发送进来。

doc

  • zap
  • zap_writer