乐趣区

关于大数据:技术干货|如何利用-ChunJun-实现数据离线同步

ChunJun 是⼀款稳固、易⽤、⾼效、批流⼀体的数据集成框架,基于计算引擎 Flink 实现多种异构数据源之间的数据同步与计算。ChunJun 能够把不同起源、格局、特点性质的数据在逻辑上或物理上有机地集中,从⽽为企业提供全⾯的数据共享,目前已在上千家公司部署且稳固运⾏。

在之前,咱们已经为大家介绍过如何利用 ChunJun 实现数据实时同步(点击看注释),本篇将为大家介绍姊妹篇,如何利⽤ ChunJun 实现数据的离线同步。

ChunJun 离线同步案例

离线同步是 ChunJun 的⼀个重要个性,下⾯以最通⽤的 mysql -> hive 的同步工作来介绍离线同步。

配置环境

找⼀个空⽬录,接下来要配置 Flink 和 ChunJun 的环境,下⾯以 /root/chunjun_demo/ 为例⼦。

● 配置 Flink

下载 Flink

wget "http://archive.apache.org/dist/flink/flink-1.12.7/flink-1.12.7-bin-scala_2.12.tgz"
tar -zxvf chunjun-dist.tar.gz

● 配置 ChunJun

# 下载 chunjun, 外部依赖 flink 1.12.7
wget https://github.com/DTStack/chunjun/releases/download/v1.12.8/chunjun-dist-1.12-SNAPSHOT.tar.gz
#新创建⼀个⽬录
mkdir chunjun && cd chunjun
#解压到指定⽬录
tar -zxvf chunjun-dist-1.12-SNAPSHOT.tar.gz

解压好的 ChunJun 有如下⽬录:
bin
chunjun-dist
chunjun-examples
lib

● 配置环境变量

# 配置 Flink 环境变量
echo "FLINK_HOME=/root/chunjun_demo/flink-1.12.7" >> /etc/profile.d/sh.local
#配置 Chunjun 的环境变量
echo "CHUNJUN_DIST=/root/chunjun_demo/chunjun/chunjun-dist" >> /etc/profile.d/sh.local
#刷新换新变量
. /etc/profile.d/sh.local

● 在 Yarn 上⾯启动 Flink Session

# 启动 Flink Session
bash $FLINK_HOME/bin/yarn-session.sh -t $CHUNJUN_DIST -d

输入如下:

echo "stop" | $FLINK_HOME/bin/yarn-session.sh -id application_1683599622970_0270
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
yarn application -kill application_1683599622970_0270

下⾯提交工作会⽤到 Flink Session 这个 Yarn Application Id (application_1683599622970_0270)。

● 其余配置

如果⽤ parquet 格局,须要把 flink-parquet_2.12-1.12.7.jar 放⼊到 flink/lib 下⾯, 在上⾯的例⼦中,须要放到 $FLINK_HOME/lib ⾥⾯。

提交工作

● 在 MySQL 筹备数据

-- 创立⼀个名为 ecommerce_db 的数据库,⽤于存储电商⽹站的数据
CREATE DATABASE IF NOT EXISTS chunjun;
USE chunjun;
-- 创立⼀个名为 orders 的表,⽤于存储订单信息
CREATE TABLE IF NOT EXISTS orders (
 id INT AUTO_INCREMENT PRIMARY KEY, -- ⾃增主键
 order_id VARCHAR(50) NOT NULL, -- 订单编号,不能为空
 user_id INT NOT NULL, -- ⽤户 ID,不能为空
 product_id INT NOT NULL, -- 产品 ID,不能为空
 quantity INT NOT NULL, -- 订购数量,不能为空
 order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
-- 订单⽇期,默认值为以后工夫戳,不能为空
);
-- 插⼊⼀些测试数据到 orders 表
INSERT INTO orders (order_id, user_id, product_id, quantity)
VALUES ('ORD123', 1, 101, 2),
       ('ORD124', 2, 102, 1),
       ('ORD125', 3, 103, 3),  
       ('ORD126', 1, 104, 1),
       ('ORD127', 2, 105, 5);
       
select * from chunjun.orders;       

如果没有 MySQL 的话,能够⽤ docker 疾速创立⼀个。

docker pull mysql:8.0.12
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:8.0.12

● 创立 Hive 表

CREATE DATABASE IF NOT EXISTS chunjun;
USE chunjun;
-- 创立⼀个名为 orders 的表,⽤于存储订单信息
CREATE TABLE IF NOT EXISTS chunjun.orders (
 id INT,
 order_id VARCHAR(50),
 user_id INT,
 product_id INT,
 quantity INT,
 order_date TIMESTAMP
)
 STORED AS PARQUET;
-- 查看 hive 表,底层的 HDFS ⽂件地位,下⾯的 SQL 后果⾥⾯ Location 字段,就是 HDFS ⽂件的地位。desc formatted chunjun.orders;
-- Location: hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders
-- ⼀会配置同步工作的时候会⽤到 hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders

● 在以后⽬录 (/root/chunjun_demo/) 配置⼀个工作 mysql_hdfs.json

vim mysql_hdfs.json 输⼊如下内容:

{
"job": {
"content": [
 {
"reader": {
"parameter": {
"connection": [
 {
"schema": "chunjun",
"jdbcUrl": ["jdbc:mysql://172.16.85.200:3306/chunjun"],
"table": ["orders"]
 }
 ],
"username": "root",
"password": "123456",
"column": [{ "name": "id", "type": "INT"},
 {"name": "order_id", "type": "VARCHAR"},
 {"name": "user_id", "type": "INT"},
 {"name": "product_id", "type": "INT"},
 {"name": "quantity", "type": "INT"},
 {"name": "order_date", "type": "TIMESTAMP"}
 ]
 },
"name": "mysqlreader"
 },
"writer": {
"parameter": {
"path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders",
"defaultFS": "hdfs://ns1",
"hadoopConfig": {
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000",
"dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000",
"dfs.client.failover.proxy.provider.ns1":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 },
"column": [{ "name": "id", "type": "INT"},
 {"name": "order_id", "type": "VARCHAR"},
 {"name": "user_id", "type": "INT"},
 {"name": "product_id", "type": "INT"},
 {"name": "quantity", "type": "INT"},
 {"name": "order_date", "type": "TIMESTAMP"}
 ],
"writeMode": "overwrite",
"encoding": "utf-8",
"fileType": "parquet",
"fullColumnName":
 ["id", "order_id", "user_id", "product_id", "quantity", "order_date"],
"fullColumnType":
 ["INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP"]
 },
"name": "hdfswriter"
 }
 }
 ],
"setting": {
"errorLimit": {"record": 0},
"speed": {
"bytes": 0,
"channel": 1
 }
 }
 }
}

因为咱们要将 MySQL 同步到 Hive ⾥⾯,然而如果间接同步 Hive 的话,外部会⽤ jdbc,⽽ jdbc 的效率不⾼,因而咱们能够间接把数据同步到 Hive 底层的 HDFS 上⾯,所以 writer ⽤到了 hdfswriter。脚本解析如下:

{
"job": {
"content": [
 {
"reader": {
"parameter": {
"connectionComment": "数据库链接, 数据库, 表, 账号, 明码",
"connection": [
 {
"schema": "chunjun",
"jdbcUrl": ["jdbc:mysql://172.16.85.200:3306/chunjun"],
"table": ["orders"]
 }
 ],
"username": "root",
"password": "123456",
"columnComment": "要同步的列抉择, 能够抉择局部列",
"column": [{ "name": "id", "type": "INT"},
 {"name": "order_id", "type": "VARCHAR"},
 {"name": "user_id", "type": "INT"},
 {"name": "product_id", "type": "INT"},
 {"name": "quantity", "type": "INT"},
 {"name": "order_date", "type": "TIMESTAMP"}
 ]
 },
"nameComment" : "source 是 mysql",
"name": "mysqlreader"
 },
"writer": {
"parameter": {
"pathComment": "HDFS 上⾯的门路, 通过 hive 语句的 desc formatted 查看",
"path": "hdfs://ns1/dtInsight/hive/warehouse/chunjun.db/orders",
"defaultFS": "hdfs://ns1",
"hadoopConfigComment": "是 hdfs ⾼可⽤最根本的配置, 在 Hadoop 配置⽂件 hdfs-site.xml 能够找到",
"hadoopConfig": {
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "172.16.85.194:9000",
"dfs.namenode.rpc-address.ns1.nn2": "172.16.85.200:9000",
"dfs.client.failover.proxy.provider.ns1":
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 },
"columnComment": "要同步的列抉择, 能够抉择局部列",
"column": [{ "name": "id", "type": "INT"},
 {"name": "order_id", "type": "VARCHAR"},
 {"name": "user_id", "type": "INT"},
 {"name": "product_id", "type": "INT"},
 {"name": "quantity", "type": "INT"},
 {"name": "order_date", "type": "TIMESTAMP"}
 ],
"writeModeComment": "笼罩写⼊到 hdfs 上⾯的⽂件, 可选 overwrite, append(默认模式)",
"writeMode": "overwrite",
"encoding": "utf-8",
"fileTypeComment": "可选 orc, parquet, text",
"fileType": "parquet",
"fullColumnNameComment": "全副字段,有时候 column ⾥⾯同步局部字段,然而⼜须要有全副字段的格局,例如 fileType : text",
"fullColumnName": ["id", "order_id", "user_id", "product_id", "quantity", "order_date"], 
"fullColumnTypeComment": "全副字段的类型",
"fullColumnType": ["INT", "VARCHAR", "INT", "INT", "INT", "TIMESTAMP"]
 },
"nameComment" : "sink 是 hdfs",
"name": "hdfswriter"
 }
 }
 ],
"setting": {
"errorLimit": {"record": 0},
"speed": {
"bytes": 0,
"channel": 1
 }
 }
 }
}

● 提交工作

bash chunjun/bin/chunjun-yarn-session.sh -job mysql_hdfs.json -confProp
{\"yarn.application.id\":\"application_1683599622970_0270\"}

● 查看工作

工作同步实现, 能够看⼀下 HDFS 上⾯的数据。

查看⼀下 Hive 表的数据。

留神, 如果是分区的 Hive 表,须要⼿动刷新⼀下 Hive 的元数据, 使⽤ MSCK 命令。(MSCK 是 Hive 中的⼀个命令,⽤于检查表中的分区,并将其增加到 Hive 元数据中)

MSCK REPAIR TABLE my_table;

ChunJun 离线同步原理解析

HDFS 文件同步原理

· 对于⽂件零碎,同步的时候会先把⽂件写⼊到 path + [filename] ⽬录⾥⾯的 .data 的⽂件⾥⾯,如果工作失败,那么 .data ⾥⾯的⽂件不会⽣效。

· 在 TaskManager 上⾯所有 task 工作完结的时候,会在 JobManager 执⾏ FinalizeOnMaster 的 finalizeGlobal ⽅法, 最终会调⽤到 moveAllTmpDataFileToDir , 把 .data ⾥⾯的⽂件移除到 .data 的上⼀层。

public interface FinalizeOnMaster {

/**
The method is invoked on the master (JobManager) after all (parallel) instances of an OutputFormat finished.
Params:parallelism – The parallelism with which the format or functions was run.
Throws:IOException – The finalization may throw exceptions, which may cause the job to abort.
*/
void finalizeGlobal(int parallelism) throws IOException; 
}
// 在 JobManager 执⾏
@Override
protected void moveAllTmpDataFileToDir() {if (fs == null) {openSource();
 }
String currentFilePath = "";
try {Path dir = new Path(outputFilePath);
Path tmpDir = new Path(tmpPath);

FileStatus[] dataFiles = fs.listStatus(tmpDir);
for (FileStatus dataFile : dataFiles) {currentFilePath = dataFile.getPath().getName();
fs.rename(dataFile.getPath(), dir);
LOG.info("move temp file:{} to dir:{}", dataFile.getPath(), dir);
 }
fs.delete(tmpDir, true);
 } catch (IOException e) {
throw new ChunJunRuntimeException(
String.format("can't move file:[%s] to dir:[%s]", currentFilePath, outputFilePath),
e);
 }
}

增量同步

增量同步次要针对某些只有 Insert 操作的表,随着业务增⻓,表内数据越来越多。如果每次都同步整表的话,耗费的工夫和资源会⽐较多。因而须要⼀个增量同步的性能,每次只读取减少局部的数据。

● 实现原理

其实现原理实际上就是配合增量键在查问的 sql 语句中拼接过滤条件,⽐如 where id > ?,将之前曾经读取过的数据过滤进来。

增量同步是针对于两个及以上的同步作业来说的。对于首次执⾏增量同步的作业⽽⾔,实际上是整表同步,不同于其余作业的在于增量同步作业会在作业执⾏实现后记录⼀个 endLocation 指标,并将这个指标上传到 prometheus 以供后续使⽤。

除第⼀次作业外,后续的所有增量同步作业都会取上⼀次作业的 endLocation 做为本次作业的过滤根据(startLocation)。⽐如第⼀次作业执⾏完后,endLocation 为 10,那么下⼀个作业就会构建出例如 SELECT id,name,age from table where id > 10 的 SQL 语句,达到增量读取的⽬的。

● 应用限度

· 只有 RDB 的 Reader 插件能够使⽤

· 通过构建 SQL 过滤语句实现,因而只能⽤于 RDB 插件

· 增量同步只关⼼读,不关⼼写,因而只与 Reader 插件无关

· 增量字段只能为数值类型和工夫类型

· 指标须要上传到 prometheus,⽽ prometheus 不⽀持字符串类型,因而只⽀持数据类型和工夫类型,工夫类型会转换成工夫戳后上传

· 增量键的值能够反复,但必须递增

· 因为使⽤ ‘>’ 的缘故,要求字段必须递增

断点续传

断点续传是为了在离线同步的时候,针对⻓工夫同步工作如超过 1 天,如果在同步过程中因为某些起因导致工作失败,从头再来的话老本⾮常⼤,因而须要⼀个断点续传的性能从工作失败的地⽅持续。

● 实现原理

· 基于 Flink 的 checkpoint,在 checkpoint 的时候 会存储 source 端最初⼀条数据的某个字段值,sink 端插件执⾏事务提交。

· 在工作失败,后续通过 checkpoint 从新运⾏时,source 端在⽣成 select 语句的时候将 state ⾥的值作为条件拼接进⾏数据的过滤,达到从上次失败位点进⾏复原。

· jdbcInputFormat 在拼接读取 SQL 时,如果从 checkpoint 复原的 state 不为空且 restoreColumn 不为空,则此时会将 checkpoint ⾥的 state 作为终点开始读取数据。

● 实用场景

通过上述原理咱们能够晓得 source 端必须是 RDB 类型插件,因为是通过 select 语句拼接 where 条件进⾏数据过滤达到断点续传的,同时断点续传须要指定⼀个字段作为过滤条件,且此字段要求是递增的。

· 工作须要开启 checkpoint

· reader 为 RDB 的插件均⽀持且 writer ⽀持事务的插件 (如 rdb filesystem 等),如果上游是幂等性则 writer 插件也不须要⽀持事务

· 作为断点续传的字段在源表⾥的数据是递增的,因为过滤条件是 >

《数栈产品白皮书》:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实际白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想理解或征询更多无关袋鼠云大数据产品、行业解决方案、客户案例的敌人,浏览袋鼠云官网:https://www.dtstack.com/?src=szsf

同时,欢送对大数据开源我的项目有趣味的同学退出「袋鼠云开源框架钉钉技术 qun」,交换最新开源技术信息,qun 号码:30537511,我的项目地址:https://github.com/DTStack

退出移动版