关于数据同步:开源数据集成平台SeaTunnelMySQL实时同步到es

一、前言最近,我的项目有几个表要从 MySQL 实时同步到 另一个 MySQL,也有同步到 ElasticSearch 的。目前,公司生产环境同步,用的是 阿里云的 DTS,每个同步工作每月 500多元,有点小贵。其余环境:MySQL同步到ES,用的是 CloudCanal,不反对 数据转换,增加同步字段比拟麻烦,社区版限度5个工作,不够用;MySQL同步到MySQL,用的是 debezium,不反对写入 ES。恰好3年前用过 SeaTunnel 的 前身 WaterDrop,那就开始吧。本文以 2.3.1 版本,Ubuntu 零碎为例二、开源数据集成平台SeaTunnel1. 简介SeaTunnel 是 Apache 软件基金会下的一个高性能开源大数据集成工具,为数据集成场景提供灵便易用、易扩大并反对千亿级数据集成的解决方案。Seaunnel 为实时(CDC)和批量数据提供高性能数据同步能力,反对十种以上数据源,曾经在B站、腾讯云、字节等数百家公司应用。能够抉择 SeaTunnel Zeta 引擎上运行,也能够在 Apache Flink 或 Spark 引擎上运行。 2. 装置下载,这里抉择 2.3.1 版本,执行 tar -xzvf apache-seatunnel-*.tar.gz 解压缩因为 2.3.2 版本,MySQL-CDC 找不到驱动,bug修复详见 Caused by: java.sql.SQLException: No suitable driver at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298) at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106) ... 20 more ... 11 more at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)3. 装置 connectors 插件执行 bash bin/install-plugin.sh,国内倡议先配置 maven 镜像,不然容易失败 或者 慢官网文档写着执行 sh bin/install-plugin.sh,我在 Ubuntu 20.04.2 LTS 上执行报错(bin/install-plugin.sh: 54: Bad substitution),我提了PR 4. 编写配置文件config 目录下,新建配置文件:如 mysql-es-test.conf增加 env 配置因为是 实时同步,这里 job.mode = "STREAMING",execution.parallelism 是 并发数 ...

July 5, 2023 · 2 min · jiezi

关于数据同步:海量数据同步首选-SeaTunnel-Zeta-引擎正式发布

点亮 ⭐️ Star · 照亮开源之路https://github.com/apache/inc... Apache SeaTunnel(incubating) 正式推出 2.3.0 正式版本,并正式公布本人的外围同步引擎 Zeta!此外,SeaTunnel 2.3.0 还带来了许多大家期待已久的新个性,包含反对 CDC、以及近百种 Connector 等。 文档 https://seatunnel.apache.org/... 下载地址 https://seatunnel.apache.org/... 01 次要更新SeaTunnel 本人的同步引擎—Zeta 正式公布Zeta Engine 是一个专门为数据同步场景设计和开发的数据同步引擎,更快、更稳固、更省资源也更加易用,在寰球多种开源同步引擎测试比对状况下,Zeta 性能都遥遥领先。SeaTunnel Zeta 引擎,经验了数个研发版本,于 2022 年十月公布 beta 版本,通过社区探讨决定,将其命名为 Zeta(宇宙中最快的星,社区同学认为这充分体现了该引擎的个性),在社区用户贡献者的致力下,咱们明天正式公布 Zeta Engine 生产可用版本,其个性包含: 简略易用,新的引擎尽量减小第三方服务的依赖,能够不依赖 Zookeeper、HDFS 等大数据组件实现集群治理、快照存储和集群 HA 性能。这对于那些没有大数据平台或者不违心依赖大数据平台进行数据同步的用户十分有用。更省资源,在 CPU 层面,Zeta Engine 外部应用 Dynamic Thread Sharing(动静线程共享)技术 ,在实时同步的场景下,如果表的数量很多但每张表的数据量又很小,Zeta Engine 会将这些同步工作在共享线程中运行,这种形式能够缩小不必要的线程创立,节俭系统资源。在读取和数据写入端,Zeta Engine 的设计指标是尽量减少 JDBC 连贯的数量。在 CDC 场景下,Zeta Engine 会尽量复用日志读取解析资源。更稳固,在此版本中,Zeta Engine 将数据同步的工作以 Pipeline 作为 Checkpoint 和容错的最小粒度,一个 task 的失败只会影响到和它有上下游关系的 task,尽量避免 task 失败造成整个 Job 失败或回滚。同时,对于源端数据有存储工夫限度的场景,Zeta Engine 反对开启数据 Cache,主动缓存从源端读取的数据,再由上游工作读取缓存数据并写入指标端。此场景下,即便指标端呈现故障导致数据无奈写入,也不会影响源端的失常读取,避免源端数据过期被删除。更疾速,Zeta Engine 的执行打算优化器会以减小数据可能的网络传输为指标来做执行打算的优化,从而升高数据序列化和反序列化带来的整体同步性能的损耗,更快地实现数据同步操作。当然,它还反对速度限制,让同步作业以一个正当的速度进行。全场景数据同步反对。SeaTunnel 的指标是反对离线批量同步下的全量同步和增量同步,反对实时同步以及 CDC。 ...

January 5, 2023 · 4 min · jiezi

关于数据同步:百亿级数据同步如何基于-SeaTunnel-的-ClickHouse-实现

作者 | Apache SeaTunnel(Incubating) Contributor 范佳 整顿 | 测试工程师 冯秀兰 对于百亿级批数据的导入,传统的 JDBC 形式在一些海量数据同步场景下的体现并不尽如人意。为了提供更快的写入速度,Apache SeaTunnel(Incubating) 在刚刚公布的 2.1.1 版本中提供了 ClickhouseFile-Connector 的反对,以实现 Bulk load 数据写入。 Bulk load 指把海量数据同步到指标 DB 中,目前 SeaTunnel 已实现数据同步到 ClickHouse 中。 在 Apache SeaTunnel(Incubating) 4 月 Meetup 上,Apache SeaTunnel(Incubating) Contributor 范佳分享了《基于 SeaTunnel 的 ClickHouse bulk load 实现》,具体解说了 ClickHouseFile 高效解决海量数据的具体实现原理和流程。 感激本文整顿志愿者 测试工程师 冯秀兰 对 Apache SeaTunnel(Incubating) 我的项目的反对! 本次演讲次要蕴含七个局部: ClickHouse Sink 现状ClickHouse Sink 弱场景ClickHouseFile 插件介绍ClickHouseFile 核心技术点ClickHouseFile 插件的实现解析插件能力比照前期优化方向 范 佳白鲸开源 高级工程师 ...

January 5, 2023 · 2 min · jiezi

关于数据同步:解读重要功能特性新手入门-Apache-SeaTunnel-CDC

引言点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/inc... 为什么说 CDC 是SeaTunnel平台中的一个重要性能个性?明天这篇文章跟大家分享一下 CDC 是什么?目前市面上的 CDC 工具现有的痛点有哪些?SeaTunnel面对这些痛点设计的架构指标是什么?另外包含社区的瞻望和目前在做的一些事件。 总体来说,市面上曾经有这么多 CDC 工具了,咱们为什么还要反复去造一个轮子? 带着这个疑难,我先给大家简要介绍下 CDC 是什么! CDC 的全称是 Change Data Capture,它就是一个数据变更捕捉。变更数据捕捉 (CDC) 应用 Server 代理来记录利用于表的插入、更新和删除流动。 这样,就能够按易于应用的关系格局提供这些更改的详细信息。 将为批改的行捕捉列信息以及将更改利用于指标环境所需的元数据,并将其存储在镜像所跟踪源表的列构造的更改表中。 CDC的应用场景异构数据库之间的数据同步或备份 / 建设数据分析计算平台在 MySQL,PostgreSQL,MongoDB 等等数据库之间相互同步数据,或者把这些数据库的数据同步到 Elasticsearch 里以供全文搜寻,当然也能够基于 CDC 对数据库进行备份。而数据分析系统能够通过订阅感兴趣的数据表的变更,来获取所须要的剖析数据进行解决,不须要把剖析流程嵌入到已有零碎中,以实现解耦。 微服务之间共享数据状态在微服务大行其道的今日,微服务之间信息共享始终比较复杂,CDC 也是一种可能的解决方案,微服务能够通过 CDC 来获取其余微服务数据库的变更,从而获取数据的状态更新,执行本人相应的逻辑。 更新缓存 / CQRS 的 Query 视图更新通常缓存更新都比拟难搞,能够通过 CDC 来获取数据库的数据更新事件,从而管制对缓存的刷新或生效。 而 CQRS 是什么又是一个很大的话题,简略来讲,你能够把 CQRS 了解为一种高配版的读写拆散的设计模式。举个例子,咱们后面讲了能够利用 CDC 将 MySQL 的数据同步到 Elasticsearch 中以供搜寻,在这样的架构里,所有的查问都用 ES 来查,但在想批改数据时,并不间接批改 ES 里的数据,而是批改上游的 MySQL 数据,使之产生数据更新事件,事件被消费者生产来更新 ES 中的数据,这就基本上是一种 CQRS 模式。而在其余 CQRS 的零碎中,也能够利用相似的形式来更新查问视图。 ...

January 4, 2023 · 3 min · jiezi

关于数据同步:SeaTunnel-在天翼云数据集成平台的探索实践

点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/inc... 讲师简介 周利旺 天翼云大数据开发工程师 在11月26日,Apache SeaTunnel& APISIX 联结 Meetup 期间,天翼云科技大数据开发工程师周利旺给大家分享了天翼云数据集成平台引入 SeaTunnel 过程中的一些摸索实际,心愿对大家有所帮忙! 天翼云数据集成平台基于 Apache Nifi 二次封装而成,然而对于一些特定的需要 Apache Nifi 不可能很好地满足,因而须要引入第三方的数据集成工具进行能力上的补足。而SeaTunnel 恰是能用 Nifi 互补的好工具。本次讲座介绍了 SeaTunnel 整合到天翼云数据平台在架构层面的设计与思考。 本文次要包含四个局部: ● Apache SeaTunnel 简介 ● Apache Nifi 简介 ● SeaTunnel 与 Nifi 整合计划 ● 天翼云数据集成平台 ● 参加开源教训与心得 ✦01 SeaTunnel简介✦咱们平台次要是面向政企客户,目前是以 Apache Nifi 作为外围,辅之以原生的 Flink 利用,在此基础上进行封装和二次开发出各种各样的数据集成利用,提供面向不同行业的解决方案,目前Seatunnel在咱们外部尽管没上生产实践,然而也做了不少的摸索,所以本次议题次要讲下 SeaTunnel 和 Nifi 联合应用的一些教训。因为两者各有优劣,然而在能力上能够舍短取长,从而能够对标更多的性能,满足客户的更多需要。 SeaTunnel 实质上是对 Spark 和 Flink 进行了一层封装,当初新版本又退出了自研的 SeaTunnel 引擎,用户能够通过编辑配置文件来疾速构建工作流。 ...

December 26, 2022 · 3 min · jiezi

关于数据同步:马蜂窝毕博分析完这9点工作原理我们最终选择了-Apache-SeaTunnel

点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/inc... 讲师简介 毕博 马蜂窝 数据工程师 在10月15日,Apache SeaTunnel& IoTDB 联结 Meetup 期间,马蜂窝网数据工程师毕博给大家介绍了SeaTunnel的基本原理和相干企业实际思考、马蜂窝大数据开发调度平台典型场景下的痛点和优化思考,并分享了集体参加社区奉献的实践经验,心愿同时能帮忙大家疾速理解SeaTunnel及参加社区建设的门路和技巧。 ✦SeaTunnel的技术原理简介✦SeaTunnel 是一个分布式、高性能的数据集成平台,用于海量数据(离线和实时)的同步和转换 下面这张图展现的是 SeaTunnel 的工作流程,简略来说蕴含3个局部:输出、转换、输入;更简单的数据处理,也无非是几种行为的组合。 以一个同步场景为例,比方将 Kafka 导入到 Elasticsearch ,Kafka 就是 流程中的 Source ,而 Elasticsearch 就是流程中的 Sink 。 如果在导入的过程中,字段列跟待写入的内部数据列不匹配须要做一些列或者类型的转换,或者须要多数据源的 Join,而后做一些数据打宽,扩大字段等解决,那么在这个过程中就须要减少一些 Transform,对应图片两头的局部。 由此可见 SeaTunnel 外围的局部就是 Source、Transform 和 Sink 流程定义。 在 Source 外面咱们能够定义须要的读取数据源,在 Sink 定义数据 Pipeline 最终写出的内部存储,能够通过 Transform 进行两头数据的转换,能够应用 SQL 或者自定义的函数等形式。 1.1 SeaTunnel 连接器API V1版本 架构分析对于一个成熟组件框架来说,从设计模式到 API 的设计实现上,肯定有比拟独特的中央,从而使得框架有比拟好的扩展性。 SeaTunnel的架构次要包含三局部: 1、SeaTunnel 根底 API ; ...

November 4, 2022 · 4 min · jiezi

关于数据同步:SeaTunnel连接器V1到V2的架构演进与探究

外围概念整个SeaTunnel设计的外围是利用设计模式中的管制翻转或者叫依赖注入,次要概括为以下两点: 下层不依赖底层,两者都依赖形象流程代码与业务逻辑应该拆散对于整个数据处理过程,大抵能够分为以下几个流程:输出 -> 转换 -> 输入,对于更简单的数据处理,本质上也是这几种行为的组合: 内核原理SeaTunnel将数据处理的各种行为形象成Plugin,并应用SPI技术进行动静注册,设计思路保障了框架的灵便扩大,在以上实践根底上,数据的转换与解决还须要做对立的形象,譬如比拟有名异构数据源同步工具DataX,也同样对数据单条记录做了对立形象。 在SeaTunnel V1架构体系中,因为背靠Spark和Flink两大分布式计算框架,框架曾经为咱们做好了数据源形象的工作,Flink的DataStream、Spark的DataFrame曾经是对接入数据源的高度形象,在此基础上咱们只须要在插件中解决这些数据抽象即可,同时借助于Flink和Spark提供的SQL接口,还能够将每一次解决完的数据注册成表,不便用SQL进行解决,缩小代码的开发量。 实际上SeaTunnel最初的目标是主动生成一个Spark或者一个Flink作业,并提交到集群中运行。 SeaTunnel连接器V1 API解析架构概览目前在我的项目dev分支下,SeaTunnel连接器V1 API所在的模块如图所示: seatunnel-api-base:根底API层形象seatunnel-api-flink:Flink引擎API层形象seatunnel-api-spark:Spark引擎API层形象seatunnel-api-base在根底模块中,有以下代码: 为了更清晰的了解这些类之间的关系,笔者这里制作了一张简略的UML类图: 整个API的组成能够大体分为三局部: 插件层:提供Source、Transform、Sink插件定义执行层:提供执行器和运行上下文定义构建层:提供命令行接口定义构建层接管命令参数构建执行器,执行器初始化上下文,上下文注册插件并启动插件,至此,整个作业开始运行。 seatunnel-api-spark在Spark引擎API层有以下代码: 同样,笔者也整顿了一张UML类图来示意它们之间的关系: 整个流程与Base模块统一,在这里笔者不过多赘述,有趣味的读者能够自行观看源码。 seatunnel-api-flink在Flink引擎API层有以下代码: 同样,笔者也整顿了一张UML类图来示意它们之间的关系: 整个流程与Base模块统一,在这里笔者不过多赘述,有趣味的读者能够自行观看源码。 SeaTunnel连接器V1运行原理启动器模块概览整个我的项目的最外层的启动类都放在以下模块中: 跟连接器V1无关的模块如下: seatunnel-core-base:V1根底启动模块seatunnel-core-flink:V1flink引擎启动模块seatunnel-core-flink-sql:V1flink-sql引擎启动模块seatunnel-core-spark:V1spark引擎启动模块执行流程为了更好的了解SeaTunnel V1的启动流程,笔者在这里制作了一张简略的时序图: 程序最外层的启动由start-seatunnel-${engine}.sh开始,用户依据将配置文件从脚本传入,脚本调用org.apache.seatunnel.core.spark.SparkStarter或者org.apache.seatunnel.core.flink.FlinkStarter,实际上这个类只做一个工作:将所有参数拼接成spark-submit或者flink命令,而后脚本接管到spark-submit或者flink命令并提交到集群中;提交到集群中真正执行job的类实际上是org.apache.seatunnel.spark.SeatunnelSpark或是org.apache.seatunnel.flink.SeatunnelFlink,读者如果想间接深刻理解作业启动外围流程的话举荐浏览这两个类的源码。 执行原理SparkSparkSource插件将异构数据源接入为DataFrameSparkTransform插件将SparkSource接入的DataFrame进行转换解决SparkSink插件将SparkTransform解决好的DataFrame写入到指标数据源FlinkFlinkSource插件将异构数据源接入为DataStreamFlinkTransform插件将FlinkSource接入的DataStream进行转换解决SparkSink插件将FlinkTransform解决好的DataStream写入指标数据源SeaTunnel连接器V2 API解析架构概览目前在我的项目dev分支下,SeaTunnel连接器V2 API所在的模块如图所示: seatunnel-api:连接器V2所有的API定义数据抽象SeaTunnel连接器V2 API在数据层面做了形象,定义了本人的数据类型,这是与连接器V1最大的不同点,连接器V1应用的是引擎数据抽象的能力,然而连接器V2本人提供的这个异构数据源对立的能力: 在所有的Source连接器和Sink连接器中,解决的都是SeaTunnelRow类型数据,同时SeaTunnel也对内设置了数据类型标准,所有通过Source接入进来的数据会被对应的连接器转化为SeaTunnelRow送到上游。 API Common在API common包下有以下接口的定义: 在这里因为篇幅关系只介绍比拟外围的几个接口: PluginIdentifierInterface:插件惟一标识SeaTunnelContext:SeaTunnel利用上下文,每个SeaTunnel Job蕴含的上下文对象,保留了以后源表的元数据SeaTunnelPluginLifeCycle:插件申明周期具体接口中有哪些办法读者能够自行浏览对应类的源码,在这里笔者将不过多赘述。 API Source在API source包下有以下接口的定义: 在这里因为篇幅关系只介绍比拟外围的几个接口: Boundedness:标识数据有界无界,连接器V2设计理念基于批流一体,此接口用于区分流式作业还是批式作业Collector:数据收集器,用于收集Source连接器产生的数据并推往上游SeaTunnelSource:Source插件基类,所有的Source连接器主类均继承于这个接口SourceReader:Source插件真正解决数据接入的接口SourceSplit:数据分片接口,连接器V2反对数据并行读入,晋升数据接入效率SourceSplitEnumerator:数据分片器,此接口用于散发数据分片至对应的SourceReader中API Sink在API sink包下有以下接口的定义: 在这里因为篇幅关系只介绍比拟外围的几个接口: SeaTunnelSink:Sink插件基类,所有的Sink连接器均继承于这个接口SinkWriter:Sink插件真正实现数据输入的接口SinkCommitter:用于解决SinkWriter#prepareCommit返回的数据信息,蕴含须要提交的事务信息,连接器V2在Sink设计上提供二阶段提交的接口,从而使连接器有了实现Exactly-Once的可能性SinkAggregatedCommitter:用于解决SinkWriter#prepareCommit返回的数据信息,蕴含须要提交的事务信息等,用于在单节点多任务一起提交事务信息,这样能够防止提交阶段二局部失败导致状态不统一的问题(注:在实现连接器时优先实现这个接口,这样会兼容性更强)小结 连接器V2在架构分层上与计算引擎进行解耦,定义了本人的元数据定义以及数据类型定义,在API层和计算引擎层减少了翻译层,将SeaTunnel自定义的数据源通过翻译层接入到引擎中,从而真正实现接口和引擎拆散的目标。 SeaTunnel连接器V2运行原理启动器模块概览整个我的项目的最外层的启动类都放在以下模块中: 跟连接器V2无关的模块如下: ...

October 9, 2022 · 1 min · jiezi

关于数据同步:从启动到关闭-SeaTunnel211源码解析

点亮 ⭐️ Star · 照亮开源之路 GitHub:https://github.com/apache/incubator-seatunnel 目录 本文转载自Adobee Chen的博客-CSDN博客,看看是否有你感兴趣的吧! 如有出错,请多斧正。 一、启动脚本解析 二、源码解析 01 入口 02 execute()外围办法 其中 BaseSource、BaseTransform、BaseSink都是接口、都实现Plugin接口。他们的实现类就是对应的插件类型execute()办法向下走,创立一个执行环境。调用plugin.prepare(env)最初启动 execution.start(sources, transforms, sinks);执行flink 代码程序最初敞开一、启动脚本解析在 /bin/start-seatunnel-flink.sh #!/bin/bashfunction usage() { echo "Usage: start-seatunnel-flink.sh [options]" echo " options:" echo " --config, -c FILE_PATH Config file" echo " --variable, -i PROP=VALUE Variable substitution, such as -i city=beijing, or -i date=20190318" echo " --check, -t Check config" echo " --help, -h Show this help message"} if [[ "[email protected]" = *--help ]] || [[ "[email protected]" = *-h ]] || [[ $# -le 1 ]]; then usage exit 0fi is_exist() { if [ -z $1 ]; then usage exit -1 fi} PARAMS=""while (( "$#" )); do case "$1" in -c|--config) CONFIG_FILE=$2 is_exist ${CONFIG_FILE} shift 2 ;; -i|--variable) variable=$2 is_exist ${variable} java_property_value="-D${variable}" variables_substitution="${java_property_value} ${variables_substitution}" shift 2 ;; *) # preserve positional arguments PARAMS="$PARAMS $1" shift ;; esacdone if [ -z ${CONFIG_FILE} ]; then echo "Error: The following option is required: [-c | --config]" usage exit -1fi # set positional arguments in their proper placeeval set -- "$PARAMS" BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"APP_DIR=$(dirname ${BIN_DIR})CONF_DIR=${APP_DIR}/configPLUGINS_DIR=${APP_DIR}/libDEFAULT_CONFIG=${CONF_DIR}/application.confCONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG} assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink*.jar) if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then source ${CONF_DIR}/seatunnel-env.shfi string_trim() { echo $1 | awk '{$1=$1;print}'} export JVM_ARGS=$(string_trim "${variables_substitution}") exec ${FLINK_HOME}/bin/flink run \ ${PARAMS} \ -c org.apache.seatunnel.SeatunnelFlink \ ${assemblyJarName} --config ${CONFIG_FILE}其中: 启动脚本能接管的 --config --variable --check(还不反对) --help ...

October 8, 2022 · 6 min · jiezi

关于数据同步:可视化任务编排拖拉拽-Scaleph-基于-Apache-SeaTunnel的数据集成

这次在 6月 Meetup 为大家带来的是Scaleph 基于 Apache SeaTunnel (Incubating) 的数据集成介绍,心愿你有所播种。 本次演讲次要包含五个局部: 对于ScalephScaleph架构&性能简介SeaTunnel社区奉献零碎演示开发计划https://www.bilibili.com/vide... ↑↑直播回放视频入口↑↑ 王奇 Apache SeaTunnel Contributor 搜寻举荐工程师,大数据 Java 开发 01 Scaleph的缘起我最早是从事搜寻举荐工作,在团队外面负责保护Dump零碎,次要是为咱们的搜索引擎提供喂数据的性能,先给大家介绍在保护过程中次要的5个痛点问题: 及时性和稳定性搜寻举荐是电商平台的外围在线零碎,尤其是对数据的及时性和稳定性要求十分高。因为搜寻举荐会接管整个电商平台 C 端的绝大部分流量,所以一旦服务呈现稳定的时候,可能就造成服务受损,导致用户的体验大打折扣。 业务简单/大宽表设计Dump零碎会将电商平台的商品、类目、品牌、店铺、商品标签、数仓的实时/离线数据及模型数据会通过一系列的预处理,最终输入成一张大宽表,在这个过程中,业务的复杂性和多变性,会侵入到Dump零碎中来,所以应答的技术挑战绝对就更高了。 全量+实时索引全量索引每天跑一次,次要目标是更新 T+1 频率更新的数据。当全量索引完结之后,咱们会通过实时索引去刷新须要实时更新的数据,比如说商品的价格、库存变动相干的信息。 数据联动更新咱们的上游数据起源十分多,有音讯队列、数据库、大数据相干的存储以及 dubbo 接口,因为是大宽表设计,以商品索引为例,大宽表会以商品为主,如果是店铺索引,会以店铺为主,依据数据的不同,上游的数据变动不肯定是商品或店铺维度的,数据也会产生肯定的联动更新。 数据兜底搜寻举荐服务过后也承当着C端绝大部分的流量,当公司其余团队的性能跟不上的时候,他们个别会把数据通过Dump零碎送到搜索引擎,而后咱们团队代替他们返回给Web页面,防止后续对他们发动二次申请调用。 同时,如果其余团队的业务零碎产生了脏数据,也须要Dump零碎做数据保护,避免数据外泄给C端用户造成不好的影响,所以开发保护中的时候,也有很大的难度。 02 为什么引入Flink?作为国内 Flink 的晚期使用者,阿里巴巴在搜寻举荐畛域领有悠久的历史和胜利的教训,在搜寻举荐团队开发保护 Dump 零碎的职业经验促使我开始关注应用Flink做A/B试验的报表、数据实时流之外的相干工作,次要也就是用Flink来实现Dump零碎为搜寻去提供Dump平台的工作,应用Flink做数据集成有5个长处: 人造的分布式反对:Flink反对多种部署和运行形式,单机、yarn、Kubernetes;低提早、海量吞吐:在泛滥大厂中利用宽泛;生态反对:Flink提供了泛滥开箱即用的connector,反对csv、avro数据格式,kafka、pulsar等音讯零碎以及泛滥的存储系统,和大数据生态紧密结合;基于分布式轻量异步快照机制实现exactly-once语义,为工作的失败、重启、迁徙、降级等提供数据一致性保障;metrics。Flink除了本身提供的 metrics 外,metrics 框架能够让用户为工作开发自定义的 metrics,丰盛监控指标;03 为什么抉择SeaTunnel?起初接触到 SeaTunnel 的时候,很喜爱 SeaTunnel 的设计理念!SeaTunnel 是运行在 Flink 和Spark 之上,高性能和分布式海量数据的下一代集成框架。 重要的是它是开箱即用的,并且针对现有的生态能够实现无缝集成,因为运行在 Flink 和 Spark 之上,能够很不便地接入公司现有的 Flink 和 Spark 的基础设施。另一方面 SeaTunnel 也有很多的生产案例,在进入 Apache 基金会孵化之后,社区十分沉闷,将来可期。 ...

July 4, 2022 · 3 min · jiezi

关于数据同步:SOFARegistry-源码|数据同步模块解析

文|宋国磊(GitHub ID:glmapper ) SOFAStack Committer、华米科技高级研发工程师 负责华米账号零碎、框架治理方向的开发 本文 3024 字 浏览 10 分钟 |前言| 本文次要围绕 SOFARegistry 的数据同步模块进行了源码的解析。其中,对于注册核心的概念以及 SOFARegistry 的基础架构将不再做具体的论述,有趣味的小伙伴在《海量数据下的注册核心 - SOFARegistry 架构介绍》[1]一文中获取相干介绍。 本文次要的思路大抵分为以下 2 个局部: - 第一局部,借助 SOFARegistry 中的角色分类来阐明哪些角色之间会进行数据同步; - 第二局部,对数据同步的具体实现进行解析。 PART. 1——SOFARegistry 的角色分类 如上图所示,SOFARegistry 蕴含以下 4 个角色: Client 提供利用接入服务注册核心的根本 API 能力,利用零碎通过依赖客户端 JAR 包,通过编程形式调用服务注册核心的服务订阅和服务公布能力。 SessionServer 会话服务器,负责承受 Client 的服务公布和服务订阅申请,并作为一个中间层将写操作转发 DataServer 层。SessionServer 这一层可随业务机器数的规模的增长而扩容。 DataServer 数据服务器,负责存储具体的服务数据,数据按 dataInfoId 进行一致性 Hash 分片存储,反对多正本备份,保证数据高可用。这一层可随服务数据量的规模的增长而扩容。\ MetaServer 元数据服务器,负责保护集群 SessionServer 和 DataServer 的统一列表,作为 SOFARegistry 集群外部的地址发现服务,在 SessionServer 或 DataServer 节点变更时能够告诉到整个集群。 在这 4 个角色中,MetaServer 作为元数据服务器自身不解决理论的业务数据,仅负责保护集群 SessionServer 和 DataServer 的统一列表,不波及数据同步问题。 ...

June 29, 2022 · 2 min · jiezi

关于数据同步:数据传输-利用-DTLE-将-MySQL-数据同步到-DBLE

作者:任仲禹 爱可生 DBA 团队成员,善于故障剖析和性能优化,文章相干技术问题,欢送大家一起探讨。 本文起源:原创投稿 *爱可生开源社区出品,原创内容未经受权不得随便应用,转载请分割小编并注明起源。 背景源于某客户的需要,存在线上某业务 MySQL 库因为数据量及业务读写压力较大,须要将业务数据迁徙到 DBLE 分布式数据库,但同时因为业务为 7x24h,可能停机的工夫窗口较短,所以须要思考数据实时同步的计划。 过往 DBLE 的业务上线根本为全新部署,数据实时同步的状况极少施行,去年 DTLE 公布后这一问题失去了些改善,明天咱们来实际下。 环境筹备1. 指标端 DBLE 集群部署装置 DBLE 软件、后端分片 MySQL 库过程略 DBLE 版本 3.20.10.8、MySQL版本 5.7.25sharding.xml <?xml version="1.0"?> <!DOCTYPE dble:sharding SYSTEM "sharding.dtd"> <dble:sharding xmlns:dble="http://dble.cloud/" version="4.0"> <schema name="dtle" sqlMaxLimit="-1" shardingNode="dn_01"> <singleTable name="gtid_executed_v4" shardingNode="dn_01" sqlMaxLimit="-1"></singleTable> </schema> <schema name="ren" sqlMaxLimit="-1" shardingNode="dn_01"> <shardingTable name="test" shardingNode="dn_01,dn_02,dn_03,dn_04" sqlMaxLimit="-1" shardingColumn="id" function="func_jumphash"></shardingTable> </schema> <shardingNode name="dn_03" dbGroup="dh-mysql-cluster02" database="dh_dn_03"></shardingNode> <shardingNode name="dn_04" dbGroup="dh-mysql-cluster02" database="dh_dn_04"></shardingNode> <shardingNode name="dn_02" dbGroup="dh-mysql-cluster01" database="dh_dn_02"></shardingNode> <shardingNode name="dn_01" dbGroup="dh-mysql-cluster01" database="dh_dn_01"></shardingNode> <function name="func_jumphash" class="jumpStringHash"> <property name="partitionCount">4</property> <property name="hashSlice">0:-1</property> </function> </dble:sharding>db.xml <?xml version="1.0"?> <!DOCTYPE dble:db SYSTEM "db.dtd"> <dble:db xmlns:dble="http://dble.cloud/" version="4.0"> <dbGroup name="dh-mysql-cluster02" rwSplitMode="0" delayThreshold="-1"> <heartbeat timeout="0" errorRetryCount="0">show slave status</heartbeat> <dbInstance name="10.186.61.13-3326-dh-1" url="10.186.61.13:3326" user="dbleuser" password="jpfmxIeMt1vxAJ6zd6Q10PGRRi+Qj023Dl+YXuOr3C4VXTdV5+GJaOIv5iVmWCwpXcucn/zi02HVlT7ADX+m6Q==" maxCon="100" minCon="10" primary="true" readWeight="0" id="mysql-i63009" usingDecrypt="true"></dbInstance> </dbGroup> <dbGroup name="dh-mysql-cluster01" rwSplitMode="0" delayThreshold="-1"> <heartbeat timeout="0" errorRetryCount="0">show slave status</heartbeat> <dbInstance name="10.186.61.11-3316-dh-1" url="10.186.61.11:3316" user="dbleuser" password="QQWRF80AGNbx4jIAx/b2Ww7Myol1+ntlyzGmA1A3PXVISmRD/i5pgRnLLwYsXoLmH0jiv1qZAkqIBHv6Yg/XAg==" maxCon="100" minCon="10" primary="true" readWeight="0" id="mysql-47vn84" usingDecrypt="true"></dbInstance> </dbGroup> </dble:db>user.xml <?xml version="1.0"?> <!DOCTYPE dble:user SYSTEM "user.dtd"> <dble:user xmlns:dble="http://dble.cloud/" version="4.0"> <managerUser name="root" password="CrjpLhvVJkHk0EPW35Y07dUeTimf52zMqClYQkIAN3/dqiG1DVUe9Zr4JLh8Kl+1KH1zd7YTKu5w04QgdyQeDw==" usingDecrypt="true"></managerUser> <shardingUser name="ren" schemas="ren,dtle" password="P+C2KazQiS3ZZ6uojBJ91MZIqYqGczspQ/ebyBZOC9xKAAkAFrqEDC9OPn/vObAyO4P8Zu3vHQJ+rljM040Kdg==" usingDecrypt="true" readOnly="false" maxCon="0" blacklist="default_black_list"></shardingUser> </dble:user>2. 源端和指标端测试表创立源端 MySQL 数据库软件装置略源端MySQL与指标端DBLE都须要创立测试表名:test use ren; CREATE TABLE `test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `city` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `dt` datetime DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `idx_ctiy` (`city`) ) ENGINE=InnoDB;3. 部署单节点DTLEDTLE社区版本GitHub下载地址:https://github.com/actiontech...下载实现后间接rpm装置(本示例应用外部QA验证版本) ...

March 29, 2022 · 7 min · jiezi

关于数据同步:otter数据增量同步

otter介绍原理形容: 基于Canal开源产品,获取数据库增量日志数据典型管理系统架构,manager(web治理)+node(工作节点) manager运行时推送同步配置到node节点 node节点将同步状态反馈到manager上 基于zookeeper,解决分布式状态调度的,容许多node节点之间协同工作.mysql数据库中自带的复制技术可分成三步: master将扭转记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,能够通过show binlog events进行查看)slave将master的binary log events拷贝到它的中继日志(relay log),这里是I/O thread线程slave重做中继日志中的事件,将扭转反映它本人的数据,这里是SQL thread线程基于canal&otter的复制技术和mysql复制相似,具备类比性. Canal对应于I/O thread,接管Master Binary Log.Otter对应于SQL thread,通过Canal获取Binary Log数据,执行同步插入数据库.两者的区别在于: otter目前嵌入式依赖canal,部署为同一个jvm,目前设计为不产生Relay Log,数据不落地.otter目前容许自定义同步逻辑,解决各类需要. a. ETL转化. 比方Slave上指标表的表名,字段名,字段类型不同,字段个数不同等. b. 异构数据库. 比方Slave能够是oracle或者其余类型的存储,nosql等. c. M-M部署,解决数据一致性问题d. 基于manager部署,不便监控同步状态和治理同步工作.部署manager官网下载 manager.deployer-4.2.18.tar.gz,otter-otter-4.2.18.zip https://github.com/alibaba/ot... 初始化otter库 解压包otter-otter-4.2.18.zip,找到数据库初始化脚本 otter-otter-4.2.18\manager\deployer\src\main\resources\sql\otter-manager-schema.sql 新建数据库otter,执行初始化语句otter-manager-schema.sql CREATE DATABASE /*!32312 IF NOT EXISTS*/ `otter` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;USE `otter`;SET sql_mode='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';CREATE TABLE `ALARM_RULE` ( `ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `MONITOR_NAME` varchar(1024) DEFAULT NULL, `RECEIVER_KEY` varchar(1024) DEFAULT NULL, `STATUS` varchar(32) DEFAULT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `DESCRIPTION` varchar(256) DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `MATCH_VALUE` varchar(1024) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, PRIMARY KEY (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `AUTOKEEPER_CLUSTER` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `CLUSTER_NAME` varchar(200) NOT NULL, `SERVER_LIST` varchar(1024) NOT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `CANAL` ( `ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `NAME` varchar(200) DEFAULT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `CANALUNIQUE` (`NAME`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `CHANNEL` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `CHANNELUNIQUE` (`NAME`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `COLUMN_PAIR` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `SOURCE_COLUMN` varchar(200) DEFAULT NULL, `TARGET_COLUMN` varchar(200) DEFAULT NULL, `DATA_MEDIA_PAIR_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_DATA_MEDIA_PAIR_ID` (`DATA_MEDIA_PAIR_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `COLUMN_PAIR_GROUP` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `DATA_MEDIA_PAIR_ID` bigint(20) NOT NULL, `COLUMN_PAIR_CONTENT` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_DATA_MEDIA_PAIR_ID` (`DATA_MEDIA_PAIR_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DATA_MEDIA` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `NAMESPACE` varchar(200) NOT NULL, `PROPERTIES` varchar(1000) NOT NULL, `DATA_MEDIA_SOURCE_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `DATAMEDIAUNIQUE` (`NAME`,`NAMESPACE`,`DATA_MEDIA_SOURCE_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DATA_MEDIA_PAIR` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `PULLWEIGHT` bigint(20) DEFAULT NULL, `PUSHWEIGHT` bigint(20) DEFAULT NULL, `RESOLVER` text DEFAULT NULL, `FILTER` text DEFAULT NULL, `SOURCE_DATA_MEDIA_ID` bigint(20) DEFAULT NULL, `TARGET_DATA_MEDIA_ID` bigint(20) DEFAULT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `COLUMN_PAIR_MODE` varchar(20) DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID` (`PIPELINE_ID`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DATA_MEDIA_SOURCE` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `TYPE` varchar(20) NOT NULL, `PROPERTIES` varchar(1000) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `DATAMEDIASOURCEUNIQUE` (`NAME`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DELAY_STAT` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `DELAY_TIME` bigint(20) NOT NULL, `DELAY_NUMBER` bigint(20) NOT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID_GmtModified_ID` (`PIPELINE_ID`,`GMT_MODIFIED`,`ID`), KEY `idx_Pipeline_GmtCreate` (`PIPELINE_ID`,`GMT_CREATE`), KEY `idx_GmtCreate_id` (`GMT_CREATE`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `LOG_RECORD` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NID` varchar(200) DEFAULT NULL, `CHANNEL_ID` varchar(200) NOT NULL, `PIPELINE_ID` varchar(200) NOT NULL, `TITLE` varchar(1000) DEFAULT NULL, `MESSAGE` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `logRecord_pipelineId` (`PIPELINE_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `NODE` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `IP` varchar(200) NOT NULL, `PORT` bigint(20) NOT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `NODEUNIQUE` (`NAME`,`IP`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `PIPELINE` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, `CHANNEL_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `PIPELINEUNIQUE` (`NAME`,`CHANNEL_ID`), KEY `idx_ChannelID` (`CHANNEL_ID`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `PIPELINE_NODE_RELATION` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NODE_ID` bigint(20) NOT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `LOCATION` varchar(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID` (`PIPELINE_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `SYSTEM_PARAMETER` ( `ID` bigint(20) unsigned NOT NULL, `VALUE` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;CREATE TABLE `TABLE_HISTORY_STAT` ( `ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `FILE_SIZE` bigint(20) DEFAULT NULL, `FILE_COUNT` bigint(20) DEFAULT NULL, `INSERT_COUNT` bigint(20) DEFAULT NULL, `UPDATE_COUNT` bigint(20) DEFAULT NULL, `DELETE_COUNT` bigint(20) DEFAULT NULL, `DATA_MEDIA_PAIR_ID` bigint(20) DEFAULT NULL, `PIPELINE_ID` bigint(20) DEFAULT NULL, `START_TIME` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `END_TIME` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_DATA_MEDIA_PAIR_ID_END_TIME` (`DATA_MEDIA_PAIR_ID`,`END_TIME`), KEY `idx_GmtCreate_id` (`GMT_CREATE`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `TABLE_STAT` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `FILE_SIZE` bigint(20) NOT NULL, `FILE_COUNT` bigint(20) NOT NULL, `INSERT_COUNT` bigint(20) NOT NULL, `UPDATE_COUNT` bigint(20) NOT NULL, `DELETE_COUNT` bigint(20) NOT NULL, `DATA_MEDIA_PAIR_ID` bigint(20) NOT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID_DataMediaPairID` (`PIPELINE_ID`,`DATA_MEDIA_PAIR_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `THROUGHPUT_STAT` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `TYPE` varchar(20) NOT NULL, `NUMBER` bigint(20) NOT NULL, `SIZE` bigint(20) NOT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `START_TIME` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `END_TIME` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID_Type_GmtCreate_ID` (`PIPELINE_ID`,`TYPE`,`GMT_CREATE`,`ID`), KEY `idx_PipelineID_Type_EndTime_ID` (`PIPELINE_ID`,`TYPE`,`END_TIME`,`ID`), KEY `idx_GmtCreate_id` (`GMT_CREATE`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `USER` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `USERNAME` varchar(20) NOT NULL, `PASSWORD` varchar(20) NOT NULL, `AUTHORIZETYPE` varchar(20) NOT NULL, `DEPARTMENT` varchar(20) NOT NULL, `REALNAME` varchar(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `USERUNIQUE` (`USERNAME`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DATA_MATRIX` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `GROUP_KEY` varchar(200) DEFAULT NULL, `MASTER` varchar(200) DEFAULT NULL, `SLAVE` varchar(200) DEFAULT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `GROUPKEY` (`GROUP_KEY`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE IF NOT EXISTS `meta_history` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `gmt_create` datetime NOT NULL COMMENT '创立工夫', `gmt_modified` datetime NOT NULL COMMENT '批改工夫', `destination` varchar(128) DEFAULT NULL COMMENT '通道名称', `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名', `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量', `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id', `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog利用的工夫戳', `use_schema` varchar(1024) DEFAULT NULL COMMENT '执行sql时对应的schema', `sql_schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema', `sql_table` varchar(1024) DEFAULT NULL COMMENT '对应的table', `sql_text` longtext DEFAULT NULL COMMENT '执行的sql', `sql_type` varchar(256) DEFAULT NULL COMMENT 'sql类型', `extra` text DEFAULT NULL COMMENT '额定的扩大信息', PRIMARY KEY (`id`), UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`), KEY `destination` (`destination`), KEY `destination_timestamp` (`destination`,`binlog_timestamp`), KEY `gmt_modified` (`gmt_modified`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表构造变动明细表';CREATE TABLE IF NOT EXISTS `meta_snapshot` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `gmt_create` datetime NOT NULL COMMENT '创立工夫', `gmt_modified` datetime NOT NULL COMMENT '批改工夫', `destination` varchar(128) DEFAULT NULL COMMENT '通道名称', `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名', `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量', `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id', `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog利用的工夫戳', `data` longtext DEFAULT NULL COMMENT '表构造数据', `extra` text DEFAULT NULL COMMENT '额定的扩大信息', PRIMARY KEY (`id`), UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`), KEY `destination` (`destination`), KEY `destination_timestamp` (`destination`,`binlog_timestamp`), KEY `gmt_modified` (`gmt_modified`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表构造记录表快照表';insert into USER(ID,USERNAME,PASSWORD,AUTHORIZETYPE,DEPARTMENT,REALNAME,GMT_CREATE,GMT_MODIFIED) values(null,'admin','801fc357a5a74743894a','ADMIN','admin','admin',now(),now());insert into USER(ID,USERNAME,PASSWORD,AUTHORIZETYPE,DEPARTMENT,REALNAME,GMT_CREATE,GMT_MODIFIED) values(null,'guest','471e02a154a2121dc577','OPERATOR','guest','guest',now(),now());linux服务器新建文件夹/data/program/otter/manager,解压manager.deployer-4.2.18.tar.gz ...

March 21, 2022 · 9 min · jiezi

关于数据同步:系统间数据同步场景及方法

一、场景示例本文的题目不是很顾名思义,然而你肯定在日常的开发工作中遇到过相似场景。 1. 场景一假如公司从事电商业务,除了电商零碎外,公司外部个别也会有一个客户关系零碎(CRM)。如果CRM零碎心愿获取到电商零碎注册的客户信息,以实现本人的后续业务。该如何实现? 2. 场景二仍然假如公司从事电商业务,电商零碎中通常有订单模块,模块中会有订单表。为了进步查问性能防止连表Join,订单表除了存储用户id数据外也存储用户名作为冗余字段。用户侧业务用户名能够被更改,用户一旦更改了本人的用户名,订单表的数据也须要被更新。如何实现? 场景一是新增数据同步场景,场景二是更新数据同步场景。 3. 概念导入在持续注释之前,咱们再细化两个概念。 3.1 如何定义“零碎”题目中的“零碎”是被打上了引号,表明它其实一个很宽泛的定义,大到能够是一个业务功残缺的利用零碎平台(电商零碎、客户关系零碎),小到能够是一个单体利用中的业务模块(用户模块、订单模块)。这里的“零碎”不是按复杂程度、代码规模和实现形式划分的。咱们将存储状态(数据)的业务组件定义为“零碎”。 依据这个定义,一个微服务体系中微服务组件(用户核心、订单核心、库存核心、评估核心),分布式系统中主节点、从节点(各类中间件的架构)都是咱们文中的“零碎”。因为两个零碎存储着状态(数据),所以“零碎”间有可能遇到须要状态(数据)同步的场景。 3.2 数据同步办法“零碎”间同步数据的形式次要有三种: 接口调用形式音讯队列形式数据库或文件形式(场景比拟小众,本文不开展形容)三、接口调用形式实现咱们应用接口调用的形式实现场景一、二中需要。 1. 场景一的实现形式首先,如果电商零碎和CRM不是同期间上线的话,咱们要思考电商零碎历史存量数据问题,通常状况历史数据在首次数据同步的时候离线一次性导入。存量数据解决之后咱们就须要思考电商零碎的增量数据即新注册用户。依据接口调用方向有两种形式,数据推送模式和数据拉取模式。 1.1 数据推送模式数据推送是一种绝对合乎直觉的形式,这是形式的语言形容就是“当你有新数据的时候通知我一声”,这里的“通知”指的就是接口调用,就是电商零碎在有新用户注册的时候调用CRM提供的接口,将新增用户信息以接口入参的模式传递。 1.2 数据拉取模式就像推和拉是一对反义词一样,数据拉取与数据推送在接口调用方向上是相同,由CRM调用电商零碎提供的接口,获取新增用户信息。 1.2.1 接口调用机会因为CRM不晓得电商零碎何时会有新用户注册,所以它只能采纳轮询的形式定期调用接口。 1.2.2 获取数据形式如何判断新增数据?就是在某个基线节点之后减少的数据,这个基线节点能够是工夫这个人造枯燥递增的数值也能够是其余枯燥递增的数值(数据库自增ID)。所以电商零碎须要提供这样一个接口,以工夫(或自增ID)为入参,批量查问出入参工夫(自增ID)之后的所有数据。这也就要求电商零碎和CRM都要存储用户的创立工夫(或自增ID)参数用于接口调用。 注:如果咱们用工夫作为接口参数,极其状况下可能呈现数据缺失问题,读者能够本人想一想起因 2. 场景二的实现形式和场景一一样,如果后期没有思考过数据同步,也存在历史存量数据问题,也须要一次性批量同步。咱们重点关注增量数据,即前期产生的用户名更改数据同步问题。仍然有两种形式: 2.1 数据推送模式和场景一的实现相似,只不过这次的语言形容变成了“当你有数据变动的时候通知我一声”,由用户模块调用订单模块接口,将用户名变动后的信息以接口入参的模式传递。 2.2 数据拉取模式由订单模块调用用户模块提供的接口,获取更新用户的信息。 2.2.1 接口调用机会和场景一雷同,订单模块也不晓得用户模块的数据何时被更新,所以它也只能采纳轮询的形式定期调用接口。 2.2.2 获取数据形式这里和场景一有些不同,场景一咱们获取的是新增数据,这里咱们要获取的是数据变动状况,所以有两种实现形式 逐条查问依照用户惟一标识(用户ID)查问用户详情,判断订单模块中冗余的用户名和查问进去的值比拟,不同就表明用户名已扭转,进行更新操作。此时只须要用户模块提供一个依据用户惟一标识(用户ID)查问用户详情的接口(更简略专用的接口就是依据ID查问用户名)。显然在须要大量数据同步的场景下这不是一个好的实现形式。 批量查问和场景一相似,用户模块须要提供一个以工夫为入参,批量查问进去入参工夫之后产生变动的所有数据的接口。这时就须要用户模块和订单模块都存储更新工夫。(在这里在略微深刻一下,通常用户表都会存储更新工夫,然而这个更新工夫在用户任意属性变更的时候都会被更新,这样咱们会获取到所有的用户属性有变更数据而不只是用户名更改的数据。要保障获取数据精准,除非咱们在用户表独自存储用户名更新工夫,显然这样得失相当) 注:和场景一一样,应用工夫作为接口参数,极其状况下也可能呈现数据缺失问题。 通过对场景一、二的实现办法的形容咱们可见。推送和拉取行为是绝对数据方提供方和数据需求方而言,从数据提供方调用数据需求方接口就是数据推送,反之就是数据拉取,尽管两种形式接口的调用方向不同,然而数据的流向是雷同的,都是通过数据提供方想数据需求方转移。 3. 推送模式和拉取模式的区别接下来咱们从两个方面,形容推送和拉取的区别: 3.1 零碎间依赖关系推送的形式下,数据提供方依赖数据需求方,场景一中电商零碎调用CRM零碎的接口,所以电商系统对CRM零碎产生了依赖,同理场景二中用户模块也依赖了订单模块。在这种依赖关系咱们嗅到了“坏滋味”。 首先,直觉通知咱们数据提供方不应该依赖数据需求方。是你须要我的数据,我反而要调用你的接口并解决异样(异样如何解决也是有需求方业务决定的,例如是否能够重试,是否容许数据失落),一旦你的业务发生变化导致接口或者异样解决逻辑变动,我不得不批改代码以应答。 其次,随着业务演进,将来可能有更多的数据需求方退出,每减少一个数据需求方都会造成数据提供方的代码层面批改,这也是咱们不能承受的。 反观拉取的形式,因为是数据需求方调用数据提供方的接口,依赖方向转变了,无论数据需求方业务如何变动或者减少了更多的数据需求方,数据提供方均不受影响。 通常不好的耦合关系不会影响以后零碎实现和运行,而是影响系统对将来业务变动的扩展性。如果咱们能确定将来业务不会变动,也就不须要关怀这类耦合关系,显然这种相对稳固的零碎少之又少。 3.2 数据同步的时效性和精确性时效性方面推送模式下,数据提供方能够在数据变动时同时调用数据需求方接口传递变动。而在拉取模式下,需求方无奈感知数据何时变动,只能定期轮询接口。时效性和接口轮序频率成正比,为了减少时效性,只能进步接口轮询频率。然而过高的频率通常不可取:一来在数据变动不频繁的场景下,频繁的接口调用是一种节约,二来过于频繁的接口调用也会个被调用方带来比拟大的性能压力。所以咱们须要在时效性和性能衡量,找到一个平衡点。 精确性方面推送模式下,数据提供方能够准确的感知本身哪些数据发生变化并通过调用接口传递给数据需求方。而在拉取模式下,数据需求方无奈感知哪些数据变动,只能通过单方保留数据变动状态(新增工夫、更新工夫)、减少查问条件的形式进行范畴查问或者采纳全量查问后一一比对的形式(不实用新增数据场景)。相较于数据推送模式,数据拉取模式很难及时的获取到精准的同步数据信息。 4. 两种模式如何抉择两种模式各有利弊,从零碎间依赖关系角度思考,采纳拉取模式的零碎有更好的业务扩展性、能更好的应答将来需要变动。然而拉取模式在数据同步时效性、精确性和零碎性能上有人造的劣势,实现起来也更加简单。所以在具体计划抉择上咱们须要依据具体场景判断。数据同步时效性和零碎性能要求不高的场景能够采纳推送形式以进步系统对将来业务的扩大能力,反之须要采纳推送的形式以零碎扩展性换取数据同步时效性和零碎性能。 5. 推送模式优化让咱们在回看一下推送模式的问题,既然推送模式场景下数据需求方的业务变动可能对数据提供方造成影响。咱们可不可以对推送模式进行优化以解决一个零碎变动影响到另一个零碎的问题。咱们能够借鉴设计模式中“策略模式”的思维,将零碎分为固定局部和变动局部,对变动的局部进行肯定维度的形象而后在将这种形象拆散进来独立实现。这种将固定逻辑和变动逻辑拆散的做法,肯定水平上能够解决咱们遇到的问题。 5.1 如何革新将变动进行形象首先,咱们将数据提供方中变动的逻辑,如调用哪些数据需求方的接口、调用接口签名(名称、地址、入参出参)是什么、如何解决调用过程中的异样等这些有可能变动并且须要代码实现的逻辑形象为通过自定义配置的形式实现,造成一个“接口配置核心”的概念。 对形象进行实现接下来,咱们就须要编码实现“接口配置核心”的逻辑。配置中的主要职责就是对数据提供方提供对立的数据同步接口,一旦数据提供方调用数据同步接口,“配置核心”依据提前定义的配置调用后端数据需求方的接口实现数据同步工作,同时会依据配置解决接口调用过程中产生的各种异常情况。 注:图中箭头示意接口调用方向和依赖关系方向。 革新后,数据提供方调用接口核心的接口传递须要同步的数据,有接口配置核心负责调用个数据需求方的接口进行二次传递。数据提供方只依赖配置核心,不再依赖各个数据需求方,接口配置核心尽管依赖后端的各个数据需求方,然而数据需求方的种种变动曾经通过配置来实现,所以无论数据需求方业务如何变动,咱们只须要调整配置核心配置即可,这种变动并不能传导到数据需求方。通过引入配置核心,咱们无效隔离了数据需求方对数据提供方的影响。 5.2 革新后的长处和有余长处解决了拉取模式下数据需求方影响数据提供方的问题。 毛病减少了零碎实现复杂度,显然“接口配置核心”实现起来并容易。 如何接口调用的相干行为形象为能够配置的形式并加以实现如何解决接口调用过程中遇到的种种异常情况如何解决接口调用的性能问题升高了零碎稳定性,进步了运维难度。更多的模块意味着更多的问题点,咱们须要更多运维监控伎俩在第一工夫发现并解决问题,以确保零碎整体稳固运行。 5.3 应用倡议是否要应用这种形式,还须要综合思考。在一个业务逻辑绝对简略的小型零碎,咱们没有必要引入如此简单的实现形式,依据场景须要从下面介绍的拉取模式和推送模式抉择即可,而对于业务逻辑较简单的大型零碎,能够引入这种形式以升高零碎间简单的依赖关系。 四、音讯队列模式实现注:图中箭头示意接口调用方向和依赖关系方向。 数据提供方以音讯的形式向数据需求方传递须要同步数据信息,数据提供发向音讯队列发送一条音讯,音讯队列负责将音讯发送给数据需求方。数据提供方和数据需求方都只依赖音讯队列,他们两者之间没有依赖也就不会相互影响。 置信大家对音讯队列并不生疏,在此我也不再赘述。你有没有发现音讯队列很像同咱们前文所述“接口配置核心”模式,只是实现略有不同。 “接口配置核心”中接口签名转换成了音讯队列中的主题、音讯概念“接口配置核心”中接口调用行为就像音讯队列中的公布订阅模式“接口配置核心”中异样解决机制就是音讯队列中的保障投递机制“接口配置核心”中的性能要求也能够通过音讯队列中的削峰作用来满足所以音讯队列能够齐全代替咱们上文所说的“接口配置核心”性能。免去咱们反复造轮子之苦。 ...

March 8, 2022 · 1 min · jiezi