乐趣区

关于golang:Docker组件goevent-源码学习

go-event 是一个在 Docker 我的项目中应用到的一个事件散发组件,实现了惯例的播送,队列等事件散发模型,代码简洁明了,也适宜初学者对 Go 语言的入门,对 channel 用来同步,通信也会加深了解。

外围数据结构

Event

type Event interface{}

Event 被封装为一个空接口,承受任意类型。在 go-events 示意一个能够被散发的事件。

interface{}的底层相似于 c 语言中的 void*, 但比void* 弱小很多,比方 interface{}保留了指向对象的指针和类型,而 c 程序员应用 void* 时,必须本人去保障对象的类型是正确的)

Sink

type Sink interface {Write(event Event) error
    Close() error}

Sink 是一个用来散发事件(Event)的构造。能够当作事件的解决者,应用接口的形式申明。只有对象实现了这两个办法,就能够被当作一个 Sink。
核型办法

  • Write(event Event) error

    • 定义了事件如何被散发的策略。
  • Close() error

    • 当 Sink 被敞开的解决策略。

go-event 外围就是围绕 Sink 做文章,docker 官网给出了一个 http 的例子,就是当调用 Write 时,发动一次 post 申请。:

func (h *httpSink) Write(event Event) error {p, err := json.Marshal(event)
    if err != nil {return err}
    body := bytes.NewReader(p)
    resp, err := h.client.Post(h.url, "application/json", body)
    if err != nil {return err}
    defer resp.Body.Close()
    
    if resp.Status != 200 {return errors.New("unexpected status")
    }

    return nil
}

// implement (*httpSink).Close()

实现模型

到此为止,sink 定义了事件散发的根本单位。在 go-event 中,封装了播送,音讯队列两种音讯散发的模型,具体来说,就是实现了 Sink 接口的两个构造体。

Boadcaster

type Broadcaster struct {sinks   []Sink // 所蕴含的 Sink
    events  chan Event// 同步 Event 的 channel
    adds    chan configureRequest //adds 和 remove 必须保障 thread-safe,所以采纳 channel 同步
    removes chan configureRequest

    shutdown chan struct{}
    closed   chan struct{}
    once     sync.Once
}

Boardcaster 由多个 Sink 组成,当 Boardcaster 接管到一个事件时,会调用本身蕴含的所有 Sink 的 Write()办法
go-events 设计之初就实现协程之间的音讯散发,须要保障 thread-safe,所以对 event 的解决,增加,移除 Sink 都应用管道来通信。这也是 Go 的一个应用准则:

应用通信来共享内存, 而不是通过共享内存来通信

在 Broadcaster 中所有的临界资源 (sinks,event) 都通过本身的 run()函数对立治理,外界则通过相应的 channel 同步给 Broadcaster
例如 Write()

func (b *Broadcaster) Write(event Event) error {
    select {
    case b.events <- event:
    case <-b.closed:
        return ErrSinkClosed
    }
    return nil
}

能够看到增减 sink 都是通过向对应的 channel 写入数据进行的。

func (b *Broadcaster) Add(sink Sink) error {return b.configure(b.adds, sink) //  will be block until ch can be writen
}

func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {response := make(chan error, 1)

    for {
        select {
        case ch <- configureRequest{
            sink:     sink,
            response: response}:
            ch = nil //?case err := <-response: 
            return err
        case <-b.closed:
            return ErrSinkClosed
        }
    }
}

外围 run 函数的实现, 监听 Boardcast 管道上的相应事件,并作出解决。

func (b *Broadcaster) run() {defer close(b.closed)
    // 将 remove 封装了一下,因为上面两处都会用到
    remove := func(target Sink) {
        for i, sink := range b.sinks {
            if sink == target {b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
                break
            }
        }
    }
    // 轮训解决 channel 上的事件
    for {
        select {
        case event := <-b.events: // 有事件到来,进行播送
            for _, sink := range b.sinks {if err := sink.Write(event); err != nil {
                    if err == ErrSinkClosed {
                        // remove closed sinks
                        remove(sink)
                        continue
                    }
                    logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
                        Errorf("broadcaster: dropping event")
                }
            }
        case request := <-b.adds: // 减少 sink 事件
            // while we have to iterate for add/remove, common iteration for
            // send is faster against slice.

            var found bool
            for _, sink := range b.sinks {
                if request.sink == sink {
                    found = true
                    break
                }
            }

            if !found {b.sinks = append(b.sinks, request.sink)
            }
            // b.sinks[request.sink] = struct{}{}
            request.response <- nil // 唤醒阻塞的 configure()函数
            
        case request := <-b.removes:// 删除 sink 事件
            remove(request.sink)
            request.response <- nil
        case <-b.shutdown:
            // close all the underlying sinks
            for _, sink := range b.sinks {if err := sink.Close(); err != nil && err != ErrSinkClosed {logrus.WithField("events.sink", sink).WithError(err).
                        Errorf("broadcaster: closing sink failed")
                }
            }
            return
        }
    }
}

queue

queue 应用 contaienr/list 实现了典型的生产消费者模型

type Queue struct {
    dst    Sink
    events *list.List
    cond   *sync.Cond 
    mu     sync.Mutex
    closed bool
}

外围函数 run(), 在队列中取出下一个 event,交给本身的 sink 解决,在没有事件队列的状况下,eq.next()总是阻塞的(应用条件变量进行同步)

func (eq *Queue) run() {
    for {event := eq.next()

        if event == nil {return // nil block means event queue is closed.}

        if err := eq.dst.Write(event); err != nil {
            logrus.WithFields(logrus.Fields{
                "event": event,
                "sink":  eq.dst,
            }).WithError(err).Debug("eventqueue: dropped event")
        }
    }
}

生产者:q.next()
消费者:write()

func (eq *Queue) Write(event Event) error {eq.mu.Lock()
    defer eq.mu.Unlock()

    if eq.closed {return ErrSinkClosed}

    eq.events.PushBack(event)
    eq.cond.Signal() // signal waiters

    return nil
}

func (eq *Queue) next() Event {eq.mu.Lock()
    defer eq.mu.Unlock()

    for eq.events.Len() < 1 {
        if eq.closed {eq.cond.Broadcast()
            return nil
        }

        eq.cond.Wait()}

    front := eq.events.Front()
    block := front.Value.(Event)
    eq.events.Remove(front)

    return block
}
退出移动版