micro.newService()中newOptions

func newOptions(opts ...Option) Options {    opt := Options{        Auth:      auth.DefaultAuth,        Broker:    broker.DefaultBroker,        Cmd:       cmd.DefaultCmd,        Config:    config.DefaultConfig,        Client:    client.DefaultClient,        Server:    server.DefaultServer,        Store:     store.DefaultStore,        Registry:  registry.DefaultRegistry,        Router:    router.DefaultRouter,        Runtime:   runtime.DefaultRuntime,        Transport: transport.DefaultTransport,        Context:   context.Background(),        Signal:    true,    }    for _, o := range opts {        o(&opt)    }    return opt}

初始化了一堆根底设置,先来看看Broker broker.DefaultBroker,
在broker/broker.go中 DefaultBroker Broker = NewBroker()

// NewBroker returns a new http brokerfunc NewBroker(opts ...Option) Broker {    return newHttpBroker(opts...)}func newHttpBroker(opts ...Option) Broker {    options := Options{        Codec:    json.Marshaler{},        Context:  context.TODO(),        Registry: registry.DefaultRegistry,    }    for _, o := range opts {        o(&options)    }    // set address    addr := DefaultAddress    if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {        addr = options.Addrs[0]    }    h := &httpBroker{        id:          uuid.New().String(),        address:     addr,        opts:        options,        r:           options.Registry,        c:           &http.Client{Transport: newTransport(options.TLSConfig)},        subscribers: make(map[string][]*httpSubscriber),        exit:        make(chan chan error),        mux:         http.NewServeMux(),        inbox:       make(map[string][][]byte),    }    // specify the message handler    h.mux.Handle(DefaultPath, h)    // get optional handlers    if h.opts.Context != nil {        handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)        if ok {            for pattern, handler := range handlers {                h.mux.Handle(pattern, handler)            }        }    }    return h}

这里做了几件事

  1. 初始化options,设置Codec为json,设置ctx,Registry
  2. 初始化httpBroker,设置http.Client时调用newTransport()设置代理,同时启用http2,最初指定message handler

h.mux.Handle(DefaultPath, h)h就是httpBroker,在httpBroker中实现了ServeHTTP(),则所有申请都通过他来解决,即所有订阅的音讯解决都是通过httpBroker.ServeHTTP()来解决的

  1. 如果ctx不为空,就取http_handlers数组,顺次注册http handle

上面看看example/broker, 一个broker的示例

var (    topic = "go.micro.topic.foo")func pub() {    tick := time.NewTicker(time.Second)    i := 0    for _ = range tick.C {        msg := &broker.Message{            Header: map[string]string{                "id": fmt.Sprintf("%d", i),            },            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),        }        if err := broker.Publish(topic, msg); err != nil {            log.Printf("[pub] failed: %v", err)        } else {            fmt.Println("[pub] pubbed message:", string(msg.Body))        }        i++    }}func sub() {    _, err := broker.Subscribe(topic, func(p broker.Event) error {        fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)        return nil    })    if err != nil {        fmt.Println(err)    }}func main() {    cmd.Init()    if err := broker.Init(); err != nil {        log.Fatalf("Broker Init error: %v", err)    }    if err := broker.Connect(); err != nil {        log.Fatalf("Broker Connect error: %v", err)    }    go pub()    go sub()    <-time.After(time.Second * 10)}

cmd.init()请见micro cmd篇,有具体介绍
cmd.opts.Broker默认应用的是上文剖析的httpBroker
先看broker.Init()

func (h *httpBroker) Init(opts ...Option) error {    h.RLock()    if h.running {        h.RUnlock()        return errors.New("cannot init while connected")    }    h.RUnlock()    h.Lock()    defer h.Unlock()    for _, o := range opts {        o(&h.opts)    }    if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {        h.address = h.opts.Addrs[0]    }    if len(h.id) == 0 {        h.id = "go.micro.http.broker-" + uuid.New().String()    }    // get registry    reg := h.opts.Registry    if reg == nil {        reg = registry.DefaultRegistry    }    // get cache    if rc, ok := h.r.(cache.Cache); ok {        rc.Stop()    }    // set registry    h.r = cache.New(reg)    // reconfigure tls config    if c := h.opts.TLSConfig; c != nil {        h.c = &http.Client{            Transport: newTransport(c),        }    }    return nil}

这里做了以下几件事件

  1. 上读锁,查看是否正在运行
  2. 上读写锁,在进行前面操作

    1. 设置opt
    2. 设置address,id
    3. 获取Registry,cache,设置registry
    4. 设置http.Client中Transport的tls

再看broker.Connect()

func (h *httpBroker) Connect() error {    h.RLock()    if h.running {        h.RUnlock()        return nil    }    h.RUnlock()    h.Lock()    defer h.Unlock()    var l net.Listener    var err error    if h.opts.Secure || h.opts.TLSConfig != nil {        config := h.opts.TLSConfig        fn := func(addr string) (net.Listener, error) {            if config == nil {                hosts := []string{addr}                // check if its a valid host:port                if host, _, err := net.SplitHostPort(addr); err == nil {                    if len(host) == 0 {                        hosts = maddr.IPs()                    } else {                        hosts = []string{host}                    }                }                // generate a certificate                cert, err := mls.Certificate(hosts...)                if err != nil {                    return nil, err                }                config = &tls.Config{Certificates: []tls.Certificate{cert}}            }            return tls.Listen("tcp", addr, config)        }        l, err = mnet.Listen(h.address, fn)    } else {        fn := func(addr string) (net.Listener, error) {            return net.Listen("tcp", addr)        }        l, err = mnet.Listen(h.address, fn)    }    if err != nil {        return err    }    addr := h.address    h.address = l.Addr().String()    go http.Serve(l, h.mux)    go func() {        h.run(l)        h.Lock()        h.opts.Addrs = []string{addr}        h.address = addr        h.Unlock()    }()    // get registry    reg := h.opts.Registry    if reg == nil {        reg = registry.DefaultRegistry    }    // set cache    h.r = cache.New(reg)    // set running    h.running = true    return nil}

这里做了以下几件事件

  1. 上读锁,查看是否正在运行
  2. 上读写锁,在进行前面操作

    1. 如果有Secure和TLSConfig,做一些tls的设置,没有则间接返回默认net.Listener
    2. 开一个协程运行http.Serve,解决申请是newHttpBroker()中指定的handle函数ServeHTTP()( 规范库http提供了Handler接口,用于开发者实现本人的handler。只有实现接口的ServeHTTP办法即可。)
    3. 开一个协程运行run(),前面看,设置地址
    4. 获取Registry,设置cache
    5. 标记httpBroker正在运行

看看方才的run()做了什么

func (h *httpBroker) run(l net.Listener) {    t := time.NewTicker(registerInterval)    defer t.Stop()    for {        select {        // heartbeat for each subscriber        case <-t.C:            h.RLock()            for _, subs := range h.subscribers {                for _, sub := range subs {                    _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))                }            }            h.RUnlock()        // received exit signal        case ch := <-h.exit:            ch <- l.Close()            h.RLock()            for _, subs := range h.subscribers {                for _, sub := range subs {                    _ = h.r.Deregister(sub.svc)                }            }            h.RUnlock()            return        }    }}

这里做了以下几件事件

  1. 设置默认30秒一次的工夫触发器,每个周期都在服务发现核心顺次注册subscribers中的订阅
  2. 承受退出音讯,顺次调用Deregister()登记subscribers中的订阅

接下来看看怎么订阅音讯broker.Subscribe()

func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {    var err error    var host, port string    options := NewSubscribeOptions(opts...)    // parse address for host, port    host, port, err = net.SplitHostPort(h.Address())    if err != nil {        return nil, err    }    addr, err := maddr.Extract(host)    if err != nil {        return nil, err    }    var secure bool    if h.opts.Secure || h.opts.TLSConfig != nil {        secure = true    }    // register service    node := &registry.Node{        Id:      topic + "-" + h.id,        Address: mnet.HostPort(addr, port),        Metadata: map[string]string{            "secure": fmt.Sprintf("%t", secure),            "broker": "http",            "topic":  topic,        },    }    // check for queue group or broadcast queue    version := options.Queue    if len(version) == 0 {        version = broadcastVersion    }    service := &registry.Service{        Name:    serviceName,        Version: version,        Nodes:   []*registry.Node{node},    }    // generate subscriber    subscriber := &httpSubscriber{        opts:  options,        hb:    h,        id:    node.Id,        topic: topic,        fn:    handler,        svc:   service,    }    // subscribe now    if err := h.subscribe(subscriber); err != nil {        return nil, err    }    // return the subscriber    return subscriber, nil}func (h *httpBroker) subscribe(s *httpSubscriber) error {    h.Lock()    defer h.Unlock()    if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {        return err    }    h.subscribers[s.topic] = append(h.subscribers[s.topic], s)    return nil}

这里做了以下几件事件

  1. 初始化SubscribeOptions
  2. 解析Address中host, port,并验证ip
  3. 初始化registry.Node{}
  4. 查看options.Queue,设置registry.Service{},
  5. 生成订阅信息结构体httpSubscriber{}
  6. 调用subscribe(),返回subscriber

    1. 开读写锁,把订阅服务(Connect()中开了http.Serve())注册到服务发现(默认mdns),发消息的时候通过服务发现找node,就是往这些注册的服务中发了
    2. 订阅频道记录到h.subscribers中

获取音讯通过调用Subscribe时传递的处理函数,示例如下

broker.Subscribe(topic, func(p broker.Event) error {        fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)        return nil    })

Event及httpEvent定义

// Event is given to a subscription handler for processingtype Event interface {    Topic() string    Message() *Message    Ack() error    Error() error}type httpEvent struct {    m   *Message    t   string    err error}func (h *httpEvent) Ack() error {    return nil}func (h *httpEvent) Error() error {    return h.err}func (h *httpEvent) Message() *Message {    return h.m}func (h *httpEvent) Topic() string {    return h.t}

能够看到httpEvent实现了Event,这样p.Message()就能够失去音讯了
获取音讯在ServeHTTP()中,收到音讯,调用传入的fn解决即可

上面再看公布音讯,只须要定义broker.Message,再调用broker.Publish()即可,示例如下

msg := &broker.Message{            Header: map[string]string{                "id": fmt.Sprintf("%d", i),            },            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),        }        if err := broker.Publish(topic, msg); err != nil {            log.Printf("[pub] failed: %v", err)        } else {            fmt.Println("[pub] pubbed message:", string(msg.Body))        }

看看broker.Publish()

func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {    // create the message first    m := &Message{        Header: make(map[string]string),        Body:   msg.Body,    }    for k, v := range msg.Header {        m.Header[k] = v    }    m.Header["Micro-Topic"] = topic    // encode the message    b, err := h.opts.Codec.Marshal(m)    if err != nil {        return err    }    // save the message    h.saveMessage(topic, b)    // now attempt to get the service    h.RLock()    s, err := h.r.GetService(serviceName)    if err != nil {        h.RUnlock()        return err    }    h.RUnlock()    pub := func(node *registry.Node, t string, b []byte) error {        scheme := "http"        // check if secure is added in metadata        if node.Metadata["secure"] == "true" {            scheme = "https"        }        vals := url.Values{}        vals.Add("id", node.Id)        uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode())        r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))        if err != nil {            return err        }        // discard response body        io.Copy(ioutil.Discard, r.Body)        r.Body.Close()        return nil    }    srv := func(s []*registry.Service, b []byte) {        for _, service := range s {            var nodes []*registry.Node            for _, node := range service.Nodes {                // only use nodes tagged with broker http                if node.Metadata["broker"] != "http" {                    continue                }                // look for nodes for the topic                if node.Metadata["topic"] != topic {                    continue                }                nodes = append(nodes, node)            }            // only process if we have nodes            if len(nodes) == 0 {                continue            }            switch service.Version {            // broadcast version means broadcast to all nodes            case broadcastVersion:                var success bool                // publish to all nodes                for _, node := range nodes {                    // publish async                    if err := pub(node, topic, b); err == nil {                        success = true                    }                }                // save if it failed to publish at least once                if !success {                    h.saveMessage(topic, b)                }            default:                // select node to publish to                node := nodes[rand.Int()%len(nodes)]                // publish async to one node                if err := pub(node, topic, b); err != nil {                    // if failed save it                    h.saveMessage(topic, b)                }            }        }    }    // do the rest async    go func() {        // get a third of the backlog        messages := h.getMessage(topic, 8)        delay := (len(messages) > 1)        // publish all the messages        for _, msg := range messages {            // serialize here            srv(s, msg)            // sending a backlog of messages            if delay {                time.Sleep(time.Millisecond * 100)            }        }    }()    return nil}

这里做了以下几件事件

  1. 创立音讯,header中增加一个Micro-Topic,值是topic
  2. 编码音讯,默认json
  3. 调用saveMessage()保留音讯

    1. httpBroker.mtx加锁
    2. 在httpBroker.inbox中增加这条音讯,如果inbox
数组大于64条则只取前64条重赋值给inbox,【`震惊,前面间接丢了!!!`】
  1. 调用httpBroker.r.GetService(serviceName)获取service,serviceName默认"micro.http.broker"
  2. 创立处理函数pub()

    1. 确定scheme[http或https]
    2. url参数中减少id,值为node.id
    3. 拼接uri,scheme://(node.Address)(DefaultPath)(vals.Encode())
    4. post发动申请,返回后果放入ioutil.Discard(就是丢掉了),敞开返回body
  3. 创立处理函数srv()

    1. 收集所有可用的registry.Node
    2. 判断音讯播送还是指定了Queue(指定了Queue随机选一个node),依据状况异步调用pub()发送,失败则从新调用saveMessage()
  4. 创立协程

    1. 调用getMessage()

      1. 开读写锁h.mtx.Lock()
      2. 从inbox取出指定条数的音讯
    2. 顺次用第6步的srv()解决每条音讯,如果方才取出的音讯大于1条,每次发送距离Millisecond*100

至此,整个broker的流程比拟清晰了。

总结:

  1. 默认的http broker,订阅就是开了一个http服务手音讯,公布就是从服务发现拿到节点信息,往节点post数据。
  2. 理论应用中通常能够指定etcd,consul这样的服务发现。
  3. 音讯inbox最多只放64条,尽量避免音讯沉积,音讯最好写日志

写到这里又要举荐大家看micro in action了
Micro In Action(四):Pub/Sub
Micro In Action(五):Message Broker