在go-micro中异步音讯的收发是通过Broker这个组件来实现的,底层实现有RabbitMQ、Kafka、Redis等等很多种形式,这篇文章次要介绍go-micro应用RabbitMQ收发数据的办法和原理。

Broker的外围性能

Broker的外围性能是Publish和Subscribe,也就是公布和订阅。它们的定义是:

Publish(topic string, m *Message, opts ...PublishOption) errorSubscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

公布

公布第一个参数是topic(主题),用于标识某类音讯。

公布的数据是通过Message承载的,其包含音讯头和音讯体,定义如下:

type Message struct {    Header map[string]string    Body   []byte}

音讯头是map,也就是一组KV(键值对)。

音讯体是字节数组,在发送和接管时须要开发者进行编码和解码的解决。

订阅

订阅的第一个参数也是topic(主题),用于过滤出要接管的音讯。

订阅的数据是通过Handler解决的,Handler是一个函数,其定义如下:

type Handler func(Event) error

其中的参数Event是一个接口,须要具体的Broker来实现,其定义如下:

type Event interface {    Topic() string    Message() *Message    Ack() error    Error() error}
  • Topic() 用于获取以后音讯的topic,也是发布者发送时的topic。
  • Message() 用于获取音讯体,也是发布者发送时的Message,其中包含Header和Body。
  • Ack() 用于告诉Broker音讯曾经收到了,Broker能够删除音讯了,可用来保障音讯至多被生产一次。
  • Error() 用于获取Broker解决音讯过胜利的谬误。

开发者订阅数据时,须要实现Handler这个函数,接管Event的实例,提取数据进行解决,依据不同的Broker,可能还须要调用Ack(),解决呈现谬误时,返回error。

go-micro集成RabbitMQ实战

大略理解了Broker的定义之后,再来看下如何应用go-micro收发RabbitMQ音讯。

启动一个RabbitMQ

如果你曾经有一个RabbitMQ服务器,请跳过这个步骤。

这里介绍一个应用docker疾速启动RabbitMQ的办法,当然前提是你得装置了docker。

执行如下命令启动一个rabbitmq的docker容器:

docker run --name rabbitmq1 -p 5672:5672 -p 15672:15672 -d rabbitmq

而后进入容器进行一些设置:

docker exec -it rabbitmq1 /bin/bash

启动管理工具、禁用指标采集(会导致某些API500谬误):

rabbitmq-plugins enable rabbitmq_management cd /etc/rabbitmq/conf.d/echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf

最初重启容器:

docker restart rabbitmq1

最初浏览器中输出 http://127.0.0.0:15672 即可拜访,默认用户名和明码都是 guest 。

编写收发函数

为了不便演示,先来定义公布音讯和接管音讯的函数。其中公布函数应用了go-micro提供的Event类型,还有其它类型也能够提供Publish的性能,这里发送的数据格式是Json字符串。接管音讯的函数名称能够随便取,然而参数和返回值必须符合规范,也就是下边代码中的样子,这个函数也能够是绑定到某个类型的。

// 定义一个公布音讯的函数:每隔1秒公布一条音讯func loopPublish(event micro.Event) {    for {        time.Sleep(time.Duration(1) * time.Second)        curUnix := strconv.FormatInt(time.Now().Unix(), 10)        msg := "{\"Id\":" + curUnix + ",\"Name\":\"张三\"}"        event.Publish(context.TODO(), msg)    }}// 定义一个接管音讯的函数:将收到的音讯打印进去func handle(ctx context.Context, msg interface{}) (err error) {    defer func() {        if r := recover(); r != nil {            err = errors.New(fmt.Sprint(r))            log.Println(err)        }    }()    b, err := json.Marshal(msg)    if err != nil {        log.Println(err)        return    }    log.Println(string(b))    return}

编写主体代码

这里先给出代码,外面提供了一些正文,后边还会有具体介绍。

func main() {    // RabbitMQ的连贯参数    rabbitmqUrl := "amqp://guest:guest@127.0.0.1:5672/"    exchangeName := "amq.topic"    subcribeTopic := "test"    queueName := "rabbitmqdemo_test"    // 默认是application/protobuf,这里演示用的是Json,所以要改下    server.DefaultContentType = "application/json"    // 创立 RabbitMQ Broker    b := rabbitmq.NewBroker(        broker.Addrs(rabbitmqUrl),           // RabbitMQ拜访地址,含VHost        rabbitmq.ExchangeName(exchangeName), // 交换机的名称        rabbitmq.DurableExchange(),          // 音讯在Exchange中时会进行长久化解决        rabbitmq.PrefetchCount(1),           // 同时生产的最大音讯数量    )    // 创立Service,外部会初始化一些货色,必须在NewSubscribeOptions前边    service := micro.NewService(        micro.Broker(b),    )    service.Init()    // 初始化订阅上下文:这里不是必须的,订阅会有默认值    subOpts := broker.NewSubscribeOptions(        rabbitmq.DurableQueue(),   // 队列长久化,消费者断开连接后,音讯依然保留到队列中        rabbitmq.RequeueOnError(), // 音讯处理函数返回error时,音讯再次入队列        rabbitmq.AckOnSuccess(),   // 音讯处理函数没有error返回时,go-micro发送Ack给RabbitMQ    )    // 注册订阅    micro.RegisterSubscriber(        subcribeTopic,    // 订阅的Topic        service.Server(), // 注册到的rpcServer        handle,           // 音讯处理函数        server.SubscriberContext(subOpts.Context), // 订阅上下文,也能够应用默认的        server.SubscriberQueue(queueName),         // 队列名称    )    // 公布事件音讯    event := micro.NewEvent(subcribeTopic, service.Client())    go loopPublish(event)    log.Println("Service is running ...")    if err := service.Run(); err != nil {        log.Println(err)    }}

次要逻辑是:

1、先创立一个RabbitMQ Broker,它实现了规范的Broker接口。其中次要的参数是RabbitMQ的拜访地址和RabbitMQ交换机,PrefetchCount是订阅者(或称为消费者)应用的。

2、而后通过 NewService 创立go-micro服务,并将broker设置进去。这里边会初始化很多货色,最外围的是创立一个rpcServer,并将rpcServer和这个broker绑定起来。

3、而后是通过 RegisterSubscriber 注册订阅,这个注册有两个层面的性能:一是如果RabbitMQ上还不存在这个队列时创立队列,并订阅指定topic的音讯;二是定义go-micro程序从这个RabbitMQ队列接收数据的解决形式。

这里具体看下订阅的参数:

func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
  • topic:go-micro应用的是Topic模式,发布者发送音讯的时候要指定一个topic,订阅者依据须要只接管某个或某几个topic的音讯;
  • s:音讯从RabbitMQ接管后会进入这个Server进行解决,它是NewService的时候外部创立的;
  • h:应用了上一步创立的接管音讯的函数 handle,Server中的办法会调用这个函数;
  • opts 是订阅的一些选项,这里须要指定RabbitMQ队列的名称;另外SubscriberContext定义了订阅的一些行为,这里DurableQueue设置RabbitMQ订阅音讯的长久化形式,个别咱们都心愿音讯不失落,这个设置的作用是即便程序与RabbitMQ的连贯断开,音讯也会保留在RabbitMQ队列中;AckOnSuccess和RequeueOnError定义了程序处理音讯呈现谬误时的行为,如果handle返回error,音讯会从新返回RabbitMQ,而后再投递给程序。

4、而后这里为了演示,通过NewEvent创立了一个Event,通过它每隔一秒发送1条音讯。

5、最初通过service.Run()把这个程序启动起来。

辛苦写了半天,看一下这个程序的运行成果:

留神个别发布者和订阅者是在不同的程序中,这里只是为了不便演示,才把他们放在一个程序中。所以如果只是公布音讯,就不须要订阅的代码,如果只是订阅,也不须要公布音讯的代码,大家应用的时候依据须要本人裁剪吧。

go-micro集成RabbitMQ的解决流程

这个局部来看一下音讯在go-micro和RabbitMQ中是怎么流转的,我画了一个示意图:

这个图有点简单,这里具体解说下。

首先分成三块:RabbitMQ、音讯公布局部、音讯接管局部,这里用不同的色彩进行了辨别。

  • RabbitMQ不是本文的重点,就把它看成一个整体就行了。
  • 音讯公布局部:从生产者程序调用Event.Publish开始,而后调用Client.Publish,到这里为止,都是在go-micro的外围模块中进行解决;而后再调用Broker.Publish,这里的Broker是RabbitMQ插件的Broker实例,从这里开始进入了RabbiitMQ插件局部,而后再顺次通过RabbitMQ Connection的Publish办法、RabbitMQ Channle的Publish办法,最终发送到RabbitMQ中。
  • 音讯接管局部:Service.Run外部会调用rpcServer.Start,这个办法外部会调用Broker.Subscribe,这个办法是RabbitMQ插件中定义的,它会读取RegisterSubscriber时的一些RabbitMQ队列设置,而后再顺次传递到RabbitMQ Connection的Consume办法、RabbitMQ Channel的ConsumeQueue办法,最终连贯到RabbitMQ,并在RabbitMQ上设置好要订阅的队列;这些办法还会返回一个类型为amqp.Delivery的Go Channel,Broker.Subscribe一直的从这个Go Channel中读取数据,而后再发送到调用Broker.Subscribe时传入的一个音讯解决办法中,这里就是rpcServer.HandleEvnet,音讯通过一些解决后再进入rpcServer外部的路由解决模块,这里就是route.ProcessMessage,这个办法外部会依据以后音讯的topic查找RegisterSubscriber时注册的订阅,并最终调用到过后注册的用于接管音讯的函数。

这个处理过程还能够划分为业务局部、外围模块局部和插件局部。

  • 首先创立一个插件的Broker实现,把它注册到外围模块的rpcServer中;
  • 音讯的发送从业务局部进入外围模块局部,再进入具体实现Broker的插件局部;
  • 音讯的接管则首先进入插件局部,而后再流转到外围模块局部,再流转到业务局部。

从上边的图中能够看到音讯都须要通过这个RabbitMQ插件进行解决,实际上能够只应用这个插件,就能实现音讯的发送和接管。这个演示代码我曾经提交到了Github,有趣味的同学能够在文末获取Github仓库的地址。

从上边这些划分中,咱们能够了解到设计者的整体设计思路,把握要害节点,用好用对,呈现问题时能够疾速定位。

填的几个坑

不能接管其它框架公布的音讯

这个是因为route.ProcessMessage查找订阅时应用了go-micro专用的一个头信息:

// get the subscribers by topic    subs, ok := router.subscribers[msg.Topic()]

这个msg.Topic返回的是如下实例中的topic字段:

    rpcMsg := &rpcMessage{        topic:       msg.Header["Micro-Topic"],        contentType: ct,        payload:     &raw.Frame{Data: msg.Body},        codec:       cf,        header:      msg.Header,        body:        msg.Body,    }

其它框架不会有这么一个头信息,除非专门适配go-micro。

因为应用RabbitMQ的场景下,整个开发都是围绕RabbitMQ做的,而且go-micro的解决逻辑没有思考RabbitMQ订阅能够应用通配符的状况,公布音讯的Topic、接管音讯的Topic与Micro-Topic的值匹配时都是依照是否相等的准则解决的,因而能够用RabbitMQ音讯自带的topic来设置这个音讯头。rabbitmq.rbroker.Subscribe 中接管到音讯后,就能够进行这个设置:

// Messages sent from other frameworks to rabbitmq do not have this header.        // The 'RoutingKey' in the message can be used as this header.        // Then the message can be transfered to the subscriber which bind this topic.        msgTopic := header["Micro-Topic"]        if msgTopic == "" {            header["Micro-Topic"] = msg.RoutingKey        }

这样go-micro开发的消费者程序就能接管其它框架公布的音讯了,其它框架无需适配。

RabbitMQ重启后订阅者和发布者有限阻塞

go-micro的RabbitMQ插件底层应用另一个库:github.com/streadway/amqp

对于发布者,RabbitMQ断开连接时amqp库会通过Go Channel同步告诉go-micro,而后go-micro能够发动从新连贯。问题呈现在这个同步告诉上,go-micro的RabbitMQ插件设置了接管连贯和通道的敞开告诉,然而只解决了一个告诉就去从新连贯了,这就导致有一个Go Channel始终阻塞,而这个阻塞会导致某个锁不能开释,这个锁又是Publish时候须要的,因而导致发布者有限阻塞。解决办法就是外层减少一个循环,等所有的告诉都收到了,再去做从新连贯。

对于订阅者,RabbitMQ断开连接时,它会始终阻塞在某个Go Channel上,直到它返回一个值,这个值代表连贯曾经从新建设,订阅者能够重建生产通道。问题也是呈现在这个阻塞的Go Channel上,因为这个Go Channel在每次收到amqp的敞开告诉时会从新赋值,而订阅者期待的Go Channel可能是之前的旧值,永远也不会返回,订阅者也就有限阻塞了。解决办法呢,就是在select时减少一个time.After,让期待的Go Channel有机会更新到新值。

代码就不贴了,有趣味的能够到Github中去看:https://github.com/go-micro/p...

对于这两个问题的批改曾经合并到官网仓库中,大家去get最新的代码就能够了。

这两个坑填了,基本上就能满足我的须要了。当然可能还有其它的坑,比方go-micro的RabbitMQ插件如同没有发布者确认的性能,这个要实现,还得好好想想怎么改。


好了,以上就是本文的次要内容。

老规矩,代码曾经上传到Github,欢送拜访:https://github.com/bosima/go-...

播种更多架构常识,请关注微信公众号 萤火架构。原创内容,转载请注明出处。