简介: 如何应用 Spark On MaxCompute 连贯 Phonix,将 Hbase 的数据写入到 MaxCompute 的对应表中,目前没有对应的案例,为了满足用户的需要。本文次要解说应用 Spark 连贯 Phonix 拜访 Hbase 的数据再写入到 MaxCompute 计划实际。该计划的验证是应用 hbase1.1 对应 Phonix 为 4.12.0。本文从阿里云 Hbase 版本的抉择、确认 VPC、vswitchID、设置白名单和拜访形式,Phonix4.12.0 的客户端装置,在客户端实现 Phonix 表的创立和写入,Spark 代码在本地 IDEA 的编写以及 pom 文件以及 vpcList 的配置, 打包上传 jar 包并进行冒烟测试。
一、购买 Hbase1.1 并设置对应资源
1.1 购买 hbase
hbase 次要版本为 2.0 与 1.1,这边抉择对应 hbase 对应的版本为 1.1
Hbase 与 Hbase2.0 版本的区别
HBase1.1 版本
1.1 版本基于 HBase 社区 1.1.2 版本开发。
HBase2.0 版本
2.0 版本是基于社区 2018 年公布的 HBase2.0.0 版本开发的全新版本。同样,在此基础上,做了大量的改良和优化,排汇了泛滥阿里外部成功经验,比社区 HBase 版本具备更好的稳定性和性能。
1.2 确认 VPC,vsWitchID
确保测试联通性的能够不便可行,该 hbase 的 VPCId,vsWitchID 尽量与购买的独享集成资源组的为统一的,独享集成资源的文档能够参考 https://help.aliyun.com/document_detail/137838.html
1.3 设置 hbase 白名单, 其中 DataWorks 白名单如下,集体 ECS 也可增加
依据文档链接抉择对应的 DataWorks 的 region 下的白名单进行增加 https://help.aliyun.com/document_detail/137792.html
1.4 查看 hbase 对应的版本和拜访地址
关上数据库链接的按钮,能够查看到 Hbase 的主版本以及 Hbase 的专有网络拜访地址,以及是否开明公网拜访的形式进行连贯。
二、装置 Phonix 客户端,并创立表和插入数据
2.1 装置客户端
依据 hbase 的版本为 1.1 抉择 Phonix 的版本为 4.12.0 依据文档 https://help.aliyun.com/document_detail/53600.html 下载对应的客户端文件 ali-phoenix-4.12.0-AliHBase-1.1-0.9.tar.gz
登陆客户端执行命令
./bin/sqlline.py 172.16.0.13,172.16.0.15,172.16.0.12:2181
创立表:
CREATE TABLE IF NOT EXISTS users_phonix
(
id INT ,
username STRING,
password STRING
) ;
插入数据:
UPSERT INTO users (id, username, password) VALUES (1, 'admin', 'Letmein');
2.2 查看是否创立和插入胜利
在客户端执行命令, 查看以后表与数据是否上传胜利
select * from users;
三、编写对应代码逻辑
3.1 编写代码逻辑
在 IDEA 依照对应得 Pom 文件进行配置本地得开发环境,将代码波及到得配置信息填写残缺,进行编写测试,这里能够先应用 Hbase 得公网拜访链接进行测试,代码逻辑验证胜利后可调整配置参数,具体代码如下
package com.git.phonix
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.phoenix.spark._
/**
* 本实例实用于 Phoenix 4.x 版本
*/
object SparkOnPhoenix4xSparkSession {def main(args: Array[String]): Unit = {
//HBase 集群的 ZK 链接地址。// 格局为:xxx-002.hbase.rds.aliyuncs.com,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181
val zkAddress = args(0)
//Phoenix 侧的表名,须要在 Phoenix 侧提前创立。Phoenix 表创立能够参考:https://help.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUW
val phoenixTableName = args(1)
//Spark 侧的表名。val ODPSTableName = args(2)
val sparkSession = SparkSession
.builder()
.appName("SparkSQL-on-MaxCompute")
.config("spark.sql.broadcastTimeout", 20 * 60)
.config("spark.sql.crossJoin.enabled", true)
.config("odps.exec.dynamic.partition.mode", "nonstrict")
//.config("spark.master", "local[4]") // 需设置 spark.master 为 local[N]能力间接运行,N 为并发数
.config("spark.hadoop.odps.project.name", "***")
.config("spark.hadoop.odps.access.id", "***")
.config("spark.hadoop.odps.access.key", "***")
//.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api")
.config("spark.sql.catalogImplementation", "odps")
.getOrCreate()
// 第一种插入方式
var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load()
df.show()
df.write.mode("overwrite").insertInto(ODPSTableName)
}
}
3.2 对应 Pom 文件
pom 文件中分为 Spark 依赖,与 ali-phoenix-spark 相干的依赖,因为波及到 ODPS 的 jar 包,会在集群中引起 jar 抵触,所以要将 ODPS 的包排除掉
<?xml version="1.0" encoding="UTF-8"?>
_<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->_
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<spark.version>2.3.0</spark.version>
<cupid.sdk.version>3.3.8-public</cupid.sdk.version>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<phoenix.version>4.12.0-HBase-1.1</phoenix.version>
</properties>
<groupId>com.aliyun.odps</groupId>
<artifactId>Spark-Phonix</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-model</artifactId>
<version>1.3.8</version>
</dependency>
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-evaluator</artifactId>
<version>1.3.10</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-core</artifactId>
<version>4.12.0-AliHBase-1.1-0.8</version>
<exclusions>
<exclusion>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
</exclusion>
<exclusion>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-commons</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-spark</artifactId>
<version>4.12.0-AliHBase-1.1-0.8</version>
<exclusions>
<exclusion>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<artifactSet>
<includes>
_<!-- Include here the dependencies you
want to be packed in your fat jar -->_
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>**/log4j.properties</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
四、打包上传到 DataWorks 进行冒烟测试
4.1 创立要传入的 MaxCompute 表
CREATE TABLE IF NOT EXISTS users_phonix
(
id INT ,
username STRING,
password STRING
) ;
4.2 打包上传到 MaxCompute
在 IDEA 打包要打成 shaded 包,将所有的依赖包,打入 jar 包中,因为 DatadWork 界面形式上传 jar 包有 50M 的限度,因而采纳 MaxCompute 客户端进行 jar 包
4.3 抉择对应的 project 环境,查看上传资源,并点击增加到数据开发
进入 DataWorks 界面抉择左侧资源图标,抉择对应的环境位开发换进,输出删除文件时的文件名称进行搜寻,列表中展现该资源曾经上传成,点击提交到数据开发
点击提交按钮
4.4 配置对应的 vpcList 参数并提交工作测试
其中的配置 vpcList 文件的配置信息如下,可具体依据集体 hbase 的链接,进行配置
{
"regionId":"cn-beijing",
"vpcs":[
{
"vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk",
"zones":[
{
"urls":[
{
"domain":"172.16.0.12",
"port":2181
},
{
"domain":"172.16.0.13",
"port":2181
},
{
"domain":"172.16.0.15",
"port":2181
},
{
"domain":"172.16.0.14",
"port":2181
},
{
"domain":"172.16.0.12",
"port":16000
},
{
"domain":"172.16.0.13",
"port":16000
},
{
"domain":"172.16.0.15",
"port":16000
},
{
"domain":"172.16.0.14",
"port":16000
},
{
"domain":"172.16.0.12",
"port":16020
},
{
"domain":"172.16.0.13",
"port":16020
},
{
"domain":"172.16.0.15",
"port":16020
},
{
"domain":"172.16.0.14",
"port":16020
}
]
}
]
}
]
}
Spark 工作提交工作的配置参数,主类,以及对应的参数
该参数次要为 3 个参数第一个为 Phonix 的链接,第二个为 Phonix 的表名称,第三个为传入的 MaxCompute 表
点击冒烟测试按钮,能够看到工作执行胜利
在长期查问节点中执行查问语句,能够失去数据曾经写入 MaxCompute 的表中
总结:
应用 Spark on MaxCompute 拜访 Phonix 的数据,并将数据写入到 MaxCompute 的表中通过实际,该计划时可行的。但在实际的时有几点注意事项:
1. 结合实际应用状况抉择对应的 Hbase 以及 Phonix 版本,对应的版本统一,并且所应用的客户端,以及代码依赖都会有所扭转。
2. 应用公网在 IEAD 进行本地测试,要留神 Hbase 白名单,不仅要设置 DataWorks 的白名单,还需将本人本地的地址退出到白名单中。
3. 代码打包时须要将 pom 中的依赖关系进行梳理,防止 ODPS 所存在的包在对应的依赖中,进而引起 jar 包抵触,并且打包时打成 shaded 包,防止缺失脱漏对应的依赖。
原文链接
本文为阿里云原创内容,未经容许不得转载。