micro.newService()中newOptions
func newOptions(opts ...Option) Options { opt := Options{ Auth: auth.DefaultAuth, Broker: broker.DefaultBroker, Cmd: cmd.DefaultCmd, Config: config.DefaultConfig, Client: client.DefaultClient, Server: server.DefaultServer, Store: store.DefaultStore, Registry: registry.DefaultRegistry, Router: router.DefaultRouter, Runtime: runtime.DefaultRuntime, Transport: transport.DefaultTransport, Context: context.Background(), Signal: true, } for _, o := range opts { o(&opt) } return opt}
初始化了一堆根底设置,先来看看Broker broker.DefaultBroker,
在broker/broker.go中 DefaultBroker Broker = NewBroker()
// NewBroker returns a new http brokerfunc NewBroker(opts ...Option) Broker { return newHttpBroker(opts...)}func newHttpBroker(opts ...Option) Broker { options := Options{ Codec: json.Marshaler{}, Context: context.TODO(), Registry: registry.DefaultRegistry, } for _, o := range opts { o(&options) } // set address addr := DefaultAddress if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { addr = options.Addrs[0] } h := &httpBroker{ id: uuid.New().String(), address: addr, opts: options, r: options.Registry, c: &http.Client{Transport: newTransport(options.TLSConfig)}, subscribers: make(map[string][]*httpSubscriber), exit: make(chan chan error), mux: http.NewServeMux(), inbox: make(map[string][][]byte), } // specify the message handler h.mux.Handle(DefaultPath, h) // get optional handlers if h.opts.Context != nil { handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler) if ok { for pattern, handler := range handlers { h.mux.Handle(pattern, handler) } } } return h}
这里做了几件事
- 初始化options,设置Codec为json,设置ctx,Registry
- 初始化httpBroker,设置http.Client时调用newTransport()设置代理,同时启用http2,最初指定message handler
h.mux.Handle(DefaultPath, h)
h就是httpBroker,在httpBroker中实现了ServeHTTP(),则所有申请都通过他来解决,即所有订阅的音讯解决都是通过httpBroker.ServeHTTP()来解决的
- 如果ctx不为空,就取http_handlers数组,顺次注册http handle
上面看看example/broker, 一个broker的示例
var ( topic = "go.micro.topic.foo")func pub() { tick := time.NewTicker(time.Second) i := 0 for _ = range tick.C { msg := &broker.Message{ Header: map[string]string{ "id": fmt.Sprintf("%d", i), }, Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), } if err := broker.Publish(topic, msg); err != nil { log.Printf("[pub] failed: %v", err) } else { fmt.Println("[pub] pubbed message:", string(msg.Body)) } i++ }}func sub() { _, err := broker.Subscribe(topic, func(p broker.Event) error { fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) return nil }) if err != nil { fmt.Println(err) }}func main() { cmd.Init() if err := broker.Init(); err != nil { log.Fatalf("Broker Init error: %v", err) } if err := broker.Connect(); err != nil { log.Fatalf("Broker Connect error: %v", err) } go pub() go sub() <-time.After(time.Second * 10)}
cmd.init()请见micro cmd篇,有具体介绍
cmd.opts.Broker默认应用的是上文剖析的httpBroker
先看broker.Init()
func (h *httpBroker) Init(opts ...Option) error { h.RLock() if h.running { h.RUnlock() return errors.New("cannot init while connected") } h.RUnlock() h.Lock() defer h.Unlock() for _, o := range opts { o(&h.opts) } if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 { h.address = h.opts.Addrs[0] } if len(h.id) == 0 { h.id = "go.micro.http.broker-" + uuid.New().String() } // get registry reg := h.opts.Registry if reg == nil { reg = registry.DefaultRegistry } // get cache if rc, ok := h.r.(cache.Cache); ok { rc.Stop() } // set registry h.r = cache.New(reg) // reconfigure tls config if c := h.opts.TLSConfig; c != nil { h.c = &http.Client{ Transport: newTransport(c), } } return nil}
这里做了以下几件事件
- 上读锁,查看是否正在运行
上读写锁,在进行前面操作
- 设置opt
- 设置address,id
- 获取Registry,cache,设置registry
- 设置http.Client中Transport的tls
再看broker.Connect()
func (h *httpBroker) Connect() error { h.RLock() if h.running { h.RUnlock() return nil } h.RUnlock() h.Lock() defer h.Unlock() var l net.Listener var err error if h.opts.Secure || h.opts.TLSConfig != nil { config := h.opts.TLSConfig fn := func(addr string) (net.Listener, error) { if config == nil { hosts := []string{addr} // check if its a valid host:port if host, _, err := net.SplitHostPort(addr); err == nil { if len(host) == 0 { hosts = maddr.IPs() } else { hosts = []string{host} } } // generate a certificate cert, err := mls.Certificate(hosts...) if err != nil { return nil, err } config = &tls.Config{Certificates: []tls.Certificate{cert}} } return tls.Listen("tcp", addr, config) } l, err = mnet.Listen(h.address, fn) } else { fn := func(addr string) (net.Listener, error) { return net.Listen("tcp", addr) } l, err = mnet.Listen(h.address, fn) } if err != nil { return err } addr := h.address h.address = l.Addr().String() go http.Serve(l, h.mux) go func() { h.run(l) h.Lock() h.opts.Addrs = []string{addr} h.address = addr h.Unlock() }() // get registry reg := h.opts.Registry if reg == nil { reg = registry.DefaultRegistry } // set cache h.r = cache.New(reg) // set running h.running = true return nil}
这里做了以下几件事件
- 上读锁,查看是否正在运行
上读写锁,在进行前面操作
- 如果有Secure和TLSConfig,做一些tls的设置,没有则间接返回默认net.Listener
- 开一个协程运行http.Serve,解决申请是
newHttpBroker()
中指定的handle函数ServeHTTP()
( 规范库http提供了Handler接口,用于开发者实现本人的handler。只有实现接口的ServeHTTP办法即可。) - 开一个协程运行run(),前面看,设置地址
- 获取Registry,设置cache
- 标记httpBroker正在运行
看看方才的run()
做了什么
func (h *httpBroker) run(l net.Listener) { t := time.NewTicker(registerInterval) defer t.Stop() for { select { // heartbeat for each subscriber case <-t.C: h.RLock() for _, subs := range h.subscribers { for _, sub := range subs { _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) } } h.RUnlock() // received exit signal case ch := <-h.exit: ch <- l.Close() h.RLock() for _, subs := range h.subscribers { for _, sub := range subs { _ = h.r.Deregister(sub.svc) } } h.RUnlock() return } }}
这里做了以下几件事件
- 设置默认30秒一次的工夫触发器,每个周期都在服务发现核心顺次注册subscribers中的订阅
- 承受退出音讯,顺次调用
Deregister()
登记subscribers中的订阅
接下来看看怎么订阅音讯broker.Subscribe()
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { var err error var host, port string options := NewSubscribeOptions(opts...) // parse address for host, port host, port, err = net.SplitHostPort(h.Address()) if err != nil { return nil, err } addr, err := maddr.Extract(host) if err != nil { return nil, err } var secure bool if h.opts.Secure || h.opts.TLSConfig != nil { secure = true } // register service node := ®istry.Node{ Id: topic + "-" + h.id, Address: mnet.HostPort(addr, port), Metadata: map[string]string{ "secure": fmt.Sprintf("%t", secure), "broker": "http", "topic": topic, }, } // check for queue group or broadcast queue version := options.Queue if len(version) == 0 { version = broadcastVersion } service := ®istry.Service{ Name: serviceName, Version: version, Nodes: []*registry.Node{node}, } // generate subscriber subscriber := &httpSubscriber{ opts: options, hb: h, id: node.Id, topic: topic, fn: handler, svc: service, } // subscribe now if err := h.subscribe(subscriber); err != nil { return nil, err } // return the subscriber return subscriber, nil}func (h *httpBroker) subscribe(s *httpSubscriber) error { h.Lock() defer h.Unlock() if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { return err } h.subscribers[s.topic] = append(h.subscribers[s.topic], s) return nil}
这里做了以下几件事件
- 初始化SubscribeOptions
- 解析Address中host, port,并验证ip
- 初始化registry.Node{}
- 查看options.Queue,设置registry.Service{},
- 生成订阅信息结构体httpSubscriber{}
调用subscribe(),返回subscriber
- 开读写锁,把订阅服务(Connect()中开了http.Serve())注册到服务发现(默认mdns),发消息的时候通过服务发现找node,就是往这些注册的服务中发了
- 订阅频道记录到h.subscribers中
获取音讯通过调用Subscribe时传递的处理函数,示例如下
broker.Subscribe(topic, func(p broker.Event) error { fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) return nil })
Event及httpEvent定义
// Event is given to a subscription handler for processingtype Event interface { Topic() string Message() *Message Ack() error Error() error}type httpEvent struct { m *Message t string err error}func (h *httpEvent) Ack() error { return nil}func (h *httpEvent) Error() error { return h.err}func (h *httpEvent) Message() *Message { return h.m}func (h *httpEvent) Topic() string { return h.t}
能够看到httpEvent实现了Event,这样p.Message()就能够失去音讯了
获取音讯在ServeHTTP()中,收到音讯,调用传入的fn解决即可
上面再看公布音讯,只须要定义broker.Message
,再调用broker.Publish()
即可,示例如下
msg := &broker.Message{ Header: map[string]string{ "id": fmt.Sprintf("%d", i), }, Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), } if err := broker.Publish(topic, msg); err != nil { log.Printf("[pub] failed: %v", err) } else { fmt.Println("[pub] pubbed message:", string(msg.Body)) }
看看broker.Publish()
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { // create the message first m := &Message{ Header: make(map[string]string), Body: msg.Body, } for k, v := range msg.Header { m.Header[k] = v } m.Header["Micro-Topic"] = topic // encode the message b, err := h.opts.Codec.Marshal(m) if err != nil { return err } // save the message h.saveMessage(topic, b) // now attempt to get the service h.RLock() s, err := h.r.GetService(serviceName) if err != nil { h.RUnlock() return err } h.RUnlock() pub := func(node *registry.Node, t string, b []byte) error { scheme := "http" // check if secure is added in metadata if node.Metadata["secure"] == "true" { scheme = "https" } vals := url.Values{} vals.Add("id", node.Id) uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode()) r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) if err != nil { return err } // discard response body io.Copy(ioutil.Discard, r.Body) r.Body.Close() return nil } srv := func(s []*registry.Service, b []byte) { for _, service := range s { var nodes []*registry.Node for _, node := range service.Nodes { // only use nodes tagged with broker http if node.Metadata["broker"] != "http" { continue } // look for nodes for the topic if node.Metadata["topic"] != topic { continue } nodes = append(nodes, node) } // only process if we have nodes if len(nodes) == 0 { continue } switch service.Version { // broadcast version means broadcast to all nodes case broadcastVersion: var success bool // publish to all nodes for _, node := range nodes { // publish async if err := pub(node, topic, b); err == nil { success = true } } // save if it failed to publish at least once if !success { h.saveMessage(topic, b) } default: // select node to publish to node := nodes[rand.Int()%len(nodes)] // publish async to one node if err := pub(node, topic, b); err != nil { // if failed save it h.saveMessage(topic, b) } } } } // do the rest async go func() { // get a third of the backlog messages := h.getMessage(topic, 8) delay := (len(messages) > 1) // publish all the messages for _, msg := range messages { // serialize here srv(s, msg) // sending a backlog of messages if delay { time.Sleep(time.Millisecond * 100) } } }() return nil}
这里做了以下几件事件
- 创立音讯,header中增加一个
Micro-Topic
,值是topic - 编码音讯,默认json
调用saveMessage()保留音讯
- httpBroker.mtx加锁
- 在httpBroker.inbox中增加这条音讯,如果inbox
数组大于64条则只取前64条重赋值给inbox,【`震惊,前面间接丢了!!!`】
- 调用
httpBroker.r.GetService(serviceName)
获取service,serviceName默认"micro.http.broker" 创立处理函数
pub()
- 确定scheme[http或https]
- url参数中减少id,值为node.id
- 拼接uri,scheme://(node.Address)(DefaultPath)(vals.Encode())
- post发动申请,返回后果放入ioutil.Discard(就是丢掉了),敞开返回body
创立处理函数
srv()
- 收集所有可用的registry.Node
- 判断音讯播送还是指定了Queue(指定了Queue随机选一个node),依据状况异步调用
pub()
发送,失败则从新调用saveMessage()
创立协程
调用getMessage()
- 开读写锁h.mtx.Lock()
- 从inbox取出指定条数的音讯
- 顺次用第6步的
srv()
解决每条音讯,如果方才取出的音讯大于1条,每次发送距离Millisecond*100
至此,整个broker的流程比拟清晰了。
总结:
- 默认的http broker,订阅就是开了一个http服务手音讯,公布就是从服务发现拿到节点信息,往节点post数据。
- 理论应用中通常能够指定etcd,consul这样的服务发现。
- 音讯inbox最多只放64条,尽量避免音讯沉积,音讯最好写日志
写到这里又要举荐大家看micro in action了
Micro In Action(四):Pub/Sub
Micro In Action(五):Message Broker