Go-Micro-Broker-源码分析

52次阅读

共计 6363 个字符,预计需要花费 16 分钟才能阅读完成。

概述

在第一篇概述文章中已经提到了在 Micro 中 Broker 的作用,Go Micro 总体设计。我们也知道 Micro 是一个可插拔的分布式框架,我们可以使用 kafka,rabbitmq,cache,redis,nats 等各种实现具体可以在 git 上的插件库中找到 go-plugins
我们再来看一下接口:

type Broker interface {Init(...Option) error
    Options() Options
    Address() string
    Connect() error
    Disconnect() error
    Publish(topic string, m *Message, opts ...PublishOption) error
    Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
    String() string}

Connect 开启 broker
Subsribe 注册对某个 topic 的监听
Publish 发布某个 topic 的消息
Disconnect 关闭 broker

简单使用

本片文章使用的是默认 broker 实现 httpbroker 分析,是一个比较简单和容易理解的方式,代码地址。

PubTopic 函数为发布函数
SubTopic 函数为订阅函数
下面的例子其实不需要很多注释,就是简单的使用一个发布一个订阅函数来实现。

package main

import (
    "fmt"
    "github.com/micro/go-micro/broker"
    "log"
    "time"
)

func main() {if err := broker.Connect(); err != nil {fmt.Println("Broker Connect error: %v", err)
    }

    go PubTopic("SubServerName")
    go SubTopic("SubServerName")
    time.Sleep(time.Second * 10)
}

func PubTopic(topic string) {tick := time.NewTicker(time.Second)
    i := 0
    for _ = range tick.C {
        msg := &broker.Message{Header: map[string]string{"id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        if err := broker.Publish(topic, msg); err != nil {log.Printf("[pub] failed: %v", err)
        } else {fmt.Println("[pub] pubbed message:", string(msg.Body))
        }
        i++
    }
}

func SubTopic(topic string) {_, err := broker.Subscribe(topic, func(p broker.Event) error {fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
        return nil
    })
    if err != nil {fmt.Println(err)
    }
}

结果

[pub] pubbed message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860
[sub] received message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860 header map[id:0]
[pub] pubbed message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632
[sub] received message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632 header map[id:1]
[pub] pubbed message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892
[sub] received message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892 header map[id:2]
[pub] pubbed message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327
[sub] received message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327 header map[id:3]
[pub] pubbed message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526
[sub] received message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526 header map[id:4]

源码分析

Connect

  1. 开启 tcp 监听
  2. 启一个 goroutine,在 registerInterval 间隔对 subscriber 就行注册,类似心跳
  3. 设置服务发现注册服务
  4. 设置缓存对象
  5. 设置 running = true
func (h *httpBroker) Connect() error {h.RLock()
    if h.running {h.RUnlock()
        return nil
    }
    h.RUnlock()

    h.Lock()
    defer h.Unlock()

    var l net.Listener
    var err error
    // 创建监听函数 判断是否有配置
    if h.opts.Secure || h.opts.TLSConfig != nil {
        config := h.opts.TLSConfig

        fn := func(addr string) (net.Listener, error) {
            if config == nil {hosts := []string{addr}

                // check if its a valid host:port
                if host, _, err := net.SplitHostPort(addr); err == nil {if len(host) == 0 {hosts = maddr.IPs()
                    } else {hosts = []string{host}
                    }
                }

                // generate a certificate
                cert, err := mls.Certificate(hosts...)
                if err != nil {return nil, err}
                config = &tls.Config{Certificates: []tls.Certificate{cert}}
            }
            return tls.Listen("tcp", addr, config)
        }
        l, err = mnet.Listen(h.address, fn)
    } else {fn := func(addr string) (net.Listener, error) {return net.Listen("tcp", addr)
        }

        l, err = mnet.Listen(h.address, fn)
    }

    if err != nil {return err}

    addr := h.address
    h.address = l.Addr().String()

    go http.Serve(l, h.mux)
    go func() {
        // 根据设置的 registerInterval 心跳时间,检测服务是否存活   
        h.run(l)
        h.Lock()
        h.opts.Addrs = []string{addr}
        h.address = addr
        h.Unlock()}()

    // get registry
    reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
    if !ok {reg = registry.DefaultRegistry}
    // set cache
    h.r = cache.New(reg)

    // set running
    h.running = true
    return nil
}

Subscribe

  1. 解析 address
  2. 创建唯一 id
  3. 拼装服务信息 最后的服务信息如下图
  4. 调用 Register(默认的是 mdns)注册服务
  5. 把 service 放到 subscribers map[string][]*httpSubscriber 中

func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {options := NewSubscribeOptions(opts...)

    // parse address for host, port
    parts := strings.Split(h.Address(), ":")
    host := strings.Join(parts[:len(parts)-1], ":")
    port, _ := strconv.Atoi(parts[len(parts)-1])

    addr, err := maddr.Extract(host)
    if err != nil {return nil, err}

    // create unique id
    id := h.id + "." + uuid.New().String()

    var secure bool

    if h.opts.Secure || h.opts.TLSConfig != nil {secure = true}

    // register service
    node := &registry.Node{
        Id:      id,
        Address: fmt.Sprintf("%s:%d", addr, port),
        Metadata: map[string]string{"secure": fmt.Sprintf("%t", secure),
        },
    }

    // check for queue group or broadcast queue
    version := options.Queue
    if len(version) == 0 {version = broadcastVersion}

    service := &registry.Service{
        Name:    "topic:" + topic,
        Version: version,
        Nodes:   []*registry.Node{node},
    }

    // 组装 sub
    subscriber := &httpSubscriber{
        opts:  options,
        hb:    h,
        id:    id,
        topic: topic,
        fn:    handler,
        svc:   service,
    }

    // 调用 subscribe 函数 内部调用 register 注册服务
    if err := h.subscribe(subscriber); err != nil {return nil, err}

    // return the subscriber
    return subscriber, nil
}

Publish

  1. 从 registry 获取 topic 的消费者节点
  2. 对消息进行编码
  3. 依次把编码后的消息异步 publish 到节点(http post)
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
    // create the message first
    m := &Message{Header: make(map[string]string),
        Body:   msg.Body,
    }

    for k, v := range msg.Header {m.Header[k] = v
    }

    m.Header[":topic"] = topic

    // encode the message
    b, err := h.opts.Codec.Marshal(m)
    if err != nil {return err}

    // save the message
    h.saveMessage(topic, b)

    // now attempt to get the service
    h.RLock()
    s, err := h.r.GetService("topic:" + topic)
    if err != nil {h.RUnlock()
        // ignore error
        return nil
    }
    h.RUnlock()

    pub := func(node *registry.Node, t string, b []byte) error {
        scheme := "http"

        // check if secure is added in metadata
        if node.Metadata["secure"] == "true" {scheme = "https"}

        vals := url.Values{}
        vals.Add("id", node.Id)

        uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode())
        r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
        if err != nil {return err}

        // discard response body
        io.Copy(ioutil.Discard, r.Body)
        r.Body.Close()
        return nil
    }

    srv := func(s []*registry.Service, b []byte) {
        for _, service := range s {
            // only process if we have nodes
            if len(service.Nodes) == 0 {continue}

            switch service.Version {
            // broadcast version means broadcast to all nodes
            case broadcastVersion:
                var success bool

                // publish to all nodes
                for _, node := range service.Nodes {
                    // publish async
                    if err := pub(node, topic, b); err == nil {success = true}
                }

                // save if it failed to publish at least once
                if !success {h.saveMessage(topic, b)
                }
            default:
                // select node to publish to
                node := service.Nodes[rand.Int()%len(service.Nodes)]

                // publish async to one node
                if err := pub(node, topic, b); err != nil {
                    // if failed save it
                    h.saveMessage(topic, b)
                }
            }
        }
    }

    // do the rest async
    go func() {
        // get a third of the backlog
        messages := h.getMessage(topic, 8)
        delay := (len(messages) > 1)

        // publish all the messages
        for _, msg := range messages {
            // serialize here
            srv(s, msg)

            // sending a backlog of messages
            if delay {time.Sleep(time.Millisecond * 100)
            }
        }
    }()

    return nil
}

总结

默认的 broker 实现是用 http,不过一般情况下不会使用 http 来实现发布和订阅的。我们可以使用 kafka,redis 等来实现发布和订阅。

正文完
 0