本文次要钻研一下zerolog的diode.Writer

diode.Writer

github.com/rs/zerolog@v1.20.0/diode/diode.go

// Writer is a io.Writer wrapper that uses a diode to make Write lock-free,// non-blocking and thread safe.type Writer struct {    w    io.Writer    d    diodeFetcher    c    context.CancelFunc    done chan struct{}}func NewWriter(w io.Writer, size int, pollInterval time.Duration, f Alerter) Writer {    ctx, cancel := context.WithCancel(context.Background())    dw := Writer{        w:    w,        c:    cancel,        done: make(chan struct{}),    }    if f == nil {        f = func(int) {}    }    d := diodes.NewManyToOne(size, diodes.AlertFunc(f))    if pollInterval > 0 {        dw.d = diodes.NewPoller(d,            diodes.WithPollingInterval(pollInterval),            diodes.WithPollingContext(ctx))    } else {        dw.d = diodes.NewWaiter(d,            diodes.WithWaiterContext(ctx))    }    go dw.poll()    return dw}
diode.Writer是一个lock-free,non-blocking及thread safe的Writer;它借助了diodes来实现;NewWriter会创立diode.Writer,并启动dw.poll()

poll

github.com/rs/zerolog@v1.20.0/diode/diode.go

func (dw Writer) poll() {    defer close(dw.done)    for {        d := dw.d.Next()        if d == nil {            return        }        p := *(*[]byte)(d)        dw.w.Write(p)        // Proper usage of a sync.Pool requires each entry to have approximately        // the same memory cost. To obtain this property when the stored type        // contains a variably-sized buffer, we add a hard limit on the maximum buffer        // to place back in the pool.        //        // See https://golang.org/issue/23199        const maxSize = 1 << 16 // 64KiB        if cap(p) <= maxSize {            bufPool.Put(p[:0])        }    }}
poll办法应用for循环执行dw.d.Next()及dw.w.Write(p)

diodeFetcher

github.com/rs/zerolog@v1.20.0/diode/diode.go

type diodeFetcher interface {    diodes.Diode    Next() diodes.GenericDataType}// Diode is any implementation of a diode.type Diode interface {    Set(GenericDataType)    TryNext() (GenericDataType, bool)}
diodeFetcher接口内嵌了Diode接口,定义了Next办法

Next

github.com/rs/zerolog@v1.20.0/diode/internal/diodes/poller.go

// Next polls the diode until data is available or until the context is done.// If the context is done, then nil will be returned.func (p *Poller) Next() GenericDataType {    for {        data, ok := p.Diode.TryNext()        if !ok {            if p.isDone() {                return nil            }            time.Sleep(p.interval)            continue        }        return data    }}
Poller实现了diodeFetcher接口的Next办法,它应用for循环,一直通过p.Diode.TryNext()来获取data

ManyToOne

github.com/rs/zerolog@v1.20.0/diode/internal/diodes/many_to_one.go

// ManyToOne diode is optimal for many writers (go-routines B-n) and a single// reader (go-routine A). It is not thread safe for multiple readers.type ManyToOne struct {    writeIndex uint64    readIndex  uint64    buffer     []unsafe.Pointer    alerter    Alerter}// Set sets the data in the next slot of the ring buffer.func (d *ManyToOne) Set(data GenericDataType) {    for {        writeIndex := atomic.AddUint64(&d.writeIndex, 1)        idx := writeIndex % uint64(len(d.buffer))        old := atomic.LoadPointer(&d.buffer[idx])        if old != nil &&            (*bucket)(old) != nil &&            (*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {            log.Println("Diode set collision: consider using a larger diode")            continue        }        newBucket := &bucket{            data: data,            seq:  writeIndex,        }        if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {            log.Println("Diode set collision: consider using a larger diode")            continue        }        return    }}// TryNext will attempt to read from the next slot of the ring buffer.// If there is not data available, it will return (nil, false).func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {    // Read a value from the ring buffer based on the readIndex.    idx := d.readIndex % uint64(len(d.buffer))    result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))    // When the result is nil that means the writer has not had the    // opportunity to write a value into the diode. This value must be ignored    // and the read head must not increment.    if result == nil {        return nil, false    }    // When the seq value is less than the current read index that means a    // value was read from idx that was previously written but has since has    // been dropped. This value must be ignored and the read head must not    // increment.    //    // The simulation for this scenario assumes the fast forward occurred as    // detailed below.    //    // 5. The reader reads again getting seq 5. It then reads again expecting    //    seq 6 but gets seq 2. This is a read of a stale value that was    //    effectively "dropped" so the read fails and the read head stays put.    //    `| 4 | 5 | 2 | 3 |` r: 7, w: 6    //    if result.seq < d.readIndex {        return nil, false    }    // When the seq value is greater than the current read index that means a    // value was read from idx that overwrote the value that was expected to    // be at this idx. This happens when the writer has lapped the reader. The    // reader needs to catch up to the writer so it moves its write head to    // the new seq, effectively dropping the messages that were not read in    // between the two values.    //    // Here is a simulation of this scenario:    //    // 1. Both the read and write heads start at 0.    //    `| nil | nil | nil | nil |` r: 0, w: 0    // 2. The writer fills the buffer.    //    `| 0 | 1 | 2 | 3 |` r: 0, w: 4    // 3. The writer laps the read head.    //    `| 4 | 5 | 2 | 3 |` r: 0, w: 6    // 4. The reader reads the first value, expecting a seq of 0 but reads 4,    //    this forces the reader to fast forward to 5.    //    `| 4 | 5 | 2 | 3 |` r: 5, w: 6    //    if result.seq > d.readIndex {        dropped := result.seq - d.readIndex        d.readIndex = result.seq        d.alerter.Alert(int(dropped))    }    // Only increment read index if a regular read occurred (where seq was    // equal to readIndex) or a value was read that caused a fast forward    // (where seq was greater than readIndex).    //    d.readIndex++    return result.data, true}
ManyToOne实现了Diode接口的Set和TryNext办法

实例

func diodeDemo() {    wr := diode.NewWriter(os.Stdout, 1000, 10*time.Millisecond, func(missed int) {        fmt.Printf("Logger Dropped %d messages", missed)    })    log := zerolog.New(wr)    log.Print("test")    time.Sleep(1 * time.Second)}

输入

{"level":"debug","message":"test"}

小结

zerolog借助diodes提供了一个lock-free,non-blocking及thread safe的diode.Writer

doc

  • zerolog