关于java:RocketMQ-Connect-构建流式数据处理平台

36次阅读

共计 7300 个字符,预计需要花费 19 分钟才能阅读完成。

本文作者:孙晓健,Apache RocketMQ Committer

01 RocketMQ Connect

RocketMQ Connect 是一款可扩大的在 RocketMQ 与其余零碎之间做流式数据传输的工具,可能轻松将 RocketMQ 与其余存储技术进行集成。RocketMQ Connect 应用特定的 Source 插件类型,将数据发送到 RocketMQ  Topics 中,并通过 Sink 监听 Topics 将数据写到上游指定数据存储中。应用过程中 Connector 能够通过 JSON 形式进行配置,无需编码。数据流转过程从源到目标,通过 RocketMQ 进行桥接。

RocketMQ Connect 具备以下个性:

①通用性:Connect 制订了规范 API,包含 Connector、Task、Converter、Transform, 开发者能够通过规范 API 扩大本人插件,达到本人需要。

②Offset 主动治理(断点续传):Source 方面——用户在开发 Connect 时,能够通过 Offset 进行增量数据拉取。零碎外部会主动对 Offset 做治理,会将上次拉取 Offset 信息进行长久化。下次工作重启时,能够通过上次提交的 Offset 持续进行数据增量拉取,无需从头进行数据同步;Sink 方面——基于 RocketMQ 本身的 Offset 提交策略,在外部实现了主动提交形式,工作运行时会主动解决,容许用户配置 Offset 提交距离;如果零碎自带 offset 曾经能够满足需要,则毋庸另外保护 Offset;如果零碎自带 Offset 无奈满足需要,则能够通过 Task API 进行保护。Task API 中自带 Offset 保护能力,能够在 Connect 中自行决定 Offset 长久化逻辑,比方长久化到 MySQL、Redis 中。下次工作启动时,能够主动从 Offset 存储位点获取下一次执行 Offset,持续做增量拉取。

③分布式、可扩大、容错:能够分布式的形式进行部署,自带容错能力。Worker 宕机或增加 Worker 时,工作会主动做重新分配、运行,在各集群 Worker 之间做均衡。工作失败后,也会主动重试。重试完可主动 Rebalance 到不同 Worker 机器上。

④运维和监控:Connect 提供了规范的集群治理性能,包含 Connect 治理性能以及插件治理性能。能够通过 API 形式对工作做启停操作,也能够查看工作在运行过程中的运行状态以及异样状态。并且能够进行指标上报,工作在数据拉取与数据写入后,数据总量、数据速率等都能够通过 Metrics 形式做数据上报。此外,Metrics 也提供了规范的上报 API , 能够基于规范 API 做指标扩大和上报形式的扩大,比方上报到 RocketMQ topic 中、Prometheus 等。

⑤批流一体:Source 在做数据拉取时,能够通过 JDBC 或 指定插件 sdk 的形式,做批量数据拉取,转换为流形式,也能够应用 CDC 形式,通过 增量快照 或类 Mysql binlog 监听形式获取源端全量与增量变更数据,推给 RocketMQ, 上游能够通过 Flink 或 RocketMQ Stream 进行流式解决做状态计算, 也可间接落到数据存储引擎中,如 Hudi、Elasticsearch、Mysql 等。

⑥Standalone、Distributed 模式:Standalone 模式次要用于测试环境,Distributed 模式次要用于生产环境。在试用过程中能够用 Standalone 形式做部署,得益于其不会做 Config 存储,每次启动时都能够带独立工作,帮忙调试。

Connect 组件蕴含以下几类:

  • Connector:作为工作协调的高级形象,形容了 Task 运行形式以及如何做 Task 拆分。
  • Task:负责理论数据拉取操作,并负责 offset 的保护和 Task Metrics 数据的收集。
  • Worker:执行 Task 工作的过程。
  • Record Converter:在 Source 与 Sink 之间做数据转换,Record 通过 Schema 制订数据契约,Schema 能够随数据传输,也能够通过 RocketMQ Schema Registry 进行近程存储,目前反对了 Avro 和 JSON 两种类型的 Converter。
  • Transform:数据传输过程中做数据转换。如进行字段变更、类型变更、做空值或已知谬误值过滤等;还能够通过扩大 groovy transform、python transform 等脚本对数据进行简单的转换, 亦可做近程调用来进行静态数据的补全或做函数计算。
  • Dead Letter Queue:在数据从 Source 端到 Sink 端的过程中,数据 Convert 转化谬误、网络超时、逻辑谬误造成写入失败等状况,能够依据本人编写的插件逻辑来决定是将数据写入到谬误队列中、或疏忽谬误持续进行、或呈现谬误后进行工作等。写入谬误队列中的数据,在不计较数据有序的状况下可自助进行异步修复后再写入。
  • Metrics:进步工作运行过程中的可观测性,工作在数据拉取与数据写入时,须要监测工作拉取的数据量、写入数据量、拉取速率、写入速率、差值、内存占用等,都能够通过 Metrics 进行指标上报,供零碎经营和运维应用。

上图为数据在 Connect 中的流转过程。

分布式部署下,Source 与 Sink 能够在不同 Worker 中,不相互依赖,一个 Connector 下可蕴含 Task、Transform、Converter 程序执行。Task 负责从源端拉取数据,Task 并发数量由自定义插件的分片形式决定。拉取到数据后,若两头配置了数据处理 Transform,数据会顺次通过配置的一个或者多个 Transform 后,再将数据传送给 Converter, Converter 会将数据进行从新组织成可传输的形式,若应用了 RocketMQ Schema Registry,则会进行 Schema 的校验、注册或降级,通过转换后的数据,最终写入至两头 Topic 中供上游 Sink 应用。上游 Sink 能够选择性的监听一个或者多个 Topic,Topic 中传输来的数据能够是雷同存储引擎中的,也能够是异构存储引擎中的数据,数据在 Sink 转换后,最终传给流计算引擎或者间接写入到目标存储中。

在转换过程中,Source Converter 与 Sink Converter 要保持一致。不同的 Converter 解析的 Schema 格局会有差别,若 Converter 不统一,会造成 Sink 解析数据的失败。不同组件之间的差异化,能够通过自定义 Transform 来进行兼容。

以上架构具备如下几点劣势:

①涣散架构:Source 与 Sink 之间通过 Topic 进行解耦,E、T、L 不再是一个整体。个别雷同存储引擎的数据的读取和写入 QPS 差距很大,所以一体化的 ETL 在数据的读取时会受到指标库写入性能的制约。

而 RocketMQ Connect 中的 Source 和 Sink 解耦后,能够做 Source 和 Sink 两端独立扩缩容,实现数据读取和写入的动态平衡,互不影响。

②规范 API:升高应用难度,扩大简便,在 API 中形象了编写并发的具体形式,插件开发者可自定义拆分。

③标准的数据抽象:应用 Topic 做解耦后,须要在 Source 和 Sink 之间建设数据契约。Connect 次要通过 Schema 进行数据束缚。以此来反对异构数据源之间的数据集成。

④专一数据拷贝:Connect 次要专一于与异构数据源之间的数据集成,不做流计算,反对数据拷贝到流(Flink、RocketMQ Stream)零碎中,再做流计算。

⑤轻量:依赖少。如果集群中已有 RocketMQ 集群,能够间接部署 RocketMQ Connect 做数据同步工作,部署非常简单,无需额定部署调度组件。RocketMQ Connect 自带任务分配组件,无需额定关注。

另外,依靠 RocketMQ 弱小的性能,能够在不同零碎之间做大规模数据的迁徙。Source 次要依赖于 RocketMQ 的写入能力,无需期待事务尾端数据写入。Sink 依靠于 Topic 的扩大能力,能够依据两头 Topic 的分区数量来决定上游 Sink 并发度,主动做扩大。工作做完扩大后,零碎会对 Connector 进行重新分配 , 保障负载平衡,Offset 不会丢,能够基于上次运行状态持续向下运行,无需人工干预。也能够依赖 RocketMQ 的有序策略来做程序数据的同步。

02 RocketMQ Connect 原理

管理区 — 次要做工作配置变更或查问的接管,包含创立、删除、更新、启停和查看 Connector 等操作。变更工作后,治理端会将工作提交到 RocketMQ 共享配置的 Topic 中。因为每一个 Worker 都监听了雷同 Topic,所以每个 Worker 都能获取 Config 信息,而后触发集群 Rebalance 再从新做任务分配,最终达到全局工作均衡。

运行时区 – 次要为曾经被调配到以后 Worker 的 Task 提供运行空间。包含工作的初始化、数据拉取、Offset 保护、工作启停状态上报、Metrics 指标上报等。

调度区  — Connect 自带任务分配调度工具,通过 hash 或 一致性 hash 在 Worker 间进行工作均衡,次要监听 Worker 和 Connector 的变更。比方 Worker 增加或删除、Connector 配置变更、工作启停等。获取状态变更用来更新本地工作状态,并决定是否进行下一轮 Rebalance 操作,以达到整个集群的负载平衡。

治理端、运行时区与调度区存在每个集群的每个 Worker 中,集群 Worker 间通信次要通过共享 Topic 来进行告诉,Worker 之间无主节、备节点之分,这让集群运维起来十分的不便,只须要在 Broker 中建对应共享 Topic 即可,但因为 Task 状态变动的动作只会产生在一个 Worker 中,集群之间共享会存在短暂提早,所以通过 Rest Api 查问 Connector 状态时可能会呈现短暂不统一的景象。

服务发现过程。有变更时,每一个 Worker 都能够发现节点变更,实现服务主动发现的成果。

①启动新的 Worker 时,Worker 会向依赖的 RocketMQ Topic 注册客户端变更监听。雷同的 Consumer Group,当有新客户端增加时,注册了该事件的客户端会收到变更告诉,Worker 收到变更事件后,会被动更新以后集群的 Worker 列表。

②当 Worker 宕机或者缩容时也会产生雷同的成果。

RocketMQ Connect 任务分配流程如下:

通过调用 Rest API 形式创立 Connector。如果 Connector 不存在,则主动进行创立,若存在则更新。创立后,会向 Config Topic 发送告诉,告诉 Worker 有工作变更。Worker 获取工作变更后,再进行重新分配,以达到负载平衡的成果。进行工作也会产生雷同的成果,目前每个 Worker 都会存储全量的工作及状态,但只运行调配给以后 Worker 的 Task。

目前零碎默认提供了简略 hash 或 一致性 hash 两种工作分配模式,倡议抉择一致性 hash 模式。因为在一致性 hash 状况下,做 Rebalance 时变更比一般 hash 变更范畴小,局部曾经被调配好的工作不会再进行负载。

Connector 扩大因素分为自定义配置、并发和 Task 信息。

自定义配置蕴含连贯信息(外围配置项)、Convertor 信息、Transform 信息等。Connector 仅作为工作全局概要和协调器,理论产生成果的仍然是调配后的 Task。比方 1 亿数据分为多个工作拉取,别离放在不同 Task 中执行,因而须要通过 Connector 去依照正当的逻辑做 Task 的拆分,这些拆分的操作须要在申明 Connector 时制订。Connecor 将配置拆分后,将理论数据拉取逻辑配置告知 Task,Task 决定数据拉取的具体形式。

Task 扩大因素包含配置初始化、连贯开启与敞开、拉取频率、错误处理、理论数据拉取逻辑以及 Offset 保护。

整个零碎中全局 Converter 转换都应用同一套 API,分为两种模式:

本地模式 :从 Source Connect 拉取到数据后,由 Converter 做数据转换。转换过程中,本地操作会将 Schema 与 value 值合并为 Connect record 向上游传递。上游通过雷同 Converter 再将其转换为 Record,推给 Sink task 做数据写入。两头通过 Convert Schema 做了数据契约,能够在 Source 与 Sink 之间转换。本地模式下,Schema 与 Value 作为一个整体传输,数据 Body 十分臃肿,每一条数据都带有 Schema 信息。但其长处为不存在版本兼容问题。

近程模式 :在数据转换时,会将 Schema 存到近程 RocketMQ Schema Registry 零碎中,在数据传输过程中只带 Value 值,不带 Schema 束缚信息。当 Sink 订阅 Topic 时,通过信息头带有的 Record ID 获取 Schema 信息、进行 Schema 校验,校验后再做数据转换。

Schema 保护在 RocketMQ Schema Registry 零碎中。因而在转换过程中能够在零碎中手工更新 Schema,而后用指定的 SchemaID 做转换,然而须要在 Converter 插件中做数据兼容。

Connect Converter 内置了扩大,有本地的 JSON、一般数据类型 Converter 等。如果内置扩大无奈满足需要,能够通过 Record Converter API 本人进行扩大。扩大后,将 Converter 包置于 Worker 运行插件目录下,零碎即可主动加载。

配置形式分为 Key 和 Value 两种。其中 Key 标注数据的惟一,也能够是 Struct 结构化数据;Value 是实在传输的数据。

Transform 是在 Connector 与 Convertor 之间做数据映射转换与简略计算的辅助工具。当 Source Converter 与 Sink Connector 在应用过程中达不到业务需要时,能够通过编写 Transform 插件的形式做数据适配。比方不同业务、不同数据源插件之间的数据转换,如字段映射、字段派生、类型转换、字段补全、简单函数计算等。

零碎中内置的 Transform 模式有比方字段扩大、替换等。如果不满足需要,能够通过 API 自行扩大 Transform。部署时,只需将编写后的扩大打好包搁置对应插件目录下,即可主动加载。

具体配置形式如上图左下方所示,Transform 的运行为串行,能够对一个值做多个转换,能够配置多个 Transform。须要配置多个 Transform 的状况下,通过逗号进行分隔, 名称不能反复。

Source Task 做数据拉取或变更监听时,例如,通过 JDBC Mysql 形式做数据增量拉取时,须要指定 Offset 增量拉取的形式,能够通过自增 ID 或 Modify time 的形式。每次数据拉取实现发送胜利后,会向 Offset writer 中提交增量信息(id 或者 modify time),零碎会异步进行长久化。工作下次启动时,会主动获取 Offset,从上次位点开始解决数据,达到断点续传的成果。

封装 Offset 时没有固定模式,能够通过本人的形式拼接 Offset key 或 value 值,惟一依赖的是 RocketMQ 中的 Connect offset topic 信息,次要为推送给其余 worker 做本地 Offset 更新。如果应用零碎的 Offset 保护,则用户只须要决定保护上报逻辑,无需关注如何保障 Offset 提交、Offset 回滚模式等,所有都由零碎保障。

运行过程中,若开启了死信队列,正确的数据会输送到目标端,谬误数据会输送到谬误队列中。业务方能够通过异步形式做数据处理,然而该种状况下无奈保障有序。如果要保证数据有序,须要在触发报错的状况下将 Task 进行,先进行数据修复,修复后再启动 Task。

如果单个 Task 解决数据报错,只需进行出错的 Task,其余 Task 不受影响。因为每个 Task 在解决数据时生产的 Query 不一样,如果指定了 Key,会依照 Key 做数据分区,而后保障分区内每个 Query 有序,因而单个 Task 进行不会影响全局有序性。

03 RocketMQ Connect 应用场景

RocketMQ Connect 可能实用于大部分传统 ETL 实用的场景。另外,传统 ETL 无奈实现的比方实时流传输、流批一体、快照性能等,RocketMQ Connect 亦可能实现。

新旧零碎迁徙场景 :业务部降级变更过程中呈现了类型变更、表拆分或扩容操作、增加索引的状况下可能导致停机耗时十分久,能够通过 RocketMQ Connect 做数据从新搬迁。

分库分表场景 :以后市面上有很多分库分表插件,能够通过 Connect 适配开源分库分表客户端做分库分表工作,也能够基于 RocketMQ 本人做分库分表逻辑,源端与目标端不变。数据从单表中取出后,能够在 Transform 中做分库分表逻辑。能够通过 Transform 做路由。路由到不同 Topic 中,在上游能够通过监听不同 Topic 落到曾经分好的库表中。

多活 :RocketMQ Connect 反对集群间 Topic 及元数据的拷贝,可保障多核心的 Offset 统一。

数据订阅场景 :通过 CDC 模式做数据监听,向上游做数据告诉。供上游做数据订阅以及即时数据更新。同时也能够将数据拉取后通过 HTTP 的形式间接推送到上游业务零碎中,相似于 Webhook 的形式,然而须要对申请做验权、限流等。

其次,还有数据入仓入湖、冷数据备份、异构数据源数据集成等业务场景都能够通过 RocketMQ Connect 作为数据处理计划

从整体应用场景来看,大抵能够分为两局部,数据集成和流式解决。数据集成次要为将数据从一个零碎搬到另一个零碎,能够在异构数据源中进行数据同步。流式解决次要为将批处理信息通过批量数据拉取,或 CDC 模式将增量数据同步到对应流解决零碎中,做数据聚合、窗口计算等操作,最终再通过 Sink 写入到存储引擎中。

04 RocketMQ Connect 生态

RocketMQ Connect 目前对上图中产品均可能提供反对,平台也提供了 Kafka Connect 插件的适配。

正文完
 0