本文适用有入门 spark 基础的同学,一些最基础知识不再赘述
通过阅读本文即可掌握使用 Spark 跨集群同步 Hive 数据的技巧!
众所周知,业界比较成熟的同步数据工具是 Sqoop,它是连接关系型数据库和 Hadoop 的桥梁
比较常用的场景是从 MySQL 等 RDB 同步到 Hive、Hbase 或者将 Hive、Hbase 的数据导出到 MySQL
但是在公司项目技术选型时选用了用 Spark 来从一个集群同步数据到另一个集群
下面来比较一下这两种方式:
①Sqoop 使用配置化,Spark 需要代码开发
学习成本上使用 Spark 稍微高一些,但是同步数据复杂一些 Sqoop 的配置也不见得很简单
②Sqoop 在同步数据进行存储时仍需要启动 MapReduce 任务,Spark 使用自身框架进行内存计算
在效率上 Spark 比 Sqoop 要高
③Sqoop1 和 Sqoop2 不能完全兼容,且网上在不同集群的 Hive 间同步数据的资料也比较少,出现问题时不保证能解决
所以选用了 Spark 作为天池的数据同步方式
其实当时选用 Spark 最重要的原因是本人使用 Spark 比较熟!
下面就来介绍如何使用 Spark 进行数据同步
1. 抽取数据文件
1.1 创建 Spark 入口对象指定源数据的 Hive 元数据服务
扩展:在远程连接 Hive 时 Hive 有两种服务 Hive Metastore Server 和 HiveServer2
HiveServer2 是 JDBC 连接,使用这种方式会占用数据本地集群的计算资源(常用默认端口 10000 的那种)
Spark 连接 Hive 可以使用 Hive Metastore Server,这样只需连接 Hive 的元数据,通过元数据记录的数据路径拉取数据使用 Spark 对数据进行计算(常用默认端口 9083 的那种)
在创建 SparkSession 对象时动态传参传入源数据集群的 Hive Metastore Server 地址
1.2 将 Spark 的 hadoopConfiguration 对象切换到源数据集群
连接了数据源集群的 Hive Metastore Server 元数据服务,Spark 会根据里面记录的元数据信息去拉取数据,而其它集群记录的是其它 HDFS 集群的路径
默认根据配置文件本集群跑 Spark 程序时连接的是本集群的 HDFS 地址,所以根据元数据获取的数据路径会找不到
而 MetaServer 元数据服务地址属于在 Spark 创建时指定的,在这个对象生命周期内无法被改变,但是 Hadoop 相关配置是可以在运行时切换的
所以需要在取数时切换 Hadoop 配置
1.3 通过 SQL 指定需要获取的数据将数据加载到目标集群的 HDFS 文件中
上例中是抽取 APP 埋点日志表,这个表中源数据是根据 site 和 year, month, day 进行分区的
我们需要每天同步每日的增量数据,Spark 在拉取数据时会根据分区去拉取对应的数据而不是全量数据
同时在 SQL 中可以看到我在同步过来数据时将 year, month, day 字段拼成 dt 字段作为日期新字段,且原表数据中字段比这里多得多,只取了需要的字段以节省时间空间
并且在写出成 HDFS 文件的时候通过.partitionBy(“site”, “dt”)也指定了目标表的分区
写出的文件路径就是这种格式的:/tianchi/data/ods_app_burial_log/site=zz/dt=2019-06-11
另外 $HDFS_ADDR 是通过 API 动态获取的目标集群 active nameNode 的 ip:port
防止写死后 NN 主备切换
由于这里是连接 spark 本地集群的 hadoop 集群, 只需要创建 Configuration 对象读取集群默认配置, 如果是获取其它集群的还需要把连接信息赋入
2. 将导入的数据文件加载到本集群的 Hive 表中
由于上面说过的原因,连接的 MetaServer 无法在运行时改变,所以抽取数据文件和加载数据进目标集群的 Hive 表无法放在同一个任务中进行,要另起一个任务进行加载
2.1 新的任务中创建 SparkSession 对象指定本集群的 Hive Metastore Server 元数据服务地址
2.2 使用 HiveSQL 的语法加载数据
LOAD DATA INPATH ‘/datapath/’ OVERWRITE INTO TABLE tableName PARTITION (par=’xxx’)