micro.newService()中 newOptions
func newOptions(opts ...Option) Options {
opt := Options{
Auth: auth.DefaultAuth,
Broker: broker.DefaultBroker,
Cmd: cmd.DefaultCmd,
Config: config.DefaultConfig,
Client: client.DefaultClient,
Server: server.DefaultServer,
Store: store.DefaultStore,
Registry: registry.DefaultRegistry,
Router: router.DefaultRouter,
Runtime: runtime.DefaultRuntime,
Transport: transport.DefaultTransport,
Context: context.Background(),
Signal: true,
}
for _, o := range opts {o(&opt)
}
return opt
}
初始化了一堆根底设置,先来看看 Broker broker.DefaultBroker,
在 broker/broker.go 中 DefaultBroker Broker = NewBroker()
// NewBroker returns a new http broker
func NewBroker(opts ...Option) Broker {return newHttpBroker(opts...)
}
func newHttpBroker(opts ...Option) Broker {
options := Options{Codec: json.Marshaler{},
Context: context.TODO(),
Registry: registry.DefaultRegistry,
}
for _, o := range opts {o(&options)
}
// set address
addr := DefaultAddress
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {addr = options.Addrs[0]
}
h := &httpBroker{id: uuid.New().String(),
address: addr,
opts: options,
r: options.Registry,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
inbox: make(map[string][][]byte),
}
// specify the message handler
h.mux.Handle(DefaultPath, h)
// get optional handlers
if h.opts.Context != nil {handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler)
if ok {
for pattern, handler := range handlers {h.mux.Handle(pattern, handler)
}
}
}
return h
}
这里做了几件事
- 初始化 options,设置 Codec 为 json,设置 ctx,Registry
- 初始化 httpBroker,设置 http.Client 时调用 newTransport()设置代理,同时启用 http2,最初指定 message handler
h.mux.Handle(DefaultPath, h)
h 就是 httpBroker,在 httpBroker 中实现了 ServeHTTP(),则所有申请都通过他来解决,即所有订阅的音讯解决都是通过 httpBroker.ServeHTTP()来解决的
- 如果 ctx 不为空,就取 http_handlers 数组,顺次注册 http handle
上面看看 example/broker, 一个 broker 的示例
var (topic = "go.micro.topic.foo")
func pub() {tick := time.NewTicker(time.Second)
i := 0
for _ = range tick.C {
msg := &broker.Message{Header: map[string]string{"id": fmt.Sprintf("%d", i),
},
Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
}
if err := broker.Publish(topic, msg); err != nil {log.Printf("[pub] failed: %v", err)
} else {fmt.Println("[pub] pubbed message:", string(msg.Body))
}
i++
}
}
func sub() {_, err := broker.Subscribe(topic, func(p broker.Event) error {fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
})
if err != nil {fmt.Println(err)
}
}
func main() {cmd.Init()
if err := broker.Init(); err != nil {log.Fatalf("Broker Init error: %v", err)
}
if err := broker.Connect(); err != nil {log.Fatalf("Broker Connect error: %v", err)
}
go pub()
go sub()
<-time.After(time.Second * 10)
}
cmd.init()请见 micro cmd 篇,有具体介绍
cmd.opts.Broker 默认应用的是上文剖析的 httpBroker
先看broker.Init()
func (h *httpBroker) Init(opts ...Option) error {h.RLock()
if h.running {h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
h.Lock()
defer h.Unlock()
for _, o := range opts {o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {h.id = "go.micro.http.broker-" + uuid.New().String()}
// get registry
reg := h.opts.Registry
if reg == nil {reg = registry.DefaultRegistry}
// get cache
if rc, ok := h.r.(cache.Cache); ok {rc.Stop()
}
// set registry
h.r = cache.New(reg)
// reconfigure tls config
if c := h.opts.TLSConfig; c != nil {
h.c = &http.Client{Transport: newTransport(c),
}
}
return nil
}
这里做了以下几件事件
- 上读锁,查看是否正在运行
-
上读写锁,在进行前面操作
- 设置 opt
- 设置 address,id
- 获取 Registry,cache,设置 registry
- 设置 http.Client 中 Transport 的 tls
再看broker.Connect()
func (h *httpBroker) Connect() error {h.RLock()
if h.running {h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
if h.opts.Secure || h.opts.TLSConfig != nil {
config := h.opts.TLSConfig
fn := func(addr string) (net.Listener, error) {
if config == nil {hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {if len(host) == 0 {hosts = maddr.IPs()
} else {hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {return nil, err}
config = &tls.Config{Certificates: []tls.Certificate{cert}}
}
return tls.Listen("tcp", addr, config)
}
l, err = mnet.Listen(h.address, fn)
} else {fn := func(addr string) (net.Listener, error) {return net.Listen("tcp", addr)
}
l, err = mnet.Listen(h.address, fn)
}
if err != nil {return err}
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go func() {h.run(l)
h.Lock()
h.opts.Addrs = []string{addr}
h.address = addr
h.Unlock()}()
// get registry
reg := h.opts.Registry
if reg == nil {reg = registry.DefaultRegistry}
// set cache
h.r = cache.New(reg)
// set running
h.running = true
return nil
}
这里做了以下几件事件
- 上读锁,查看是否正在运行
-
上读写锁,在进行前面操作
- 如果有 Secure 和 TLSConfig,做一些 tls 的设置,没有则间接返回默认 net.Listener
- 开一个协程运行 http.Serve,解决申请是
newHttpBroker()
中指定的 handle 函数ServeHTTP()
(规范库 http 提供了 Handler 接口,用于开发者实现本人的 handler。只有实现接口的 ServeHTTP 办法即可。) - 开一个协程运行 run(), 前面看, 设置地址
- 获取 Registry,设置 cache
- 标记 httpBroker 正在运行
看看方才的 run()
做了什么
func (h *httpBroker) run(l net.Listener) {t := time.NewTicker(registerInterval)
defer t.Stop()
for {
select {
// heartbeat for each subscriber
case <-t.C:
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {_ = h.r.Deregister(sub.svc)
}
}
h.RUnlock()
return
}
}
}
这里做了以下几件事件
- 设置默认 30 秒一次的工夫触发器,每个周期都在服务发现核心顺次注册 subscribers 中的订阅
- 承受退出音讯,顺次调用
Deregister()
登记 subscribers 中的订阅
接下来看看怎么订阅音讯broker.Subscribe()
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
var err error
var host, port string
options := NewSubscribeOptions(opts...)
// parse address for host, port
host, port, err = net.SplitHostPort(h.Address())
if err != nil {return nil, err}
addr, err := maddr.Extract(host)
if err != nil {return nil, err}
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {secure = true}
// register service
node := ®istry.Node{
Id: topic + "-" + h.id,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{"secure": fmt.Sprintf("%t", secure),
"broker": "http",
"topic": topic,
},
}
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {version = broadcastVersion}
service := ®istry.Service{
Name: serviceName,
Version: version,
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: node.Id,
topic: topic,
fn: handler,
svc: service,
}
// subscribe now
if err := h.subscribe(subscriber); err != nil {return nil, err}
// return the subscriber
return subscriber, nil
}
func (h *httpBroker) subscribe(s *httpSubscriber) error {h.Lock()
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {return err}
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
}
这里做了以下几件事件
- 初始化 SubscribeOptions
- 解析 Address 中 host, port,并验证 ip
- 初始化 registry.Node{}
- 查看 options.Queue,设置 registry.Service{},
- 生成订阅信息结构体 httpSubscriber{}
-
调用 subscribe(), 返回 subscriber
- 开读写锁,把订阅服务 (Connect() 中开了 http.Serve())注册到服务发现(默认 mdns),发消息的时候通过服务发现找 node,就是往这些注册的服务中发了
- 订阅频道记录到 h.subscribers 中
获取音讯通过调用 Subscribe 时传递的处理函数,示例如下
broker.Subscribe(topic, func(p broker.Event) error {fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
})
Event 及 httpEvent 定义
// Event is given to a subscription handler for processing
type Event interface {Topic() string
Message() *Message
Ack() error
Error() error}
type httpEvent struct {
m *Message
t string
err error
}
func (h *httpEvent) Ack() error {return nil}
func (h *httpEvent) Error() error {return h.err}
func (h *httpEvent) Message() *Message {return h.m}
func (h *httpEvent) Topic() string {return h.t}
能够看到 httpEvent 实现了 Event,这样 p.Message()就能够失去音讯了
获取音讯在 ServeHTTP()中,收到音讯,调用传入的 fn 解决即可
上面再看公布音讯,只须要定义 broker.Message
,再调用broker.Publish()
即可,示例如下
msg := &broker.Message{Header: map[string]string{"id": fmt.Sprintf("%d", i),
},
Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
}
if err := broker.Publish(topic, msg); err != nil {log.Printf("[pub] failed: %v", err)
} else {fmt.Println("[pub] pubbed message:", string(msg.Body))
}
看看broker.Publish()
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
// create the message first
m := &Message{Header: make(map[string]string),
Body: msg.Body,
}
for k, v := range msg.Header {m.Header[k] = v
}
m.Header["Micro-Topic"] = topic
// encode the message
b, err := h.opts.Codec.Marshal(m)
if err != nil {return err}
// save the message
h.saveMessage(topic, b)
// now attempt to get the service
h.RLock()
s, err := h.r.GetService(serviceName)
if err != nil {h.RUnlock()
return err
}
h.RUnlock()
pub := func(node *registry.Node, t string, b []byte) error {
scheme := "http"
// check if secure is added in metadata
if node.Metadata["secure"] == "true" {scheme = "https"}
vals := url.Values{}
vals.Add("id", node.Id)
uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode())
r, err := h.c.Post(uri, "application/json", bytes.NewReader(b))
if err != nil {return err}
// discard response body
io.Copy(ioutil.Discard, r.Body)
r.Body.Close()
return nil
}
srv := func(s []*registry.Service, b []byte) {
for _, service := range s {var nodes []*registry.Node
for _, node := range service.Nodes {
// only use nodes tagged with broker http
if node.Metadata["broker"] != "http" {continue}
// look for nodes for the topic
if node.Metadata["topic"] != topic {continue}
nodes = append(nodes, node)
}
// only process if we have nodes
if len(nodes) == 0 {continue}
switch service.Version {
// broadcast version means broadcast to all nodes
case broadcastVersion:
var success bool
// publish to all nodes
for _, node := range nodes {
// publish async
if err := pub(node, topic, b); err == nil {success = true}
}
// save if it failed to publish at least once
if !success {h.saveMessage(topic, b)
}
default:
// select node to publish to
node := nodes[rand.Int()%len(nodes)]
// publish async to one node
if err := pub(node, topic, b); err != nil {
// if failed save it
h.saveMessage(topic, b)
}
}
}
}
// do the rest async
go func() {
// get a third of the backlog
messages := h.getMessage(topic, 8)
delay := (len(messages) > 1)
// publish all the messages
for _, msg := range messages {
// serialize here
srv(s, msg)
// sending a backlog of messages
if delay {time.Sleep(time.Millisecond * 100)
}
}
}()
return nil
}
这里做了以下几件事件
- 创立音讯,header 中增加一个
Micro-Topic
,值是 topic - 编码音讯,默认 json
-
调用 saveMessage()保留音讯
- httpBroker.mtx 加锁
- 在 httpBroker.inbox 中增加这条音讯,如果 inbox
数组大于 64 条则只取前 64 条重赋值给 inbox,【` 震惊,前面间接丢了!!!`】
- 调用
httpBroker.r.GetService(serviceName)
获取 service,serviceName 默认 ”micro.http.broker” -
创立处理函数
pub()
- 确定 scheme[http 或 https]
- url 参数中减少 id,值为 node.id
- 拼接 uri,scheme://(node.Address)(DefaultPath)(vals.Encode())
- post 发动申请,返回后果放入 ioutil.Discard(就是丢掉了),敞开返回 body
-
创立处理函数
srv()
- 收集所有可用的 registry.Node
- 判断音讯播送还是指定了 Queue(指定了 Queue 随机选一个 node),依据状况异步调用
pub()
发送,失败则从新调用saveMessage()
-
创立协程
-
调用 getMessage()
- 开读写锁 h.mtx.Lock()
- 从 inbox 取出指定条数的音讯
- 顺次用第 6 步的
srv()
解决每条音讯,如果方才取出的音讯大于 1 条,每次发送距离 Millisecond*100
-
至此,整个 broker 的流程比拟清晰了。
总结:
- 默认的 http broker,订阅就是开了一个 http 服务手音讯,公布就是从服务发现拿到节点信息,往节点 post 数据。
- 理论应用中通常能够指定 etcd,consul 这样的服务发现。
- 音讯 inbox 最多只放 64 条,尽量避免音讯沉积,音讯最好写日志
写到这里又要举荐大家看 micro in action 了
Micro In Action(四):Pub/Sub
Micro In Action(五):Message Broker