关于golang:gomicro集成RabbitMQ实战和原理

30次阅读

共计 7178 个字符,预计需要花费 18 分钟才能阅读完成。

在 go-micro 中异步音讯的收发是通过 Broker 这个组件来实现的,底层实现有 RabbitMQ、Kafka、Redis 等等很多种形式,这篇文章次要介绍 go-micro 应用 RabbitMQ 收发数据的办法和原理。

Broker 的外围性能

Broker 的外围性能是 Publish 和 Subscribe,也就是公布和订阅。它们的定义是:

Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

公布

公布第一个参数是 topic(主题),用于标识某类音讯。

公布的数据是通过 Message 承载的,其包含音讯头和音讯体,定义如下:

type Message struct {Header map[string]string
    Body   []byte}

音讯头是 map,也就是一组 KV(键值对)。

音讯体是字节数组,在发送和接管时须要开发者进行编码和解码的解决。

订阅

订阅的第一个参数也是 topic(主题),用于过滤出要接管的音讯。

订阅的数据是通过 Handler 解决的,Handler 是一个函数,其定义如下:

type Handler func(Event) error

其中的参数 Event 是一个接口,须要具体的 Broker 来实现,其定义如下:

type Event interface {Topic() string
    Message() *Message
    Ack() error
    Error() error}
  • Topic() 用于获取以后音讯的 topic,也是发布者发送时的 topic。
  • Message() 用于获取音讯体,也是发布者发送时的 Message,其中包含 Header 和 Body。
  • Ack() 用于告诉 Broker 音讯曾经收到了,Broker 能够删除音讯了,可用来保障音讯至多被生产一次。
  • Error() 用于获取 Broker 解决音讯过胜利的谬误。

开发者订阅数据时,须要实现 Handler 这个函数,接管 Event 的实例,提取数据进行解决,依据不同的 Broker,可能还须要调用 Ack(),解决呈现谬误时,返回 error。

go-micro 集成 RabbitMQ 实战

大略理解了 Broker 的定义之后,再来看下如何应用 go-micro 收发 RabbitMQ 音讯。

启动一个 RabbitMQ

如果你曾经有一个 RabbitMQ 服务器,请跳过这个步骤。

这里介绍一个应用 docker 疾速启动 RabbitMQ 的办法,当然前提是你得装置了 docker。

执行如下命令启动一个 rabbitmq 的 docker 容器:

docker run --name rabbitmq1 -p 5672:5672 -p 15672:15672 -d rabbitmq

而后进入容器进行一些设置:

docker exec -it rabbitmq1 /bin/bash

启动管理工具、禁用指标采集(会导致某些 API500 谬误):

rabbitmq-plugins enable rabbitmq_management
 
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf

最初重启容器:

docker restart rabbitmq1

最初浏览器中输出 http://127.0.0.0:15672 即可拜访,默认用户名和明码都是 guest。

编写收发函数

为了不便演示,先来定义公布音讯和接管音讯的函数。其中公布函数应用了 go-micro 提供的 Event 类型,还有其它类型也能够提供 Publish 的性能,这里发送的数据格式是 Json 字符串。接管音讯的函数名称能够随便取,然而参数和返回值必须符合规范,也就是下边代码中的样子,这个函数也能够是绑定到某个类型的。

// 定义一个公布音讯的函数:每隔 1 秒公布一条音讯
func loopPublish(event micro.Event) {
    for {time.Sleep(time.Duration(1) * time.Second)

        curUnix := strconv.FormatInt(time.Now().Unix(), 10)
        msg := "{\"Id\":" + curUnix + ",\"Name\":\" 张三 \"}"
        event.Publish(context.TODO(), msg)
    }
}

// 定义一个接管音讯的函数:将收到的音讯打印进去
func handle(ctx context.Context, msg interface{}) (err error) {defer func() {if r := recover(); r != nil {err = errors.New(fmt.Sprint(r))
            log.Println(err)
        }
    }()

    b, err := json.Marshal(msg)
    if err != nil {log.Println(err)
        return
    }

    log.Println(string(b))
    return
}

编写主体代码

这里先给出代码,外面提供了一些正文,后边还会有具体介绍。

func main() {
    // RabbitMQ 的连贯参数
    rabbitmqUrl := "amqp://guest:guest@127.0.0.1:5672/"
    exchangeName := "amq.topic"
    subcribeTopic := "test"
    queueName := "rabbitmqdemo_test"

    // 默认是 application/protobuf,这里演示用的是 Json,所以要改下
    server.DefaultContentType = "application/json"

    // 创立 RabbitMQ Broker
    b := rabbitmq.NewBroker(broker.Addrs(rabbitmqUrl),           // RabbitMQ 拜访地址,含 VHost
        rabbitmq.ExchangeName(exchangeName), // 交换机的名称
        rabbitmq.DurableExchange(),          // 音讯在 Exchange 中时会进行长久化解决
        rabbitmq.PrefetchCount(1),           // 同时生产的最大音讯数量
    )

    // 创立 Service,外部会初始化一些货色,必须在 NewSubscribeOptions 前边
    service := micro.NewService(micro.Broker(b),
    )
    service.Init()

    // 初始化订阅上下文:这里不是必须的,订阅会有默认值
    subOpts := broker.NewSubscribeOptions(rabbitmq.DurableQueue(),   // 队列长久化,消费者断开连接后,音讯依然保留到队列中
        rabbitmq.RequeueOnError(), // 音讯处理函数返回 error 时,音讯再次入队列
        rabbitmq.AckOnSuccess(),   // 音讯处理函数没有 error 返回时,go-micro 发送 Ack 给 RabbitMQ)

    // 注册订阅
    micro.RegisterSubscriber(
        subcribeTopic,    // 订阅的 Topic
        service.Server(), // 注册到的 rpcServer
        handle,           // 音讯处理函数
        server.SubscriberContext(subOpts.Context), // 订阅上下文,也能够应用默认的
        server.SubscriberQueue(queueName),         // 队列名称
    )

    // 公布事件音讯
    event := micro.NewEvent(subcribeTopic, service.Client())
    go loopPublish(event)

    log.Println("Service is running ...")
    if err := service.Run(); err != nil {log.Println(err)
    }
}

次要逻辑是:

1、先创立一个 RabbitMQ Broker,它实现了规范的 Broker 接口。其中次要的参数是 RabbitMQ 的拜访地址和 RabbitMQ 交换机,PrefetchCount 是订阅者(或称为消费者)应用的。

2、而后通过 NewService 创立 go-micro 服务,并将 broker 设置进去。这里边会初始化很多货色,最外围的是创立一个 rpcServer,并将 rpcServer 和这个 broker 绑定起来。

3、而后是通过 RegisterSubscriber 注册订阅,这个注册有两个层面的性能:一是如果 RabbitMQ 上还不存在这个队列时创立队列,并订阅指定 topic 的音讯;二是定义 go-micro 程序从这个 RabbitMQ 队列接收数据的解决形式。

这里具体看下订阅的参数:

func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
  • topic:go-micro 应用的是 Topic 模式,发布者发送音讯的时候要指定一个 topic,订阅者依据须要只接管某个或某几个 topic 的音讯;
  • s:音讯从 RabbitMQ 接管后会进入这个 Server 进行解决,它是 NewService 的时候外部创立的;
  • h:应用了上一步创立的接管音讯的函数 handle,Server 中的办法会调用这个函数;
  • opts 是订阅的一些选项,这里须要指定 RabbitMQ 队列的名称;另外 SubscriberContext 定义了订阅的一些行为,这里 DurableQueue 设置 RabbitMQ 订阅音讯的长久化形式,个别咱们都心愿音讯不失落,这个设置的作用是即便程序与 RabbitMQ 的连贯断开,音讯也会保留在 RabbitMQ 队列中;AckOnSuccess 和 RequeueOnError 定义了程序处理音讯呈现谬误时的行为,如果 handle 返回 error,音讯会从新返回 RabbitMQ,而后再投递给程序。

4、而后这里为了演示,通过 NewEvent 创立了一个 Event,通过它每隔一秒发送 1 条音讯。

5、最初通过 service.Run() 把这个程序启动起来。

辛苦写了半天,看一下这个程序的运行成果:

留神个别发布者和订阅者是在不同的程序中,这里只是为了不便演示,才把他们放在一个程序中。所以如果只是公布音讯,就不须要订阅的代码,如果只是订阅,也不须要公布音讯的代码,大家应用的时候依据须要本人裁剪吧。

go-micro 集成 RabbitMQ 的解决流程

这个局部来看一下音讯在 go-micro 和 RabbitMQ 中是怎么流转的,我画了一个示意图:

这个图有点简单,这里具体解说下。

首先分成三块:RabbitMQ、音讯公布局部、音讯接管局部,这里用不同的色彩进行了辨别。

  • RabbitMQ 不是本文的重点,就把它看成一个整体就行了。
  • 音讯公布局部:从生产者程序调用 Event.Publish 开始,而后调用 Client.Publish,到这里为止,都是在 go-micro 的外围模块中进行解决;而后再调用 Broker.Publish,这里的 Broker 是 RabbitMQ 插件的 Broker 实例,从这里开始进入了 RabbiitMQ 插件局部,而后再顺次通过 RabbitMQ Connection 的 Publish 办法、RabbitMQ Channle 的 Publish 办法,最终发送到 RabbitMQ 中。
  • 音讯接管局部:Service.Run 外部会调用 rpcServer.Start,这个办法外部会调用 Broker.Subscribe,这个办法是 RabbitMQ 插件中定义的,它会读取 RegisterSubscriber 时的一些 RabbitMQ 队列设置,而后再顺次传递到 RabbitMQ Connection 的 Consume 办法、RabbitMQ Channel 的 ConsumeQueue 办法,最终连贯到 RabbitMQ,并在 RabbitMQ 上设置好要订阅的队列;这些办法还会返回一个类型为 amqp.Delivery 的 Go Channel,Broker.Subscribe 一直的从这个 Go Channel 中读取数据,而后再发送到调用 Broker.Subscribe 时传入的一个音讯解决办法中,这里就是 rpcServer.HandleEvnet,音讯通过一些解决后再进入 rpcServer 外部的路由解决模块,这里就是 route.ProcessMessage,这个办法外部会依据以后音讯的 topic 查找 RegisterSubscriber 时注册的订阅,并最终调用到过后注册的用于接管音讯的函数。

这个处理过程还能够划分为业务局部、外围模块局部和插件局部。

  • 首先创立一个插件的 Broker 实现,把它注册到外围模块的 rpcServer 中;
  • 音讯的发送从业务局部进入外围模块局部,再进入具体实现 Broker 的插件局部;
  • 音讯的接管则首先进入插件局部,而后再流转到外围模块局部,再流转到业务局部。

从上边的图中能够看到音讯都须要通过这个 RabbitMQ 插件进行解决,实际上能够只应用这个插件,就能实现音讯的发送和接管。这个演示代码我曾经提交到了 Github,有趣味的同学能够在文末获取 Github 仓库的地址。

从上边这些划分中,咱们能够了解到设计者的整体设计思路,把握要害节点,用好用对,呈现问题时能够疾速定位。

填的几个坑

不能接管其它框架公布的音讯

这个是因为 route.ProcessMessage 查找订阅时应用了 go-micro 专用的一个头信息:

// get the subscribers by topic
    subs, ok := router.subscribers[msg.Topic()]

这个 msg.Topic 返回的是如下实例中的 topic 字段:

    rpcMsg := &rpcMessage{topic:       msg.Header["Micro-Topic"],
        contentType: ct,
        payload:     &raw.Frame{Data: msg.Body},
        codec:       cf,
        header:      msg.Header,
        body:        msg.Body,
    }

其它框架不会有这么一个头信息,除非专门适配 go-micro。

因为应用 RabbitMQ 的场景下,整个开发都是围绕 RabbitMQ 做的,而且 go-micro 的解决逻辑没有思考 RabbitMQ 订阅能够应用通配符的状况,公布音讯的 Topic、接管音讯的 Topic 与 Micro-Topic 的值匹配时都是依照是否相等的准则解决的,因而能够用 RabbitMQ 音讯自带的 topic 来设置这个音讯头。rabbitmq.rbroker.Subscribe 中接管到音讯后,就能够进行这个设置:

// Messages sent from other frameworks to rabbitmq do not have this header.
        // The 'RoutingKey' in the message can be used as this header.
        // Then the message can be transfered to the subscriber which bind this topic.
        msgTopic := header["Micro-Topic"]
        if msgTopic == "" {header["Micro-Topic"] = msg.RoutingKey
        }

这样 go-micro 开发的消费者程序就能接管其它框架公布的音讯了,其它框架无需适配。

RabbitMQ 重启后订阅者和发布者有限阻塞

go-micro 的 RabbitMQ 插件底层应用另一个库:github.com/streadway/amqp

对于发布者,RabbitMQ 断开连接时 amqp 库会通过 Go Channel 同步告诉 go-micro,而后 go-micro 能够发动从新连贯。问题呈现在这个同步告诉上,go-micro 的 RabbitMQ 插件设置了接管连贯和通道的敞开告诉,然而只解决了一个告诉就去从新连贯了,这就导致有一个 Go Channel 始终阻塞,而这个阻塞会导致某个锁不能开释,这个锁又是 Publish 时候须要的,因而导致发布者有限阻塞。解决办法就是外层减少一个循环,等所有的告诉都收到了,再去做从新连贯。

对于订阅者,RabbitMQ 断开连接时,它会始终阻塞在某个 Go Channel 上,直到它返回一个值,这个值代表连贯曾经从新建设,订阅者能够重建生产通道。问题也是呈现在这个阻塞的 Go Channel 上,因为这个 Go Channel 在每次收到 amqp 的敞开告诉时会从新赋值,而订阅者期待的 Go Channel 可能是之前的旧值,永远也不会返回,订阅者也就有限阻塞了。解决办法呢,就是在 select 时减少一个 time.After,让期待的 Go Channel 有机会更新到新值。

代码就不贴了,有趣味的能够到 Github 中去看:https://github.com/go-micro/p…

对于这两个问题的批改曾经合并到官网仓库中,大家去 get 最新的代码就能够了。

这两个坑填了,基本上就能满足我的须要了。当然可能还有其它的坑,比方 go-micro 的 RabbitMQ 插件如同没有发布者确认的性能,这个要实现,还得好好想想怎么改。


好了,以上就是本文的次要内容。

老规矩,代码曾经上传到 Github,欢送拜访:https://github.com/bosima/go-…

播种更多架构常识,请关注微信公众号 萤火架构。原创内容,转载请注明出处。

正文完
 0