共计 6363 个字符,预计需要花费 16 分钟才能阅读完成。
概述
在第一篇概述文章中已经提到了在 Micro 中 Broker 的作用,Go Micro 总体设计。我们也知道 Micro 是一个可插拔的分布式框架,我们可以使用 kafka,rabbitmq,cache,redis,nats 等各种实现具体可以在 git 上的插件库中找到 go-plugins
我们再来看一下接口:
type Broker interface {Init(...Option) error | |
Options() Options | |
Address() string | |
Connect() error | |
Disconnect() error | |
Publish(topic string, m *Message, opts ...PublishOption) error | |
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) | |
String() string} |
Connect 开启 broker
Subsribe 注册对某个 topic 的监听
Publish 发布某个 topic 的消息
Disconnect 关闭 broker
简单使用
本片文章使用的是默认 broker 实现 httpbroker 分析,是一个比较简单和容易理解的方式,代码地址。
PubTopic 函数为发布函数
SubTopic 函数为订阅函数
下面的例子其实不需要很多注释,就是简单的使用一个发布一个订阅函数来实现。
package main | |
import ( | |
"fmt" | |
"github.com/micro/go-micro/broker" | |
"log" | |
"time" | |
) | |
func main() {if err := broker.Connect(); err != nil {fmt.Println("Broker Connect error: %v", err) | |
} | |
go PubTopic("SubServerName") | |
go SubTopic("SubServerName") | |
time.Sleep(time.Second * 10) | |
} | |
func PubTopic(topic string) {tick := time.NewTicker(time.Second) | |
i := 0 | |
for _ = range tick.C { | |
msg := &broker.Message{Header: map[string]string{"id": fmt.Sprintf("%d", i), | |
}, | |
Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), | |
} | |
if err := broker.Publish(topic, msg); err != nil {log.Printf("[pub] failed: %v", err) | |
} else {fmt.Println("[pub] pubbed message:", string(msg.Body)) | |
} | |
i++ | |
} | |
} | |
func SubTopic(topic string) {_, err := broker.Subscribe(topic, func(p broker.Event) error {fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) | |
return nil | |
}) | |
if err != nil {fmt.Println(err) | |
} | |
} |
结果
[pub] pubbed message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860 | |
[sub] received message: 0: 2019-07-15 15:48:00.377247298 +0800 CST m=+1.005764860 header map[id:0] | |
[pub] pubbed message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632 | |
[sub] received message: 1: 2019-07-15 15:48:01.376383294 +0800 CST m=+2.004891632 header map[id:1] | |
[pub] pubbed message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892 | |
[sub] received message: 2: 2019-07-15 15:48:02.377595797 +0800 CST m=+3.006094892 header map[id:2] | |
[pub] pubbed message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327 | |
[sub] received message: 3: 2019-07-15 15:48:03.376685455 +0800 CST m=+4.005175327 header map[id:3] | |
[pub] pubbed message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526 | |
[sub] received message: 4: 2019-07-15 15:48:04.377715895 +0800 CST m=+5.006196526 header map[id:4] |
源码分析
Connect
- 开启 tcp 监听
- 启一个 goroutine,在 registerInterval 间隔对 subscriber 就行注册,类似心跳
- 设置服务发现注册服务
- 设置缓存对象
- 设置 running = true
func (h *httpBroker) Connect() error {h.RLock() | |
if h.running {h.RUnlock() | |
return nil | |
} | |
h.RUnlock() | |
h.Lock() | |
defer h.Unlock() | |
var l net.Listener | |
var err error | |
// 创建监听函数 判断是否有配置 | |
if h.opts.Secure || h.opts.TLSConfig != nil { | |
config := h.opts.TLSConfig | |
fn := func(addr string) (net.Listener, error) { | |
if config == nil {hosts := []string{addr} | |
// check if its a valid host:port | |
if host, _, err := net.SplitHostPort(addr); err == nil {if len(host) == 0 {hosts = maddr.IPs() | |
} else {hosts = []string{host} | |
} | |
} | |
// generate a certificate | |
cert, err := mls.Certificate(hosts...) | |
if err != nil {return nil, err} | |
config = &tls.Config{Certificates: []tls.Certificate{cert}} | |
} | |
return tls.Listen("tcp", addr, config) | |
} | |
l, err = mnet.Listen(h.address, fn) | |
} else {fn := func(addr string) (net.Listener, error) {return net.Listen("tcp", addr) | |
} | |
l, err = mnet.Listen(h.address, fn) | |
} | |
if err != nil {return err} | |
addr := h.address | |
h.address = l.Addr().String() | |
go http.Serve(l, h.mux) | |
go func() { | |
// 根据设置的 registerInterval 心跳时间,检测服务是否存活 | |
h.run(l) | |
h.Lock() | |
h.opts.Addrs = []string{addr} | |
h.address = addr | |
h.Unlock()}() | |
// get registry | |
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry) | |
if !ok {reg = registry.DefaultRegistry} | |
// set cache | |
h.r = cache.New(reg) | |
// set running | |
h.running = true | |
return nil | |
} |
Subscribe
- 解析 address
- 创建唯一 id
- 拼装服务信息 最后的服务信息如下图
- 调用 Register(默认的是 mdns)注册服务
- 把 service 放到 subscribers map[string][]*httpSubscriber 中
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {options := NewSubscribeOptions(opts...) | |
// parse address for host, port | |
parts := strings.Split(h.Address(), ":") | |
host := strings.Join(parts[:len(parts)-1], ":") | |
port, _ := strconv.Atoi(parts[len(parts)-1]) | |
addr, err := maddr.Extract(host) | |
if err != nil {return nil, err} | |
// create unique id | |
id := h.id + "." + uuid.New().String() | |
var secure bool | |
if h.opts.Secure || h.opts.TLSConfig != nil {secure = true} | |
// register service | |
node := ®istry.Node{ | |
Id: id, | |
Address: fmt.Sprintf("%s:%d", addr, port), | |
Metadata: map[string]string{"secure": fmt.Sprintf("%t", secure), | |
}, | |
} | |
// check for queue group or broadcast queue | |
version := options.Queue | |
if len(version) == 0 {version = broadcastVersion} | |
service := ®istry.Service{ | |
Name: "topic:" + topic, | |
Version: version, | |
Nodes: []*registry.Node{node}, | |
} | |
// 组装 sub | |
subscriber := &httpSubscriber{ | |
opts: options, | |
hb: h, | |
id: id, | |
topic: topic, | |
fn: handler, | |
svc: service, | |
} | |
// 调用 subscribe 函数 内部调用 register 注册服务 | |
if err := h.subscribe(subscriber); err != nil {return nil, err} | |
// return the subscriber | |
return subscriber, nil | |
} |
Publish
- 从 registry 获取 topic 的消费者节点
- 对消息进行编码
- 依次把编码后的消息异步 publish 到节点(http post)
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { | |
// create the message first | |
m := &Message{Header: make(map[string]string), | |
Body: msg.Body, | |
} | |
for k, v := range msg.Header {m.Header[k] = v | |
} | |
m.Header[":topic"] = topic | |
// encode the message | |
b, err := h.opts.Codec.Marshal(m) | |
if err != nil {return err} | |
// save the message | |
h.saveMessage(topic, b) | |
// now attempt to get the service | |
h.RLock() | |
s, err := h.r.GetService("topic:" + topic) | |
if err != nil {h.RUnlock() | |
// ignore error | |
return nil | |
} | |
h.RUnlock() | |
pub := func(node *registry.Node, t string, b []byte) error { | |
scheme := "http" | |
// check if secure is added in metadata | |
if node.Metadata["secure"] == "true" {scheme = "https"} | |
vals := url.Values{} | |
vals.Add("id", node.Id) | |
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode()) | |
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) | |
if err != nil {return err} | |
// discard response body | |
io.Copy(ioutil.Discard, r.Body) | |
r.Body.Close() | |
return nil | |
} | |
srv := func(s []*registry.Service, b []byte) { | |
for _, service := range s { | |
// only process if we have nodes | |
if len(service.Nodes) == 0 {continue} | |
switch service.Version { | |
// broadcast version means broadcast to all nodes | |
case broadcastVersion: | |
var success bool | |
// publish to all nodes | |
for _, node := range service.Nodes { | |
// publish async | |
if err := pub(node, topic, b); err == nil {success = true} | |
} | |
// save if it failed to publish at least once | |
if !success {h.saveMessage(topic, b) | |
} | |
default: | |
// select node to publish to | |
node := service.Nodes[rand.Int()%len(service.Nodes)] | |
// publish async to one node | |
if err := pub(node, topic, b); err != nil { | |
// if failed save it | |
h.saveMessage(topic, b) | |
} | |
} | |
} | |
} | |
// do the rest async | |
go func() { | |
// get a third of the backlog | |
messages := h.getMessage(topic, 8) | |
delay := (len(messages) > 1) | |
// publish all the messages | |
for _, msg := range messages { | |
// serialize here | |
srv(s, msg) | |
// sending a backlog of messages | |
if delay {time.Sleep(time.Millisecond * 100) | |
} | |
} | |
}() | |
return nil | |
} |
总结
默认的 broker 实现是用 http,不过一般情况下不会使用 http 来实现发布和订阅的。我们可以使用 kafka,redis 等来实现发布和订阅。
正文完