在 go-micro 中异步音讯的收发是通过 Broker 这个组件来实现的,底层实现有 RabbitMQ、Kafka、Redis 等等很多种形式,这篇文章次要介绍 go-micro 应用 RabbitMQ 收发数据的办法和原理。
Broker 的外围性能
Broker 的外围性能是 Publish 和 Subscribe,也就是公布和订阅。它们的定义是:
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
公布
公布第一个参数是 topic(主题),用于标识某类音讯。
公布的数据是通过 Message 承载的,其包含音讯头和音讯体,定义如下:
type Message struct {Header map[string]string
Body []byte}
音讯头是 map,也就是一组 KV(键值对)。
音讯体是字节数组,在发送和接管时须要开发者进行编码和解码的解决。
订阅
订阅的第一个参数也是 topic(主题),用于过滤出要接管的音讯。
订阅的数据是通过 Handler 解决的,Handler 是一个函数,其定义如下:
type Handler func(Event) error
其中的参数 Event 是一个接口,须要具体的 Broker 来实现,其定义如下:
type Event interface {Topic() string
Message() *Message
Ack() error
Error() error}
- Topic() 用于获取以后音讯的 topic,也是发布者发送时的 topic。
- Message() 用于获取音讯体,也是发布者发送时的 Message,其中包含 Header 和 Body。
- Ack() 用于告诉 Broker 音讯曾经收到了,Broker 能够删除音讯了,可用来保障音讯至多被生产一次。
- Error() 用于获取 Broker 解决音讯过胜利的谬误。
开发者订阅数据时,须要实现 Handler 这个函数,接管 Event 的实例,提取数据进行解决,依据不同的 Broker,可能还须要调用 Ack(),解决呈现谬误时,返回 error。
go-micro 集成 RabbitMQ 实战
大略理解了 Broker 的定义之后,再来看下如何应用 go-micro 收发 RabbitMQ 音讯。
启动一个 RabbitMQ
如果你曾经有一个 RabbitMQ 服务器,请跳过这个步骤。
这里介绍一个应用 docker 疾速启动 RabbitMQ 的办法,当然前提是你得装置了 docker。
执行如下命令启动一个 rabbitmq 的 docker 容器:
docker run --name rabbitmq1 -p 5672:5672 -p 15672:15672 -d rabbitmq
而后进入容器进行一些设置:
docker exec -it rabbitmq1 /bin/bash
启动管理工具、禁用指标采集(会导致某些 API500 谬误):
rabbitmq-plugins enable rabbitmq_management
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
最初重启容器:
docker restart rabbitmq1
最初浏览器中输出 http://127.0.0.0:15672 即可拜访,默认用户名和明码都是 guest。
编写收发函数
为了不便演示,先来定义公布音讯和接管音讯的函数。其中公布函数应用了 go-micro 提供的 Event 类型,还有其它类型也能够提供 Publish 的性能,这里发送的数据格式是 Json 字符串。接管音讯的函数名称能够随便取,然而参数和返回值必须符合规范,也就是下边代码中的样子,这个函数也能够是绑定到某个类型的。
// 定义一个公布音讯的函数:每隔 1 秒公布一条音讯
func loopPublish(event micro.Event) {
for {time.Sleep(time.Duration(1) * time.Second)
curUnix := strconv.FormatInt(time.Now().Unix(), 10)
msg := "{\"Id\":" + curUnix + ",\"Name\":\" 张三 \"}"
event.Publish(context.TODO(), msg)
}
}
// 定义一个接管音讯的函数:将收到的音讯打印进去
func handle(ctx context.Context, msg interface{}) (err error) {defer func() {if r := recover(); r != nil {err = errors.New(fmt.Sprint(r))
log.Println(err)
}
}()
b, err := json.Marshal(msg)
if err != nil {log.Println(err)
return
}
log.Println(string(b))
return
}
编写主体代码
这里先给出代码,外面提供了一些正文,后边还会有具体介绍。
func main() {
// RabbitMQ 的连贯参数
rabbitmqUrl := "amqp://guest:guest@127.0.0.1:5672/"
exchangeName := "amq.topic"
subcribeTopic := "test"
queueName := "rabbitmqdemo_test"
// 默认是 application/protobuf,这里演示用的是 Json,所以要改下
server.DefaultContentType = "application/json"
// 创立 RabbitMQ Broker
b := rabbitmq.NewBroker(broker.Addrs(rabbitmqUrl), // RabbitMQ 拜访地址,含 VHost
rabbitmq.ExchangeName(exchangeName), // 交换机的名称
rabbitmq.DurableExchange(), // 音讯在 Exchange 中时会进行长久化解决
rabbitmq.PrefetchCount(1), // 同时生产的最大音讯数量
)
// 创立 Service,外部会初始化一些货色,必须在 NewSubscribeOptions 前边
service := micro.NewService(micro.Broker(b),
)
service.Init()
// 初始化订阅上下文:这里不是必须的,订阅会有默认值
subOpts := broker.NewSubscribeOptions(rabbitmq.DurableQueue(), // 队列长久化,消费者断开连接后,音讯依然保留到队列中
rabbitmq.RequeueOnError(), // 音讯处理函数返回 error 时,音讯再次入队列
rabbitmq.AckOnSuccess(), // 音讯处理函数没有 error 返回时,go-micro 发送 Ack 给 RabbitMQ)
// 注册订阅
micro.RegisterSubscriber(
subcribeTopic, // 订阅的 Topic
service.Server(), // 注册到的 rpcServer
handle, // 音讯处理函数
server.SubscriberContext(subOpts.Context), // 订阅上下文,也能够应用默认的
server.SubscriberQueue(queueName), // 队列名称
)
// 公布事件音讯
event := micro.NewEvent(subcribeTopic, service.Client())
go loopPublish(event)
log.Println("Service is running ...")
if err := service.Run(); err != nil {log.Println(err)
}
}
次要逻辑是:
1、先创立一个 RabbitMQ Broker,它实现了规范的 Broker 接口。其中次要的参数是 RabbitMQ 的拜访地址和 RabbitMQ 交换机,PrefetchCount 是订阅者(或称为消费者)应用的。
2、而后通过 NewService 创立 go-micro 服务,并将 broker 设置进去。这里边会初始化很多货色,最外围的是创立一个 rpcServer,并将 rpcServer 和这个 broker 绑定起来。
3、而后是通过 RegisterSubscriber 注册订阅,这个注册有两个层面的性能:一是如果 RabbitMQ 上还不存在这个队列时创立队列,并订阅指定 topic 的音讯;二是定义 go-micro 程序从这个 RabbitMQ 队列接收数据的解决形式。
这里具体看下订阅的参数:
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
- topic:go-micro 应用的是 Topic 模式,发布者发送音讯的时候要指定一个 topic,订阅者依据须要只接管某个或某几个 topic 的音讯;
- s:音讯从 RabbitMQ 接管后会进入这个 Server 进行解决,它是 NewService 的时候外部创立的;
- h:应用了上一步创立的接管音讯的函数 handle,Server 中的办法会调用这个函数;
- opts 是订阅的一些选项,这里须要指定 RabbitMQ 队列的名称;另外 SubscriberContext 定义了订阅的一些行为,这里 DurableQueue 设置 RabbitMQ 订阅音讯的长久化形式,个别咱们都心愿音讯不失落,这个设置的作用是即便程序与 RabbitMQ 的连贯断开,音讯也会保留在 RabbitMQ 队列中;AckOnSuccess 和 RequeueOnError 定义了程序处理音讯呈现谬误时的行为,如果 handle 返回 error,音讯会从新返回 RabbitMQ,而后再投递给程序。
4、而后这里为了演示,通过 NewEvent 创立了一个 Event,通过它每隔一秒发送 1 条音讯。
5、最初通过 service.Run() 把这个程序启动起来。
辛苦写了半天,看一下这个程序的运行成果:
留神个别发布者和订阅者是在不同的程序中,这里只是为了不便演示,才把他们放在一个程序中。所以如果只是公布音讯,就不须要订阅的代码,如果只是订阅,也不须要公布音讯的代码,大家应用的时候依据须要本人裁剪吧。
go-micro 集成 RabbitMQ 的解决流程
这个局部来看一下音讯在 go-micro 和 RabbitMQ 中是怎么流转的,我画了一个示意图:
这个图有点简单,这里具体解说下。
首先分成三块:RabbitMQ、音讯公布局部、音讯接管局部,这里用不同的色彩进行了辨别。
- RabbitMQ 不是本文的重点,就把它看成一个整体就行了。
- 音讯公布局部:从生产者程序调用 Event.Publish 开始,而后调用 Client.Publish,到这里为止,都是在 go-micro 的外围模块中进行解决;而后再调用 Broker.Publish,这里的 Broker 是 RabbitMQ 插件的 Broker 实例,从这里开始进入了 RabbiitMQ 插件局部,而后再顺次通过 RabbitMQ Connection 的 Publish 办法、RabbitMQ Channle 的 Publish 办法,最终发送到 RabbitMQ 中。
- 音讯接管局部:Service.Run 外部会调用 rpcServer.Start,这个办法外部会调用 Broker.Subscribe,这个办法是 RabbitMQ 插件中定义的,它会读取 RegisterSubscriber 时的一些 RabbitMQ 队列设置,而后再顺次传递到 RabbitMQ Connection 的 Consume 办法、RabbitMQ Channel 的 ConsumeQueue 办法,最终连贯到 RabbitMQ,并在 RabbitMQ 上设置好要订阅的队列;这些办法还会返回一个类型为 amqp.Delivery 的 Go Channel,Broker.Subscribe 一直的从这个 Go Channel 中读取数据,而后再发送到调用 Broker.Subscribe 时传入的一个音讯解决办法中,这里就是 rpcServer.HandleEvnet,音讯通过一些解决后再进入 rpcServer 外部的路由解决模块,这里就是 route.ProcessMessage,这个办法外部会依据以后音讯的 topic 查找 RegisterSubscriber 时注册的订阅,并最终调用到过后注册的用于接管音讯的函数。
这个处理过程还能够划分为业务局部、外围模块局部和插件局部。
- 首先创立一个插件的 Broker 实现,把它注册到外围模块的 rpcServer 中;
- 音讯的发送从业务局部进入外围模块局部,再进入具体实现 Broker 的插件局部;
- 音讯的接管则首先进入插件局部,而后再流转到外围模块局部,再流转到业务局部。
从上边的图中能够看到音讯都须要通过这个 RabbitMQ 插件进行解决,实际上能够只应用这个插件,就能实现音讯的发送和接管。这个演示代码我曾经提交到了 Github,有趣味的同学能够在文末获取 Github 仓库的地址。
从上边这些划分中,咱们能够了解到设计者的整体设计思路,把握要害节点,用好用对,呈现问题时能够疾速定位。
填的几个坑
不能接管其它框架公布的音讯
这个是因为 route.ProcessMessage 查找订阅时应用了 go-micro 专用的一个头信息:
// get the subscribers by topic
subs, ok := router.subscribers[msg.Topic()]
这个 msg.Topic 返回的是如下实例中的 topic 字段:
rpcMsg := &rpcMessage{topic: msg.Header["Micro-Topic"],
contentType: ct,
payload: &raw.Frame{Data: msg.Body},
codec: cf,
header: msg.Header,
body: msg.Body,
}
其它框架不会有这么一个头信息,除非专门适配 go-micro。
因为应用 RabbitMQ 的场景下,整个开发都是围绕 RabbitMQ 做的,而且 go-micro 的解决逻辑没有思考 RabbitMQ 订阅能够应用通配符的状况,公布音讯的 Topic、接管音讯的 Topic 与 Micro-Topic 的值匹配时都是依照是否相等的准则解决的,因而能够用 RabbitMQ 音讯自带的 topic 来设置这个音讯头。rabbitmq.rbroker.Subscribe 中接管到音讯后,就能够进行这个设置:
// Messages sent from other frameworks to rabbitmq do not have this header.
// The 'RoutingKey' in the message can be used as this header.
// Then the message can be transfered to the subscriber which bind this topic.
msgTopic := header["Micro-Topic"]
if msgTopic == "" {header["Micro-Topic"] = msg.RoutingKey
}
这样 go-micro 开发的消费者程序就能接管其它框架公布的音讯了,其它框架无需适配。
RabbitMQ 重启后订阅者和发布者有限阻塞
go-micro 的 RabbitMQ 插件底层应用另一个库:github.com/streadway/amqp
对于发布者,RabbitMQ 断开连接时 amqp 库会通过 Go Channel 同步告诉 go-micro,而后 go-micro 能够发动从新连贯。问题呈现在这个同步告诉上,go-micro 的 RabbitMQ 插件设置了接管连贯和通道的敞开告诉,然而只解决了一个告诉就去从新连贯了,这就导致有一个 Go Channel 始终阻塞,而这个阻塞会导致某个锁不能开释,这个锁又是 Publish 时候须要的,因而导致发布者有限阻塞。解决办法就是外层减少一个循环,等所有的告诉都收到了,再去做从新连贯。
对于订阅者,RabbitMQ 断开连接时,它会始终阻塞在某个 Go Channel 上,直到它返回一个值,这个值代表连贯曾经从新建设,订阅者能够重建生产通道。问题也是呈现在这个阻塞的 Go Channel 上,因为这个 Go Channel 在每次收到 amqp 的敞开告诉时会从新赋值,而订阅者期待的 Go Channel 可能是之前的旧值,永远也不会返回,订阅者也就有限阻塞了。解决办法呢,就是在 select 时减少一个 time.After,让期待的 Go Channel 有机会更新到新值。
代码就不贴了,有趣味的能够到 Github 中去看:https://github.com/go-micro/p…
对于这两个问题的批改曾经合并到官网仓库中,大家去 get 最新的代码就能够了。
这两个坑填了,基本上就能满足我的须要了。当然可能还有其它的坑,比方 go-micro 的 RabbitMQ 插件如同没有发布者确认的性能,这个要实现,还得好好想想怎么改。
好了,以上就是本文的次要内容。
老规矩,代码曾经上传到 Github,欢送拜访:https://github.com/bosima/go-…
播种更多架构常识,请关注微信公众号 萤火架构。原创内容,转载请注明出处。