关于数据同步:开源数据集成平台SeaTunnelMySQL实时同步到es

一、前言最近,我的项目有几个表要从 MySQL 实时同步到 另一个 MySQL,也有同步到 ElasticSearch 的。目前,公司生产环境同步,用的是 阿里云的 DTS,每个同步工作每月 500多元,有点小贵。其余环境:MySQL同步到ES,用的是 CloudCanal,不反对 数据转换,增加同步字段比拟麻烦,社区版限度5个工作,不够用;MySQL同步到MySQL,用的是 debezium,不反对写入 ES。恰好3年前用过 SeaTunnel 的 前身 WaterDrop,那就开始吧。本文以 2.3.1 版本,Ubuntu 零碎为例二、开源数据集成平台SeaTunnel1. 简介SeaTunnel 是 Apache 软件基金会下的一个高性能开源大数据集成工具,为数据集成场景提供灵便易用、易扩大并反对千亿级数据集成的解决方案。Seaunnel 为实时(CDC)和批量数据提供高性能数据同步能力,反对十种以上数据源,曾经在B站、腾讯云、字节等数百家公司应用。能够抉择 SeaTunnel Zeta 引擎上运行,也能够在 Apache Flink 或 Spark 引擎上运行。 2. 装置下载,这里抉择 2.3.1 版本,执行 tar -xzvf apache-seatunnel-*.tar.gz 解压缩因为 2.3.2 版本,MySQL-CDC 找不到驱动,bug修复详见 Caused by: java.sql.SQLException: No suitable driver at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298) at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106) ... 20 more ... 11 more at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)3. 装置 connectors 插件执行 bash bin/install-plugin.sh,国内倡议先配置 maven 镜像,不然容易失败 或者 慢官网文档写着执行 sh bin/install-plugin.sh,我在 Ubuntu 20.04.2 LTS 上执行报错(bin/install-plugin.sh: 54: Bad substitution),我提了PR 4. 编写配置文件config 目录下,新建配置文件:如 mysql-es-test.conf增加 env 配置因为是 实时同步,这里 job.mode = "STREAMING",execution.parallelism 是 并发数 ...

July 5, 2023 · 2 min · jiezi

关于数据同步:海量数据同步首选-SeaTunnel-Zeta-引擎正式发布

点亮 ⭐️ Star · 照亮开源之路https://github.com/apache/inc... Apache SeaTunnel(incubating) 正式推出 2.3.0 正式版本,并正式公布本人的外围同步引擎 Zeta!此外,SeaTunnel 2.3.0 还带来了许多大家期待已久的新个性,包含反对 CDC、以及近百种 Connector 等。 文档 https://seatunnel.apache.org/... 下载地址 https://seatunnel.apache.org/... 01 次要更新SeaTunnel 本人的同步引擎—Zeta 正式公布Zeta Engine 是一个专门为数据同步场景设计和开发的数据同步引擎,更快、更稳固、更省资源也更加易用,在寰球多种开源同步引擎测试比对状况下,Zeta 性能都遥遥领先。SeaTunnel Zeta 引擎,经验了数个研发版本,于 2022 年十月公布 beta 版本,通过社区探讨决定,将其命名为 Zeta(宇宙中最快的星,社区同学认为这充分体现了该引擎的个性),在社区用户贡献者的致力下,咱们明天正式公布 Zeta Engine 生产可用版本,其个性包含: 简略易用,新的引擎尽量减小第三方服务的依赖,能够不依赖 Zookeeper、HDFS 等大数据组件实现集群治理、快照存储和集群 HA 性能。这对于那些没有大数据平台或者不违心依赖大数据平台进行数据同步的用户十分有用。更省资源,在 CPU 层面,Zeta Engine 外部应用 Dynamic Thread Sharing(动静线程共享)技术 ,在实时同步的场景下,如果表的数量很多但每张表的数据量又很小,Zeta Engine 会将这些同步工作在共享线程中运行,这种形式能够缩小不必要的线程创立,节俭系统资源。在读取和数据写入端,Zeta Engine 的设计指标是尽量减少 JDBC 连贯的数量。在 CDC 场景下,Zeta Engine 会尽量复用日志读取解析资源。更稳固,在此版本中,Zeta Engine 将数据同步的工作以 Pipeline 作为 Checkpoint 和容错的最小粒度,一个 task 的失败只会影响到和它有上下游关系的 task,尽量避免 task 失败造成整个 Job 失败或回滚。同时,对于源端数据有存储工夫限度的场景,Zeta Engine 反对开启数据 Cache,主动缓存从源端读取的数据,再由上游工作读取缓存数据并写入指标端。此场景下,即便指标端呈现故障导致数据无奈写入,也不会影响源端的失常读取,避免源端数据过期被删除。更疾速,Zeta Engine 的执行打算优化器会以减小数据可能的网络传输为指标来做执行打算的优化,从而升高数据序列化和反序列化带来的整体同步性能的损耗,更快地实现数据同步操作。当然,它还反对速度限制,让同步作业以一个正当的速度进行。全场景数据同步反对。SeaTunnel 的指标是反对离线批量同步下的全量同步和增量同步,反对实时同步以及 CDC。 ...

January 5, 2023 · 4 min · jiezi

关于数据同步:百亿级数据同步如何基于-SeaTunnel-的-ClickHouse-实现

作者 | Apache SeaTunnel(Incubating) Contributor 范佳 整顿 | 测试工程师 冯秀兰 对于百亿级批数据的导入,传统的 JDBC 形式在一些海量数据同步场景下的体现并不尽如人意。为了提供更快的写入速度,Apache SeaTunnel(Incubating) 在刚刚公布的 2.1.1 版本中提供了 ClickhouseFile-Connector 的反对,以实现 Bulk load 数据写入。 Bulk load 指把海量数据同步到指标 DB 中,目前 SeaTunnel 已实现数据同步到 ClickHouse 中。 在 Apache SeaTunnel(Incubating) 4 月 Meetup 上,Apache SeaTunnel(Incubating) Contributor 范佳分享了《基于 SeaTunnel 的 ClickHouse bulk load 实现》,具体解说了 ClickHouseFile 高效解决海量数据的具体实现原理和流程。 感激本文整顿志愿者 测试工程师 冯秀兰 对 Apache SeaTunnel(Incubating) 我的项目的反对! 本次演讲次要蕴含七个局部: ClickHouse Sink 现状ClickHouse Sink 弱场景ClickHouseFile 插件介绍ClickHouseFile 核心技术点ClickHouseFile 插件的实现解析插件能力比照前期优化方向 范 佳白鲸开源 高级工程师 ...

January 5, 2023 · 2 min · jiezi

关于数据同步:解读重要功能特性新手入门-Apache-SeaTunnel-CDC

引言点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/inc... 为什么说 CDC 是SeaTunnel平台中的一个重要性能个性?明天这篇文章跟大家分享一下 CDC 是什么?目前市面上的 CDC 工具现有的痛点有哪些?SeaTunnel面对这些痛点设计的架构指标是什么?另外包含社区的瞻望和目前在做的一些事件。 总体来说,市面上曾经有这么多 CDC 工具了,咱们为什么还要反复去造一个轮子? 带着这个疑难,我先给大家简要介绍下 CDC 是什么! CDC 的全称是 Change Data Capture,它就是一个数据变更捕捉。变更数据捕捉 (CDC) 应用 Server 代理来记录利用于表的插入、更新和删除流动。 这样,就能够按易于应用的关系格局提供这些更改的详细信息。 将为批改的行捕捉列信息以及将更改利用于指标环境所需的元数据,并将其存储在镜像所跟踪源表的列构造的更改表中。 CDC的应用场景异构数据库之间的数据同步或备份 / 建设数据分析计算平台在 MySQL,PostgreSQL,MongoDB 等等数据库之间相互同步数据,或者把这些数据库的数据同步到 Elasticsearch 里以供全文搜寻,当然也能够基于 CDC 对数据库进行备份。而数据分析系统能够通过订阅感兴趣的数据表的变更,来获取所须要的剖析数据进行解决,不须要把剖析流程嵌入到已有零碎中,以实现解耦。 微服务之间共享数据状态在微服务大行其道的今日,微服务之间信息共享始终比较复杂,CDC 也是一种可能的解决方案,微服务能够通过 CDC 来获取其余微服务数据库的变更,从而获取数据的状态更新,执行本人相应的逻辑。 更新缓存 / CQRS 的 Query 视图更新通常缓存更新都比拟难搞,能够通过 CDC 来获取数据库的数据更新事件,从而管制对缓存的刷新或生效。 而 CQRS 是什么又是一个很大的话题,简略来讲,你能够把 CQRS 了解为一种高配版的读写拆散的设计模式。举个例子,咱们后面讲了能够利用 CDC 将 MySQL 的数据同步到 Elasticsearch 中以供搜寻,在这样的架构里,所有的查问都用 ES 来查,但在想批改数据时,并不间接批改 ES 里的数据,而是批改上游的 MySQL 数据,使之产生数据更新事件,事件被消费者生产来更新 ES 中的数据,这就基本上是一种 CQRS 模式。而在其余 CQRS 的零碎中,也能够利用相似的形式来更新查问视图。 ...

January 4, 2023 · 3 min · jiezi

关于数据同步:SeaTunnel-在天翼云数据集成平台的探索实践

点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/inc... 讲师简介 周利旺 天翼云大数据开发工程师 在11月26日,Apache SeaTunnel& APISIX 联结 Meetup 期间,天翼云科技大数据开发工程师周利旺给大家分享了天翼云数据集成平台引入 SeaTunnel 过程中的一些摸索实际,心愿对大家有所帮忙! 天翼云数据集成平台基于 Apache Nifi 二次封装而成,然而对于一些特定的需要 Apache Nifi 不可能很好地满足,因而须要引入第三方的数据集成工具进行能力上的补足。而SeaTunnel 恰是能用 Nifi 互补的好工具。本次讲座介绍了 SeaTunnel 整合到天翼云数据平台在架构层面的设计与思考。 本文次要包含四个局部: ● Apache SeaTunnel 简介 ● Apache Nifi 简介 ● SeaTunnel 与 Nifi 整合计划 ● 天翼云数据集成平台 ● 参加开源教训与心得 ✦01 SeaTunnel简介✦咱们平台次要是面向政企客户,目前是以 Apache Nifi 作为外围,辅之以原生的 Flink 利用,在此基础上进行封装和二次开发出各种各样的数据集成利用,提供面向不同行业的解决方案,目前Seatunnel在咱们外部尽管没上生产实践,然而也做了不少的摸索,所以本次议题次要讲下 SeaTunnel 和 Nifi 联合应用的一些教训。因为两者各有优劣,然而在能力上能够舍短取长,从而能够对标更多的性能,满足客户的更多需要。 SeaTunnel 实质上是对 Spark 和 Flink 进行了一层封装,当初新版本又退出了自研的 SeaTunnel 引擎,用户能够通过编辑配置文件来疾速构建工作流。 ...

December 26, 2022 · 3 min · jiezi

关于数据同步:马蜂窝毕博分析完这9点工作原理我们最终选择了-Apache-SeaTunnel

点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/inc... 讲师简介 毕博 马蜂窝 数据工程师 在10月15日,Apache SeaTunnel& IoTDB 联结 Meetup 期间,马蜂窝网数据工程师毕博给大家介绍了SeaTunnel的基本原理和相干企业实际思考、马蜂窝大数据开发调度平台典型场景下的痛点和优化思考,并分享了集体参加社区奉献的实践经验,心愿同时能帮忙大家疾速理解SeaTunnel及参加社区建设的门路和技巧。 ✦SeaTunnel的技术原理简介✦SeaTunnel 是一个分布式、高性能的数据集成平台,用于海量数据(离线和实时)的同步和转换 下面这张图展现的是 SeaTunnel 的工作流程,简略来说蕴含3个局部:输出、转换、输入;更简单的数据处理,也无非是几种行为的组合。 以一个同步场景为例,比方将 Kafka 导入到 Elasticsearch ,Kafka 就是 流程中的 Source ,而 Elasticsearch 就是流程中的 Sink 。 如果在导入的过程中,字段列跟待写入的内部数据列不匹配须要做一些列或者类型的转换,或者须要多数据源的 Join,而后做一些数据打宽,扩大字段等解决,那么在这个过程中就须要减少一些 Transform,对应图片两头的局部。 由此可见 SeaTunnel 外围的局部就是 Source、Transform 和 Sink 流程定义。 在 Source 外面咱们能够定义须要的读取数据源,在 Sink 定义数据 Pipeline 最终写出的内部存储,能够通过 Transform 进行两头数据的转换,能够应用 SQL 或者自定义的函数等形式。 1.1 SeaTunnel 连接器API V1版本 架构分析对于一个成熟组件框架来说,从设计模式到 API 的设计实现上,肯定有比拟独特的中央,从而使得框架有比拟好的扩展性。 SeaTunnel的架构次要包含三局部: 1、SeaTunnel 根底 API ; ...

November 4, 2022 · 4 min · jiezi

关于数据同步:SeaTunnel连接器V1到V2的架构演进与探究

外围概念整个SeaTunnel设计的外围是利用设计模式中的管制翻转或者叫依赖注入,次要概括为以下两点: 下层不依赖底层,两者都依赖形象流程代码与业务逻辑应该拆散对于整个数据处理过程,大抵能够分为以下几个流程:输出 -> 转换 -> 输入,对于更简单的数据处理,本质上也是这几种行为的组合: 内核原理SeaTunnel将数据处理的各种行为形象成Plugin,并应用SPI技术进行动静注册,设计思路保障了框架的灵便扩大,在以上实践根底上,数据的转换与解决还须要做对立的形象,譬如比拟有名异构数据源同步工具DataX,也同样对数据单条记录做了对立形象。 在SeaTunnel V1架构体系中,因为背靠Spark和Flink两大分布式计算框架,框架曾经为咱们做好了数据源形象的工作,Flink的DataStream、Spark的DataFrame曾经是对接入数据源的高度形象,在此基础上咱们只须要在插件中解决这些数据抽象即可,同时借助于Flink和Spark提供的SQL接口,还能够将每一次解决完的数据注册成表,不便用SQL进行解决,缩小代码的开发量。 实际上SeaTunnel最初的目标是主动生成一个Spark或者一个Flink作业,并提交到集群中运行。 SeaTunnel连接器V1 API解析架构概览目前在我的项目dev分支下,SeaTunnel连接器V1 API所在的模块如图所示: seatunnel-api-base:根底API层形象seatunnel-api-flink:Flink引擎API层形象seatunnel-api-spark:Spark引擎API层形象seatunnel-api-base在根底模块中,有以下代码: 为了更清晰的了解这些类之间的关系,笔者这里制作了一张简略的UML类图: 整个API的组成能够大体分为三局部: 插件层:提供Source、Transform、Sink插件定义执行层:提供执行器和运行上下文定义构建层:提供命令行接口定义构建层接管命令参数构建执行器,执行器初始化上下文,上下文注册插件并启动插件,至此,整个作业开始运行。 seatunnel-api-spark在Spark引擎API层有以下代码: 同样,笔者也整顿了一张UML类图来示意它们之间的关系: 整个流程与Base模块统一,在这里笔者不过多赘述,有趣味的读者能够自行观看源码。 seatunnel-api-flink在Flink引擎API层有以下代码: 同样,笔者也整顿了一张UML类图来示意它们之间的关系: 整个流程与Base模块统一,在这里笔者不过多赘述,有趣味的读者能够自行观看源码。 SeaTunnel连接器V1运行原理启动器模块概览整个我的项目的最外层的启动类都放在以下模块中: 跟连接器V1无关的模块如下: seatunnel-core-base:V1根底启动模块seatunnel-core-flink:V1flink引擎启动模块seatunnel-core-flink-sql:V1flink-sql引擎启动模块seatunnel-core-spark:V1spark引擎启动模块执行流程为了更好的了解SeaTunnel V1的启动流程,笔者在这里制作了一张简略的时序图: 程序最外层的启动由start-seatunnel-${engine}.sh开始,用户依据将配置文件从脚本传入,脚本调用org.apache.seatunnel.core.spark.SparkStarter或者org.apache.seatunnel.core.flink.FlinkStarter,实际上这个类只做一个工作:将所有参数拼接成spark-submit或者flink命令,而后脚本接管到spark-submit或者flink命令并提交到集群中;提交到集群中真正执行job的类实际上是org.apache.seatunnel.spark.SeatunnelSpark或是org.apache.seatunnel.flink.SeatunnelFlink,读者如果想间接深刻理解作业启动外围流程的话举荐浏览这两个类的源码。 执行原理SparkSparkSource插件将异构数据源接入为DataFrameSparkTransform插件将SparkSource接入的DataFrame进行转换解决SparkSink插件将SparkTransform解决好的DataFrame写入到指标数据源FlinkFlinkSource插件将异构数据源接入为DataStreamFlinkTransform插件将FlinkSource接入的DataStream进行转换解决SparkSink插件将FlinkTransform解决好的DataStream写入指标数据源SeaTunnel连接器V2 API解析架构概览目前在我的项目dev分支下,SeaTunnel连接器V2 API所在的模块如图所示: seatunnel-api:连接器V2所有的API定义数据抽象SeaTunnel连接器V2 API在数据层面做了形象,定义了本人的数据类型,这是与连接器V1最大的不同点,连接器V1应用的是引擎数据抽象的能力,然而连接器V2本人提供的这个异构数据源对立的能力: 在所有的Source连接器和Sink连接器中,解决的都是SeaTunnelRow类型数据,同时SeaTunnel也对内设置了数据类型标准,所有通过Source接入进来的数据会被对应的连接器转化为SeaTunnelRow送到上游。 API Common在API common包下有以下接口的定义: 在这里因为篇幅关系只介绍比拟外围的几个接口: PluginIdentifierInterface:插件惟一标识SeaTunnelContext:SeaTunnel利用上下文,每个SeaTunnel Job蕴含的上下文对象,保留了以后源表的元数据SeaTunnelPluginLifeCycle:插件申明周期具体接口中有哪些办法读者能够自行浏览对应类的源码,在这里笔者将不过多赘述。 API Source在API source包下有以下接口的定义: 在这里因为篇幅关系只介绍比拟外围的几个接口: Boundedness:标识数据有界无界,连接器V2设计理念基于批流一体,此接口用于区分流式作业还是批式作业Collector:数据收集器,用于收集Source连接器产生的数据并推往上游SeaTunnelSource:Source插件基类,所有的Source连接器主类均继承于这个接口SourceReader:Source插件真正解决数据接入的接口SourceSplit:数据分片接口,连接器V2反对数据并行读入,晋升数据接入效率SourceSplitEnumerator:数据分片器,此接口用于散发数据分片至对应的SourceReader中API Sink在API sink包下有以下接口的定义: 在这里因为篇幅关系只介绍比拟外围的几个接口: SeaTunnelSink:Sink插件基类,所有的Sink连接器均继承于这个接口SinkWriter:Sink插件真正实现数据输入的接口SinkCommitter:用于解决SinkWriter#prepareCommit返回的数据信息,蕴含须要提交的事务信息,连接器V2在Sink设计上提供二阶段提交的接口,从而使连接器有了实现Exactly-Once的可能性SinkAggregatedCommitter:用于解决SinkWriter#prepareCommit返回的数据信息,蕴含须要提交的事务信息等,用于在单节点多任务一起提交事务信息,这样能够防止提交阶段二局部失败导致状态不统一的问题(注:在实现连接器时优先实现这个接口,这样会兼容性更强)小结 连接器V2在架构分层上与计算引擎进行解耦,定义了本人的元数据定义以及数据类型定义,在API层和计算引擎层减少了翻译层,将SeaTunnel自定义的数据源通过翻译层接入到引擎中,从而真正实现接口和引擎拆散的目标。 SeaTunnel连接器V2运行原理启动器模块概览整个我的项目的最外层的启动类都放在以下模块中: 跟连接器V2无关的模块如下: ...

October 9, 2022 · 1 min · jiezi

关于数据同步:从启动到关闭-SeaTunnel211源码解析

点亮 ⭐️ Star · 照亮开源之路 GitHub:https://github.com/apache/incubator-seatunnel 目录 本文转载自Adobee Chen的博客-CSDN博客,看看是否有你感兴趣的吧! 如有出错,请多斧正。 一、启动脚本解析 二、源码解析 01 入口 02 execute()外围办法 其中 BaseSource、BaseTransform、BaseSink都是接口、都实现Plugin接口。他们的实现类就是对应的插件类型execute()办法向下走,创立一个执行环境。调用plugin.prepare(env)最初启动 execution.start(sources, transforms, sinks);执行flink 代码程序最初敞开一、启动脚本解析在 /bin/start-seatunnel-flink.sh #!/bin/bashfunction usage() { echo "Usage: start-seatunnel-flink.sh [options]" echo " options:" echo " --config, -c FILE_PATH Config file" echo " --variable, -i PROP=VALUE Variable substitution, such as -i city=beijing, or -i date=20190318" echo " --check, -t Check config" echo " --help, -h Show this help message"} if [[ "[email protected]" = *--help ]] || [[ "[email protected]" = *-h ]] || [[ $# -le 1 ]]; then usage exit 0fi is_exist() { if [ -z $1 ]; then usage exit -1 fi} PARAMS=""while (( "$#" )); do case "$1" in -c|--config) CONFIG_FILE=$2 is_exist ${CONFIG_FILE} shift 2 ;; -i|--variable) variable=$2 is_exist ${variable} java_property_value="-D${variable}" variables_substitution="${java_property_value} ${variables_substitution}" shift 2 ;; *) # preserve positional arguments PARAMS="$PARAMS $1" shift ;; esacdone if [ -z ${CONFIG_FILE} ]; then echo "Error: The following option is required: [-c | --config]" usage exit -1fi # set positional arguments in their proper placeeval set -- "$PARAMS" BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"APP_DIR=$(dirname ${BIN_DIR})CONF_DIR=${APP_DIR}/configPLUGINS_DIR=${APP_DIR}/libDEFAULT_CONFIG=${CONF_DIR}/application.confCONFIG_FILE=${CONFIG_FILE:-$DEFAULT_CONFIG} assemblyJarName=$(find ${PLUGINS_DIR} -name seatunnel-core-flink*.jar) if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then source ${CONF_DIR}/seatunnel-env.shfi string_trim() { echo $1 | awk '{$1=$1;print}'} export JVM_ARGS=$(string_trim "${variables_substitution}") exec ${FLINK_HOME}/bin/flink run \ ${PARAMS} \ -c org.apache.seatunnel.SeatunnelFlink \ ${assemblyJarName} --config ${CONFIG_FILE}其中: 启动脚本能接管的 --config --variable --check(还不反对) --help ...

October 8, 2022 · 6 min · jiezi

关于数据同步:可视化任务编排拖拉拽-Scaleph-基于-Apache-SeaTunnel的数据集成

这次在 6月 Meetup 为大家带来的是Scaleph 基于 Apache SeaTunnel (Incubating) 的数据集成介绍,心愿你有所播种。 本次演讲次要包含五个局部: 对于ScalephScaleph架构&性能简介SeaTunnel社区奉献零碎演示开发计划https://www.bilibili.com/vide... ↑↑直播回放视频入口↑↑ 王奇 Apache SeaTunnel Contributor 搜寻举荐工程师,大数据 Java 开发 01 Scaleph的缘起我最早是从事搜寻举荐工作,在团队外面负责保护Dump零碎,次要是为咱们的搜索引擎提供喂数据的性能,先给大家介绍在保护过程中次要的5个痛点问题: 及时性和稳定性搜寻举荐是电商平台的外围在线零碎,尤其是对数据的及时性和稳定性要求十分高。因为搜寻举荐会接管整个电商平台 C 端的绝大部分流量,所以一旦服务呈现稳定的时候,可能就造成服务受损,导致用户的体验大打折扣。 业务简单/大宽表设计Dump零碎会将电商平台的商品、类目、品牌、店铺、商品标签、数仓的实时/离线数据及模型数据会通过一系列的预处理,最终输入成一张大宽表,在这个过程中,业务的复杂性和多变性,会侵入到Dump零碎中来,所以应答的技术挑战绝对就更高了。 全量+实时索引全量索引每天跑一次,次要目标是更新 T+1 频率更新的数据。当全量索引完结之后,咱们会通过实时索引去刷新须要实时更新的数据,比如说商品的价格、库存变动相干的信息。 数据联动更新咱们的上游数据起源十分多,有音讯队列、数据库、大数据相干的存储以及 dubbo 接口,因为是大宽表设计,以商品索引为例,大宽表会以商品为主,如果是店铺索引,会以店铺为主,依据数据的不同,上游的数据变动不肯定是商品或店铺维度的,数据也会产生肯定的联动更新。 数据兜底搜寻举荐服务过后也承当着C端绝大部分的流量,当公司其余团队的性能跟不上的时候,他们个别会把数据通过Dump零碎送到搜索引擎,而后咱们团队代替他们返回给Web页面,防止后续对他们发动二次申请调用。 同时,如果其余团队的业务零碎产生了脏数据,也须要Dump零碎做数据保护,避免数据外泄给C端用户造成不好的影响,所以开发保护中的时候,也有很大的难度。 02 为什么引入Flink?作为国内 Flink 的晚期使用者,阿里巴巴在搜寻举荐畛域领有悠久的历史和胜利的教训,在搜寻举荐团队开发保护 Dump 零碎的职业经验促使我开始关注应用Flink做A/B试验的报表、数据实时流之外的相干工作,次要也就是用Flink来实现Dump零碎为搜寻去提供Dump平台的工作,应用Flink做数据集成有5个长处: 人造的分布式反对:Flink反对多种部署和运行形式,单机、yarn、Kubernetes;低提早、海量吞吐:在泛滥大厂中利用宽泛;生态反对:Flink提供了泛滥开箱即用的connector,反对csv、avro数据格式,kafka、pulsar等音讯零碎以及泛滥的存储系统,和大数据生态紧密结合;基于分布式轻量异步快照机制实现exactly-once语义,为工作的失败、重启、迁徙、降级等提供数据一致性保障;metrics。Flink除了本身提供的 metrics 外,metrics 框架能够让用户为工作开发自定义的 metrics,丰盛监控指标;03 为什么抉择SeaTunnel?起初接触到 SeaTunnel 的时候,很喜爱 SeaTunnel 的设计理念!SeaTunnel 是运行在 Flink 和Spark 之上,高性能和分布式海量数据的下一代集成框架。 重要的是它是开箱即用的,并且针对现有的生态能够实现无缝集成,因为运行在 Flink 和 Spark 之上,能够很不便地接入公司现有的 Flink 和 Spark 的基础设施。另一方面 SeaTunnel 也有很多的生产案例,在进入 Apache 基金会孵化之后,社区十分沉闷,将来可期。 ...

July 4, 2022 · 3 min · jiezi

关于数据同步:SOFARegistry-源码|数据同步模块解析

文|宋国磊(GitHub ID:glmapper ) SOFAStack Committer、华米科技高级研发工程师 负责华米账号零碎、框架治理方向的开发 本文 3024 字 浏览 10 分钟 |前言| 本文次要围绕 SOFARegistry 的数据同步模块进行了源码的解析。其中,对于注册核心的概念以及 SOFARegistry 的基础架构将不再做具体的论述,有趣味的小伙伴在《海量数据下的注册核心 - SOFARegistry 架构介绍》[1]一文中获取相干介绍。 本文次要的思路大抵分为以下 2 个局部: - 第一局部,借助 SOFARegistry 中的角色分类来阐明哪些角色之间会进行数据同步; - 第二局部,对数据同步的具体实现进行解析。 PART. 1——SOFARegistry 的角色分类 如上图所示,SOFARegistry 蕴含以下 4 个角色: Client 提供利用接入服务注册核心的根本 API 能力,利用零碎通过依赖客户端 JAR 包,通过编程形式调用服务注册核心的服务订阅和服务公布能力。 SessionServer 会话服务器,负责承受 Client 的服务公布和服务订阅申请,并作为一个中间层将写操作转发 DataServer 层。SessionServer 这一层可随业务机器数的规模的增长而扩容。 DataServer 数据服务器,负责存储具体的服务数据,数据按 dataInfoId 进行一致性 Hash 分片存储,反对多正本备份,保证数据高可用。这一层可随服务数据量的规模的增长而扩容。\ MetaServer 元数据服务器,负责保护集群 SessionServer 和 DataServer 的统一列表,作为 SOFARegistry 集群外部的地址发现服务,在 SessionServer 或 DataServer 节点变更时能够告诉到整个集群。 在这 4 个角色中,MetaServer 作为元数据服务器自身不解决理论的业务数据,仅负责保护集群 SessionServer 和 DataServer 的统一列表,不波及数据同步问题。 ...

June 29, 2022 · 2 min · jiezi

关于数据同步:数据传输-利用-DTLE-将-MySQL-数据同步到-DBLE

作者:任仲禹 爱可生 DBA 团队成员,善于故障剖析和性能优化,文章相干技术问题,欢送大家一起探讨。 本文起源:原创投稿 *爱可生开源社区出品,原创内容未经受权不得随便应用,转载请分割小编并注明起源。 背景源于某客户的需要,存在线上某业务 MySQL 库因为数据量及业务读写压力较大,须要将业务数据迁徙到 DBLE 分布式数据库,但同时因为业务为 7x24h,可能停机的工夫窗口较短,所以须要思考数据实时同步的计划。 过往 DBLE 的业务上线根本为全新部署,数据实时同步的状况极少施行,去年 DTLE 公布后这一问题失去了些改善,明天咱们来实际下。 环境筹备1. 指标端 DBLE 集群部署装置 DBLE 软件、后端分片 MySQL 库过程略 DBLE 版本 3.20.10.8、MySQL版本 5.7.25sharding.xml <?xml version="1.0"?> <!DOCTYPE dble:sharding SYSTEM "sharding.dtd"> <dble:sharding xmlns:dble="http://dble.cloud/" version="4.0"> <schema name="dtle" sqlMaxLimit="-1" shardingNode="dn_01"> <singleTable name="gtid_executed_v4" shardingNode="dn_01" sqlMaxLimit="-1"></singleTable> </schema> <schema name="ren" sqlMaxLimit="-1" shardingNode="dn_01"> <shardingTable name="test" shardingNode="dn_01,dn_02,dn_03,dn_04" sqlMaxLimit="-1" shardingColumn="id" function="func_jumphash"></shardingTable> </schema> <shardingNode name="dn_03" dbGroup="dh-mysql-cluster02" database="dh_dn_03"></shardingNode> <shardingNode name="dn_04" dbGroup="dh-mysql-cluster02" database="dh_dn_04"></shardingNode> <shardingNode name="dn_02" dbGroup="dh-mysql-cluster01" database="dh_dn_02"></shardingNode> <shardingNode name="dn_01" dbGroup="dh-mysql-cluster01" database="dh_dn_01"></shardingNode> <function name="func_jumphash" class="jumpStringHash"> <property name="partitionCount">4</property> <property name="hashSlice">0:-1</property> </function> </dble:sharding>db.xml <?xml version="1.0"?> <!DOCTYPE dble:db SYSTEM "db.dtd"> <dble:db xmlns:dble="http://dble.cloud/" version="4.0"> <dbGroup name="dh-mysql-cluster02" rwSplitMode="0" delayThreshold="-1"> <heartbeat timeout="0" errorRetryCount="0">show slave status</heartbeat> <dbInstance name="10.186.61.13-3326-dh-1" url="10.186.61.13:3326" user="dbleuser" password="jpfmxIeMt1vxAJ6zd6Q10PGRRi+Qj023Dl+YXuOr3C4VXTdV5+GJaOIv5iVmWCwpXcucn/zi02HVlT7ADX+m6Q==" maxCon="100" minCon="10" primary="true" readWeight="0" id="mysql-i63009" usingDecrypt="true"></dbInstance> </dbGroup> <dbGroup name="dh-mysql-cluster01" rwSplitMode="0" delayThreshold="-1"> <heartbeat timeout="0" errorRetryCount="0">show slave status</heartbeat> <dbInstance name="10.186.61.11-3316-dh-1" url="10.186.61.11:3316" user="dbleuser" password="QQWRF80AGNbx4jIAx/b2Ww7Myol1+ntlyzGmA1A3PXVISmRD/i5pgRnLLwYsXoLmH0jiv1qZAkqIBHv6Yg/XAg==" maxCon="100" minCon="10" primary="true" readWeight="0" id="mysql-47vn84" usingDecrypt="true"></dbInstance> </dbGroup> </dble:db>user.xml <?xml version="1.0"?> <!DOCTYPE dble:user SYSTEM "user.dtd"> <dble:user xmlns:dble="http://dble.cloud/" version="4.0"> <managerUser name="root" password="CrjpLhvVJkHk0EPW35Y07dUeTimf52zMqClYQkIAN3/dqiG1DVUe9Zr4JLh8Kl+1KH1zd7YTKu5w04QgdyQeDw==" usingDecrypt="true"></managerUser> <shardingUser name="ren" schemas="ren,dtle" password="P+C2KazQiS3ZZ6uojBJ91MZIqYqGczspQ/ebyBZOC9xKAAkAFrqEDC9OPn/vObAyO4P8Zu3vHQJ+rljM040Kdg==" usingDecrypt="true" readOnly="false" maxCon="0" blacklist="default_black_list"></shardingUser> </dble:user>2. 源端和指标端测试表创立源端 MySQL 数据库软件装置略源端MySQL与指标端DBLE都须要创立测试表名:test use ren; CREATE TABLE `test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `city` varchar(20) COLLATE utf8mb4_bin DEFAULT NULL, `dt` datetime DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`), KEY `idx_ctiy` (`city`) ) ENGINE=InnoDB;3. 部署单节点DTLEDTLE社区版本GitHub下载地址:https://github.com/actiontech...下载实现后间接rpm装置(本示例应用外部QA验证版本) ...

March 29, 2022 · 7 min · jiezi

关于数据同步:otter数据增量同步

otter介绍原理形容: 基于Canal开源产品,获取数据库增量日志数据典型管理系统架构,manager(web治理)+node(工作节点) manager运行时推送同步配置到node节点 node节点将同步状态反馈到manager上 基于zookeeper,解决分布式状态调度的,容许多node节点之间协同工作.mysql数据库中自带的复制技术可分成三步: master将扭转记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,能够通过show binlog events进行查看)slave将master的binary log events拷贝到它的中继日志(relay log),这里是I/O thread线程slave重做中继日志中的事件,将扭转反映它本人的数据,这里是SQL thread线程基于canal&otter的复制技术和mysql复制相似,具备类比性. Canal对应于I/O thread,接管Master Binary Log.Otter对应于SQL thread,通过Canal获取Binary Log数据,执行同步插入数据库.两者的区别在于: otter目前嵌入式依赖canal,部署为同一个jvm,目前设计为不产生Relay Log,数据不落地.otter目前容许自定义同步逻辑,解决各类需要. a. ETL转化. 比方Slave上指标表的表名,字段名,字段类型不同,字段个数不同等. b. 异构数据库. 比方Slave能够是oracle或者其余类型的存储,nosql等. c. M-M部署,解决数据一致性问题d. 基于manager部署,不便监控同步状态和治理同步工作.部署manager官网下载 manager.deployer-4.2.18.tar.gz,otter-otter-4.2.18.zip https://github.com/alibaba/ot... 初始化otter库 解压包otter-otter-4.2.18.zip,找到数据库初始化脚本 otter-otter-4.2.18\manager\deployer\src\main\resources\sql\otter-manager-schema.sql 新建数据库otter,执行初始化语句otter-manager-schema.sql CREATE DATABASE /*!32312 IF NOT EXISTS*/ `otter` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */;USE `otter`;SET sql_mode='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';CREATE TABLE `ALARM_RULE` ( `ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `MONITOR_NAME` varchar(1024) DEFAULT NULL, `RECEIVER_KEY` varchar(1024) DEFAULT NULL, `STATUS` varchar(32) DEFAULT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `DESCRIPTION` varchar(256) DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `MATCH_VALUE` varchar(1024) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, PRIMARY KEY (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `AUTOKEEPER_CLUSTER` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `CLUSTER_NAME` varchar(200) NOT NULL, `SERVER_LIST` varchar(1024) NOT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `CANAL` ( `ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `NAME` varchar(200) DEFAULT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `CANALUNIQUE` (`NAME`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `CHANNEL` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `CHANNELUNIQUE` (`NAME`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `COLUMN_PAIR` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `SOURCE_COLUMN` varchar(200) DEFAULT NULL, `TARGET_COLUMN` varchar(200) DEFAULT NULL, `DATA_MEDIA_PAIR_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_DATA_MEDIA_PAIR_ID` (`DATA_MEDIA_PAIR_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `COLUMN_PAIR_GROUP` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `DATA_MEDIA_PAIR_ID` bigint(20) NOT NULL, `COLUMN_PAIR_CONTENT` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_DATA_MEDIA_PAIR_ID` (`DATA_MEDIA_PAIR_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DATA_MEDIA` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `NAMESPACE` varchar(200) NOT NULL, `PROPERTIES` varchar(1000) NOT NULL, `DATA_MEDIA_SOURCE_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `DATAMEDIAUNIQUE` (`NAME`,`NAMESPACE`,`DATA_MEDIA_SOURCE_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DATA_MEDIA_PAIR` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `PULLWEIGHT` bigint(20) DEFAULT NULL, `PUSHWEIGHT` bigint(20) DEFAULT NULL, `RESOLVER` text DEFAULT NULL, `FILTER` text DEFAULT NULL, `SOURCE_DATA_MEDIA_ID` bigint(20) DEFAULT NULL, `TARGET_DATA_MEDIA_ID` bigint(20) DEFAULT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `COLUMN_PAIR_MODE` varchar(20) DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID` (`PIPELINE_ID`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DATA_MEDIA_SOURCE` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `TYPE` varchar(20) NOT NULL, `PROPERTIES` varchar(1000) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `DATAMEDIASOURCEUNIQUE` (`NAME`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DELAY_STAT` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `DELAY_TIME` bigint(20) NOT NULL, `DELAY_NUMBER` bigint(20) NOT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID_GmtModified_ID` (`PIPELINE_ID`,`GMT_MODIFIED`,`ID`), KEY `idx_Pipeline_GmtCreate` (`PIPELINE_ID`,`GMT_CREATE`), KEY `idx_GmtCreate_id` (`GMT_CREATE`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `LOG_RECORD` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NID` varchar(200) DEFAULT NULL, `CHANNEL_ID` varchar(200) NOT NULL, `PIPELINE_ID` varchar(200) NOT NULL, `TITLE` varchar(1000) DEFAULT NULL, `MESSAGE` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `logRecord_pipelineId` (`PIPELINE_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `NODE` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `IP` varchar(200) NOT NULL, `PORT` bigint(20) NOT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `NODEUNIQUE` (`NAME`,`IP`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `PIPELINE` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NAME` varchar(200) NOT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `PARAMETERS` text DEFAULT NULL, `CHANNEL_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `PIPELINEUNIQUE` (`NAME`,`CHANNEL_ID`), KEY `idx_ChannelID` (`CHANNEL_ID`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `PIPELINE_NODE_RELATION` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `NODE_ID` bigint(20) NOT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `LOCATION` varchar(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID` (`PIPELINE_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `SYSTEM_PARAMETER` ( `ID` bigint(20) unsigned NOT NULL, `VALUE` text DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;CREATE TABLE `TABLE_HISTORY_STAT` ( `ID` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `FILE_SIZE` bigint(20) DEFAULT NULL, `FILE_COUNT` bigint(20) DEFAULT NULL, `INSERT_COUNT` bigint(20) DEFAULT NULL, `UPDATE_COUNT` bigint(20) DEFAULT NULL, `DELETE_COUNT` bigint(20) DEFAULT NULL, `DATA_MEDIA_PAIR_ID` bigint(20) DEFAULT NULL, `PIPELINE_ID` bigint(20) DEFAULT NULL, `START_TIME` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `END_TIME` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_DATA_MEDIA_PAIR_ID_END_TIME` (`DATA_MEDIA_PAIR_ID`,`END_TIME`), KEY `idx_GmtCreate_id` (`GMT_CREATE`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `TABLE_STAT` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `FILE_SIZE` bigint(20) NOT NULL, `FILE_COUNT` bigint(20) NOT NULL, `INSERT_COUNT` bigint(20) NOT NULL, `UPDATE_COUNT` bigint(20) NOT NULL, `DELETE_COUNT` bigint(20) NOT NULL, `DATA_MEDIA_PAIR_ID` bigint(20) NOT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID_DataMediaPairID` (`PIPELINE_ID`,`DATA_MEDIA_PAIR_ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `THROUGHPUT_STAT` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `TYPE` varchar(20) NOT NULL, `NUMBER` bigint(20) NOT NULL, `SIZE` bigint(20) NOT NULL, `PIPELINE_ID` bigint(20) NOT NULL, `START_TIME` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `END_TIME` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `idx_PipelineID_Type_GmtCreate_ID` (`PIPELINE_ID`,`TYPE`,`GMT_CREATE`,`ID`), KEY `idx_PipelineID_Type_EndTime_ID` (`PIPELINE_ID`,`TYPE`,`END_TIME`,`ID`), KEY `idx_GmtCreate_id` (`GMT_CREATE`,`ID`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `USER` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `USERNAME` varchar(20) NOT NULL, `PASSWORD` varchar(20) NOT NULL, `AUTHORIZETYPE` varchar(20) NOT NULL, `DEPARTMENT` varchar(20) NOT NULL, `REALNAME` varchar(20) NOT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), UNIQUE KEY `USERUNIQUE` (`USERNAME`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE `DATA_MATRIX` ( `ID` bigint(20) NOT NULL AUTO_INCREMENT, `GROUP_KEY` varchar(200) DEFAULT NULL, `MASTER` varchar(200) DEFAULT NULL, `SLAVE` varchar(200) DEFAULT NULL, `DESCRIPTION` varchar(200) DEFAULT NULL, `GMT_CREATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', `GMT_MODIFIED` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`ID`), KEY `GROUPKEY` (`GROUP_KEY`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;CREATE TABLE IF NOT EXISTS `meta_history` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `gmt_create` datetime NOT NULL COMMENT '创立工夫', `gmt_modified` datetime NOT NULL COMMENT '批改工夫', `destination` varchar(128) DEFAULT NULL COMMENT '通道名称', `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名', `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量', `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id', `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog利用的工夫戳', `use_schema` varchar(1024) DEFAULT NULL COMMENT '执行sql时对应的schema', `sql_schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema', `sql_table` varchar(1024) DEFAULT NULL COMMENT '对应的table', `sql_text` longtext DEFAULT NULL COMMENT '执行的sql', `sql_type` varchar(256) DEFAULT NULL COMMENT 'sql类型', `extra` text DEFAULT NULL COMMENT '额定的扩大信息', PRIMARY KEY (`id`), UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`), KEY `destination` (`destination`), KEY `destination_timestamp` (`destination`,`binlog_timestamp`), KEY `gmt_modified` (`gmt_modified`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表构造变动明细表';CREATE TABLE IF NOT EXISTS `meta_snapshot` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键', `gmt_create` datetime NOT NULL COMMENT '创立工夫', `gmt_modified` datetime NOT NULL COMMENT '批改工夫', `destination` varchar(128) DEFAULT NULL COMMENT '通道名称', `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名', `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量', `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id', `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog利用的工夫戳', `data` longtext DEFAULT NULL COMMENT '表构造数据', `extra` text DEFAULT NULL COMMENT '额定的扩大信息', PRIMARY KEY (`id`), UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`), KEY `destination` (`destination`), KEY `destination_timestamp` (`destination`,`binlog_timestamp`), KEY `gmt_modified` (`gmt_modified`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='表构造记录表快照表';insert into USER(ID,USERNAME,PASSWORD,AUTHORIZETYPE,DEPARTMENT,REALNAME,GMT_CREATE,GMT_MODIFIED) values(null,'admin','801fc357a5a74743894a','ADMIN','admin','admin',now(),now());insert into USER(ID,USERNAME,PASSWORD,AUTHORIZETYPE,DEPARTMENT,REALNAME,GMT_CREATE,GMT_MODIFIED) values(null,'guest','471e02a154a2121dc577','OPERATOR','guest','guest',now(),now());linux服务器新建文件夹/data/program/otter/manager,解压manager.deployer-4.2.18.tar.gz ...

March 21, 2022 · 9 min · jiezi

关于数据同步:系统间数据同步场景及方法

一、场景示例本文的题目不是很顾名思义,然而你肯定在日常的开发工作中遇到过相似场景。 1. 场景一假如公司从事电商业务,除了电商零碎外,公司外部个别也会有一个客户关系零碎(CRM)。如果CRM零碎心愿获取到电商零碎注册的客户信息,以实现本人的后续业务。该如何实现? 2. 场景二仍然假如公司从事电商业务,电商零碎中通常有订单模块,模块中会有订单表。为了进步查问性能防止连表Join,订单表除了存储用户id数据外也存储用户名作为冗余字段。用户侧业务用户名能够被更改,用户一旦更改了本人的用户名,订单表的数据也须要被更新。如何实现? 场景一是新增数据同步场景,场景二是更新数据同步场景。 3. 概念导入在持续注释之前,咱们再细化两个概念。 3.1 如何定义“零碎”题目中的“零碎”是被打上了引号,表明它其实一个很宽泛的定义,大到能够是一个业务功残缺的利用零碎平台(电商零碎、客户关系零碎),小到能够是一个单体利用中的业务模块(用户模块、订单模块)。这里的“零碎”不是按复杂程度、代码规模和实现形式划分的。咱们将存储状态(数据)的业务组件定义为“零碎”。 依据这个定义,一个微服务体系中微服务组件(用户核心、订单核心、库存核心、评估核心),分布式系统中主节点、从节点(各类中间件的架构)都是咱们文中的“零碎”。因为两个零碎存储着状态(数据),所以“零碎”间有可能遇到须要状态(数据)同步的场景。 3.2 数据同步办法“零碎”间同步数据的形式次要有三种: 接口调用形式音讯队列形式数据库或文件形式(场景比拟小众,本文不开展形容)三、接口调用形式实现咱们应用接口调用的形式实现场景一、二中需要。 1. 场景一的实现形式首先,如果电商零碎和CRM不是同期间上线的话,咱们要思考电商零碎历史存量数据问题,通常状况历史数据在首次数据同步的时候离线一次性导入。存量数据解决之后咱们就须要思考电商零碎的增量数据即新注册用户。依据接口调用方向有两种形式,数据推送模式和数据拉取模式。 1.1 数据推送模式数据推送是一种绝对合乎直觉的形式,这是形式的语言形容就是“当你有新数据的时候通知我一声”,这里的“通知”指的就是接口调用,就是电商零碎在有新用户注册的时候调用CRM提供的接口,将新增用户信息以接口入参的模式传递。 1.2 数据拉取模式就像推和拉是一对反义词一样,数据拉取与数据推送在接口调用方向上是相同,由CRM调用电商零碎提供的接口,获取新增用户信息。 1.2.1 接口调用机会因为CRM不晓得电商零碎何时会有新用户注册,所以它只能采纳轮询的形式定期调用接口。 1.2.2 获取数据形式如何判断新增数据?就是在某个基线节点之后减少的数据,这个基线节点能够是工夫这个人造枯燥递增的数值也能够是其余枯燥递增的数值(数据库自增ID)。所以电商零碎须要提供这样一个接口,以工夫(或自增ID)为入参,批量查问出入参工夫(自增ID)之后的所有数据。这也就要求电商零碎和CRM都要存储用户的创立工夫(或自增ID)参数用于接口调用。 注:如果咱们用工夫作为接口参数,极其状况下可能呈现数据缺失问题,读者能够本人想一想起因 2. 场景二的实现形式和场景一一样,如果后期没有思考过数据同步,也存在历史存量数据问题,也须要一次性批量同步。咱们重点关注增量数据,即前期产生的用户名更改数据同步问题。仍然有两种形式: 2.1 数据推送模式和场景一的实现相似,只不过这次的语言形容变成了“当你有数据变动的时候通知我一声”,由用户模块调用订单模块接口,将用户名变动后的信息以接口入参的模式传递。 2.2 数据拉取模式由订单模块调用用户模块提供的接口,获取更新用户的信息。 2.2.1 接口调用机会和场景一雷同,订单模块也不晓得用户模块的数据何时被更新,所以它也只能采纳轮询的形式定期调用接口。 2.2.2 获取数据形式这里和场景一有些不同,场景一咱们获取的是新增数据,这里咱们要获取的是数据变动状况,所以有两种实现形式 逐条查问依照用户惟一标识(用户ID)查问用户详情,判断订单模块中冗余的用户名和查问进去的值比拟,不同就表明用户名已扭转,进行更新操作。此时只须要用户模块提供一个依据用户惟一标识(用户ID)查问用户详情的接口(更简略专用的接口就是依据ID查问用户名)。显然在须要大量数据同步的场景下这不是一个好的实现形式。 批量查问和场景一相似,用户模块须要提供一个以工夫为入参,批量查问进去入参工夫之后产生变动的所有数据的接口。这时就须要用户模块和订单模块都存储更新工夫。(在这里在略微深刻一下,通常用户表都会存储更新工夫,然而这个更新工夫在用户任意属性变更的时候都会被更新,这样咱们会获取到所有的用户属性有变更数据而不只是用户名更改的数据。要保障获取数据精准,除非咱们在用户表独自存储用户名更新工夫,显然这样得失相当) 注:和场景一一样,应用工夫作为接口参数,极其状况下也可能呈现数据缺失问题。 通过对场景一、二的实现办法的形容咱们可见。推送和拉取行为是绝对数据方提供方和数据需求方而言,从数据提供方调用数据需求方接口就是数据推送,反之就是数据拉取,尽管两种形式接口的调用方向不同,然而数据的流向是雷同的,都是通过数据提供方想数据需求方转移。 3. 推送模式和拉取模式的区别接下来咱们从两个方面,形容推送和拉取的区别: 3.1 零碎间依赖关系推送的形式下,数据提供方依赖数据需求方,场景一中电商零碎调用CRM零碎的接口,所以电商系统对CRM零碎产生了依赖,同理场景二中用户模块也依赖了订单模块。在这种依赖关系咱们嗅到了“坏滋味”。 首先,直觉通知咱们数据提供方不应该依赖数据需求方。是你须要我的数据,我反而要调用你的接口并解决异样(异样如何解决也是有需求方业务决定的,例如是否能够重试,是否容许数据失落),一旦你的业务发生变化导致接口或者异样解决逻辑变动,我不得不批改代码以应答。 其次,随着业务演进,将来可能有更多的数据需求方退出,每减少一个数据需求方都会造成数据提供方的代码层面批改,这也是咱们不能承受的。 反观拉取的形式,因为是数据需求方调用数据提供方的接口,依赖方向转变了,无论数据需求方业务如何变动或者减少了更多的数据需求方,数据提供方均不受影响。 通常不好的耦合关系不会影响以后零碎实现和运行,而是影响系统对将来业务变动的扩展性。如果咱们能确定将来业务不会变动,也就不须要关怀这类耦合关系,显然这种相对稳固的零碎少之又少。 3.2 数据同步的时效性和精确性时效性方面推送模式下,数据提供方能够在数据变动时同时调用数据需求方接口传递变动。而在拉取模式下,需求方无奈感知数据何时变动,只能定期轮询接口。时效性和接口轮序频率成正比,为了减少时效性,只能进步接口轮询频率。然而过高的频率通常不可取:一来在数据变动不频繁的场景下,频繁的接口调用是一种节约,二来过于频繁的接口调用也会个被调用方带来比拟大的性能压力。所以咱们须要在时效性和性能衡量,找到一个平衡点。 精确性方面推送模式下,数据提供方能够准确的感知本身哪些数据发生变化并通过调用接口传递给数据需求方。而在拉取模式下,数据需求方无奈感知哪些数据变动,只能通过单方保留数据变动状态(新增工夫、更新工夫)、减少查问条件的形式进行范畴查问或者采纳全量查问后一一比对的形式(不实用新增数据场景)。相较于数据推送模式,数据拉取模式很难及时的获取到精准的同步数据信息。 4. 两种模式如何抉择两种模式各有利弊,从零碎间依赖关系角度思考,采纳拉取模式的零碎有更好的业务扩展性、能更好的应答将来需要变动。然而拉取模式在数据同步时效性、精确性和零碎性能上有人造的劣势,实现起来也更加简单。所以在具体计划抉择上咱们须要依据具体场景判断。数据同步时效性和零碎性能要求不高的场景能够采纳推送形式以进步系统对将来业务的扩大能力,反之须要采纳推送的形式以零碎扩展性换取数据同步时效性和零碎性能。 5. 推送模式优化让咱们在回看一下推送模式的问题,既然推送模式场景下数据需求方的业务变动可能对数据提供方造成影响。咱们可不可以对推送模式进行优化以解决一个零碎变动影响到另一个零碎的问题。咱们能够借鉴设计模式中“策略模式”的思维,将零碎分为固定局部和变动局部,对变动的局部进行肯定维度的形象而后在将这种形象拆散进来独立实现。这种将固定逻辑和变动逻辑拆散的做法,肯定水平上能够解决咱们遇到的问题。 5.1 如何革新将变动进行形象首先,咱们将数据提供方中变动的逻辑,如调用哪些数据需求方的接口、调用接口签名(名称、地址、入参出参)是什么、如何解决调用过程中的异样等这些有可能变动并且须要代码实现的逻辑形象为通过自定义配置的形式实现,造成一个“接口配置核心”的概念。 对形象进行实现接下来,咱们就须要编码实现“接口配置核心”的逻辑。配置中的主要职责就是对数据提供方提供对立的数据同步接口,一旦数据提供方调用数据同步接口,“配置核心”依据提前定义的配置调用后端数据需求方的接口实现数据同步工作,同时会依据配置解决接口调用过程中产生的各种异常情况。 注:图中箭头示意接口调用方向和依赖关系方向。 革新后,数据提供方调用接口核心的接口传递须要同步的数据,有接口配置核心负责调用个数据需求方的接口进行二次传递。数据提供方只依赖配置核心,不再依赖各个数据需求方,接口配置核心尽管依赖后端的各个数据需求方,然而数据需求方的种种变动曾经通过配置来实现,所以无论数据需求方业务如何变动,咱们只须要调整配置核心配置即可,这种变动并不能传导到数据需求方。通过引入配置核心,咱们无效隔离了数据需求方对数据提供方的影响。 5.2 革新后的长处和有余长处解决了拉取模式下数据需求方影响数据提供方的问题。 毛病减少了零碎实现复杂度,显然“接口配置核心”实现起来并容易。 如何接口调用的相干行为形象为能够配置的形式并加以实现如何解决接口调用过程中遇到的种种异常情况如何解决接口调用的性能问题升高了零碎稳定性,进步了运维难度。更多的模块意味着更多的问题点,咱们须要更多运维监控伎俩在第一工夫发现并解决问题,以确保零碎整体稳固运行。 5.3 应用倡议是否要应用这种形式,还须要综合思考。在一个业务逻辑绝对简略的小型零碎,咱们没有必要引入如此简单的实现形式,依据场景须要从下面介绍的拉取模式和推送模式抉择即可,而对于业务逻辑较简单的大型零碎,能够引入这种形式以升高零碎间简单的依赖关系。 四、音讯队列模式实现注:图中箭头示意接口调用方向和依赖关系方向。 数据提供方以音讯的形式向数据需求方传递须要同步数据信息,数据提供发向音讯队列发送一条音讯,音讯队列负责将音讯发送给数据需求方。数据提供方和数据需求方都只依赖音讯队列,他们两者之间没有依赖也就不会相互影响。 置信大家对音讯队列并不生疏,在此我也不再赘述。你有没有发现音讯队列很像同咱们前文所述“接口配置核心”模式,只是实现略有不同。 “接口配置核心”中接口签名转换成了音讯队列中的主题、音讯概念“接口配置核心”中接口调用行为就像音讯队列中的公布订阅模式“接口配置核心”中异样解决机制就是音讯队列中的保障投递机制“接口配置核心”中的性能要求也能够通过音讯队列中的削峰作用来满足所以音讯队列能够齐全代替咱们上文所说的“接口配置核心”性能。免去咱们反复造轮子之苦。 ...

March 8, 2022 · 1 min · jiezi

TiDB-Binlog-源码阅读系列文章三Pump-client-介绍

作者:黄佳豪 在 上篇文章 中,我们介绍了 Pump 的作用是存储 TiDB 产生的 binlog。本篇将介绍 Pump client,希望大家了解 TiDB 把 binlog 写到 Pump,以及输出数据的过程。 gRPC APIPump client 的代码在 tidb-tools 下这个 路径,TiDB 会直接 import 这个路径使用 Pump client package。TiDB 跟 Pump 之间使用 gRPC 通信,相关的 proto 文件定义在 这里。Pump server 提供以下两个接口: // Interfaces exported by Pump.service Pump { // Writes a binlog to the local file on the pump machine. // A response with an empty errmsg is returned if the binlog is written successfully. rpc WriteBinlog(WriteBinlogReq) returns (WriteBinlogResp) {} // Sends binlog stream from a given location. rpc PullBinlogs(PullBinlogReq) returns (stream PullBinlogResp) {}}本文我们主要介绍 RPC WriteBinlog 这个接口,Pump client 会通过这个接口写 binlog 到 Pump。 ...

August 7, 2019 · 3 min · jiezi

基于-MySQL-Binlog-的-Elasticsearch-数据同步实践

一、背景随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张MySQL 表中,这张中间表对应了业务需要的 Elasticsearch 索引,每一列对应索引中的一个Mapping 字段。通过脚本以 Crontab 的方式,读取 MySQL 中间表中 UTime 大于上一次读取时间的所有数据,即该段时间内的增量,写入Elasticsearch。 所以,一旦业务逻辑中有相应字段的数据变更,需要同时顾及 MySQL 中间表的变更;如果需要 Elasticsearch 中的数据即时性较高,还需要同时写入 Elasticsearch。 随着业务数据越来越多,MySQL 中间表的数据量越来越大。当需要在 Elasticsearch 的索引中新增 Mapping 字段时,相应的 MySQL 中间表也需要新增列,在数据量庞大的表中,扩展列的耗时是难以忍受的。 而且 Elasticsearch 索引中的 Mapping 字段随着业务发展增多,需要由业务方增加相应的写入 MySQL 中间表方法,这也带来一部分开发成本。 三、方案设计1. 整体思路现有的一些开源数据同步工具,如阿里的 DataX 等,主要是基于查询来获取数据源,这会存在如何确定增量(比如使用utime字段解决等)和轮询频率的问题,而我们一些业务场景对于数据同步的实时性要求比较高。为了解决上述问题,我们提出了一种基于 MySQL Binlog 来进行 MySQL 数据同步到 Elasticsearch 的思路。Binlog 是 MySQL 通过 Replication 协议用来做主从数据同步的数据,所以它有我们需要写入 Elasticsearch 的数据,并符合对数据同步时效性的要求。 使用 Binlog 数据同步 Elasticsearch,业务方就可以专注于业务逻辑对 MySQL 的操作,不用再关心数据向 Elasticsearch 同步的问题,减少了不必要的同步代码,避免了扩展中间表列的长耗时问题。 经过调研后,我们采用开源项目 go-mysql-elasticsearch 实现数据同步,并针对马蜂窝技术栈和实际的业务环境进行了一些定制化开发。 ...

July 15, 2019 · 2 min · jiezi

TiDB-在小红书从-0-到-200-节点的探索和应用

作者介绍:张俊骏,小红书数据库与中间件团队负责人小红书使用 TiDB 历史可以追溯到 2017 年甚至更早,那时在物流、仓库等对新技术比较感兴趣的场景下应用,在 2018 年 5 月之后,我们就开始逐步铺开,延展到其他适合 TiDB 的场景中去。截止目前,小红书使用的 TiDB 节点数在 200+ 个,未来也有更大扩展空间。 本文根据近两年 TiDB 在小红书的落地过程,和大家一起探讨一下,小红书在新数据库选型的考虑因素、以及 TiDB 从场景分类的角度是如何考量及逐步推广使用的。具体包括以下内容: 目前小红书数据服务整体架构,以及从数据流角度如何对不同数据库服务进行定义和划分。从基本功能、数据同步、部署管理、运维、二次开发及优化、安全等多个维度解读小红书在数据库选型的考虑因素及思考。TiDB 的适用场景,以及在小红书如何进行场景选择、如何逐步进行上线规划。一、小红书数据服务整体架构 <center>图 1</center> 如图 1 所示,小红书数据服务整体架构最上层是在线应用层(online app),应用层往下肯定会依赖一些离线(offline)或者在线(online)的 database(其实它更多的意义应该算存储,比如 Redis 也被我们理解为 database,所以称之为“数据服务”可能会更好),这些在线数据服务(online database)会有两条线: 通过实时数据流(dataflow)将数据导入到离线数据库(offline database)支撑离线分析以及实时展示的场景,也就是图 1 最下层的展示类服务(presentation)和数仓(data warehouse)。这些数据还可能会回灌到线上其他 database 上,有些是离线,有些是实时。图 1 蓝框中的部分基本上都由我们团队负责。我们首先需要保证在线数据库(online database) 的稳定性、安全性以及性能优化等,其次我们的多种数据库数据同步服务(database to database replication) 有点像阿里提出的 data replication center 这个概念,这部分也基本上由我们团队全权负责。 二、小红书数据服务组件选型 RoadMap对于一个新的数据库或数据服务组件选型(如 TiDB),我们该从哪些方面去入手搞清楚它的特性?下面分享一下我们的经验。 1. 产品的基本功能第一步,我们需要考察该数据服务/组件的基本功能,首先,我们要了解它的读写场景,包括点查、批量获取(batch get)、范围扫描(range scan)、过滤查询(filter query)、聚合查询(aggregation)等等。然后我们看看它是否符合响应时间(latency) 以及带宽(bandwidth,即能承接多少并发)的要求。最后我们会关注可扩展性,比如 TiDB 可能最大的特点就是扩展性非常好。这几点是大家都会想到的最基本的要求,这里我就一笔略过。 2. 数据同步与处理相关解决方案第二部分是数据同步与处理相关解决方案。这里我们有以下 4 点考虑: ...

July 12, 2019 · 3 min · jiezi

TiDB-Binlog-源码阅读系列文章二初识-TiDB-Binlog-源码

作者:satoru TiDB Binlog 架构简介TiDB Binlog 主要由 Pump 和 Drainer 两部分组成,其中 Pump 负责存储 TiDB 产生的 binlog 并向 Drainer 提供按时间戳查询和读取 binlog 的服务,Drainer 负责将获取后的 binlog 合并排序再以合适的格式保存到对接的下游组件。 在《TiDB Binlog 架构演进与实现原理》一文中,我们对 TiDB Binlog 整体架构有更详细的说明,建议先行阅读该文。 相关源码仓库TiDB Binlog 的实现主要分布在 tidb-tools 和 tidb-binlog 两个源码仓库中,我们先介绍一下这两个源码仓库中的关键目录。 1. tidb-toolsRepo: https://github.com/pingcap/tidb-tools/ 这个仓库除了 TiDB Binlog 还有其他工具的组件,在这里与 TiDB Binlog 关系最密切的是 tidb-binlog/pump_client 这个 package。pump_client 实现了 Pump 的客户端接口,当 binlog 功能开启时,TiDB 使用它来给 pump 发送 binlog 。 2. tidb-binlogRepo: https://github.com/pingcap/tidb-binlog TiDB-Binlog 的核心组件都在这个仓库,下面是各个关键目录: cmd:包含 pump,drainer,binlogctl,reparo,arbiter 等 5 个子目录,分别对应 5 个同名命令行工具。这些子目录下面的 main.go 是对应命令行工具的入口,而主要功能的实现则依赖下面将介绍到的各个同名 packages。pump:Pump 源码,主要入口是 pump.NewServer 和 Server.Start;服务启动后,主要的功能是 WriteBinlog(面向 TiDB/pump_client) 和 PullBinlogs(面向 Drainer)。drainer:Drainer 源码,主要入口是 drainer.NewServer 和 Server.Start;服务启动后,Drainer 会先找到所有 Pump 节点,然后调用 Pump 节点的 PullBinlogs 接口同步 binlog 到下游。目前支持的下游有:mysql/tidb,file(文件增量备份),kafka 。binlogctl:Binlogctl 源码,实现一些常用的 Binlog 运维操作,例如用 -cmd pumps 参数可以查看当前注册的各个 Pump 节点信息,相应的实现就是 QueryNodesByKind。reparo:Reparo 源码,实现从备份文件(Drainer 选择 file 下游时保存的文件)恢复数据到指定数据库的功能。arbiter:Arbiter 源码,实现从 Kafka 消息队列中读取 binlog 同步到指定数据库的功能,binlog 在消息中以 Protobuf 格式编码。pkg:各个工具公用的一些辅助类的 packages,例如 pkg/util 下面有用于重试函数执行的 RetryOnError,pkg/version 下面有用于打印版本信息的 PrintVersionInfo。tests:集成测试。启动测试集群上个小节提到的 tests 目录里有一个名为 run.sh 脚本,我们一般会使用 make integration_test 命令,通过该脚本执行一次完整的集成测试,不过现在我们先介绍如何用它来启动一个测试集群。 ...

July 5, 2019 · 2 min · jiezi

基于Tablestore-Tunnel的数据复制实战

前言数据复制主要指通过互联的网络在多台机器上保存相同数据的副本,通过数据复制方案,人们通常希望达到以下目的:1)使数据在地理位置上更接近用户,进而降低访问延迟;2)当部分组件出现故障时,系统依旧可以继续工作,提高可用性;3)扩展至多台机器以同时提供数据访问服务,从而提升读吞吐量。如果复制的数据一成不变,那么数据复制就非常容易,只需要将数据复制到每个节点,一次性即可搞定,面对持续更改的数据如何正确而有效的完成数据复制是一个不小的挑战。 使用DataX进行Tablestore数据复制表格存储(Tablestore)是阿里云自研的NoSQL多模型数据库,提供海量结构化数据存储以及快速的查询和分析服务,表格存储的分布式存储和强大的索引引擎能够提供PB级存储、千万TPS以及毫秒级延迟的服务能力。DataX是阿里巴巴集团内被广泛使用的离线数据同步工具,DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件。通过使用DataX可以完成Tablestore表的数据复制,如下图所示,otsreader插件实现了从Tablestore读取数据,并可以通过用户指定抽取数据范围可方便的实现数据增量抽取的需求,otsstreamreader插件实现了Tablestore的增量数据导出,而otswriter插件则实现了向Tablestore中写入数据。通过在DataX中配置Tablestore相关的Reader和Writer插件,即可以完成Tablestore的表数据复制。 使用通道服务进行Tablestore数据复制通道服务(Tunnel Service)是基于表格存储数据接口之上的全增量一体化服务。通道服务为您提供了增量、全量、增量加全量三种类型的分布式数据实时消费通道。通过为数据表建立数据通道,可以简单地实现对表中历史存量和新增数据的消费处理。 借助于全增量一体的通道服务,我们可以轻松构建高效、弹性的数据复制解决方案。本文将逐步介绍如何结合通道服务进行Tablestore的数据复制,完整代码开源在github上的 tablestore-examples中。本次的实战将基于通道服务的Java SDK来完成,推荐先阅读下通道服务的相关文档,包括快速开始等。 1. 配置抽取配置抽取其实对应的是数据同步所具备的功能,在本次实战中,我们将完成指定时间点之前的表数据同步,指定的时间点可以是现在或者未来的某个时刻。具体的配置如下所示,ots-reader中记录的是源表的相关配置,ots-writer中记录的是目的表的相关配置。 { "ots-reader": { "endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com", "instanceName": "zhuoran-high", "tableName": "testSrcTable", "accessId": "", "accessKey": "", "tunnelName": "testTunnel", "endTime": "2019-06-19 17:00:00" }, "ots-writer": { "endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com", "instanceName": "zhuoran-search", "tableName": "testDstTable", "accessId": "", "accessKey": "", "batchWriteCount": 100 }}ots-reader中各参数的说明如下: endpoint: Tablestore服务的Endpoint地址,例如https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com。在进行数据复制前,请检查下连通性(可以使用curl命令)。instanceName: Tablestore的实例名。tableName: Tablestore的表名。accessId: 访问Tablestore的云账号accessId。accessKey: 访问Tablestore的云账号accessKey。tunnelName: Tablestore的通道名,配置endTime: 数据同步的截止时间点,对应到Java里SimpleFormat的格式为:yyyy-MM-dd HH:mm:ss 。ots-writer中各参数的说明如下(略去相同的参数): batchWriteCount: Tablestore单次批量写入的条数,最大值为200。注:未来会开放更多的功能配置,比如指定时间范围的数据复制等。2. 编写主逻辑数据复制的主逻辑主要分为以下4步,在第一次运行时,会完整的进行所有步骤,而在程序重启或者断点续传场景时,只需要进行第3步和第4步。 1.创建复制目的表 通过使用DesribeTable接口,我们可以获取到源表的Schema,借此可以创建出目的表,值得注意的是需要把目的表的有效版本偏差设成一个足够大的值(默认为86400秒),因为服务端在处理写请求时会对属性列的版本号进行检查,写入的版本号需要在一个范围内才能写入成功,对于源表中的历史存量数据而言,时间戳往往是比较小的,会被服务端过滤掉,最终导致同步数据的丢失。sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(), config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(), config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName());if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) { System.out.println("Table is already exist: " + config.getWriteConf().getTableName());} else { DescribeTableResponse describeTableResponse = sourceClient.describeTable( new DescribeTableRequest(config.getReadConf().getTableName())); describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName()); describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000); CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(), describeTableResponse.getTableOptions(), new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit())); destClient.createTable(createTableRequest); System.out.println("Create table success: " + config.getWriteConf().getTableName());}2.在源表上创建通道 ...

July 1, 2019 · 2 min · jiezi

DM-源码阅读系列文章八Online-Schema-Change-同步支持

作者:lan 本文为 DM 源码阅读系列文章的第八篇,上篇文章 对 DM 中的定制化数据同步功能进行详细的讲解,包括库表路由(Table routing)、黑白名单(Black & white table lists)、列值转化(Column mapping)、binlog 过滤(Binlog event filter)四个主要功能的实现。 本篇文章将会以 gh-ost 为例,详细地介绍 DM 是如何支持一些 MySQL 上的第三方 online schema change 方案同步,内容包括 online schema change 方案的简单介绍,online schema change 同步方案,以及同步实现细节。 MySQL 的 Online Schema Change 方案目前有一些第三方工具支持在 MySQL 上面进行 Online Schema Change,比较主流的包括 pt-online-schema-change 和 gh-ost。 这些工具的实现原理比较类似,本文会以 gh-ost 为例来进行分析讲解。 从上图可以大致了解到 gh-ost 的逻辑处理流程: 在操作目标数据库上使用 create table ghost table like origin table 来创建 ghost 表;按照需求变更表结构,比如 add column/index;gh-ost 自身变为 MySQL replica slave,将原表的全量数据和 binlog 增量变更数据同步到 ghost 表;数据同步完成之后执行 rename origin table to table_del, table_gho to origin table 完成 ghost 表和原始表的切换pt-online-schema-change 通过 trigger 的方式来实现数据同步,剩余流程类似。 ...

June 20, 2019 · 2 min · jiezi

TiDB-Binlog-源码阅读系列文章一序

作者:黄佳豪 TiDB Binlog 组件用于收集 TiDB 的 binlog,并准实时同步给下游,如 TiDB、MySQL 等。该组件在功能上类似于 MySQL 的主从复制,会收集各个 TiDB 实例产生的 binlog,并按事务提交的时间排序,全局有序的将数据同步至下游。利用 TiDB Binlog 可以实现数据准实时同步到其他数据库,以及 TiDB 数据准实时的备份与恢复。随着大家使用的广泛和深入,我们遇到了不少由于对 TiDB Binlog 原理不理解而错误使用的情况,也发现了一些 TiDB Binlog 支持并不完善的场景和可以改进的设计。 在这样的背景下,我们开展 TiDB Binlog 源码阅读分享活动,通过对 TiDB Binlog 代码的分析和设计原理的解读,帮助大家理解 TiDB Binlog 的实现原理,和大家进行更深入的交流,同时也有助于社区参与 TiDB Binlog 的设计、开发和测试。 背景知识本系列文章会聚焦 TiDB Binlog 本身,读者需要有一些基本的知识,包括但不限于: Go 语言,TiDB Binlog 由 Go 语言实现,有一定的 Go 语言基础有助于快速理解代码。数据库基础知识,包括 MySQL、TiDB 的功能、配置和使用等;了解基本的 DDL、DML 语句和事务的基本常识。了解 Kafka 的基本原理。基本的后端服务知识,比如后台服务进程管理、RPC 工作原理等。总体而言,读者需要有一定 MySQL/TiDB/Kafka 的使用经验,以及可以读懂 Go 语言程序。在阅读 TiDB Binlog 源码之前,可以先从阅读 《TiDB Binlog 架构演进与实现原理》 入手。 ...

June 18, 2019 · 2 min · jiezi

拔掉数据库的电源会怎样阿里云数据库新型灾备架构让云端容灾有备无患

拔掉数据库的电源会怎样?假设我们拔掉数据库的电源会怎样? 在日前举行的阿里云“企业级”云灾备解决方案发布会上,阿里云智能技术战略总监陈绪就来了一场现场“断电”演示,拔掉了数据库的电源。 (直播回放:https://yq.aliyun.com/live/1104/event) 猜猜现场发生了什么? 数据丢失,业务瘫痪,企业资金受损? 上述情况统统没有出现!没有出现任何数据丢失,也没有业务瘫痪,10秒后,上云企业的业务就完全恢复了。 那么这是如何实现的呢? 在会上,阿里云智能数据库产品事业部技术总监天羽为大家全面解析《云时代,数据库新型灾备架构》,详细介绍了在混合云架构下,从异地备份、容灾、双活到统一管理的数据库一站式灾备解决方案。 有了云数据库新型灾备架构,即使断电又何妨?通过阿里云提供的DBS、DTS、HDM等服务,您的企业也可轻松构建灾备体系,做到“有备无患”。 墨菲定律 鸡蛋不能放在一个篮子里对于每个企业而言,数据库都是其最为核心的资产。但是单点故障是不可避免的,因此为了提升数据安全,需要做的就是数据冗余。 国家对于数据库灾难恢复能力也定义了相应的标准。对于位于等级2~3的一般业务而言,需要每天进行备份;对于位于等级4的重要业务而言,需要每天全量+增量备份;对于等级5的关键业务而言,要求数据丢失不能超过半个小时,并且要求在分钟级别恢复业务;对于位于等级6的核心业务而言,则需要做到数据零丢失。 阿里巴巴数据库从备份到多活的发展经过了以下历程: 2012年之前,阿里巴巴采用的是异地冷备+热备方案,提供只读副本,当时异地冷备和热备可能出现异地延时比较长的问题,导致出现灾难之后敢不敢进行数据库切换成为一个问题,可能现在很多传统企业还在使用该方案。 2013年,阿里巴巴通过数据库实时日志的解析能力实现了同城双活。 2014年,阿里巴巴实现了异地双活。 2015年,阿里巴巴就实现了中美同步以及多个地域、多点写入的数据同步策略。 2016年,阿里巴巴实现了分布式数据强一致的能力以及异地多活能力。 在不断提升阿里巴巴灾备能力的过程中,我们也在阿里云上孵化了数据库备份(DBS)、数据传输(DTS)、混合云数据库管理(HDM),搭建从备份、容灾、双活及混合云统一管理的一站式云灾备解决方案。 对于等级1到等级4的业务而言,可以通过DBS将数据实时备份到阿里云OSS上,该方案具有低成本、秒级RPO的优势; 对于等级5的业务而言,可以通过DTS数据传输服务将本地IDC或者其他云产商的数据库备份到阿里云上去,实现热备或者双活解决方案,实现秒级RPO和秒级RTO。 阿里云数据库新型灾备方案众所周知,传统灾备解决方案存在成本高昂、实施困难、运维复杂、RTO和RPO无法保障等问题。 阿里云拥有遍布全球安全可靠的数据中心,是企业用户天然的异地灾备中心。阿里云的新型灾备方案可以为您提供低成本、高质量、开箱即用的数据库灾备服务。 1、数据库备份服务DBS数据库备份服务DBS结合阿里云对象存储服务OSS,能够为用户提供秒级RPO以及低成本的特性,并且实现了国家灾备等级4的相应能力。 用户自建的IDC或者来自其他云厂商的数据库可以通过DBS备份到阿里云OSS之上,而且整个备份的实现过程非常简单,只需要打通网络就可以通过DBS实现数据备份到云上,当出现灾难的时候就能够完成云上数据库快速恢复。 除了和云上数据库进行打通之外,对于数据的备份集而言,也可以通过数据湖服务直接进行查询和验证(无需恢复),这也是阿里云特有的能力之一。 阿里云数据库备份服务DBS主要有如下优势: 秒级RPO:因为数据库发生变更的时候,首先会记录日志,再刷新数据。而阿里巴巴沉淀了一整套数据库解析技术,通过这个技术能够实现秒级冷备到阿里云上的能力,并且其冷备数据和在线数据之间仅存在秒级延时。低成本:借助OSS的能力可以实现对于数据的周期性归档,并且允许数据库只备份核心关注的数据业务表,仅备份有效数据,同时进行加密和压缩。备份数据可在线读,验证有效性:基于DLA的数据湖能力,备份逻辑数据集允许用户直接进行备份集查询,查询里面的数据内容并且校验其中的数据。基于RDS的能力能够帮助用户在出现灾难时实现数据库的快速恢复。丰富的备份数据源:阿里云数据库备份服务DBS能够支持非常丰富的数据源,包括Oracle、MySQL、SQLServer、MongoDB以及Redis等。2、数据库热备以及双活架构DTS 结合DTS和RDS就能够实现云上数据库热备,可以实现国标等级5的灾备能力。无论是将业务中心建立在自建IDC还是其他云厂商上,通过DTS热备到阿里云上,当出现本地IDC出现数据库故障或者误操作的时候,用户就可以一键切换到云热备之上,实现秒级RPO和秒级RTO。 您还可以更进一步,借助DTS和RDS实现多活,除了将业务切换到阿里云上之外,还可以反向建立阿里云到本地IDC数据库的同步链路,从而建立双向同步通道,这样就能够提供异地双活能力,两端都可以进行写入和切换。业务也可以在云上和本地IDC之间进行分流,从而实现就近写入和就近服务的查询能力,同时能够支持实现容灾。 如果采用传统热备方案,将数据热备到云上之后可以支持实现秒级RPO的数据库切换,但是当切换完成之后如果想要去恢复灾备系统,则需要一定的恢复过程,但是当建立了双向同步通道之后,可以很快地切换到阿里云,同时很快地切换回来,因此能够支持企业实现在线的容灾演练。 关于阿里云数据库传输服务DTS: 阿里巴巴在2011年左右开始投入做数据库的日志解析,而DTS除了能够实现日志解析之外,还能够实现高效的数据同步,是阿里巴巴内部实现异地多活的基础设施,也是阿里巴巴的数据从生产到消费的数据流基础设施。 DTS也支持了非常丰富的数据源,包括关系型数据库、NoSQL及大数据等17种数据源,承担了阿里云上的40多万的数据传输任务。 3、基于DMS+HDM的数据库统一管理方案 除了上述的DBS和DTS两款灾备产品之外,当用户使用线下到线上的数据同步或者线下到线上数据热备之后,就会形成一个混合云数据库架构。 阿里云为此提供了一整套数据库混合云统一管理解决方案,该方案沉淀了阿里在脱敏审计、变更管控以及研发协同等多方面的能力。 在混合云上,如果数据库分布在自建的IDC、其他云厂商以及阿里云上,就可以通过阿里云的混合云数据库管理(HDM)进行统一管理,通过One Console实现统一监控、告警、性能优化和风险识别。 了解企业级云灾备解决方案——“十万先行者计划”,请点击:https://promotion.aliyun.com/ntms/act/hclouddr/index.html 相关阅读阿里云发布企业级云灾备解决方案,十万先行者计划开启普惠灾备专访阿里数据库备份专家 教你Pick最有效的备份系统 本文作者:七幕阅读原文 本文为云栖社区原创内容,未经允许不得转载。

June 10, 2019 · 1 min · jiezi

DM-源码阅读系列文章七定制化数据同步功能的实现

作者:王相 本文为 DM 源码阅读系列文章的第七篇,在 上篇文章 中我们介绍了 relay log 的实现,主要包括 relay log 目录结构定义、relay log 数据的处理流程、主从切换支持、relay log 的读取等逻辑。本篇文章我们将会对 DM 的定制化数据同步功能进行详细的讲解。 在一般的数据同步中,上下游的数据是一一对应的,即上下游的库名、表名、列名以及每一列的值都是相同的,但是很多用户因为业务的原因希望 DM 在同步数据到 TiDB 时进行一些定制化的转化。下面我们将主要介绍数据同步定制化中的库表路由(Table routing)、黑白名单(Black & white table lists)、列值转化(Column mapping)、binlog 过滤(Binlog event filter)四个主要功能的实现。值得注意的是,由于其他一些工具(例如 TiDB Lightning 和 TiDB Binlog)也需要类似的功能,所以这四个功能都以 package 的形式维护在 tidb-tools 项目下,这样方便使用和维护。 库表路由(Table routing)库表路由顾名思义就是对库名和表名根据一定的路由规则进行转换。比如用户在上游多个 MySQL 实例或者 schema 有多个逻辑上相同的表,需要把这些表的数据同步到 TiDB 集群的同一个表中,这个时候就可以使用 table-router 功能,如下图所示: 该功能实现在 pkg/table-router 中,库表路由的规则定义在结构 TableRule 中,其中的属性 SchemaPattern 和 TablePattern 用于配置原库名和表名的模式,TargetSchema 和 TargetTable 用于配置目标库和表名,即符合指定 pattern 的库和表名都将转化成目标库名和表名。 ...

June 6, 2019 · 2 min · jiezi

使用EMR-Spark-Relational-Cache跨集群同步数据

背景Relational Cache是EMR Spark支持的一个重要特性,主要通过对数据进行预组织和预计算加速数据分析,提供了类似传统数据仓库物化视图的功能。除了用于提升数据处理速度,Relational Cache还可以应用于其他很多场景,本文主要介绍如何使用Relational Cache跨集群同步数据表。通过统一的Data Lake管理所有数据是许多公司追求的目标,但是在现实中,由于多个数据中心,不同网络Region,甚至不同部门的存在,不可避免的会存在多个不同的大数据集群,不同集群的数据同步需求普遍存在,此外,集群迁移,搬站涉及到的新老数据同步也是一个常见的问题。数据同步的工作通常是一个比较痛苦的过程,迁移工具的开发,增量数据处理,读写的同步,后续的数据比对等等,需要很多的定制开发和人工介入。基于Relational Cache,用户可以简化这部分的工作,以较小的代价实现跨集群的数据同步。下面我们以具体示例展示如何通过EMR Spark Relational Cache实现跨集群的数据同步。 使用Relational Cache同步数据假设我们有A,B两个集群,需要把activity_log表的数据从集群A同步到集群B中,且在整个过程中,会持续有新的数据插入到activity_log表中,A集群中activity_log的建表语句如下: CREATE TABLE activity_log ( user_id STRING, act_type STRING, module_id INT, d_year INT)USING JSONPARTITIONED BY (d_year)插入两条信息代表历史信息: INSERT INTO TABLE activity_log PARTITION (d_year = 2017) VALUES("user_001", "NOTIFICATION", 10), ("user_101", "SCAN", 2)为activity_log表建一个Relational Cache: CACHE TABLE activity_log_syncREFRESH ON COMMITDISABLE REWRITEUSING JSONPARTITIONED BY (d_year)LOCATION "hdfs://192.168.1.36:9000/user/hive/data/activity_log"AS SELECT user_id, act_type, module_id, d_year FROM activity_logREFRESH ON COMMIT表示当源表数据发生更新时,自动更新cache数据。通过LOCATION可以指定cache的数据的存储地址,我们把cache的地址指向B集群的HDFS从而实现数据从集群A到集群B的同步。此外Cache的字段和Partition信息均与源表保持一致。 在集群B中,我们也创建一个activity_log表,创建语句如下: CREATE TABLE activity_log ( user_id STRING, act_type STRING, module_id INT, d_year INT)USING JSONPARTITIONED BY (d_year)LOCATION "hdfs:///user/hive/data/activity_log"执行MSCK REPAIR TABLE activity_log自动修复相关meta信息,然后执行查询语句,可以看到在集群B中,已经能够查到之前集群A的表中插入的两条数据。 ...

June 6, 2019 · 1 min · jiezi

蚂蚁金服面对亿级并发场景的组件体系设计

作者:吕丹(凝睇),2011 年加入支付宝,先后负责了支付宝 Wap、alipass 卡券、SYNC 数据同步等项目,并参与了多次双十一、双十二、春节红包大促活动,在客户端基础服务方面有一定的项目实践经验与积累。目前负责蚂蚁金服移动开发平台 mPaaS 服务端组件体系优化与架构设计。5 月 6 日,InfoQ 主办的 QCon 2019 全球软件开发大会在北京举行。蚂蚁金服技术专家吕丹(凝睇)在大会上做了《蚂蚁金服面对亿级并发场景的组件体系设计》的分享,我们根据演讲整理如下: 今天,我主要想和大家分享一下移动领域基础组件体系,内容大致可以分为四大块,第一块是标准移动研发所需的基础服务体系,第二块是支撑亿级并发的核心组件“移动接入”的架构演进过程,第三块是双十一、双十二、新春红包这种大促活动的的应付方法,最后一块是目前已经对外输出的基础服务产品。 0. 移动研发基础服务体系 首先介绍一下支付宝客户端的演进过程。之前,支付宝客户端的主要功能是转账、订单支付、交易查询等等,更像是一个工具类的 APP,在需要付钱的时候才会掏出来,用完了就放回去了。2013 年,蚂蚁金服 all in 无线之后,加入了很多服务,例如余额宝、卡券、探索发现等,基本是把支付宝网站上的功能都尽量迁移到客户端,支付宝也逐渐演化成一个平台级别的客户端。之后,随着移动互联网的快速发展,公司内部孵化出了更多的 APP,其他行业也在移动互联网圈内铺开了大量的业务,为了提升用户量、用户粘性,APP 之间也开始进行了大量的业务融合,超级 APP 也因此而诞生,APP 开始朝着生态化的模式发展。 截止到目前为止,支付宝客户端的年活跃用户数超过 8 亿,在大促场景下,同时在线量超过 3 亿,并发请求超过 1 亿,同时上线的用户数超过百万每秒。 而在这些数据的背后一定需要一套庞大、复杂、完整的支撑体系来支持支付宝的运作,移动研发基础服务体系就是其中的重要组成部分。 按照研发过程,我们把移动研发基础服务体系分成四大块:APP 研发阶段,主要包括 App 框架、基础组件、云端服务和研发工具;App 测试阶段 ,主要包括研发协作平台和真机测试平台,其中研发协作平台包含版本管理、迭代管理、安装包编译、构建和打包的能力,而真机测试主要是代替人工服务,减少人工消耗,提升测试效率; App 运维阶段 ,主要包括智能发布、日志回溯、应急管理和动态配置;App 运营阶段,主要包括舆情反馈、实时分析、离线计算和智能营销。 1. 蚂蚁移动接入架构演进 今天的主题为支撑亿级并发下的基础服务,而在亿级并发下移动接入又是最核心、最重要的一个环节。移动接入并不是单个系统,而是一整套组件的总称,包括:Spanner+ 连接管理、API 网关、PUSH 通知和 SYNC 数据同步,它是所有移动业务的流量入口,需要维持客户端的状态,支持进行统一的管控,同时还需要进行部分的业务数据处理。 其实,一开始并没有移动接入这个说法,与支付宝客户端的演进过程类似,后端移动接入也是逐步迭代演进的。最开始,各个业务服务都是自己提供 API 或者直接暴露能力给客户端,没有统一的架构,没有统一的模型,也没有统一的管控。 为了解决这个问题,在 all in 阶段我们引申出了一个 API 网关,由它来做集中式管理,同时添加了 PUSH 推送的能力。因为公司内部有很多 APP,我们希望这些能力能够复用,所以在架构上,我们支持多 APP 同构,客户端会提供多个 SDK,可以随时进行集成。 ...

May 21, 2019 · 2 min · jiezi

TiDB-Binlog-组件正式开源前排开坑走起

TiDB Binlog 组件用于收集 TiDB 的 binlog,并准实时同步给下游,如:TiDB/MySQL等。该组件在功能上类似于 MySQL 的主从复制,会收集各个 TiDB 实例产生的 binlog,并按事务提交的时间排序,全局有序的将数据同步至下游。利用 TiDB Binlog 可以实现数据准实时同步到其他数据库,以及 TiDB 数据准实时的备份与恢复。TiDB Binlog 作为 TiDB 的核心组件之一,已经在上百家用户的生产环境中长时间稳定运行。 为方便用户和开发者更加深入理解和使用 TiDB Binlog 组件,以及基于 TiDB Binlog 组件做二次开发用于更多的业务场景, 我们决定今天正式开源 TiDB Binlog 组件。 TiDB Binlog 适用的功能场景准实时数据同步:同步 TiDB 数据到其他数据库或消息队列(如TiDB/MySQL/MariaDB/Kafka)。准实时备份和恢复:增量备份 TiDB 集群数据到外部系统,利用备份的数据在系统故障或者其他场景时可将数据恢复到任意时间点。TiDB Binlog 架构 TiDB Binlog 核心特性支持类似 MySQL ROW 复制模式。准实时并按事务提交的时间顺序将数据同步至下游。分布式架构设计,支持水平弹性扩容和服务高可用。数据高可靠,系统实时将数据持久化到本地磁盘。支持多种输出方式,如下: 文件:系统准实时将 binlog 写入文件系统作为增量备份,利用此增量备份文件可将数据恢复到任意时间点。消息队列:按照 binlog slave protocol 输出到 Kafka。下游目标数据库:TiDB/MySQL/MariaDB。TiDB Binlog 代码及文档资源TiDB Binlog 源代码TiDB Binlog 使用手册深入理解 TiDB Binlog 组件实现原理定制输出方式或者输出到其他下游存储系统欢迎大家一起参与 TiDB Binlog 的设计、研发、测试共同推进 TiDB Binlog 走向更成熟,更稳定。近期我们将发布 TiDB Binlog 源码阅读指南,敬请期待。 ...

May 7, 2019 · 1 min · jiezi

Elasticsearch-查询和数据同步-记一次技术实践

前言前段时间与同事一起为产品接入了 Elasticsearch 框架技术。从参与方案会议到搭建开发上线过程中有很多讨论点,故产生本文,希望藉此总结和分享一些经验。 1. 业务模型接触已有的业务时,数据模型是最早需要知道的信息。我和同事负责接入 Elasticsearch 的产品是一个业务繁多的通讯录,简化下来就是 3 个关键的模型,如下: 部门(Department)人员(User)标签(Tag)它们的用途和联系,就跟它们的词义一样。由此产生的业务如下: 通过 标签 查询 部门、人员通过 部门 查询 人员 基于以上模型和业务,在典型的关系型数据库下,为了实现关联关系,自然会有额外的关联表: 部门人员关联表:每条记录包含1个部门,1个人员标签对象关联表:每条记录包含1个标签,1个部门或人员2. 需求Elasticsearch 的特点有全文检索、分布式、海量数据下近实时查询。当时为通讯录业务引入 Elasticsearch 的需求和目标如下: 多字段的匹配或模糊查询。这些部门、人员、标签数据原本存储在 MySQL 中,如果要做匹配多个字段的模糊查询就比较吃力了,考虑一个常用功能 “输入姓名/手机号/拼音/首字母来查询人员”。而快速查询此类业务是 Elasticsearch 可以提供的。基础模块能力。其他业务模块也提出了类似全文检索的需求,因此在通讯录业务首次应用 es 时,要定义和提供好 es 的访问和工具方法,供其他模块在未来接入时,能复用一些实现,能保持一致的接口和命名风格等。3. 索引设计从原 MySQL 数据库表,到 Elasticsearch 的索引,数据模型的变化称为异构。Elasticsearch 适合解决在 MySQL 中多条件或连表这样比较慢的查询业务,因此除了原有的信息字段,我们会再附加 3 个模型的关联关系到 es 索引中。 索引 字段原有关联关系部门部门名<br/>完整部门路径名(无)人员姓名<br/>拼音<br/>首字母<br/>手机号父部门Id<br/>所有父级部门Id<br/>标签Id标签标签名部门Id<br/>人员Id(上表略去了一些无关本篇内容的字段,如 SaaS 平台的租户Id、每个对象的信息详情字段) 是否需要添加关联关系的字段,是由业务需求决定的。拿人员索引的 “所有父级部门Id” 举例子,因为有查询部门下所有人员(包括直属、子部门下的)的业务需求,所以会设计这么一个字段。可以使用 Elasticsearch 的分词功能来记录关联关系的字段中。为该字段定义一个分隔模式为竖线 “|” 的分词器,把若干个关联Id存成一个拼接的字符串。4. 版本选择同事是个版本控,在选择版本时了解和考虑了非常多的信息。不过版本选择确实是为平台接入新技术时的一个重要考虑点。我们提出这个方案的当时(2018年4月),对比了主要使用的云服务提供商的几个版本,考虑项可以按优先级概括为: 稳定的案例资料多的时新程度,包括 Elasticsearch版本 和 Lucene版本我们已经使用了某家云服务提供商,会偏向再用其提供的服务几个版本对比我们当时选择了 Elasticsearch 6.2.2 版本。 v5.6.4是 Spring 整合的各个框架中,支持数最多的版本市面使用人数较多,资料较多其依赖的 Lucene 大版本是v6,较旧v6.2.2是当时稳定的版本中最新的,性能比 v5 好v6.2.4是当时最新的版本,修复了许多 bug性能更好,是官方推荐的版本官方的技术文档部分还没更新,得看旧文档市面上找不到相应的人的使用资料版本发展(于2019年4月)在写本篇文章时,我再去了解了和 Elasticsearch版本 相关的变更: ...

April 28, 2019 · 1 min · jiezi

DM-源码阅读系列文章四dumpload-全量同步的实现

作者:杨非 本文为 DM 源码阅读系列文章的第四篇,上篇文章 介绍了数据同步处理单元实现的功能,数据同步流程的运行逻辑以及数据同步处理单元的 interface 设计。本篇文章在此基础上展开,详细介绍 dump 和 load 两个数据同步处理单元的设计实现,重点关注数据同步处理单元 interface 的实现,数据导入并发模型的设计,以及导入任务在暂停或出现异常后如何恢复。 dump 处理单元dump 处理单元的代码位于 github.com/pingcap/dm/mydumper 包内,作用是从上游 MySQL 将表结构和数据导出到逻辑 SQL 文件,由于该处理单元总是运行在任务的第一个阶段(full 模式和 all 模式),该处理单元每次运行不依赖于其他处理单元的处理结果。另一方面,如果在 dump 运行过程中被强制终止(例如在 dmctl 中执行 pause-task 或者 stop-task),也不会记录已经 dump 数据的 checkpoint 等信息。不记录 checkpoint 是因为每次运行 mydumper 从上游导出数据,上游的数据都可能发生变更,为了能得到一致的数据和 metadata 信息,每次恢复任务或重新运行任务时该处理单元会 清理旧的数据目录,重新开始一次完整的数据 dump。 导出表结构和数据的逻辑并不是在 DM 内部直接实现,而是 通过 os/exec 包调用外部 mydumper 二进制文件 来完成。在 mydumper 内部,我们需要关注以下几个问题: 数据导出时的并发模型是如何实现的。no-locks, lock-all-tables, less-locking 等参数有怎样的功能。库表黑白名单的实现方式。mydumper 的实现细节mydumper 的一次完整的运行流程从主线程开始,主线程按照以下步骤执行: 解析参数。创建到数据库的连接。会根据 no-locks 选项进行一系列的备份安全策略,包括 long query guard 和 lock all tables or FLUSH TABLES WITH READ LOCK。START TRANSACTION WITH CONSISTENT SNAPSHOT。记录 binlog 位点信息。less locking 处理线程的初始化。普通导出线程初始化。如果配置了 trx-consistency-only 选项,执行 UNLOCK TABLES /* trx-only */ 释放之前获取的表锁。注意,如果开启该选项,是无法保证非 InnoDB 表导出数据的一致性。更多关于一致性读的细节可以参考 MySQL 官方文档 Consistent Nonlocking Reads 部分。根据配置规则(包括 --database, --tables-list 和 --regex 配置)读取需要导出的 schema 和表信息,并在这个过程中有区分的记录 innodb_tables 和 non_innodb_table。为工作子线程创建任务,并将任务 push 到相关的工作队列。如果没有配置 no-locks 和 trx-consistency-only 选项,执行 UNLOCK TABLES / FTWRL / 释放锁。如果开启 less-locking,等待所有 less locking 子线程退出。等待所有工作子线程退出。工作线程的并发控制包括了两个层面,一层是在不同表级别的并发,另一层是同一张表级别的并发。mydumper 的主线程会将一次同步任务拆分为多个同步子任务,并将每个子任务分发给同一个异步队列 conf.queue_less_locking/conf.queue,工作子线程从队列中获取任务并执行。具体的子任务划分包括以下策略: ...

April 28, 2019 · 3 min · jiezi

DM 源码阅读系列文章(三)数据同步处理单元介绍

作者:lan本文为 DM 源码阅读系列文章的第三篇,上篇文章 介绍了 DM 的整体架构,DM 组件 DM-master 和 DM-worker 的入口代码,以及两者之间的数据交互模型。本篇文章详细地介绍 DM 数据同步处理单元(DM-worker 内部用来同步数据的逻辑单元),包括数据同步处理单元实现了什么功能,数据同步流程、运行逻辑,以及数据同步处理单元的 interface 设计。数据同步处理单元从上图可以了解到目前 DM 包含 relay log、dump、load、binlog replication(sync) 4 个数据同步处理单元,涵盖了以下数据同步处理的功能:处理单元功能relay log持久化 MySQL/MariaDB Binlog 到磁盘dump从 MySQL/MariaDB dump 全量数据load加载全量数据到 TiDB clusterbinlog replication(sync)复制 relay log 存储的 Binlog 到 TiDB cluster数据同步流程Task 数据同步流程初始化操作步骤:DM-master 接收到 task,将 task 拆分成 subtask 后 分发给对应的各个 DM-worker;DM-worker 接收到 subtask 后 创建一个 subtask 对象,然后 初始化数据同步流程。从 初始化数据同步流程 的代码中我们可以看到,根据 task 配置项 task-mode 的不同,DM-worker 会初始化不同的数据同步流程:task-mode同步流程需要的数据同步处理单元all全量同步 -> 增量数据同步relay log、dump、load、binlog replication(sync)full全量同步dump、loadincremental增量同步relay log,binlog replication(sync)运行逻辑DM 数据同步处理单元 interface 定义在 dm/unit,relay log、dump、load、binlog replication(sync)都实现了该 interface(golang interface 介绍)。实际上 DM-worker 中的数据同步处理单元分为两类:全局共享单例。dm-worker 启动的时候只初始化一次这类数据同步处理单元,所有的 subtask 都可以使用这类数据同步处理单元的服务;relay log 属于这种类型。subtask 独享。dm-worker 会为每个 subtask 初始化一系列的数据同步处理单元;dump、load、binlog replication(sync)属于这种类型。两类数据同步处理单元的使用逻辑不同,这篇文档会着重讲一下 subtask 独享的数据同步处理单元的使用逻辑,不会囊括更多的 relay log 相关的内容,后面会有单独一篇文章详细介绍它。relay log 相关使用代码在 dm/worker/relay.go 、具体功能实现代码在 relay/relay.go,有兴趣的同学也可以先行阅读一下相关代码,relay log 的代码注释也是比较丰富,并且简单易懂。subtask 独享数据同步处理单元使用逻辑相关代码在 dm/worker/subtask.go。subtask 对象包含的主要属性有:units:初始化后要运行的数据同步处理单元。currUnit:当前正在运行的数据同步处理单元。prevUnit:上一个运行的数据同步处理单元。stage:subtask 的运行阶段状态, 包含 New、Running、Paused,Stopped,Finished,具体定义的代码在 dm/proto/dmworker.proto。result:subtask 当前数据同步处理单元的运行结果,对应着 stage = Paused/Stopped/Finished 的详细信息。主要的逻辑有:初始化 subtask 对象实例的时候会 编排数据同步处理单元的运行先后顺序。所有的数据同步处理单元都实现了 dm/unit interface,所以接下来的运行中就不需要关心具体的数据同步处理单元的类型,可以按照统一的 interface 方法来运行数据同步处理单元,以及对其进行状态监控。初始化各个数据同步处理单元。subtask 在运行前集中地初始化所有的数据同步处理单元,我们计划之后优化成在各个数据同步处理单元运行前再进行初始化,这样子减少资源的提前或者无效的占用。数据同步处理单元运行状态监控。通过监控当前运行的数据同步处理单元的结果,将 subtask 的 stage 设置为 Paused/Stopped/Finished。如果 当前的数据同步处理单元工作已经完成,则会根据 units 来 选取下一个需要运行的数据同步处理单元,如果没有需要的数据同步处理单元,那么会将 subtask 的 stage 设置为 Finished。这里有个注意点,因为 binlog replication 单元永远不会结束,所以不会进入 Finished 的状态。如果 返回的 result 里面包含有错误信息,则会将 subtask 的 stage 设置为 Paused,并且打印具体的错误信息。如果是用户手动暂停或者停止,则会将 subtask 的 stage 设置为 Paused/Stopped。这里有个注意点,这个时候 stage=Paused 是没有错误信息的。数据同步处理单元之间的运行交接处理逻辑。部分数据同步处理单元在开始工作的时候需要满足一些前置条件,例如 binlog replication(sync)的运行需要等待 relay log 处理单元已经储存下来其开始同步需要的 binlog 文件,否则 subtask 将处于 stage=Paused 的暂停等待状态。小结本篇文章主要介绍了数据同步处理单元实现了什么功能,数据同步流程、运行逻辑,以及数据同步处理单元的 interface 设计。后续会分三篇文章详细地介绍数据同步处理单元的实现,包括:dump/load 全量同步实现binlog replication 增量同步实现relay log 实现 ...

April 11, 2019 · 1 min · jiezi

通过DataWorks数据集成归档日志服务数据至MaxCompute进行离线分析

通过DataWorks归档日志服务数据至MaxCompute官方指导文档:https://help.aliyun.com/document_detail/68322.html但是会遇到大家在分区上或者DataWorks调度参数配置问题,具体拿到真实的case模拟如下:创建数据源:步骤1、进入数据集成,点击作业数据源,进入Tab页面。步骤2、 点击右上角新增数据源,选择消息队列 loghub。步骤3、编辑LogHub数据源中的必填项,包括数据源名称、LogHubEndpoint、Project、AK信息等,并点击 测试连通性。创建目标表:步骤1、在左侧tab也中找到临时查询,并右键>新建ODPS SQL节点。步骤2、编写建表DDL。步骤3、点击执行 按钮进行创建目标表,分别为ods_client_operation_log、ods_vedio_server_log、ods_web_tracking_log。步骤4、直到日志打印成本,表示三条DDL语句执行完毕。步骤5、可以通过desc 查看创建的表。其他两张表也可以通过desc 进行查询。确认数据表的存在情况。创建数据同步任务数据源端以及在DataWorks中的数据源连通性都已经配置好,接下来就可以通过数据同步任务进行采集数据到MaxCompute上。操作步骤步骤1、点击新建业务流程 并 确认提交,名称为 直播日志采集。步骤2、在业务流程开发面板中依次创建如下依赖并命名。依次配置数据同步任务节点配置:web_tracking_log_syn、client_operation_log_syn、vedio_server_log_syn。步骤3、双击web_tracking_log_syn 进入节点配置,配置项包括数据源(数据来源和数据去向)、字段映射(源头表和目标表)、通道控制。根据采集的时间窗口自定义参数为:步骤4、可以点击高级运行进行测试。可以分别手工收入自定义参数值进行测试。步骤5、使用SQL脚本确认是否数据已经写进来。如下图所示:日志服务的日志正式的被采集入库,接下来就可以进行数据加工。比如可以通过上述来统计热门房间、地域分布和卡顿率,如下所示:具体SQL逻辑不在这里展开,可以根据具体业务需求来统计分析。依赖关系配置如上图所示。本文作者:祎休阅读原文本文为云栖社区原创内容,未经允许不得转载。

April 2, 2019 · 1 min · jiezi

DM 源码阅读系列文章(一)序

作者:杨非前言TiDB-DM 是由 PingCAP 开发的一体化数据同步任务管理平台,支持从 MySQL 或 MariaDB 到 TiDB 的全量数据迁移和增量数据同步,在 TiDB DevCon 2019 正式开源。作为一款连接 MySQL/MariaDB 生态和 TiDB 生态的中台类型产品,DM 获得了广泛的关注,很多公司、开发者和社区的伙伴已经在使用 DM 来进行数据迁移和管理。随着大家使用的广泛和深入,遇到了不少由于对 DM 原理不理解而错误使用的情况,也发现了一些 DM 支持并不完善的场景和很多可以改进的地方。在这样的背景下,我们希望开展 DM 源码阅读分享活动,通过对 DM 代码的分析和设计原理的解读,帮助大家理解 DM 的实现原理,和大家进行更深入的交流,也有助于我们和社区共同进行 DM 的设计、开发和测试。背景知识本系列文章会聚焦 DM 自身,读者需要有一些基本的知识,包括但不限于:Go 语言,DM 由 Go 语言实现,有一定的 Go 语言基础有助于快速理解代码。数据库基础知识,包括 MySQL、TiDB 的功能、配置和使用等;知道基本的 DDL、DML 语句和事务的基本常识;MySQL 数据备份、主从同步的原理等。基本的后端服务知识,比如后台服务进程管理、RPC 工作原理等。总体而言,读者需要有一定 MySQL/TiDB 的使用经验,了解 MySQL 数据备份和主从同步的原理,以及可以读懂 Go 语言程序。在阅读 DM 源码之前,可以先从阅读《TiDB-DM 架构设计与实现原理》入手,并且参考 使用文档 在本地搭建一个 DM 的测试环境,从基础原理和使用对 DM 有一个初步的认识,然后再进一步分析源码,深入理解代码的设计和实现。内容概要源码阅读系列将会从两条线进行展开,一条是围绕 DM 的系统架构和重要模块进行分析,另一条线围绕 DM 内部的同步机制展开分析。源码阅读不仅是对代码实现的分析,更重要的是深入的分析背后的设计思想,源码阅读和原理分析的覆盖范围包括但不限于以下列出的内容(因为目前 DM 仍处于快速迭代的阶段,会有新的功能和模块产生,部分模块在未来也会进行优化和重构,后续源码阅读的内容会随着 DM 的功能演进做适当的调整):整体架构介绍,包括 DM 有哪些模块,分别实现什么功能,模块之间交互的数据模型和 RPC 实现。DM-worker 内部组件设计原理(relay-unit, dump-unit, load-unit, sync-unit)和数据同步的并发模型设计与实现。基于 binlog 的数据同步模型设计和实现。relay log 的原理和实现。定制化数据同步功能的实现原理(包括库表路由,库表黑白名单,binlog event 过滤,列值转换)。DM 如何支持上游 online DDL 工具(pt-osc, gh-ost)的 DDL 同步场景。sharding DDL 处理的具体实现。checkpoint 的设计原理和实现,深入介绍 DM 如何在各类异常情况下保证上下游数据同步的一致性。DM 测试的架构和实现。代码简介DM 源代码完全托管在 GitHub 上,从 项目主页 可以看到所有信息,整个项目使用 Go 语言开发,按照功能划分了很多 package,下表列出了 DM 每个 package 的基本功能:PackageIntroductionchecker同步任务上下游数据库配置、权限前置检查模块cmd/dm-ctl, cmd/dm-master, cmd/dm-workerdmctl, DM-master, DM-worker 的 main 文件所在模块dm/config同步任务配置、子任务配置、前置检查配置定义模块dm/ctldmctl 所有 RPC 调用实现的模块dm/masterDM-master 的核心实现,包含了 DM-master 后台服务,对 dmctl 到 DM-master 的 RPC 调用的处理逻辑,对 DM-worker 的管理,对 sharding DDL 进行协调调度等功能dm/pb, dm/protodm/proto 定义了 DM-master 和 DM-worker 相关交互的 protobuf 协议,dm/pb 是对应的生成代码dm/unit定义了子任务执行的逻辑单元(包括 dump unit, load unit, sync unit, relay unit)接口,在每个不同逻辑单元对应的 package 内都有对应的 接口实现dm/workerDM-worker 的核心实现,实现 DM-worker 后台服务,管理维护每个任务的 relay 逻辑单元,管理、调度每个子任务的逻辑单元loader子任务 load 逻辑单元的实现,用于全量数据的导入mydumper子任务 dump 逻辑单元的实现,用于全量数据的导出pkg包含了一些基础功能的实现,例如 gtid 操作、SQL parser 封装、binlog 文件流读写封装等relay处理 relay log 同步的核心模块syncer子任务 sync 逻辑单元的实现,用于增量数据的同步对于理解代码最直接的手段就是从 DM-server, DM-worker 和 dmctl 三个 binary 对应的 main 文件入手,看 DM-worker, DM-master 是如何启动,DM-worker 如何管理一个上游实例和同步任务;如何从 dmctl 开始同步子任务;然后看一个同步子任务从全量状态,到增量同步状态,binlog 如何处理、sql 任务如何分发等。通过这样一个流程对 DM 的整体架构就会有全面的理解。进一步就可以针对每个使用细节去了解 DM 背后的设计逻辑和代码实现,可以从具体每个 package 入手,也可以从感兴趣的功能入手。实际上 DM 代码中使用了很多优秀的第三方开源代码,包括但不仅限于:借助 grpc 实现各组件之间的 RPC 通信借助 pingcap/parser 进行 DDL 的语法解析和语句还原借助 pingcap/tidb-tools 提供的工具实现复杂的数据同步定制借助 go-mysql 解析 MySQL/MariaDB binlog 等在源码阅读过程中对于比较重要的、与实现原理有很高相关度的第三方模块,我们会进行相应的扩展阅读。工具链工欲善其事,必先利其器,在阅读 DM 源码之前,我们先来介绍 DM 项目使用到的一些外部工具,这些工具通常用于 DM 的构建、部署、运行和测试,在逐步使用 DM,阅读代码、理解原理的过程中都会使用到这些工具。golang 工具链:构建 DM 需要 go >= 1.11.4,目前支持 Linux 和 MacOS 环境。gogoprotobuf:用于从 proto 描述文件生成 protobuf 代码,DM 代码仓库的 generate-dm.sh 文件封装了自动生成 DM 内部 protobuf 代码的脚本。Ansible:DM 封装了 DM-Ansible 脚本用于 DM 集群的自动化部署,部署流程可以参考 使用 ansible 部署 DM。pt-osc, gh-ost:用于上游 MySQL 进行 online-ddl 的同步场景。mydumper:DM 的全量数据 dump 阶段直接使用 mydumper 的 binary。MySQL, TiDB, sync_diff_inspector:这些主要用于单元测试和集成测试,可以参考 tests#preparations 这部分描述。小结本篇文章主要介绍了 DM 源码阅读的目的和源码阅读的规划,简单介绍了 DM 的源码结构和工具链。下一篇文章我们会从 DM 的整体架构入手,详细分析 DM-master、DM-worker 和 dmctl 三个组件服务逻辑的实现和功能抽象,RPC 数据模型和交互接口。更多的代码阅读内容会在后面的章节中逐步展开,敬请期待。 ...

March 20, 2019 · 2 min · jiezi

JSON数据从MongoDB迁移到MaxCompute最佳实践

摘要: 本文为您介绍如何利用DataWorks数据集成直接从MongoDB提取JSON字段到MaxCompute。数据及账号准备首先您需要将数据上传至您的MongoDB数据库。本例中使用阿里云的云数据库 MongoDB 版,网络类型为VPC(需申请公网地址,否则无法与DataWorks默认资源组互通),测试数据如下。{ “store”: { “book”: [ { “category”: “reference”, “author”: “Nigel Rees”, “title”: “Sayings of the Century”, “price”: 8.95 }, { “category”: “fiction”, “author”: “Evelyn Waugh”, “title”: “Sword of Honour”, “price”: 12.99 }, { “category”: “fiction”, “author”: “J. R. R. Tolkien”, “title”: “The Lord of the Rings”, “isbn”: “0-395-19395-8”, “price”: 22.99 } ], “bicycle”: { “color”: “red”, “price”: 19.95 } }, “expensive”: 10}登录MongoDB的DMS控制台,本例中使用的数据库为 admin,集合为 userlog,您可以在查询窗口使用db.userlog.find().limit(10)命令查看已上传好的数据,如下图所示。 此外,需提前在数据库内新建用户,用于DataWorks添加数据源。本例中使用命令db.createUser({user:“bookuser”,pwd:“123456”,roles:[“root”]}),新建用户名为 bookuser,密码为 123456,权限为root。使用DataWorks提取数据到MaxCompute新增MongoDB数据源进入DataWorks数据集成控制台,新增MongoDB类型数据源。 具体参数如下所示,测试数据源连通性通过即可点击完成。由于本文中MongoDB处于VPC环境下,因此 数据源类型需选择 有公网IP。 访问地址及端口号可通过在MongoDB管理控制台点击实例名称获取,如下图所示。 新建数据同步任务在DataWorks上新建数据同步类型节点。 新建的同时,在DataWorks新建一个建表任务,用于存放JSON数据,本例中新建表名为mqdata。 表参数可以通过图形化界面完成。本例中mqdata表仅有一列,类型为string,列名为MQ data。 完成上述新建后,您可以在图形化界面进行数据同步任务参数的初步配置,如下图所示。选择目标数据源名称为odps_first,选择目标表为刚建立的mqdata。数据来源类型为MongoDB,选择我们刚创建的数据源mongodb_userlog。完成上述配置后, 点击转换为脚本,跳转到脚本模式。 脚本模式代码示例如下。{ “type”: “job”, “steps”: [ { “stepType”: “mongodb”, “parameter”: { “datasource”: “mongodb_userlog”, //数据源名称 “column”: [ { “name”: “store.bicycle.color”, //JSON字段路径,本例中提取color值 “type”: “document.document.string” //本栏目的字段数需和name一致。假如您选取的JSON字段为一级字段,如本例中的expensive,则直接填写string即可。 } ], “collectionName //集合名称”: “userlog” }, “name”: “Reader”, “category”: “reader” }, { “stepType”: “odps”, “parameter”: { “partition”: “”, “isCompress”: false, “truncate”: true, “datasource”: “odps_first”, “column”: [ //MaxCompute表列名 “mqdata” ], “emptyAsNull”: false, “table”: “mqdata” }, “name”: “Writer”, “category”: “writer” } ], “version”: “2.0”, “order”: { “hops”: [ { “from”: “Reader”, “to”: “Writer” } ] }, “setting”: { “errorLimit”: { “record”: "" }, “speed”: { “concurrent”: 2, “throttle”: false, “dmu”: 1 } }}完成上述配置后,点击运行接即可。运行成功日志示例如下所示。 结果验证在您的业务流程中新建一个ODPS SQL节点。 您可以输入 SELECT * from mqdata;语句,查看当前mqdata表中数据。当然这一步您也可以直接在MaxCompute客户端中输入命令运行。 本文作者:付帅阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

November 28, 2018 · 1 min · jiezi

JSON数据从OSS迁移到MaxCompute最佳实践

本文为您介绍如何利用DataWorks数据集成将JSON数据从OSS迁移到MaxCompute,并使用MaxCompute内置字符串函数GET_JSON_OBJECT提取JSON信息。数据上传OSS将您的JSON文件重命名后缀为TXT文件,并上传到OSS。本文中使用的JSON文件示例如下。{ “store”: { “book”: [ { “category”: “reference”, “author”: “Nigel Rees”, “title”: “Sayings of the Century”, “price”: 8.95 }, { “category”: “fiction”, “author”: “Evelyn Waugh”, “title”: “Sword of Honour”, “price”: 12.99 }, { “category”: “fiction”, “author”: “J. R. R. Tolkien”, “title”: “The Lord of the Rings”, “isbn”: “0-395-19395-8”, “price”: 22.99 } ], “bicycle”: { “color”: “red”, “price”: 19.95 } }, “expensive”: 10}将applog.txt文件上传到OSS,本文中OSS Bucket位于华东2区。 使用DataWorks导入数据到MaxCompute新增OSS数据源进入DataWorks数据集成控制台,新增OSS类型数据源。 具体参数如下所示,测试数据源连通性通过即可点击完成。Endpoint地址请参见OSS各区域的外网、内网地址,本例中为http://oss-cn-shanghai.aliyun… http://oss-cn-shanghai-internal.aliyuncs.com(由于本文中OSS和DataWorks项目处于同一个region中,本文选用后者,通过内网连接)。 新建数据同步任务在DataWorks上新建数据同步类型节点。 新建的同时,在DataWorks新建一个建表任务,用于存放JSON数据,本例中新建表名为mqdata。 表参数可以通过图形化界面完成。本例中mqdata表仅有一列,类型为string,列名为MQ data。 完成上述新建后,您可以在图形化界面配置数据同步任务参数,如下图所示。选择目标数据源名称为odps_first,选择目标表为刚建立的mqdata。数据来源类型为OSS,Object前缀可填写文件路径及名称。列分隔符使用TXT文件中不存在的字符即可,本文中使用 ^(对于OSS中的TXT格式数据源,Dataworks支持多字符分隔符,所以您可以使用例如 %&%#^$$^%这样很难出现的字符作为列分隔符,保证分割为一列)。 映射方式选择默认的同行映射即可。 点击左上方的切换脚本按钮,切换为脚本模式。修改fileFormat参数为: “fileFormat”:“binary”。该步骤可以保证OSS中的JSON文件同步到MaxCompute之后存在同一行数据中,即为一个字段。其他参数保持不变,脚本模式代码示例如下。{ “type”: “job”, “steps”: [ { “stepType”: “oss”, “parameter”: { “fieldDelimiterOrigin”: “^”, “nullFormat”: “”, “compress”: “”, “datasource”: “OSS_userlog”, “column”: [ { “name”: 0, “type”: “string”, “index”: 0 } ], “skipHeader”: “false”, “encoding”: “UTF-8”, “fieldDelimiter”: “^”, “fileFormat”: “binary”, “object”: [ “applog.txt” ] }, “name”: “Reader”, “category”: “reader” }, { “stepType”: “odps”, “parameter”: { “partition”: “”, “isCompress”: false, “truncate”: true, “datasource”: “odps_first”, “column”: [ “mqdata” ], “emptyAsNull”: false, “table”: “mqdata” }, “name”: “Writer”, “category”: “writer” } ], “version”: “2.0”, “order”: { “hops”: [ { “from”: “Reader”, “to”: “Writer” } ] }, “setting”: { “errorLimit”: { “record”: "" }, “speed”: { “concurrent”: 2, “throttle”: false, “dmu”: 1 } }}完成上述配置后,点击运行接即可。运行成功日志示例如下所示。 获取JSON字段信息在您的业务流程中新建一个ODPS SQL节点。 您可以首先输入 SELECT*from mqdata;语句,查看当前mqdata表中数据。当然这一步及后续步骤,您也可以直接在MaxCompute客户端中输入命令运行。 确认导入表中的数据结果无误后,您可以使用MaxCompute内建字符串函数GET_JSON_OBJECT获取您想要的JSON数据。本例中使用 SELECT GET_JSON_OBJECT(mqdata.MQdata,’$.expensive’) FROM mqdata;获取JSON文件中的 expensive值。如下图所示,可以看到已成功获取数据。 本文作者:付帅阅读原文本文为云栖社区原创内容,未经允许不得转载。 ...

November 15, 2018 · 1 min · jiezi