关于消息中间件:基于-RocketMQ-Connect-构建数据流转处理平台

44次阅读

共计 5986 个字符,预计需要花费 15 分钟才能阅读完成。

本文作者:周波,阿里云智能高级开发工程师,Apache RocketMQ Committer。

01 从问题中来的 RocketMQ Connect

在电商零碎、金融零碎及物流零碎,咱们常常能够看到 RocketMQ 的身影。起因不难理解,随着数字化转型范畴的扩充及过程的放慢,业务零碎的数据也在每日暴增,此时为了保证系统的稳固运行,就须要把运行压力分担进来。RocketMQ 就负责着这样的角色,它的异步音讯解决与高并发读写能力,决定了零碎底层的重构不会影响下层利用的性能。而 RocketMQ 的另一个劣势——可伸缩能力,使零碎在面临流量的不确定性时,实现对流量的缓冲解决。此外,RocketMQ 的程序设计个性使其成为一个人造的排队引擎,例如,三个利用同时对一个后盾引擎发动申请,确保不引起“撞车”事变。因而,RocketMQ 被用在异步解耦、削峰填谷以及事务音讯等场景中。

然而,数字化转型浪潮也带来了更多用户对数据价值的关注——如何让数据产生更大利用价值?RocketMQ 本身不具备数据分析能力,然而有不少用户心愿从 RocketMQ Topic 中获取数据并进行在线或离线的数据分析。然而,应用市面上的数据集成或数据同步工具,将 RocketMQ Topic 数据同步到一些剖析零碎中尽管是一种可行计划,却会引入新的组件,造成数据同步的链路较长,时延绝对较高,用户体验不佳。

举个例子,假如业务场景中应用 OceanBase 作为数据存储,同时心愿将这些数据同步到 Elasticsearch 进行全文搜寻,有两种可行的数据同步计划。

计划一:从 OceanBase 中获取数据,写入 Elasticsearch 组件并进行数据同步,在数据源较少时此计划没什么问题,一旦数据增多,开发和保护都非常复杂,此时就要用到第二种计划。

计划二:引入消息中间件对上下游进行解藕,这能解决第一种计划的问题,然而一些较为简单的问题还没有齐全解决。比方,如何将数据从源数据同步到指标零碎并保障高性能,如果保障同步工作的局部节点挂掉,数据同步仍然失常进行,节点复原仍然能够断点续传,同时随着数据管道的增多,如何治理数据管道也变得十分困难。

总的来说,数据集成过程中的挑战次要有五个。

挑战一: 数据源多,市面上可能有上百个数据源,且各数据源的零碎差别较大,实现任意数据源之间的数据同步工作量较大,研发周期很长。

挑战二: 高性能问题,如何高效地从源数据系统同步到目标数据系统,并保障其性能。

挑战三: 高可用问题,即 Failover 能力,当一个节点挂掉是否这个节点的工作就进行了,工作重新启动是否还能够断点续传。

挑战四: 弹性扩缩容能力,依据零碎流量动静减少或缩小节点数量,既能通过扩容满足高峰期业务,也能在低峰期缩减节点,节省成本。

挑战五: 数据管道的治理运维,随着数据管道的增多,运维监控的数据管道也会变得越来越简单,如何高效治理监控泛滥的同步工作。

面对上述挑战 RocketMQ 如何解决?

第一,标准化数据集成 API(Open Messaging Connect API)。在 RocketMQ 生态中减少 Connect 组件,一方面对数据集成过程形象,形象规范的数据格式以及形容数据的 Schema,另一方面对同步工作进行形象,工作的创立、分片都形象成一套标准化的流程。

第二,基于规范的 API 实现 Connect Runtime。Runtime 提供了集群治理、配置管理、位点治理、负载平衡相干的能力,领有了这些能力,开发者或者用户就只须要关注数据如何获取或如何写入,从而疾速构建数据生态,如与 OceanBase、MySQL、Elasticsearc 等疾速建设连贯,搭建数据集成平台。整个数据集成平台的构建也非常简单,通过 Runtime 提供的 RESTFull API 进行简略调用即可。

第三,提供欠缺的运维工具,方便管理同步工作,同时提供丰盛的 Metrics 信息,不便查看同步工作的 TPS,流量等信息。

02 RocketMQ Connect 两大应用场景

这里为大家整顿了 RocketMQ Connect 的两大应用场景。

场景一,RocketMQ 作为两头媒介,能够将上下游数据买通 ,比方在新旧零碎迁徙的过程中,如果在业务量不大时应用 MySQL 就能够满足业务需要,而随着业务的增长,MySQL 性能无奈满足业务要求时,须要对系统进行降级,选用分布式数据库 OceanBase 晋升零碎性能。

如何将旧零碎数据无缝迁徙到 OceanBase 中呢?在这个场景中 RocketMQ Connect 就能够发挥作用,RocketMQ Connect 能够构建一个从 MySQL 到 OceanBase 的数据管道,实现数据的平滑迁徙。RocketMQ Connect 还能够用在搭建数据湖、搜索引擎、ETL 平台等场景。例如将各个数据源的数据集成到 RocketMQ Topic 当中,指标存储只须要对接 Elasticsearch 就能够构建一个搜寻平台,指标存储如果是数据湖就能够构建一个数据湖平台。

除此之外,RocketMQ 本身也能够作为一个数据源,将一个 RocketMQ 集群的数据同步到另一个集群,能够构建 RocketMQ 多活容灾能力,这是社区正在孵化的 Replicator 能够实现的能力。

场景二,RocketMQ 作为端点。 RocketMQ 的生态中提供了流计算能力组件 -RocketMQ Streams,Connector 将各个存储系统的数据集成到 RocketMQ Topic 当中,上游应用 RocketMQ Streams 流计算的能力就能够构建一个实时的流计算平台。当然也能够配合业务零碎的 Service 实现业务零碎疾速从其它存储对立疾速获取数据的能力。

![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/a90e49f8c2a…)

还能够将 RocketMQ 作为端点的上游,将业务音讯发到 Topic 中,应用 Connector 对数据做长久化或转存的操作。

如此一来,RocketMQ 就具备数据集成能力,能够实现任意任意异构数据源之间的数据同步,同时也具备对立的集群治理、监控能力及配置化搭建数据管道搭建能力,开发者或者用户只须要专一于数据拷贝, 简略配置就能够失去一个具备配置化、低代码、低延时、高可用,反对故障解决和动静扩缩容数据集成平台。

那么,RocketMQ Connect 是如何实现的呢?

03RocketMQ Connect 实现原理

在介绍实现原理前,先来理解两个概念。

概念一,什么是 Connector(连接器)? 它定义数据从哪复制到哪,是从源数据系统读取数据写入 RocketMQ,这种是 SourceConnector,或从 RocketMQ 读数据写入到指标零碎, 这种是 SinkConnector。Connector 决定须要创立工作的数量,从 Worker 接管配置传递给工作。

[]()

概念二,什么是 Task? Task 是 Connector 工作分片的最小调配单位,是理论将源数据源数据复制到 RocketMQ(SourceTask),或者将数据从 RocketMQ 读出写入到指标零碎(SinkTask)真正的执行者,Task 是无状态的,能够动静的启停工作,多个 Task 能够并行执行,Connector 复制数据的并行度次要体现在 Task 上。一个 Task 工作能够了解为一个线程,多个 Task 则以多线程的形式运行。

通过 Connect 的 API 也能够看到 Connector 和 Task 各自的职责,Connector 实现时就曾经确定数据复制的流向,Connector 接收数据源相干的配置,taskClass 获取须要创立的工作类型,通过 taskConfigs 的数量确定工作数量,并且为 Task 调配好配置。Task 拿到配置当前数据源建设连贯并获取数据写入到指标存储。通过上面的两张图能够分明的看到,Connector 和 Task 解决根本流程。

一个 RocketMQ Connect 集群中会有多个 Connector,每个 Connector 会对应一个或多个 Task,这些工作运行在 Worker(过程)中。Worker 过程是 Connector 和 Task 运行环境,它提供 RESTFull 能力,接管 HTTP 申请,将获取到的配置传递给 Connector 和 Task,它还负责启动 Connector 和 Task,保留 Connector 配置信息,保留 Task 同步数据的位点信息,除此以外,Worker 还提供负载平衡能力,Connect 集群高可用、扩缩容、故障解决次要依赖 Worker 的负责平衡能力实现的。Worker 提供服务的流程如下:

[]()

Worker 提供的服务发现及负载平衡的实现原理如下:

服务发现:

用过 RocketMQ 的开发者应该晓得,它的应用很简略,就是发送和接管音讯。生产模式分为集群模式和播送模式两种,集群生产模式下一个 Topic 能够有多个 Consumer 生产音讯,任意一个 Consumer 的上线或下线 RocketMQ 服务端都有感知,并且还能够将客户端高低线信息告诉给其它节点,利用 RocketMQ 这个个性就实现了 Worker 的服务发现。

配置 /Offset 同步:

Connector 的配置 /Offset 信息同步通过每个 Worker 订阅雷同的 Topic,不同 Worker 应用不同的 Consumer Group 实现的,Worker 节点能够通过这种形式生产到雷同 Topic 的所有数据,即 Connector 配置 /Offset 信息,这相似于播送生产模式,这种数据同步模式能够保障任何一个 Worker 挂掉,该 Worker 上的工作仍旧能够在存活的 Worker 失常拉起运行,并且能够获取到工作对应的 Offset 信息实现断点续传,这是故障转移以及高可用能力的根底。

负载平衡

RocketMQ 生产场景中,生产客户端 与 Topic Queue 之间有负载平衡能力,Connector 在这一部分也是相似的,只不过它负载平衡的对象不一样,Connector 是 Worker 节点和 Task 之间的负载平衡,与 RocketMQ 客户端负载平衡一样,能够依据应用场景抉择不同负载平衡算法。

[]()

上文提到过 RocketMQ Connect 提供 RESTFull API 能力。通过 RESTFull AP 能够创立 Connector,治理 Connector 以及查看 Connector 状态,简略列举:

  • POST /connectors/{connector name}
  • GET /connectors/{connector name}/config
  • GET /connectors/{connector name}/status
  • POST /connectors/{connector name}/stop

目前 Connector 反对单机、集群两种部署模式。集群模式至多要有两个节点,能力保障它的高可用。并且集群能够动静减少或者缩小,做到了动态控制晋升集群性能和节省成本节俭的能力。单机模式更多不便了开发者开发测试 Connector。

如何如何实现一个 Connector 呢? 还是联合一个具体的场景看一看,例如业务数据以后是写入 MySQL 数据库中的,心愿将 MySQL 中数据实时同步到数据湖 Hudi 当中。只有实现 MySQL Source Connector、Hudi Sink Connector 这两个 Connector 即可。

上面就以 MySQLSource Connector 为例,来看一下具体的如何实现。

实现 Connector 最次要的就是实现两个 API。第一个是 Connector API,除了实现它生命周期相干的 API 外,还有工作如何调配,是通过 Topic、Table 还是通过数据库的维度去分。第二个 API 是须要创立的 Task,Connector 通过任务分配将相干的配置信息传递给 Task,Task 拿到这些信息,例如数据库账号,明码,IP,端口后就会创立数据库连贯,再通过 MySQL 提供的 BINLOG 机智获取到表的数据,将这些数据写到一个阻塞队列中。Task 有个 Poll 办法,实现 Connector 时只有调用到 Poll 办法时能够获取到数据即可,这样 Connector 就根本写完了。而后打包以 Jar 包的模式提供进去,将它加载到 Worker 的节点中。

创立 Connector 工作后,Worker 中会创立一个或者多个线程,不停的轮询 Poll 办法,从而获取到 MySQL 表中的数据,再通过 RocketMQ Producer 发送到 RocketMQ Broker 中,这就是 Connector 从实现到运行的整体过程(见下图)。

04 RocketMQ Connect 现状与将来

RocketMQ Connect 的倒退历程分为三个阶段。

第一阶段:Preview 阶段

RocketMQ Connect 倒退的初期也即 Preview 阶段,实现了 Open Messaging Connect API 1.0 版本,基于该版本实现了 RocketMQ Connect Runtime,同时提供了 10+ Connector 实现(MySQL,Redis,Kafka,Jms,MongoDB……)。在该阶段,RocketMQ Connect 能够简略实现端到端的数据源同步,但性能还不够欠缺,不反对数据转换,序列化等能力,生态绝对还比拟贫乏。

第二阶段:1.0 阶段

在 1.0 阶段,Open Messaging Connect API 进行了降级,反对 Schema、Transform,Converter 等能力,在此基础上对 Connect Runtime 也进行了重大降级,对数据转换,序列化做了反对,简单 Schema 也做了欠缺的反对。该阶段的 API、Runtime 能力曾经根本欠缺,在此基础上,还有 30+ Connecotor 实现,笼罩了 CDC、JDBC、SFTP、NoSQL、缓存 Redis、HTTP、AMQP、JMS、数据湖、实时数仓、Replicator、等 Connector 实现,还做了 Kafka Connector Adaptor 能够运行 Kafka 生态的 Connector。

第三阶段:2.0 阶段

RocketMQ Connect 以后处于这个阶段,重点倒退 Connector 生态,当 RocketMQ 的 Connector 生态达到 100 + 时,RocketMQ 基本上能够与任意的一个数据系统去做连贯。

目前 RocketMQ 社区正在和 OceanBase 社区单干,进行 OceanBase 到 RocketMQ Connect 的研发工作,提供 JDBC 和 CDC 两种模式接入模式,后续会在社区中公布,欢送感兴趣的同学试用。

05 总结

RocketMQ 是一个牢靠的数据集成组件,具备分布式、伸缩性、故障容错等能力,能够实现 RocketMQ 与其余数据系统之间的数据流入与流出。通过 RocketMQ Connect 能够实现 CDC,构建数据湖,联合流计算可实现数据价值。

正文完
 0