本文作者:周波,阿里云智能高级开发工程师, Apache RocketMQ Committer 。

01 从问题中来的RocketMQ Connect

在电商零碎、金融零碎及物流零碎,咱们常常能够看到 RocketMQ 的身影。起因不难理解,随着数字化转型范畴的扩充及过程的放慢,业务零碎的数据也在每日暴增,此时为了保证系统的稳固运行,就须要把运行压力分担进来。RocketMQ就负责着这样的角色,它的异步音讯解决与高并发读写能力,决定了零碎底层的重构不会影响下层利用的性能。而RocketMQ的另一个劣势——可伸缩能力,使零碎在面临流量的不确定性时,实现对流量的缓冲解决。此外,RocketMQ的程序设计个性使其成为一个人造的排队引擎,例如,三个利用同时对一个后盾引擎发动申请,确保不引起“撞车”事变。因而,RocketMQ被用在异步解耦、削峰填谷以及事务音讯等场景中。

然而,数字化转型浪潮也带来了更多用户对数据价值的关注——如何让数据产生更大利用价值?RocketMQ本身不具备数据分析能力,然而有不少用户心愿从 RocketMQ Topic 中获取数据并进行在线或离线的数据分析。然而,应用市面上的数据集成或数据同步工具,将 RocketMQ Topic 数据同步到一些剖析零碎中尽管是一种可行计划,却会引入新的组件,造成数据同步的链路较长,时延绝对较高,用户体验不佳。

举个例子,假如业务场景中应用 OceanBase 作为数据存储,同时心愿将这些数据同步到 Elasticsearch进行全文搜寻,有两种可行的数据同步计划。

计划一:从 OceanBase 中获取数据,写入 Elasticsearch组件并进行数据同步,在数据源较少时此计划没什么问题,一旦数据增多,开发和保护都非常复杂,此时就要用到第二种计划。

计划二:引入消息中间件对上下游进行解藕,这能解决第一种计划的问题,然而一些较为简单的问题还没有齐全解决。比方,如何将数据从源数据同步到指标零碎并保障高性能,如果保障同步工作的局部节点挂掉,数据同步仍然失常进行,节点复原仍然能够断点续传,同时随着数据管道的增多,如何治理数据管道也变得十分困难。

总的来说,数据集成过程中的挑战次要有五个。

挑战一: 数据源多,市面上可能有上百个数据源,且各数据源的零碎差别较大,实现任意数据源之间的数据同步工作量较大,研发周期很长。

挑战二: 高性能问题,如何高效地从源数据系统同步到目标数据系统,并保障其性能。

挑战三: 高可用问题,即Failover能力,当一个节点挂掉是否这个节点的工作就进行了,工作重新启动是否还能够断点续传。

挑战四: 弹性扩缩容能力,依据零碎流量动静减少或缩小节点数量,既能通过扩容满足高峰期业务,也能在低峰期缩减节点,节省成本。

挑战五: 数据管道的治理运维,随着数据管道的增多,运维监控的数据管道也会变得越来越简单,如何高效治理监控泛滥的同步工作。

面对上述挑战 RocketMQ 如何解决?

第一,标准化数据集成 API (Open Messaging Connect API)。在 RocketMQ 生态中减少 Connect 组件,一方面对数据集成过程形象,形象规范的数据格式以及形容数据的 Schema,另一方面对同步工作进行形象,工作的创立、分片都形象成一套标准化的流程。

第二,基于规范的 API 实现 Connect Runtime。Runtime提供了集群治理、配置管理、位点治理、负载平衡相干的能力,领有了这些能力,开发者或者用户就只须要关注数据如何获取或如何写入,从而疾速构建数据生态,如与OceanBase、MySQL、Elasticsearc等疾速建设连贯,搭建数据集成平台。整个数据集成平台的构建也非常简单,通过 Runtime 提供的 RESTFull API 进行简略调用即可。

第三,提供欠缺的运维工具,方便管理同步工作,同时提供丰盛的Metrics 信息,不便查看同步工作的TPS,流量等信息。

02 RocketMQ Connect 两大应用场景

这里为大家整顿了 RocketMQ Connect的两大应用场景。

场景一,RocketMQ 作为两头媒介,能够将上下游数据买通,比方在新旧零碎迁徙的过程中,如果在业务量不大时应用 MySQL 就能够满足业务需要,而随着业务的增长,MySQL 性能无奈满足业务要求时,须要对系统进行降级,选用分布式数据库OceanBase 晋升零碎性能。

如何将旧零碎数据无缝迁徙到 OceanBase 中呢?在这个场景中RocketMQ Connect 就能够发挥作用,RocketMQ Connect能够构建一个从 MySQL 到 OceanBase 的数据管道,实现数据的平滑迁徙。RocketMQ Connect 还能够用在搭建数据湖、搜索引擎、ETL 平台等场景。例如将各个数据源的数据集成到 RocketMQ Topic当中,指标存储只须要对接 Elasticsearch 就能够构建一个搜寻平台,指标存储如果是数据湖就能够构建一个数据湖平台。

除此之外,RocketMQ 本身也能够作为一个数据源,将一个 RocketMQ 集群的数据同步到另一个集群,能够构建 RocketMQ 多活容灾能力,这是社区正在孵化的Replicator能够实现的能力。

场景二,RocketMQ 作为端点。 RocketMQ 的生态中提供了流计算能力组件-RocketMQ Streams,Connector将各个存储系统的数据集成到RocketMQ Topic当中,上游应用 RocketMQ Streams流计算的能力就能够构建一个实时的流计算平台。当然也能够配合业务零碎的Service 实现业务零碎疾速从其它存储对立疾速获取数据的能力。

![](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/a90e49f8c2a...)

还能够将 RocketMQ 作为端点的上游,将业务音讯发到 Topic 中,应用 Connector 对数据做长久化或转存的操作。

如此一来,RocketMQ 就具备数据集成能力,能够实现任意任意异构数据源之间的数据同步,同时也具备对立的集群治理、监控能力及配置化搭建数据管道搭建能力,开发者或者用户只须要专一于数据拷贝,简略配置就能够失去一个具备配置化、低代码、低延时、高可用,反对故障解决和动静扩缩容数据集成平台。

那么, RocketMQ Connect 是如何实现的呢?

03RocketMQ Connect 实现原理

在介绍实现原理前,先来理解两个概念。

概念一,什么是 Connector(连接器)? 它定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是SourceConnector,或从RocketMQ读数据写入到指标零碎,这种是SinkConnector。Connector决定须要创立工作的数量,从Worker接管配置传递给工作。

[]()

概念二,什么是 Task ? Task 是 Connector 工作分片的最小调配单位,是理论将源数据源数据复制到 RocketMQ(SourceTask),或者将数据从RocketMQ 读出写入到指标零碎(SinkTask)真正的执行者,Task是无状态的,能够动静的启停工作,多个Task能够并行执行,Connector 复制数据的并行度次要体现在 Task 上。一个 Task 工作能够了解为一个线程,多个 Task 则以多线程的形式运行。

通过Connect的API也能够看到Connector和Task各自的职责,Connector实现时就曾经确定数据复制的流向,Connector接收数据源相干的配置,taskClass获取须要创立的工作类型,通过taskConfigs的数量确定工作数量,并且为Task调配好配置。Task拿到配置当前数据源建设连贯并获取数据写入到指标存储。通过上面的两张图能够分明的看到,Connector和Task解决根本流程。

一个 RocketMQ Connect 集群中会有多个 Connector ,每个 Connector 会对应一个或多个 Task,这些工作运行在 Worker(过程)中。Worker过程是Connector和Task运行环境,它提供RESTFull能力,接管HTTP申请,将获取到的配置传递给Connector和Task,它还负责启动Connector和Task,保留Connector配置信息,保留Task同步数据的位点信息,除此以外,Worker还提供负载平衡能力,Connect集群高可用、扩缩容、故障解决次要依赖Worker的负责平衡能力实现的。Worker 提供服务的流程如下:

[]()

Worker 提供的服务发现及负载平衡的实现原理如下:

服务发现:

用过 RocketMQ 的开发者应该晓得,它的应用很简略,就是发送和接管音讯。生产模式分为集群模式和播送模式两种,集群生产模式下一个 Topic 能够有多个 Consumer 生产音讯,任意一个 Consumer 的上线或下线 RocketMQ 服务端都有感知,并且还能够将客户端高低线信息告诉给其它节点,利用RocketMQ这个个性就实现了 Worker的服务发现。

配置/Offset同步:

Connector 的配置/Offset信息同步通过每个 Worker订阅雷同的 Topic,不同Worker 应用不同的 Consumer Group 实现的, Worker 节点能够通过这种形式生产到雷同Topic的所有数据,即Connector配置/Offset信息,这相似于播送生产模式,这种数据同步模式能够保障任何一个 Worker 挂掉,该Worker上的工作仍旧能够在存活的 Worker 失常拉起运行 ,并且能够获取到工作对应的 Offset 信息实现断点续传, 这是故障转移以及高可用能力的根底。

负载平衡

RocketMQ 生产场景中,生产客户端 与Topic Queue 之间有负载平衡能力,Connector 在这一部分也是相似的,只不过它负载平衡的对象不一样,Connector 是 Worker 节点和 Task之间的负载平衡,与RocketMQ客户端负载平衡一样,能够依据应用场景抉择不同负载平衡算法。

[]()

上文提到过 RocketMQ Connect 提供 RESTFull API能力。通过 RESTFull AP能够创立 Connector,治理Connector 以及查看 Connector 状态,简略列举:

  • POST /connectors/{connector name}
  • GET /connectors/{connector name}/config
  • GET /connectors/{connector name}/status
  • POST /connectors/{connector name}/stop

目前 Connector 反对单机、集群两种部署模式。集群模式至多要有两个节点,能力保障它的高可用。并且集群能够动静减少或者缩小,做到了动态控制晋升集群性能和节省成本节俭的能力。单机模式更多不便了开发者开发测试 Connector 。

如何如何实现一个 Connector呢? 还是联合一个具体的场景看一看,例如业务数据以后是写入MySQL 数据库中的,心愿将MySQL中数据实时同步到数据湖Hudi当中。只有实现 MySQL Source Connector 、Hudi Sink Connector这两个 Connector 即可。

上面就以 MySQLSource Connector 为例,来看一下具体的如何实现。

实现 Connector最次要的就是实现两个 API 。第一个是 Connector API ,除了实现它生命周期相干的 API 外,还有工作如何调配,是通过 Topic、Table 还是通过数据库的维度去分。第二个API是须要创立的 Task,Connector 通过任务分配将相干的配置信息传递给 Task, Task 拿到这些信息,例如数据库账号,明码,IP,端口后就会创立数据库连贯,再通过 MySQL 提供的 BINLOG 机智获取到表的数据,将这些数据写到一个阻塞队列中。Task有个 Poll 办法,实现Connector时只有调用到Poll办法时能够获取到数据即可,这样 Connector 就根本写完了。而后打包以Jar包的模式提供进去,将它加载到 Worker 的节点中。

创立 Connector 工作后, Worker 中会创立一个或者多个线程,不停的轮询 Poll办法,从而获取到MySQL表中的数据,再通过 RocketMQ Producer 发送到 RocketMQ Broker中,这就是 Connector 从实现到运行的整体过程(见下图)。

04 RocketMQ Connect 现状与将来

RocketMQ Connect 的倒退历程分为三个阶段。

第一阶段:Preview 阶段

RocketMQ Connect倒退的初期也即Preview阶段,实现了Open Messaging Connect API 1.0 版本,基于该版本实现了 RocketMQ Connect Runtime ,同时提供了 10+ Connector 实现(MySQL,Redis,Kafka,Jms,MongoDB……)。在该阶段,RocketMQ Connect 能够简略实现端到端的数据源同步,但性能还不够欠缺,不反对数据转换,序列化等能力,生态绝对还比拟贫乏。

第二阶段:1.0 阶段

在 1.0 阶段,Open Messaging Connect API 进行了降级,反对Schema、Transform,Converter等能力,在此基础上对 Connect Runtime 也进行了重大降级,对数据转换,序列化做了反对,简单Schema也做了欠缺的反对。该阶段的 API、Runtime 能力曾经根本欠缺,在此基础上,还有30+ Connecotor 实现,笼罩了 CDC、JDBC、SFTP、NoSQL、缓存Redis、HTTP、AMQP、JMS、数据湖、实时数仓、Replicator、等Connector实现,还做了Kafka Connector Adaptor能够运行Kafka生态的Connector。

第三阶段:2.0 阶段

RocketMQ Connect以后处于这个阶段,重点倒退Connector生态,当 RocketMQ 的 Connector生态达到 100 + 时,RocketMQ 基本上能够与任意的一个数据系统去做连贯。

目前 RocketMQ 社区正在和 OceanBase 社区单干,进行 OceanBase 到 RocketMQ Connect 的研发工作,提供 JDBC 和 CDC 两种模式接入模式,后续会在社区中公布,欢送感兴趣的同学试用。

05 总结

RocketMQ是一个牢靠的数据集成组件,具备分布式、伸缩性、故障容错等能力,能够实现RocketMQ与其余数据系统之间的数据流入与流出。通过 RocketMQ Connect 能够实现CDC,构建数据湖,联合流计算可实现数据价值。