关于pulsar:中国移动张浩AMQP-on-Pulsar-的设计与应用一览

57次阅读

共计 4172 个字符,预计需要花费 11 分钟才能阅读完成。

本期 TGIP-CN 直播,咱们邀请到了来自中国移动云能力核心的张浩,他在 AMQP on Pulsar 我的项目中参加了很多,也一起帮助推动了我的项目的开发与更新,接下来的内容次要基于此次直播中对于 AoP 的一些介绍和细节干货。

相干背景

首先一个很重要的问题,就是咱们挪动外部为什么要本人做 AMQP 音讯队列?答案很显著。

一是为了满足外部组件的需要,中国移动是 OpenStack 的重度应用用户,而 OpenStack 中默认应用 RabbitMQ 作为 RPC 通信组件。应用 RabbitMQ 在线上环境部署和运维中,遇到了很多问题。因为咱们中间件团队次要采纳 Java 架构,围绕 RabbitMQ 只能做一些外围革新。

同时,中国移动的私有云中有很多须要应用 AMQP 音讯队列的客户,然而现有的 RabbitMQ 不满足云拜访的条件,因而,中国移动的中间件团队开始钻研 AMQP 音讯队列。在比照 Qpid,RocketMQ 和 Pulsar 之后,发现 Pulsar 的计算拆散存储架构非常适合目前的需要。

同时在对 Pulsar 进行调研之后,发现 StreamNative 曾经开源了 KoP,这更加确定了咱们要基于 Pulsar 开发 AMQP 的可行性。同时 Pulsar 社区活跃度很高,社区方面对本我的项目的反对力度很高。

所以,咱们便开启了与 StreamNative 共同开发 AoP 协定解决插件的征程,携手实现将 AoP 从 0 到 1 的变质。

AMQP 0.9.1

AMQP 0.9.1(高级音讯队列协定) 是一种消息传递协定,它容许符合标准的客户端应用程序与符合标准的消息传递中间件代理进行通信。

在 AoP 的初始版本中,首先实现了 0.9.1 版本协定。在这里次要有以下几个概念须要大家了解:

  • VirtualHost:资源的逻辑分组以及隔离
  • Exchange:音讯路由
  • Queue:音讯存储
  • Binding:路由规定

大体操作流程如上图:首先音讯会发送到 Exchange 中,后依据不同的设置 / 类型等路由到不同的 queue 中。消费者生产音讯时,是间接从 queue 中进行。

Protocol Handler

在理解了 AMQP 0.9.1 的模型后,咱们来看下 AoP 实现依赖的组件——Protocol Handler。

在 Pulsar 架构中,基于 Managed Leger 曾经实现了一套分布式的流程封装,包含如何去存储音讯和避免音讯失落。Broker 自身也实现了一些个性,比方 load-balancer、geo-replicator 等。

下层中的 Protocol Handler 属于轻量级工具,次要解决 Pulsar 中生产者和消费者发送进去的 TCP 申请,将其转化为可读取状态的操作。

Pulsar 2.5 版本后,将 Protocol Handler 接口独自脱离了进去,利用这个框架就能够独自实现自定义协定的转换,比方 Kafka、AMQP、MQTT 等。实现了不同协定的 Protocol Handler,Pulsar broker 就具备读写 / 解析其余协定的能力。下图为 AMQP on Pulsar 采纳的架构模型。

如何实现 AoP

具体实现次要是四个局部:模型转换(将 AMQP 模型接入到 Pulsar 外部)、音讯发送、音讯生产和 Proxy。

模型转换

AMQP 0.9.1 引入了一些根底概念,如 Exchagne、Queue 等。这些与 Pulsar 的模型有着较大的区别。所以咱们须要找到一种办法,反对利用 Pulsar 现有的一些特色,并将它们分割在一起。下图展现了音讯在 AoP 中的流转,并探讨了对于音讯长久化,音讯路由,音讯投递的细节。

???? AmqpExchange

AmqpExchange 蕴含一个原始音讯 Topic,用来保留 AMQP producer 发送的音讯。AmqpExchange 的 replicator 会将音讯解决到 AMQP 队列中。Replicator 是基于 Pulsar 的长久化游标,能够确保胜利将音讯发送到队列,而不会失落音讯。

???? AmqpMessageRouter

AmqpMessageRouter 用于保护音讯路由类型以及将音讯从 AmqpExchange 路由到 AmqpQueue 的路由规定。路由类型和路由规定这些原数据都长久化在 Pulsar 的存储中。所以就算 broker 重启,咱们也能够复原 AmqpMessageRouter。

???? AmqpQueue

AmqpQueue 提供一个索引音讯 Topic,用来存储路由到这个队列的 IndexMessage。IndexMessage 由原始音讯的 ID 和存储音讯的 Exchange 的名称组成。当 AmqpQueue 向 consumer 发送音讯时,AmqpQueue 会依据 IndexMessage 读取原始音讯数据,而后将其发送给 consumer。

???? Vhost 调配

在 AoP 中,一个 AMQP Vhost 只能由一个 Pulsar broker 提供服务,而一个 Pulsar broker 能够为多个 Vhost 提供服务。所以减少 Vhost 和 broker 的数量能够达到横向扩容的成果。通过应用更多的 Vhost 能够使用户构建更大的 AoP 集群。

在 AoP 中,一个 Vhost 基于一个 Pulsar namespace,并且这个 namespace 只能有一个 bundle。如果一台 broker 解体,其余的 broker 能够接管这台解体的 broker 保护的 Vhost。Vhost 也能够利用 broker 的负载平衡机制。broker 能够将 Vhost 从一台高负载的机器转移到一台闲暇的机器。下图展现了 Vhost 在 broker 上的分配情况。

音讯发送

在音讯发送层面,比方通过 AMQP Producer 做 Basic-publish 的音讯发送到 broker 端。当 broker 接管到音讯后,会将 AMQP 音讯体转换为 Pulsar 音讯体,找到以后 Exchange 对应的 topic,并进行音讯写入。依据不同的路由关系、参数配置等,会将 topic 中的音讯散发到不同 queue 中。

音讯生产

在生产端,次要用到了 Pulsar 订阅模型里的 Exclusive(独占订阅)和 Shared(共享订阅)两种类型。

Shared(共享订阅)是能够将所需数量的 consumer 附加到同一订阅。音讯以多个 consumer 的循环尝试散发模式传递,并且任何给定的音讯仅传递给一个 consumer。当消费者断开连接时,所有传递给它并且未被确认的音讯将被重新安排,以便发送给该订阅上残余的 consumer。

依据 AMQP 协定的定义,队列收到的音讯是以轮循形式调配给所有消费者,与 Pulsar Shared 订阅形式吻合。所以在 AoP 的实现中将这种 Shared 订阅形式定义为 default 订阅形式。

以上实用于 queue 为非排他队列的情景,如果是排他队列则采纳 Exclusive 订阅模型。

不论有多少个 consumer 同时存在,只会有一个 consumer 是沉闷的,也就是只有这一个 consumer 能够接管到这个 topic 的所有音讯。这种模式就为 Pulsar 订阅模式中的独占订阅(Exclusive),这种模式跟 AMQP 中的排他队列比拟吻合。

Pulsar 的生产模型与 RocketMQ 模型相同,一个相似于“推”模型,一个相似于“拉”模型。在 Pulsar 中 consumer 发送 flow 申请给到 broker,告诉 broker 能够将音讯推送给 consumer。此时 broker 通过查看准许后,将 broker 内的音讯推送给 consumer。

AMQP 中有两种生产模型,一种是 consume,另一种是 get。Consume 对应的是“推”模型,Get 对应“拉”模型。

在推模型中,咱们自定义了一个 consumer 并命名为“AmqpPushConsumer”,它继承了 Pulsar broker 内的 consumer,最大的不同是批改了 sendMessage() 办法。将 Pulsar 音讯转换成 AMQP 音讯后,间接推送到 AMQP consumer 中。

同时将 AmqpPushConsumer 增加到订阅形式中,并调用 consumer.handleFlow 命令。

在拉模型中,为了保障生产位点的一致性、防止音讯失落,减少了一个“AmqpPullConsumer”。同样继承了 Pulsar broker consumer,然而对以下两个办法进行了批改:

  • getAvailablePermits()-> 0
  • isBlocked()-> true

这种模型是被动从 cursor 中读取音讯,而后和推模型中的的 consumer 共享 read position。其音讯签收的形式也与推模型中的形式统一。

Proxy

AoP Proxy 用于在客户端与 AMQP 服务连贯时,帮忙查找负责解决 Vhost 数据的 owner broker,并在客户端与 Vhost 的 owner broker 之间传输数据。下图阐明了 AoP Proxy 的服务流程。

  1. AMQP 客户端建设与 AoP Proxy 的连贯。
  2. AoP Proxy 向 Pulsar cluster 发送查找申请,以便确定 Vhost 的 owner broker 的 URL 地址。
  3. Pulsar 集群 将 owner broker 的 URL 地址返回给 AoP Proxy。
  4. AoP Proxy 建设与 Vhose 的 owner broker 的连贯并开始在 AMQP 客户端和 Vhost 的 owner broker 之间传输数据。

目前,AoP Proxy 与 Pulsar broker 独特工作。用户能够通过配置 amqpProxyEnable 来抉择是否开启 AoP Proxy 服务。

如何应用 AoP

目前 AoP 我的项目是开源在 StreamNative 我的项目库内,应用 Apache License V2 许可证,我的项目地址为可参考下方链接:https://github.com/streamnati…。

欢送大家参加奉献和应用,具体步骤能够参考 readme 的文字介绍。

后续打算

  • 目前根本实现了 AMQP 0.9.1 版本的协定反对,将来会反对 AMQP 1.0 协定:
  • 将 AMQP 元数据做集成化存储,不便信息集中化治理
  • 丰盛 AMQP 相干监控项
  • 后续将反对 namespace 的多 bundle 机制,目前仍为单 bundle 机制

总结

本次分享次要针对 AMQP on Pulsar 我的项目的产生过程、实现细节等层面进行了一些技术和概念上的分享,心愿通过本次分享,大家能够对 AoP 我的项目有了更清晰的了解,感兴趣的话也能够参加该我的项目奉献哦!

正文完
 0