关于java:RocketMQ-Schema让消息成为流动的结构化数据

48次阅读

共计 5282 个字符,预计需要花费 14 分钟才能阅读完成。

本文作者:许奕斌,阿里云智能高级研发工程师。

Why we need schema

RocketMQ 目前对于音讯体没有任何数据格式的束缚,能够是 JSON,能够是对象 toString,也能够只是 word 或一段日志,序列化与反序列化过程齐全交给用户。业务上下游也须要对于音讯体的了解达成统一,方可基于 RocketMQ 进行通信。而以上现状会导致两个问题。

首先,类型平安问题。如果生产者或消费者来自齐全不同的团队,上游对数据格式进行了渺小但不兼容的改变,可能导致上游无奈失常地解决数据,且复原速度很慢。

其次,利用扩大问题。对于研发场景,尽管 RocketMQ 实现了链路上的解耦,但研发阶段的上游与上游仍然须要基于音讯了解做很多沟通和联调,耦合仍然较强,生产端的重构也须要牵累生产端一起变更。对于数据流场景,如果没有 schema 定义,每次在构建 ETL 时须要重写整个数据解析逻辑。

RocketMQ schema 提供了对音讯的数据结构托管服务,同时也为原生客户端提供了较为丰盛的序列化 / 反序列化 SDK,包含 Avro、JSON、PB 等,补齐了 RocketMQ 在数据治理和业务上下游解耦方面的短板。

如上图所示,在商业版 Kafka 上创立 topic 时,会揭示保护该 topic 相干 schema。如果保护了 schema,业务上下游看到该 topic 时,可能清晰地理解到须要传入什么数据,无效晋升研发效率。

咱们心愿 RocketMQ 既可能面向 App 业务场景,也可能面向 IoT 微音讯场景,还能面向大数据场景,以成为整个企业的业务中枢。

退出 RSQLDB 之后,用户能够用 SQL 形式剖析 RocketMQ 数据。RocketMQ 既能够作为通信管道,具备管道的流个性,又能够作为数据积淀,即具备数据库个性。如果 RocketMQ 要同时向流式引擎和 DB 引擎凑近,其数据定义、标准以及治理变得异样重要。

面对业务音讯场景时,咱们冀望 RocketMQ 退出 schema 之后可能领有以下劣势:

①数据治理:防止音讯脏数据产生,防止 producer 产生格局不标准的音讯。

②晋升研发效率:业务上下游研发阶段或联调阶段沟通老本升高。

③托管“契约”:将契约托管后,能够实现真正意义上的业务上下游解耦。

④晋升整个零碎的健壮性:躲避上游忽然无奈解析等数据异样。

面对流场景,咱们冀望 RocketMQ 具备下列劣势:

①数据治理:可能保障整条链路数据解析的流畅性。

②晋升传输效率:schema 独立托管,无需附加到数据之上,晋升了整个链路传输的效率。

③推动音讯 - 流 - 表的交融,topic 能够成为动静表。

④反对更丰盛的序列化形式,节约音讯存储老本。以后大部分业务场景均应用 JSON 解析数据,而大数据场景罕用的 Avro 形式更能节俭音讯存储老本。

整体架构

引入了 Schema Registry 后的整体架构如上图所示。在原有最外围的 producer、broker 和 Consumer 架构下引入 Schema Registry 用于托管音讯体的数据结构。

上层是 schema 的治理 API,包含创立、更新、删除、绑定等。与 producer 和 Consumer 的交互中,producer 发送给 broker 之前会做序列化。序列化时会向 registey 查问元数据而后做解析。Consumer 侧能够依据 ID、topic 查问,再做反序列化。RocketMQ 的用户在收发音讯时只须要关怀构造体,无需关怀如何将数据序列化和反序列化。

服务端

Schema Registry 的部署形式与 NameServer 相似,与 broker 拆散部署,因而 broker 不用强依赖于 Schema Registry,采纳了无状态部署模式,能够动静扩缩容。长久化方面,默认应用 Compact Topic5.0 新个性,用户也可自行实现存储插件,比方基于 MySQL 或 Git。治理接口上提供 Restful 接口做增删改查,也反对 schema 与多个 topic 绑定 \ 解绑。

利用启动之后,提供了自带 Swagger UI 做交互版本演进,提供 SchemaName 维度的版本演进和相应的兼容性校验,反对七种兼容性策略。元信息方面,每一个 schema 版本都会向用户裸露全局惟一 RecordID,用户获取到 RecordID 后能够到 registry 查找惟一 schema 版本。

代码设计如上图。次要为 spring boot 利用,暴露出一个 restful 接口。Controller 底下是 Service 层,波及到权限校验、jar 包治理、StoreManager,其中 StoreManager 包含本地缓存和远端长久化。

Schema Registry 的外围概念与 RocketMQ 内核做了对齐。比方 registry 有 cluster 概念,对应内核中的 cluster,Tenant 对应 NameSpace 概念,subject 对应内核中的 topic。每一个 schema 有惟一名称 SchemaName,用户能够将本人利用的 Java 类名称或全门路名称作为 SchemaName,保障全局惟一即可,能够绑定到 subject 上。每一个 schema 有惟一 ID,通过服务端雪花算法生成。SchemaVersion 的每一次更新都不会扭转 ID,然而会生成枯燥递增的版本号,因而一个 schema 能够具备多个不同版本。

ID 和 version 叠加在一起生成了一个新概念 record ID,裸露给用户用于惟一定位某一个 schema 版本。SchemaType 包含 Avro、Json、Protobuf 等罕用序列化类型,IDL 用于具体形容 schema 的结构化信息。

每一个 schema 有一个 ID,ID 放弃不变,但能够有版本迭代,比方从 version 1 到 version 2 到 version 3,每一个 version 反对绑定不同的 subject。Subject 能够近似地了解为 Flink table。比方右图为 应用 Flink SQL 创立一张表,先创立 RocketMQ topic 注册到 NameServer。因为有表构造,同时要创立 schema 注册到 subject 上。因而,引入 schema 之后,能够与 Flink 等数据引擎做无缝兼容。

Schema 次要存储以下类型的信息。

  • 元信息:包含类型、名称、ID、归属于以及兼容性。
  • 个版本具体内容:包含版本号、IDL、IDL 中字段、jar 包信息、绑定的 subject。
  • 命名信息:包含集群、租户、subject。
  • 审计信息。
  • 预留属性。

具体存储设计分为三层。

客户端缓存 :如果 producer Consumer 每一次收发音讯都要与 registy 交互,则十分影响性能和稳定性。因而 RocketMQ 实现了一层缓存,schema 更新频率比拟低,缓存能够满足大部分收发音讯的申请。

服务端缓存 :通过 RocksDB 做了一层缓存。得益于 RocksDB,服务重启和降级均不会影响自身的数据。

服务端长久化 :远端存储通过插件化形式实现,应用 RocketMQ5.0 的 compact topic 个性,其自身可能反对 KV 存储的模式。

远端长久化与本地缓存同步通过 registey 的 PushConsumer 做监听和同步。

目前 Schema Registry 反对 7 种兼容性策略。默认为 backward,小米公司外部实际也验证了默认策略根本够用。校验方向为消费者兼容生产者,即演进了 schema 之后,是须要先降级 Consumer,Consumer 的高版本能够兼容生产者的低版本。

如果兼容策略是 backward_transative,则能够兼容生产者的所有版本。

接口设计均遵循 Open Schema 规范,启动 registry 服务之后,只有拜访 local host 的 swagger UI 页面即可发动 http 申请,本人做 schema 治理。

客户端设计

客户端在音讯收发过程中,须要提供 SDK 做 schema 查问以及音讯的序列化和反序列化解决。

如上图,以前用户在发送时传递字节数组,接管时也是字节数组。当初咱们心愿发送端关怀一个对象,生产端也关怀一个对象。如果生产端没有感知到对象属于什么类,也能够通过 generate record 等通用类型了解音讯。因而,用户视角发送和接管到的均为相似于 public class Order 等结构化数据。

Producer 也能够反对主动创立和更新 schema,也反对 Avro、JSON 等支流的序列化形式。

设计准则为不入侵原客户端代码,不应用 schema 则音讯收发齐全不受影响,用户不感知 schema,感知的是序列化和反序化类型。且反对在序列化过程中按最新版本解析、按指定 ID 解析。另外,为了满足 streams 等十分强调轻量的场景,还反对了 without Schema Registry 的音讯解析。

上图代码为 schema 外围 API 序列化和反序列化。参数非常简单,只有传入 topic、原始音讯对象,即可序列化为 message body 格局。反序列化同理,传入 subject 和原始字节数组,即可将对象解析并传递给用户。

上图为集成了 schema 之后的 producer 样例。创立 producer 须要传入 registry URL 和序列化类型。发送时传入的并非字节数组,而是原始对象。

生产端创立时,需指定 registry URL 和序列化类型,而后通过 getMessage 办法间接获取泛型或理论对象。

ETL 场景落地

RocketMQ flink catlog 次要用于形容 RocketMQ Flink 的 Table、Database 等元数据,因而基于 Schema Registry 实现时须要人造对齐一些概念。比方 catalog 对应 cluster,database 对应 Tenant,subject 对应 table。

异构数据源的转化过程中,十分重要的一个环节为异构数据源 schema 如何做转换,波及到 converter。ConnectRecord 会将 data 和 schema 放在一起做传输,如果 converter 依赖 registry 做 schema 的第三方托管,则 ConnectRecord 无需将原来的 data 和 schema 放于一起,传输效率将会进步,这也是 connect 集成 Schema Registry 的出发点。

集成到 RocketMQ streams 场景的出发点在于心愿 RocketMQ streams API 的应用能够更加敌对。没有集成 schema 时,用户须要被动将数据转化成 JSON。集成后,在流剖析时,要凑近 Flink 或 streams 的应用习惯能够间接通过对象操作,用户应用更敌对。

上图代码中新增了参数 schemaConfig 用于配置 schema,包含序列化类型、指标 java 类,之后的 filter、map 以及 window 算子的计算均可基于对象操作,十分不便。

另外,集成 streams 目前还可反对根本类型解析、音讯自身做 group by 操作以及自定义反序列化优化器。

后续布局

将来,咱们将在以下后果方面继续精进。

第一,社区 SIG 倒退:小组刚经验了从 0 到 1 的建设,还有很多 todo list 尚未实现,也有很多 good first issue 适宜给社区新人做尝试。

第二,强化 Table 概念。RocketMQ 想要凑近流式引擎,须要一直强化 table 概念。因而,引入 schema 之后是比拟好的契机,能够将 RocketMQ 的 topic 概念晋升至 table 的概念,促成音讯和流表的深度交融。

第三,No-server 的 schema 治理。引入了 registry 组件后减少了肯定的内部组件依赖。因而一些强调轻量化的场景仍然心愿做 no-server 的 schema 治理。比方间接与 RocketMQ 交互,将信息长久化到 compact topic 上,做间接读、间接写或基于 Git 存储。

第四,列式查问。集成到 streams 之后,咱们发现能够依照字段去生产音讯、了解音讯。以后的 RocketMQ 音讯按行了解,解析计算时须要生产整个音讯体。streams 目前依照字段生产音讯曾经根本实现,后续冀望可能实现依照条件查问音讯、按字段查问音讯,将 RocketMQ 革新成查问引擎。

第五,数据血统 / 数据地图。当 RocketMQ 通过分级存储等个性缩短音讯的生命周期,它将能够被视为企业的数据资产。目前的痛点在于 RocketMQ 提供的 dashboard 上,业务人员很难感知到 topic 背地的业务语义。如果做好数据血统、理清数据 topic 上下游关系,比方谁在生产数据、被提供了哪些字段、哪些信息,则整个 dashboard 能够提供音讯角度的业务大盘,这其实具备很大的设想空间。

退出 Apache RocketMQ 社区

十年铸剑,Apache RocketMQ 的成长离不开寰球靠近 500 位开发者的积极参与奉献,置信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅能够结识社区大牛,晋升技术水平,也能够晋升集体影响力,促成本身成长。

社区 5.0 版本正在进行着热火朝天的开发,另外还有靠近 30 个 SIG(兴趣小组)等你退出,欢送立志打造世界级分布式系统的同学退出社区,增加社区开发者:rocketmq666 即可进群,参加奉献,打造下一代音讯、事件、流交融解决平台。

正文完
 0