在go-micro中异步音讯的收发是通过Broker这个组件来实现的,底层实现有RabbitMQ、Kafka、Redis等等很多种形式,这篇文章次要介绍go-micro应用RabbitMQ收发数据的办法和原理。
Broker的外围性能
Broker的外围性能是Publish和Subscribe,也就是公布和订阅。它们的定义是:
Publish(topic string, m *Message, opts ...PublishOption) errorSubscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
公布
公布第一个参数是topic(主题),用于标识某类音讯。
公布的数据是通过Message承载的,其包含音讯头和音讯体,定义如下:
type Message struct { Header map[string]string Body []byte}
音讯头是map,也就是一组KV(键值对)。
音讯体是字节数组,在发送和接管时须要开发者进行编码和解码的解决。
订阅
订阅的第一个参数也是topic(主题),用于过滤出要接管的音讯。
订阅的数据是通过Handler解决的,Handler是一个函数,其定义如下:
type Handler func(Event) error
其中的参数Event是一个接口,须要具体的Broker来实现,其定义如下:
type Event interface { Topic() string Message() *Message Ack() error Error() error}
- Topic() 用于获取以后音讯的topic,也是发布者发送时的topic。
- Message() 用于获取音讯体,也是发布者发送时的Message,其中包含Header和Body。
- Ack() 用于告诉Broker音讯曾经收到了,Broker能够删除音讯了,可用来保障音讯至多被生产一次。
- Error() 用于获取Broker解决音讯过胜利的谬误。
开发者订阅数据时,须要实现Handler这个函数,接管Event的实例,提取数据进行解决,依据不同的Broker,可能还须要调用Ack(),解决呈现谬误时,返回error。
go-micro集成RabbitMQ实战
大略理解了Broker的定义之后,再来看下如何应用go-micro收发RabbitMQ音讯。
启动一个RabbitMQ
如果你曾经有一个RabbitMQ服务器,请跳过这个步骤。
这里介绍一个应用docker疾速启动RabbitMQ的办法,当然前提是你得装置了docker。
执行如下命令启动一个rabbitmq的docker容器:
docker run --name rabbitmq1 -p 5672:5672 -p 15672:15672 -d rabbitmq
而后进入容器进行一些设置:
docker exec -it rabbitmq1 /bin/bash
启动管理工具、禁用指标采集(会导致某些API500谬误):
rabbitmq-plugins enable rabbitmq_management cd /etc/rabbitmq/conf.d/echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
最初重启容器:
docker restart rabbitmq1
最初浏览器中输出 http://127.0.0.0:15672 即可拜访,默认用户名和明码都是 guest 。
编写收发函数
为了不便演示,先来定义公布音讯和接管音讯的函数。其中公布函数应用了go-micro提供的Event类型,还有其它类型也能够提供Publish的性能,这里发送的数据格式是Json字符串。接管音讯的函数名称能够随便取,然而参数和返回值必须符合规范,也就是下边代码中的样子,这个函数也能够是绑定到某个类型的。
// 定义一个公布音讯的函数:每隔1秒公布一条音讯func loopPublish(event micro.Event) { for { time.Sleep(time.Duration(1) * time.Second) curUnix := strconv.FormatInt(time.Now().Unix(), 10) msg := "{\"Id\":" + curUnix + ",\"Name\":\"张三\"}" event.Publish(context.TODO(), msg) }}// 定义一个接管音讯的函数:将收到的音讯打印进去func handle(ctx context.Context, msg interface{}) (err error) { defer func() { if r := recover(); r != nil { err = errors.New(fmt.Sprint(r)) log.Println(err) } }() b, err := json.Marshal(msg) if err != nil { log.Println(err) return } log.Println(string(b)) return}
编写主体代码
这里先给出代码,外面提供了一些正文,后边还会有具体介绍。
func main() { // RabbitMQ的连贯参数 rabbitmqUrl := "amqp://guest:guest@127.0.0.1:5672/" exchangeName := "amq.topic" subcribeTopic := "test" queueName := "rabbitmqdemo_test" // 默认是application/protobuf,这里演示用的是Json,所以要改下 server.DefaultContentType = "application/json" // 创立 RabbitMQ Broker b := rabbitmq.NewBroker( broker.Addrs(rabbitmqUrl), // RabbitMQ拜访地址,含VHost rabbitmq.ExchangeName(exchangeName), // 交换机的名称 rabbitmq.DurableExchange(), // 音讯在Exchange中时会进行长久化解决 rabbitmq.PrefetchCount(1), // 同时生产的最大音讯数量 ) // 创立Service,外部会初始化一些货色,必须在NewSubscribeOptions前边 service := micro.NewService( micro.Broker(b), ) service.Init() // 初始化订阅上下文:这里不是必须的,订阅会有默认值 subOpts := broker.NewSubscribeOptions( rabbitmq.DurableQueue(), // 队列长久化,消费者断开连接后,音讯依然保留到队列中 rabbitmq.RequeueOnError(), // 音讯处理函数返回error时,音讯再次入队列 rabbitmq.AckOnSuccess(), // 音讯处理函数没有error返回时,go-micro发送Ack给RabbitMQ ) // 注册订阅 micro.RegisterSubscriber( subcribeTopic, // 订阅的Topic service.Server(), // 注册到的rpcServer handle, // 音讯处理函数 server.SubscriberContext(subOpts.Context), // 订阅上下文,也能够应用默认的 server.SubscriberQueue(queueName), // 队列名称 ) // 公布事件音讯 event := micro.NewEvent(subcribeTopic, service.Client()) go loopPublish(event) log.Println("Service is running ...") if err := service.Run(); err != nil { log.Println(err) }}
次要逻辑是:
1、先创立一个RabbitMQ Broker,它实现了规范的Broker接口。其中次要的参数是RabbitMQ的拜访地址和RabbitMQ交换机,PrefetchCount是订阅者(或称为消费者)应用的。
2、而后通过 NewService 创立go-micro服务,并将broker设置进去。这里边会初始化很多货色,最外围的是创立一个rpcServer,并将rpcServer和这个broker绑定起来。
3、而后是通过 RegisterSubscriber 注册订阅,这个注册有两个层面的性能:一是如果RabbitMQ上还不存在这个队列时创立队列,并订阅指定topic的音讯;二是定义go-micro程序从这个RabbitMQ队列接收数据的解决形式。
这里具体看下订阅的参数:
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
- topic:go-micro应用的是Topic模式,发布者发送音讯的时候要指定一个topic,订阅者依据须要只接管某个或某几个topic的音讯;
- s:音讯从RabbitMQ接管后会进入这个Server进行解决,它是NewService的时候外部创立的;
- h:应用了上一步创立的接管音讯的函数 handle,Server中的办法会调用这个函数;
- opts 是订阅的一些选项,这里须要指定RabbitMQ队列的名称;另外SubscriberContext定义了订阅的一些行为,这里DurableQueue设置RabbitMQ订阅音讯的长久化形式,个别咱们都心愿音讯不失落,这个设置的作用是即便程序与RabbitMQ的连贯断开,音讯也会保留在RabbitMQ队列中;AckOnSuccess和RequeueOnError定义了程序处理音讯呈现谬误时的行为,如果handle返回error,音讯会从新返回RabbitMQ,而后再投递给程序。
4、而后这里为了演示,通过NewEvent创立了一个Event,通过它每隔一秒发送1条音讯。
5、最初通过service.Run()把这个程序启动起来。
辛苦写了半天,看一下这个程序的运行成果:
留神个别发布者和订阅者是在不同的程序中,这里只是为了不便演示,才把他们放在一个程序中。所以如果只是公布音讯,就不须要订阅的代码,如果只是订阅,也不须要公布音讯的代码,大家应用的时候依据须要本人裁剪吧。
go-micro集成RabbitMQ的解决流程
这个局部来看一下音讯在go-micro和RabbitMQ中是怎么流转的,我画了一个示意图:
这个图有点简单,这里具体解说下。
首先分成三块:RabbitMQ、音讯公布局部、音讯接管局部,这里用不同的色彩进行了辨别。
- RabbitMQ不是本文的重点,就把它看成一个整体就行了。
- 音讯公布局部:从生产者程序调用Event.Publish开始,而后调用Client.Publish,到这里为止,都是在go-micro的外围模块中进行解决;而后再调用Broker.Publish,这里的Broker是RabbitMQ插件的Broker实例,从这里开始进入了RabbiitMQ插件局部,而后再顺次通过RabbitMQ Connection的Publish办法、RabbitMQ Channle的Publish办法,最终发送到RabbitMQ中。
- 音讯接管局部:Service.Run外部会调用rpcServer.Start,这个办法外部会调用Broker.Subscribe,这个办法是RabbitMQ插件中定义的,它会读取RegisterSubscriber时的一些RabbitMQ队列设置,而后再顺次传递到RabbitMQ Connection的Consume办法、RabbitMQ Channel的ConsumeQueue办法,最终连贯到RabbitMQ,并在RabbitMQ上设置好要订阅的队列;这些办法还会返回一个类型为amqp.Delivery的Go Channel,Broker.Subscribe一直的从这个Go Channel中读取数据,而后再发送到调用Broker.Subscribe时传入的一个音讯解决办法中,这里就是rpcServer.HandleEvnet,音讯通过一些解决后再进入rpcServer外部的路由解决模块,这里就是route.ProcessMessage,这个办法外部会依据以后音讯的topic查找RegisterSubscriber时注册的订阅,并最终调用到过后注册的用于接管音讯的函数。
这个处理过程还能够划分为业务局部、外围模块局部和插件局部。
- 首先创立一个插件的Broker实现,把它注册到外围模块的rpcServer中;
- 音讯的发送从业务局部进入外围模块局部,再进入具体实现Broker的插件局部;
- 音讯的接管则首先进入插件局部,而后再流转到外围模块局部,再流转到业务局部。
从上边的图中能够看到音讯都须要通过这个RabbitMQ插件进行解决,实际上能够只应用这个插件,就能实现音讯的发送和接管。这个演示代码我曾经提交到了Github,有趣味的同学能够在文末获取Github仓库的地址。
从上边这些划分中,咱们能够了解到设计者的整体设计思路,把握要害节点,用好用对,呈现问题时能够疾速定位。
填的几个坑
不能接管其它框架公布的音讯
这个是因为route.ProcessMessage查找订阅时应用了go-micro专用的一个头信息:
// get the subscribers by topic subs, ok := router.subscribers[msg.Topic()]
这个msg.Topic返回的是如下实例中的topic字段:
rpcMsg := &rpcMessage{ topic: msg.Header["Micro-Topic"], contentType: ct, payload: &raw.Frame{Data: msg.Body}, codec: cf, header: msg.Header, body: msg.Body, }
其它框架不会有这么一个头信息,除非专门适配go-micro。
因为应用RabbitMQ的场景下,整个开发都是围绕RabbitMQ做的,而且go-micro的解决逻辑没有思考RabbitMQ订阅能够应用通配符的状况,公布音讯的Topic、接管音讯的Topic与Micro-Topic的值匹配时都是依照是否相等的准则解决的,因而能够用RabbitMQ音讯自带的topic来设置这个音讯头。rabbitmq.rbroker.Subscribe 中接管到音讯后,就能够进行这个设置:
// Messages sent from other frameworks to rabbitmq do not have this header. // The 'RoutingKey' in the message can be used as this header. // Then the message can be transfered to the subscriber which bind this topic. msgTopic := header["Micro-Topic"] if msgTopic == "" { header["Micro-Topic"] = msg.RoutingKey }
这样go-micro开发的消费者程序就能接管其它框架公布的音讯了,其它框架无需适配。
RabbitMQ重启后订阅者和发布者有限阻塞
go-micro的RabbitMQ插件底层应用另一个库:github.com/streadway/amqp
对于发布者,RabbitMQ断开连接时amqp库会通过Go Channel同步告诉go-micro,而后go-micro能够发动从新连贯。问题呈现在这个同步告诉上,go-micro的RabbitMQ插件设置了接管连贯和通道的敞开告诉,然而只解决了一个告诉就去从新连贯了,这就导致有一个Go Channel始终阻塞,而这个阻塞会导致某个锁不能开释,这个锁又是Publish时候须要的,因而导致发布者有限阻塞。解决办法就是外层减少一个循环,等所有的告诉都收到了,再去做从新连贯。
对于订阅者,RabbitMQ断开连接时,它会始终阻塞在某个Go Channel上,直到它返回一个值,这个值代表连贯曾经从新建设,订阅者能够重建生产通道。问题也是呈现在这个阻塞的Go Channel上,因为这个Go Channel在每次收到amqp的敞开告诉时会从新赋值,而订阅者期待的Go Channel可能是之前的旧值,永远也不会返回,订阅者也就有限阻塞了。解决办法呢,就是在select时减少一个time.After,让期待的Go Channel有机会更新到新值。
代码就不贴了,有趣味的能够到Github中去看:https://github.com/go-micro/p...
对于这两个问题的批改曾经合并到官网仓库中,大家去get最新的代码就能够了。
这两个坑填了,基本上就能满足我的须要了。当然可能还有其它的坑,比方go-micro的RabbitMQ插件如同没有发布者确认的性能,这个要实现,还得好好想想怎么改。
好了,以上就是本文的次要内容。
老规矩,代码曾经上传到Github,欢送拜访:https://github.com/bosima/go-...
播种更多架构常识,请关注微信公众号 萤火架构。原创内容,转载请注明出处。