概述

在第一篇概述文章中已经提到了在Micro中 Broker的作用,Go Micro 总体设计。我们也知道Micro是一个可插拔的分布式框架,我们可以使用kafka,rabbitmq,cache,redis,nats等各种实现具体可以在git上的插件库中找到go-plugins
我们再来看一下接口:

type Broker interface {    Init(...Option) error    Options() Options    Address() string    Connect() error    Disconnect() error    Publish(topic string, m *Message, opts ...PublishOption) error    Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)    String() string}

Connect 开启broker
Subsribe 注册对某个topic的监听
Publish 发布某个topic的消息
Disconnect 关闭broker

简单使用

本片文章使用的是默认broker实现httpbroker分析,是一个比较简单和容易理解的方式,代码地址。

PubTopic函数为发布函数
SubTopic函数为订阅函数
下面的例子其实不需要很多注释,就是简单的使用一个发布一个订阅函数来实现。
package mainimport (    "fmt"    "github.com/micro/go-micro/broker"    "log"    "time")func main() {    if err := broker.Connect(); err != nil {        fmt.Println("Broker Connect error: %v", err)    }    go PubTopic("SubServerName")    go SubTopic("SubServerName")    time.Sleep(time.Second * 10)}func PubTopic(topic string) {    tick := time.NewTicker(time.Second)    i := 0    for _ = range tick.C {        msg := &broker.Message{            Header: map[string]string{                "id": fmt.Sprintf("%d", i),            },            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),        }        if err := broker.Publish(topic, msg); err != nil {            log.Printf("[pub] failed: %v", err)        } else {            fmt.Println("[pub] pubbed message:", string(msg.Body))        }        i++    }}func SubTopic(topic string) {    _, err := broker.Subscribe(topic, func(p broker.Event) error {        fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)        return nil    })    if err != nil {        fmt.Println(err)    }}

结果

[pub] pubbed message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860[sub] received message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860 header map[id:0][pub] pubbed message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632[sub] received message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632 header map[id:1][pub] pubbed message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892[sub] received message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892 header map[id:2][pub] pubbed message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327[sub] received message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327 header map[id:3][pub] pubbed message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526[sub] received message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526 header map[id:4]

源码分析

Connect

  1. 开启tcp监听
  2. 启一个goroutine,在registerInterval间隔对subscriber就行注册,类似心跳
  3. 设置服务发现注册服务
  4. 设置缓存对象
  5. 设置running = true
func (h *httpBroker) Connect() error {    h.RLock()    if h.running {        h.RUnlock()        return nil    }    h.RUnlock()    h.Lock()    defer h.Unlock()    var l net.Listener    var err error    // 创建监听函数 判断是否有配置    if h.opts.Secure || h.opts.TLSConfig != nil {        config := h.opts.TLSConfig        fn := func(addr string) (net.Listener, error) {            if config == nil {                hosts := []string{addr}                // check if its a valid host:port                if host, _, err := net.SplitHostPort(addr); err == nil {                    if len(host) == 0 {                        hosts = maddr.IPs()                    } else {                        hosts = []string{host}                    }                }                // generate a certificate                cert, err := mls.Certificate(hosts...)                if err != nil {                    return nil, err                }                config = &tls.Config{Certificates: []tls.Certificate{cert}}            }            return tls.Listen("tcp", addr, config)        }        l, err = mnet.Listen(h.address, fn)    } else {        fn := func(addr string) (net.Listener, error) {            return net.Listen("tcp", addr)        }        l, err = mnet.Listen(h.address, fn)    }    if err != nil {        return err    }    addr := h.address    h.address = l.Addr().String()    go http.Serve(l, h.mux)    go func() {        // 根据设置的registerInterval 心跳时间,检测服务是否存活           h.run(l)        h.Lock()        h.opts.Addrs = []string{addr}        h.address = addr        h.Unlock()    }()    // get registry    reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)    if !ok {        reg = registry.DefaultRegistry    }    // set cache    h.r = cache.New(reg)    // set running    h.running = true    return nil}

Subscribe

  1. 解析address
  2. 创建唯一id
  3. 拼装服务信息 最后的服务信息如下图
  4. 调用Register(默认的是mdns)注册服务
  5. 把service放到subscribers map[string][]*httpSubscriber中

func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {    options := NewSubscribeOptions(opts...)    // parse address for host, port    parts := strings.Split(h.Address(), ":")    host := strings.Join(parts[:len(parts)-1], ":")    port, _ := strconv.Atoi(parts[len(parts)-1])    addr, err := maddr.Extract(host)    if err != nil {        return nil, err    }    // create unique id    id := h.id + "." + uuid.New().String()    var secure bool    if h.opts.Secure || h.opts.TLSConfig != nil {        secure = true    }    // register service    node := &registry.Node{        Id:      id,        Address: fmt.Sprintf("%s:%d", addr, port),        Metadata: map[string]string{            "secure": fmt.Sprintf("%t", secure),        },    }    // check for queue group or broadcast queue    version := options.Queue    if len(version) == 0 {        version = broadcastVersion    }    service := &registry.Service{        Name:    "topic:" + topic,        Version: version,        Nodes:   []*registry.Node{node},    }    // 组装sub    subscriber := &httpSubscriber{        opts:  options,        hb:    h,        id:    id,        topic: topic,        fn:    handler,        svc:   service,    }    // 调用subscribe函数 内部调用register 注册服务    if err := h.subscribe(subscriber); err != nil {        return nil, err    }    // return the subscriber    return subscriber, nil}

Publish

  1. 从registry获取topic的消费者节点
  2. 对消息进行编码
  3. 依次把编码后的消息异步publish到节点(http post)
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {    // create the message first    m := &Message{        Header: make(map[string]string),        Body:   msg.Body,    }    for k, v := range msg.Header {        m.Header[k] = v    }    m.Header[":topic"] = topic    // encode the message    b, err := h.opts.Codec.Marshal(m)    if err != nil {        return err    }    // save the message    h.saveMessage(topic, b)    // now attempt to get the service    h.RLock()    s, err := h.r.GetService("topic:" + topic)    if err != nil {        h.RUnlock()        // ignore error        return nil    }    h.RUnlock()    pub := func(node *registry.Node, t string, b []byte) error {        scheme := "http"        // check if secure is added in metadata        if node.Metadata["secure"] == "true" {            scheme = "https"        }        vals := url.Values{}        vals.Add("id", node.Id)        uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode())        r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))        if err != nil {            return err        }        // discard response body        io.Copy(ioutil.Discard, r.Body)        r.Body.Close()        return nil    }    srv := func(s []*registry.Service, b []byte) {        for _, service := range s {            // only process if we have nodes            if len(service.Nodes) == 0 {                continue            }            switch service.Version {            // broadcast version means broadcast to all nodes            case broadcastVersion:                var success bool                // publish to all nodes                for _, node := range service.Nodes {                    // publish async                    if err := pub(node, topic, b); err == nil {                        success = true                    }                }                // save if it failed to publish at least once                if !success {                    h.saveMessage(topic, b)                }            default:                // select node to publish to                node := service.Nodes[rand.Int()%len(service.Nodes)]                // publish async to one node                if err := pub(node, topic, b); err != nil {                    // if failed save it                    h.saveMessage(topic, b)                }            }        }    }    // do the rest async    go func() {        // get a third of the backlog        messages := h.getMessage(topic, 8)        delay := (len(messages) > 1)        // publish all the messages        for _, msg := range messages {            // serialize here            srv(s, msg)            // sending a backlog of messages            if delay {                time.Sleep(time.Millisecond * 100)            }        }    }()    return nil}

总结

默认的broker实现是用http,不过一般情况下不会使用http来实现发布和订阅的。我们可以使用kafka,redis等来实现发布和订阅。