go-event是一个在Docker我的项目中应用到的一个事件散发组件,实现了惯例的播送,队列等事件散发模型,代码简洁明了,也适宜初学者对Go语言的入门,对channel用来同步,通信也会加深了解。

外围数据结构

Event

type Event interface{}

Event被封装为一个空接口,承受任意类型。在go-events示意一个能够被散发的事件。

interface{}的底层相似于c语言中的void*,但比void*弱小很多,比方interface{}保留了指向对象的指针和类型,而c程序员应用void*时,必须本人去保障对象的类型是正确的)

Sink

type Sink interface {    Write(event Event) error    Close() error}

Sink是一个用来散发事件(Event)的构造。能够当作事件的解决者,应用接口的形式申明。只有对象实现了这两个办法,就能够被当作一个Sink。
核型办法

  • Write(event Event) error

    • 定义了事件如何被散发的策略。
  • Close() error

    • 当Sink被敞开的解决策略。

go-event外围就是围绕Sink做文章,docker官网给出了一个http的例子,就是当调用Write时,发动一次post申请。:

func (h *httpSink) Write(event Event) error {    p, err := json.Marshal(event)    if err != nil {        return err    }    body := bytes.NewReader(p)    resp, err := h.client.Post(h.url, "application/json", body)    if err != nil {        return err    }    defer resp.Body.Close()        if resp.Status != 200 {        return errors.New("unexpected status")    }    return nil}// implement (*httpSink).Close()

实现模型

到此为止,sink定义了事件散发的根本单位。在go-event中,封装了播送,音讯队列两种音讯散发的模型,具体来说,就是实现了Sink接口的两个构造体。

Boadcaster

type Broadcaster struct {    sinks   []Sink //所蕴含的Sink    events  chan Event// 同步Event的channel    adds    chan configureRequest //adds和remove必须保障thread-safe,所以采纳channel同步    removes chan configureRequest    shutdown chan struct{}    closed   chan struct{}    once     sync.Once}

Boardcaster由多个Sink组成,当Boardcaster接管到一个事件时,会调用本身蕴含的所有Sink的Write()办法
go-events设计之初就实现协程之间的音讯散发,须要保障thread-safe,所以对event的解决,增加,移除Sink都应用管道来通信。这也是Go的一个应用准则:

应用通信来共享内存,而不是通过共享内存来通信

在Broadcaster中所有的临界资源(sinks,event)都通过本身的run()函数对立治理,外界则通过相应的channel 同步给Broadcaster
例如Write()

func (b *Broadcaster) Write(event Event) error {    select {    case b.events <- event:    case <-b.closed:        return ErrSinkClosed    }    return nil}

能够看到增减sink都是通过向对应的channel写入数据进行的。

func (b *Broadcaster) Add(sink Sink) error {    return b.configure(b.adds, sink) //  will be block until ch can be writen}func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {    response := make(chan error, 1)    for {        select {        case ch <- configureRequest{            sink:     sink,            response: response}:            ch = nil // ?        case err := <-response:             return err        case <-b.closed:            return ErrSinkClosed        }    }}

外围run函数的实现,监听Boardcast管道上的相应事件,并作出解决。

func (b *Broadcaster) run() {    defer close(b.closed)    //将remove封装了一下,因为上面两处都会用到    remove := func(target Sink) {        for i, sink := range b.sinks {            if sink == target {                b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)                break            }        }    }    // 轮训解决channel上的事件    for {        select {        case event := <-b.events: //有事件到来,进行播送            for _, sink := range b.sinks {                if err := sink.Write(event); err != nil {                    if err == ErrSinkClosed {                        // remove closed sinks                        remove(sink)                        continue                    }                    logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).                        Errorf("broadcaster: dropping event")                }            }        case request := <-b.adds: //减少sink事件            // while we have to iterate for add/remove, common iteration for            // send is faster against slice.            var found bool            for _, sink := range b.sinks {                if request.sink == sink {                    found = true                    break                }            }            if !found {                b.sinks = append(b.sinks, request.sink)            }            // b.sinks[request.sink] = struct{}{}            request.response <- nil // 唤醒阻塞的configure()函数                    case request := <-b.removes://删除sink事件            remove(request.sink)            request.response <- nil        case <-b.shutdown:            // close all the underlying sinks            for _, sink := range b.sinks {                if err := sink.Close(); err != nil && err != ErrSinkClosed {                    logrus.WithField("events.sink", sink).WithError(err).                        Errorf("broadcaster: closing sink failed")                }            }            return        }    }}

queue

queue应用contaienr/list实现了典型的生产消费者模型

type Queue struct {    dst    Sink    events *list.List    cond   *sync.Cond     mu     sync.Mutex    closed bool}

外围函数run(),在队列中取出下一个event,交给本身的sink解决,在没有事件队列的状况下,eq.next()总是阻塞的(应用条件变量进行同步)

func (eq *Queue) run() {    for {        event := eq.next()        if event == nil {            return // nil block means event queue is closed.        }        if err := eq.dst.Write(event); err != nil {            logrus.WithFields(logrus.Fields{                "event": event,                "sink":  eq.dst,            }).WithError(err).Debug("eventqueue: dropped event")        }    }}

生产者:q.next()
消费者:write()

func (eq *Queue) Write(event Event) error {    eq.mu.Lock()    defer eq.mu.Unlock()    if eq.closed {        return ErrSinkClosed    }    eq.events.PushBack(event)    eq.cond.Signal() // signal waiters    return nil}func (eq *Queue) next() Event {    eq.mu.Lock()    defer eq.mu.Unlock()    for eq.events.Len() < 1 {        if eq.closed {            eq.cond.Broadcast()            return nil        }        eq.cond.Wait()    }    front := eq.events.Front()    block := front.Value.(Event)    eq.events.Remove(front)    return block}