乐趣区

关于大数据:数栈技术分享OTS数据迁移我们不生产数据我们是大数据的搬运工

数栈是云原生—站式数据中台 PaaS,咱们在 github 和 gitee 上有一个乏味的开源我的项目:FlinkX,FlinkX 是一个基于 Flink 的批流对立的数据同步工具,既能够采集动态的数据,也能够采集实时变动的数据,是全域、异构、批流一体的数据同步引擎。大家喜爱的话请给咱们点个 star!star!star!

github 开源我的项目:https://github.com/DTStack/fl…

gitee 开源我的项目:https://gitee.com/dtstack_dev…

「表格存储」是 NoSQL 的数据存储服务,是基于云计算技术构建的一个分布式结构化和半结构化数据的存储和治理服务。

表格存储的数据模型以「二维表」为核心。

表有行和列的概念,然而与传统数据库不一样,表格存储的表是稠密的

每一行能够有不同的列,能够动静减少或者缩小属性列,建表时不须要为表的属性列定义严格的 schema。
一、概述

OTS 的数据迁徙能够应用「DataX」实现全量数据迁徙。但因为局部数据表的数据量较大,无奈在指定的工夫窗口内实现全量迁徙,且目前 DataX 只能针对主键值进行范畴查问,暂不反对依照属性列范畴抽取数据。

所以能够按如下两种形式实现全量 + 增量的数据迁徙:

  • 分区键蕴含范畴信息(如工夫信息、自增 ID),则以指定 range 为切分点,分批次迁徙。
  • 分区键不蕴含范畴信息,则能够采纳在利用侧双写的模式将数据分批次迁徙,写入指标环境同一张业务表。利用 OTS 的主键唯一性,抉择对反复数据执行笼罩原有行的策略来保证数据唯一性。

总而言之,言而总之,咱们不生产数据,此刻,咱们是大数据的搬运工。

接下来呢,本文就以利用侧调整为双写模式为例,具体阐明 OTS 数据迁徙、校验过程。

其中 OTS 数据迁徙流程具体如下图所示:

OTS 数据迁徙之筹备工作

  • 预迁徙阶段:双写模式中的大表全量迁徙
  • 正式迁徙阶段:双写模式中的增量表全量迁徙、其余小表的全量迁徙

二、预迁徙阶段

1、筹备工作

为保障新老环境的数据一致性,须要在开始数据迁徙前,对指标环境的 OTS 数据表进行数据清空操作,Delete 操作是通过 DataX 工具间接删除表内数据,无需从新建表。

具体操作如下:

1)配置 DataX 工作

在应用 DataX 执行数据清空前,需配置对应数据表应用 DataX 执行 Delete 工作所需的 json 文件。在清空数据的配置中,reader 与 writer 均配置指标端的连贯信息,且数据写入模式配置 DeleteRow 即可, 具体内容如下:

{

"job": {
    "setting": {
        "speed": {"channel": "5"}
    },
    "content": [{
        "reader": {
            "name": "otsreader",
            "parameter": {
                "endpoint": "http://xxx.vpc.ots.yyy.com/",
                "accessId": "dest_accessId",
                "accessKey": "dest_accessKey",
                "instanceName": "dest_instanceName",
                "table": "tablename",
                "column": [{"name": "xxxxx"},
                    {"name": "xxxxx"}
                ],
                "range": {
                    "begin": [{"type": "INF_MIN"}],
                    "end": [{"type": "INF_MAX"}]
                }
            }
        },
        "writer": {
            "name": "otswriter",
            "parameter": {
                "endpoint": "http://xxx.vpc.ots.yun.yyy.com/",
                "accessId": "dest_accessId",
                "accessKey": "dest_accessKey",
                "instanceName": "dest_instanceName",
                "table": "tablename",
                "primaryKey": [{
                    "name": "xxxxx",
                    "type": "string"
                }],
                "column": [{
                        "name": "xxxxx",
                        "type": "string"
                    },
                    {
                        "name": "xxxxx",
                        "type": "string"
                    }
                ],
                "writeMode": "DeleteRow"
            }
        }
    }]
}

}

2)执行 datax 工作

  • 登录 datax 所在 ECS 后,进入 datax 所在门路
  • 在对应的工具机别离执行 del_pre.sh 脚本,即可开始指标环境对应表的数据清空,具体命令如下:

sh del_pre.sh

  • del_pre.sh 脚本内容如下:

!/bin/bash
nohup python datax.py del_table_1.json –jvm=”-Xms16G -Xmx16G” > del_table_1.log &

2、数据迁徙

在不停服务的状况下把源环境内数据量较大的数据表全副迁徙到指标环境内对应的数据表。

1)配置 DataX 工作

在 DataX 对数据表配置相应的 json 文件,迁徙配置的具体内容如下:

{

"job": {
    "setting": {
        "speed": {"channel": "5"}
    },
    "content": [{
        "reader": {
            "name": "otsreader",
            "parameter": {
                "endpoint": "http://xxx.vpc.ots.yyy.com/",
                "accessId": "src_accessId",
                "accessKey": "src_ accessKey",
                "instanceName": "src_instanceName",
                "table": "tablename",
                "column": [{"name": "xxxxx"},
                    {"name": "xxxxx"}
                ],
                "range": {
                    "begin": [{"type": "INF_MIN"}],
                    "end": [{"type": "INF_MAX"}]
                }
            }
        },
        "writer": {
            "name": "otswriter",
            "parameter": {
                "endpoint": "http://xxx.vpc.ots.yun.zzz.com/",
                "accessId": "dest_accessId",
                "accessKey": "dest_accessKey",
                "instanceName": "dest_instanceName",
                "table": "tablename",
                "primaryKey": [{
                    "name": "xxxxx",
                    "type": "string"
                }],
                "column": [{
                        "name": "xxxxx",
                        "type": "string"
                    },
                    {
                        "name": "xxxxx",
                        "type": "string"
                    }
                ],
                "writeMode": "PutRow"
            }
        }
    }]
}

}

需注意,因为 OTS 自身是 NoSQL 零碎,在迁徙数据的配置中,必须配置所有的属性列,否则会缺失对应属性列的值。

2)执行 datax 工作

  • 登录 datax 所在 ECS 后,进入 datax 所在门路
  • 在对应的工具机别离执行 pre_transfer.sh 脚本,即可开始专有域 OTS 到专有云 OTS 的数据迁徙,具体命令如下:

sh pre_transfer.sh

  • pre_transfer.sh 脚本内容如下:

!/bin/bash
nohup python datax.py table_1.json –jvm=”-Xms16G -Xmx16G” >table_1.log &

呐,此时,万事俱备,数据只待迁徙!

在迁徙之前,让咱们最初再对焦一下数据迁徙的指标:

上面,进入正式迁徙阶段!
三、正式迁徙阶段

1、OTS 数据静默

OTS 的数据静默次要是通过观察对应表的数据是否存在变动来判断,校验形式次要包含行数统计、内容统计。

1)行数统计

因 OTS 自身不提供 count 接口,所以采纳在 hive 创立 OTS 内部表的形式,读取 OTS 数据并计算对应数据表的行数,具体操作如下:

  • 创立内部表
  • 启动 hive,创立上述数据表对应的内部表;为进步统计效率,内部表能够只读取 OTS 的主键列,建表语句示例如下:

CREATE EXTERNAL TABLE t_oilcard_expenses_old
(h_card_no string)
STORED BY ‘com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler’
WITH SERDEPROPERTIES(
“tablestore.columns.mapping”=”card_no”)
TBLPROPERTIES (“tablestore.endpoint”=”$endpoint “,”tablestore.instance”=”instanceName”,”tablestore.access_key_id”=”ak”,”tablestore.access_key_secret”=”sk”,”tablestore.table.name”=”tableName”);

  • 进入脚本所在门路
  • 登录 Hadoop 集群 master 所在 ECS,进入 hive 所在目录
  • 执行行数统计
  • 执行 pre_all_count.sh 脚本,即可开始源环境内 OTS 对应表的行数统计

nohup sh pre_all_count.sh >pre_all_count.log &

  • pre_all_count.sh 脚本内容如下:

!/bin/bash
./bin/hive -e “use ots;select count(h_card_no) from tableName;” >table.rs &

间断执行两次行数统计,若两次统计后果统一则阐明数据曾经静默,数据写入以进行。

2)内容统计

因为局部数据表分区键对应的值比拟繁多,导致数据全副存储在同一个分区。若采纳 hive 统计行数会耗时太久,所以对于这个表应用 datax 将 OTS 数据导入 oss 的形式进行内容统计,具体操作如下:

  • 进入脚本所在门路
  • 登录上述表格对应的 ECS,进入 datax 所在门路;
  • 执行内容校验
    a、执行 check_table.sh 脚本,即可将源环境内 OTS 数据表导出到 OSS;

sh check_table.sh

  • check_table.sh 脚本内容如下:

!/bin/bash
nohup python datax.py table.json –jvm=”-Xms8G -Xmx8G”>ots2oss01.log &

b、获取 OSS object 的 ETAG 值,写入对应文件 table_check01.rs
间断执行两次内容统计,比照两次导出 object 的 ETAG 值,若后果统一则阐明数据曾经静默,数据写入以进行。

2、OTS 数据迁徙

1)筹备工作

为保障迁徙后新老环境数据统一,避免指标环境因测试产生遗留脏数据,在进行数据迁徙前,须要将指标环境的 OTS 的其余全量表进行数据清空。

「数据清空形式」次要有 Drop、Delete,两者的区别如下:

a、Drop 表操作

登录 OTS 图形化客户端所在工具机,应用如下信息连贯指定 OTS 实例,并进行对应表的 drop 操作;

AK: dest_accessId
SK: dest_accessKey
InstanceName: InstanceName
Endpoint:endpoint

确认删除后,再在客户端从新创立对应的数据。

b、Delete 表操作

Delete 操作是通过 DataX 工具间接删除表内数据,无需从新建表。DataX 所需的配置文件参考 2.1.1 所示。

  • 登录 datax 所在 ECS 后,进入 datax 所在门路
  • 在对应的工具机别离执行 delete 脚本,即可开始指标环境 OTS 的对应表的数据清空,具体命令如下:

sh del_table_01.sh

  • del_table_01.sh 脚本内容如下:

!/bin/bash
nohup python datax.py del_table.json –jvm=”-Xms16G -Xmx16G”>del_table.log &

2)数据迁徙

在源环境进行服务的状况下把双写模式中的增量表全量迁徙以及其余小表全副迁徙到指标环境内对应的数据表。

具体操作如下:

a、配置 DataX 工作

在 DataX 对上述数据表配置相应的 json 文件,迁徙配置的具体内容参考 2.2.1,在迁徙数据的配置中,须要列全所有的属性列。

b、执行 DataX 工作

  • 登录 DataX 所在 ECS 后,进入 DataX 所在门路
  • 在对应的工具机别离执行 transfer.sh 脚本,即可开始专有域 OTS 到专有云 OTS 的数据迁徙,具体命令如下:

sh transfer.sh

  • transfer.sh 脚本内容如下:

!/bin/bash
nohup python datax.py Table.json >Table.log &

3、OTS 数据校验

新老环境 OTS 的数据校验形式均包含行数统计、内容统计,具体如下:

1)源环境数据统计

源环境 OTS 数据表的数据量统计根据数据静默期间最初一次的统计后果即可。

2)指标环境数据统计

a、行数统计

因 OTS 自身不提供 count 接口,且指标环境 ODPS 反对创立 OTS 内部表,所以采纳在 ODPS 创立 OTS 内部表的形式,读取 OTS 数据并计算对应数据表的行数,具体操作如下:

  • 创立内部表
  • 登录 odpscmd,创立上述数据表对应的内部表;
  • 进入脚本所在门路
  • 登录 odpscmd 工具所在 ECS,进入 odps 所在门路;
  • 执行行数统计
  • 执行 newots_count.sh 脚本,即可进行指标环境内 OTS 对应表的行数统计;

nohup sh newots_count.sh >newots_count.log &

  • newots_count.sh 脚本内容如下:

!/bin/bash
./bin/odpscmd -e “select count(h_card_no) from tableName;” >table.rs &

b、内容统计

因为源环境的局部数据表采纳内容统计的形式进行数据校验,为了不便比照数据是否统一,所以指标环境也采纳内容统计的形式,具体操作参考 3.1.2。

退出移动版