乐趣区

关于micro:go-micro-broker

micro.newService()中 newOptions

func newOptions(opts ...Option) Options {
    opt := Options{
        Auth:      auth.DefaultAuth,
        Broker:    broker.DefaultBroker,
        Cmd:       cmd.DefaultCmd,
        Config:    config.DefaultConfig,
        Client:    client.DefaultClient,
        Server:    server.DefaultServer,
        Store:     store.DefaultStore,
        Registry:  registry.DefaultRegistry,
        Router:    router.DefaultRouter,
        Runtime:   runtime.DefaultRuntime,
        Transport: transport.DefaultTransport,
        Context:   context.Background(),
        Signal:    true,
    }

    for _, o := range opts {o(&opt)
    }

    return opt
}

初始化了一堆根底设置,先来看看 Broker broker.DefaultBroker,
在 broker/broker.go 中 DefaultBroker Broker = NewBroker()

// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {return newHttpBroker(opts...)
}

func newHttpBroker(opts ...Option) Broker {
    options := Options{Codec:    json.Marshaler{},
        Context:  context.TODO(),
        Registry: registry.DefaultRegistry,
    }

    for _, o := range opts {o(&options)
    }

    // set address
    addr := DefaultAddress

    if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {addr = options.Addrs[0]
    }

    h := &httpBroker{id:          uuid.New().String(),
        address:     addr,
        opts:        options,
        r:           options.Registry,
        c:           &http.Client{Transport: newTransport(options.TLSConfig)},
        subscribers: make(map[string][]*httpSubscriber),
        exit:        make(chan chan error),
        mux:         http.NewServeMux(),
        inbox:       make(map[string][][]byte),
    }

    // specify the message handler
    h.mux.Handle(DefaultPath, h)

    // get optional handlers
    if h.opts.Context != nil {handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
        if ok {
            for pattern, handler := range handlers {h.mux.Handle(pattern, handler)
            }
        }
    }

    return h
}

这里做了几件事

  1. 初始化 options,设置 Codec 为 json,设置 ctx,Registry
  2. 初始化 httpBroker,设置 http.Client 时调用 newTransport()设置代理,同时启用 http2,最初指定 message handler

h.mux.Handle(DefaultPath, h) h 就是 httpBroker,在 httpBroker 中实现了 ServeHTTP(),则所有申请都通过他来解决,即所有订阅的音讯解决都是通过 httpBroker.ServeHTTP()来解决的

  1. 如果 ctx 不为空,就取 http_handlers 数组,顺次注册 http handle

上面看看 example/broker, 一个 broker 的示例

var (topic = "go.micro.topic.foo")

func pub() {tick := time.NewTicker(time.Second)
    i := 0
    for _ = range tick.C {
        msg := &broker.Message{Header: map[string]string{"id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        if err := broker.Publish(topic, msg); err != nil {log.Printf("[pub] failed: %v", err)
        } else {fmt.Println("[pub] pubbed message:", string(msg.Body))
        }
        i++
    }
}

func sub() {_, err := broker.Subscribe(topic, func(p broker.Event) error {fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
        return nil
    })
    if err != nil {fmt.Println(err)
    }
}

func main() {cmd.Init()

    if err := broker.Init(); err != nil {log.Fatalf("Broker Init error: %v", err)
    }
    if err := broker.Connect(); err != nil {log.Fatalf("Broker Connect error: %v", err)
    }

    go pub()
    go sub()

    <-time.After(time.Second * 10)
}

cmd.init()请见 micro cmd 篇,有具体介绍
cmd.opts.Broker 默认应用的是上文剖析的 httpBroker
先看broker.Init()

func (h *httpBroker) Init(opts ...Option) error {h.RLock()
    if h.running {h.RUnlock()
        return errors.New("cannot init while connected")
    }
    h.RUnlock()

    h.Lock()
    defer h.Unlock()

    for _, o := range opts {o(&h.opts)
    }

    if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {h.address = h.opts.Addrs[0]
    }

    if len(h.id) == 0 {h.id = "go.micro.http.broker-" + uuid.New().String()}

    // get registry
    reg := h.opts.Registry
    if reg == nil {reg = registry.DefaultRegistry}

    // get cache
    if rc, ok := h.r.(cache.Cache); ok {rc.Stop()
    }

    // set registry
    h.r = cache.New(reg)

    // reconfigure tls config
    if c := h.opts.TLSConfig; c != nil {
        h.c = &http.Client{Transport: newTransport(c),
        }
    }

    return nil
}

这里做了以下几件事件

  1. 上读锁,查看是否正在运行
  2. 上读写锁,在进行前面操作

    1. 设置 opt
    2. 设置 address,id
    3. 获取 Registry,cache,设置 registry
    4. 设置 http.Client 中 Transport 的 tls

再看broker.Connect()

func (h *httpBroker) Connect() error {h.RLock()
    if h.running {h.RUnlock()
        return nil
    }
    h.RUnlock()

    h.Lock()
    defer h.Unlock()

    var l net.Listener
    var err error

    if h.opts.Secure || h.opts.TLSConfig != nil {
        config := h.opts.TLSConfig

        fn := func(addr string) (net.Listener, error) {
            if config == nil {hosts := []string{addr}

                // check if its a valid host:port
                if host, _, err := net.SplitHostPort(addr); err == nil {if len(host) == 0 {hosts = maddr.IPs()
                    } else {hosts = []string{host}
                    }
                }

                // generate a certificate
                cert, err := mls.Certificate(hosts...)
                if err != nil {return nil, err}
                config = &tls.Config{Certificates: []tls.Certificate{cert}}
            }
            return tls.Listen("tcp", addr, config)
        }

        l, err = mnet.Listen(h.address, fn)
    } else {fn := func(addr string) (net.Listener, error) {return net.Listen("tcp", addr)
        }

        l, err = mnet.Listen(h.address, fn)
    }

    if err != nil {return err}

    addr := h.address
    h.address = l.Addr().String()

    go http.Serve(l, h.mux)
    go func() {h.run(l)
        h.Lock()
        h.opts.Addrs = []string{addr}
        h.address = addr
        h.Unlock()}()

    // get registry
    reg := h.opts.Registry
    if reg == nil {reg = registry.DefaultRegistry}
    // set cache
    h.r = cache.New(reg)

    // set running
    h.running = true
    return nil
}

这里做了以下几件事件

  1. 上读锁,查看是否正在运行
  2. 上读写锁,在进行前面操作

    1. 如果有 Secure 和 TLSConfig,做一些 tls 的设置,没有则间接返回默认 net.Listener
    2. 开一个协程运行 http.Serve,解决申请是 newHttpBroker() 中指定的 handle 函数ServeHTTP()(规范库 http 提供了 Handler 接口,用于开发者实现本人的 handler。只有实现接口的 ServeHTTP 办法即可。)
    3. 开一个协程运行 run(), 前面看, 设置地址
    4. 获取 Registry,设置 cache
    5. 标记 httpBroker 正在运行

看看方才的 run() 做了什么

func (h *httpBroker) run(l net.Listener) {t := time.NewTicker(registerInterval)
    defer t.Stop()

    for {
        select {
        // heartbeat for each subscriber
        case <-t.C:
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
                }
            }
            h.RUnlock()
        // received exit signal
        case ch := <-h.exit:
            ch <- l.Close()
            h.RLock()
            for _, subs := range h.subscribers {
                for _, sub := range subs {_ = h.r.Deregister(sub.svc)
                }
            }
            h.RUnlock()
            return
        }
    }
}

这里做了以下几件事件

  1. 设置默认 30 秒一次的工夫触发器,每个周期都在服务发现核心顺次注册 subscribers 中的订阅
  2. 承受退出音讯,顺次调用 Deregister() 登记 subscribers 中的订阅

接下来看看怎么订阅音讯broker.Subscribe()

func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
    var err error
    var host, port string
    options := NewSubscribeOptions(opts...)

    // parse address for host, port
    host, port, err = net.SplitHostPort(h.Address())
    if err != nil {return nil, err}

    addr, err := maddr.Extract(host)
    if err != nil {return nil, err}

    var secure bool

    if h.opts.Secure || h.opts.TLSConfig != nil {secure = true}

    // register service
    node := &registry.Node{
        Id:      topic + "-" + h.id,
        Address: mnet.HostPort(addr, port),
        Metadata: map[string]string{"secure": fmt.Sprintf("%t", secure),
            "broker": "http",
            "topic":  topic,
        },
    }

    // check for queue group or broadcast queue
    version := options.Queue
    if len(version) == 0 {version = broadcastVersion}

    service := &registry.Service{
        Name:    serviceName,
        Version: version,
        Nodes:   []*registry.Node{node},
    }

    // generate subscriber
    subscriber := &httpSubscriber{
        opts:  options,
        hb:    h,
        id:    node.Id,
        topic: topic,
        fn:    handler,
        svc:   service,
    }

    // subscribe now
    if err := h.subscribe(subscriber); err != nil {return nil, err}

    // return the subscriber
    return subscriber, nil
}

func (h *httpBroker) subscribe(s *httpSubscriber) error {h.Lock()
    defer h.Unlock()

    if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {return err}

    h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
    return nil
}

这里做了以下几件事件

  1. 初始化 SubscribeOptions
  2. 解析 Address 中 host, port,并验证 ip
  3. 初始化 registry.Node{}
  4. 查看 options.Queue,设置 registry.Service{},
  5. 生成订阅信息结构体 httpSubscriber{}
  6. 调用 subscribe(), 返回 subscriber

    1. 开读写锁,把订阅服务 (Connect() 中开了 http.Serve())注册到服务发现(默认 mdns),发消息的时候通过服务发现找 node,就是往这些注册的服务中发了
    2. 订阅频道记录到 h.subscribers 中

获取音讯通过调用 Subscribe 时传递的处理函数,示例如下

broker.Subscribe(topic, func(p broker.Event) error {fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
        return nil
    })

Event 及 httpEvent 定义

// Event is given to a subscription handler for processing
type Event interface {Topic() string
    Message() *Message
    Ack() error
    Error() error}

type httpEvent struct {
    m   *Message
    t   string
    err error
}

func (h *httpEvent) Ack() error {return nil}

func (h *httpEvent) Error() error {return h.err}

func (h *httpEvent) Message() *Message {return h.m}

func (h *httpEvent) Topic() string {return h.t}

能够看到 httpEvent 实现了 Event,这样 p.Message()就能够失去音讯了
获取音讯在 ServeHTTP()中,收到音讯,调用传入的 fn 解决即可

上面再看公布音讯,只须要定义 broker.Message,再调用broker.Publish() 即可,示例如下

msg := &broker.Message{Header: map[string]string{"id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        if err := broker.Publish(topic, msg); err != nil {log.Printf("[pub] failed: %v", err)
        } else {fmt.Println("[pub] pubbed message:", string(msg.Body))
        }

看看broker.Publish()

func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
    // create the message first
    m := &Message{Header: make(map[string]string),
        Body:   msg.Body,
    }

    for k, v := range msg.Header {m.Header[k] = v
    }

    m.Header["Micro-Topic"] = topic

    // encode the message
    b, err := h.opts.Codec.Marshal(m)
    if err != nil {return err}

    // save the message
    h.saveMessage(topic, b)

    // now attempt to get the service
    h.RLock()
    s, err := h.r.GetService(serviceName)
    if err != nil {h.RUnlock()
        return err
    }
    h.RUnlock()

    pub := func(node *registry.Node, t string, b []byte) error {
        scheme := "http"

        // check if secure is added in metadata
        if node.Metadata["secure"] == "true" {scheme = "https"}

        vals := url.Values{}
        vals.Add("id", node.Id)

        uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode())
        r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
        if err != nil {return err}

        // discard response body
        io.Copy(ioutil.Discard, r.Body)
        r.Body.Close()
        return nil
    }

    srv := func(s []*registry.Service, b []byte) {
        for _, service := range s {var nodes []*registry.Node

            for _, node := range service.Nodes {
                // only use nodes tagged with broker http
                if node.Metadata["broker"] != "http" {continue}

                // look for nodes for the topic
                if node.Metadata["topic"] != topic {continue}

                nodes = append(nodes, node)
            }

            // only process if we have nodes
            if len(nodes) == 0 {continue}

            switch service.Version {
            // broadcast version means broadcast to all nodes
            case broadcastVersion:
                var success bool

                // publish to all nodes
                for _, node := range nodes {
                    // publish async
                    if err := pub(node, topic, b); err == nil {success = true}
                }

                // save if it failed to publish at least once
                if !success {h.saveMessage(topic, b)
                }
            default:
                // select node to publish to
                node := nodes[rand.Int()%len(nodes)]

                // publish async to one node
                if err := pub(node, topic, b); err != nil {
                    // if failed save it
                    h.saveMessage(topic, b)
                }
            }
        }
    }

    // do the rest async
    go func() {
        // get a third of the backlog
        messages := h.getMessage(topic, 8)
        delay := (len(messages) > 1)

        // publish all the messages
        for _, msg := range messages {
            // serialize here
            srv(s, msg)

            // sending a backlog of messages
            if delay {time.Sleep(time.Millisecond * 100)
            }
        }
    }()

    return nil
}

这里做了以下几件事件

  1. 创立音讯,header 中增加一个Micro-Topic,值是 topic
  2. 编码音讯,默认 json
  3. 调用 saveMessage()保留音讯

    1. httpBroker.mtx 加锁
    2. 在 httpBroker.inbox 中增加这条音讯,如果 inbox
数组大于 64 条则只取前 64 条重赋值给 inbox,【` 震惊,前面间接丢了!!!`】
  1. 调用 httpBroker.r.GetService(serviceName) 获取 service,serviceName 默认 ”micro.http.broker”
  2. 创立处理函数pub()

    1. 确定 scheme[http 或 https]
    2. url 参数中减少 id,值为 node.id
    3. 拼接 uri,scheme://(node.Address)(DefaultPath)(vals.Encode())
    4. post 发动申请,返回后果放入 ioutil.Discard(就是丢掉了),敞开返回 body
  3. 创立处理函数srv()

    1. 收集所有可用的 registry.Node
    2. 判断音讯播送还是指定了 Queue(指定了 Queue 随机选一个 node),依据状况异步调用 pub() 发送,失败则从新调用saveMessage()
  4. 创立协程

    1. 调用 getMessage()

      1. 开读写锁 h.mtx.Lock()
      2. 从 inbox 取出指定条数的音讯
    2. 顺次用第 6 步的 srv() 解决每条音讯,如果方才取出的音讯大于 1 条,每次发送距离 Millisecond*100

至此,整个 broker 的流程比拟清晰了。

总结:

  1. 默认的 http broker,订阅就是开了一个 http 服务手音讯,公布就是从服务发现拿到节点信息,往节点 post 数据。
  2. 理论应用中通常能够指定 etcd,consul 这样的服务发现。
  3. 音讯 inbox 最多只放 64 条,尽量避免音讯沉积,音讯最好写日志

写到这里又要举荐大家看 micro in action 了
Micro In Action(四):Pub/Sub
Micro In Action(五):Message Broker

退出移动版