简介: 如何应用Spark On MaxCompute连贯Phonix,将Hbase的数据写入到MaxCompute的对应表中,目前没有对应的案例,为了满足用户的需要。本文次要解说应用Spark连贯Phonix拜访Hbase的数据再写入到MaxCompute计划实际。该计划的验证是应用hbase1.1对应Phonix为4.12.0。本文从阿里云Hbase版本的抉择、确认VPC、vswitchID、设置白名单和拜访形式,Phonix4.12.0的客户端装置,在客户端实现Phonix表的创立和写入,Spark代码在本地IDEA的编写以及pom文件以及vpcList的配置,打包上传jar包并进行冒烟测试。
一、购买Hbase1.1并设置对应资源
1.1购买hbase
hbase次要版本为2.0与1.1,这边抉择对应hbase对应的版本为1.1
Hbase与Hbase2.0版本的区别
HBase1.1版本
1.1版本基于HBase社区1.1.2版本开发。
HBase2.0版本
2.0版本是基于社区2018年公布的HBase2.0.0版本开发的全新版本。同样,在此基础上,做了大量的改良和优化,排汇了泛滥阿里外部成功经验,比社区HBase版本具备更好的稳定性和性能。
1.2确认VPC,vsWitchID
确保测试联通性的能够不便可行,该hbase的VPCId,vsWitchID尽量与购买的独享集成资源组的为统一的,独享集成资源的文档能够参考https://help.aliyun.com/document_detail/137838.html
1.3设置hbase白名单,其中DataWorks白名单如下,集体ECS也可增加
依据文档链接抉择对应的DataWorks的region下的白名单进行增加https://help.aliyun.com/document_detail/137792.html
1.4查看hbase对应的版本和拜访地址
关上数据库链接的按钮,能够查看到Hbase的主版本以及Hbase的专有网络拜访地址,以及是否开明公网拜访的形式进行连贯。
二、装置Phonix客户端,并创立表和插入数据
2.1装置客户端
依据hbase的版本为1.1抉择Phonix的版本为4.12.0依据文档https://help.aliyun.com/document_detail/53600.html 下载对应的客户端文件ali-phoenix-4.12.0-AliHBase-1.1-0.9.tar.gz
登陆客户端执行命令
./bin/sqlline.py 172.16.0.13,172.16.0.15,172.16.0.12:2181
创立表:
CREATE TABLE IF NOT EXISTS users_phonix( id INT , username STRING, password STRING) ;
插入数据:
UPSERT INTO users (id, username, password) VALUES (1, 'admin', 'Letmein');
2.2查看是否创立和插入胜利
在客户端执行命令,查看以后表与数据是否上传胜利
select * from users;
三、编写对应代码逻辑
3.1编写代码逻辑
在IDEA依照对应得Pom文件进行配置本地得开发环境,将代码波及到得配置信息填写残缺,进行编写测试,这里能够先应用Hbase得公网拜访链接进行测试,代码逻辑验证胜利后可调整配置参数,具体代码如下
package com.git.phoniximport org.apache.hadoop.conf.Configurationimport org.apache.spark.sql.SparkSessionimport org.apache.phoenix.spark._/** * 本实例实用于Phoenix 4.x版本 */object SparkOnPhoenix4xSparkSession { def main(args: Array[String]): Unit = { //HBase集群的ZK链接地址。 //格局为:xxx-002.hbase.rds.aliyuncs.com,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181 val zkAddress = args(0) //Phoenix侧的表名,须要在Phoenix侧提前创立。Phoenix表创立能够参考:https://help.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUW val phoenixTableName = args(1) //Spark侧的表名。 val ODPSTableName = args(2) val sparkSession = SparkSession .builder() .appName("SparkSQL-on-MaxCompute") .config("spark.sql.broadcastTimeout", 20 * 60) .config("spark.sql.crossJoin.enabled", true) .config("odps.exec.dynamic.partition.mode", "nonstrict") //.config("spark.master", "local[4]") // 需设置spark.master为local[N]能力间接运行,N为并发数 .config("spark.hadoop.odps.project.name", "***") .config("spark.hadoop.odps.access.id", "***") .config("spark.hadoop.odps.access.key", "***") //.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api") .config("spark.sql.catalogImplementation", "odps") .getOrCreate() //第一种插入方式 var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load() df.show() df.write.mode("overwrite").insertInto(ODPSTableName) }}
3.2对应Pom文件
pom文件中分为Spark依赖,与ali-phoenix-spark相干的依赖,因为波及到ODPS的jar包,会在集群中引起jar抵触,所以要将ODPS的包排除掉
<?xml version="1.0" encoding="UTF-8"?>_<!-- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file.-->_<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <properties> <spark.version>2.3.0</spark.version> <cupid.sdk.version>3.3.8-public</cupid.sdk.version> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version> <phoenix.version>4.12.0-HBase-1.1</phoenix.version> </properties> <groupId>com.aliyun.odps</groupId> <artifactId>Spark-Phonix</artifactId> <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.jpmml</groupId> <artifactId>pmml-model</artifactId> <version>1.3.8</version> </dependency> <dependency> <groupId>org.jpmml</groupId> <artifactId>pmml-evaluator</artifactId> <version>1.3.10</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </exclusion> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scalap</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>cupid-sdk</artifactId> <version>${cupid.sdk.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-core</artifactId> <version>4.12.0-AliHBase-1.1-0.8</version> <exclusions> <exclusion> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-mapred</artifactId> </exclusion> <exclusion> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-commons</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-spark</artifactId> <version>4.12.0-AliHBase-1.1-0.8</version> <exclusions> <exclusion> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-core</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>false</minimizeJar> <shadedArtifactAttached>true</shadedArtifactAttached> <artifactSet> <includes> _<!-- Include here the dependencies you want to be packed in your fat jar -->_ <include>*:*</include> </includes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>**/log4j.properties</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.2</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile-first</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build></project>
四、打包上传到DataWorks进行冒烟测试
4.1创立要传入的MaxCompute表
CREATE TABLE IF NOT EXISTS users_phonix( id INT , username STRING, password STRING) ;
4.2打包上传到MaxCompute
在IDEA打包要打成shaded包,将所有的依赖包,打入jar包中,因为DatadWork界面形式上传jar包有50M的限度,因而采纳MaxCompute客户端进行jar包
4.3抉择对应的project环境,查看上传资源,并点击增加到数据开发
进入DataWorks界面抉择左侧资源图标,抉择对应的环境位开发换进,输出删除文件时的文件名称进行搜寻,列表中展现该资源曾经上传成,点击提交到数据开发
点击提交按钮
4.4配置对应的vpcList参数并提交工作测试
其中的配置vpcList文件的配置信息如下,可具体依据集体hbase的链接,进行配置
{ "regionId":"cn-beijing", "vpcs":[ { "vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk", "zones":[ { "urls":[ { "domain":"172.16.0.12", "port":2181 }, { "domain":"172.16.0.13", "port":2181 }, { "domain":"172.16.0.15", "port":2181 }, { "domain":"172.16.0.14", "port":2181 }, { "domain":"172.16.0.12", "port":16000 }, { "domain":"172.16.0.13", "port":16000 }, { "domain":"172.16.0.15", "port":16000 }, { "domain":"172.16.0.14", "port":16000 }, { "domain":"172.16.0.12", "port":16020 }, { "domain":"172.16.0.13", "port":16020 }, { "domain":"172.16.0.15", "port":16020 }, { "domain":"172.16.0.14", "port":16020 } ] } ] } ]}
Spark工作提交工作的配置参数,主类,以及对应的参数
该参数次要为3个参数第一个为Phonix的链接,第二个为Phonix的表名称,第三个为传入的MaxCompute表
点击冒烟测试按钮,能够看到工作执行胜利
在长期查问节点中执行查问语句,能够失去数据曾经写入MaxCompute的表中
总结:
应用Spark on MaxCompute拜访Phonix的数据,并将数据写入到MaxCompute的表中通过实际,该计划时可行的。但在实际的时有几点注意事项:
1.结合实际应用状况抉择对应的Hbase以及Phonix版本,对应的版本统一,并且所应用的客户端,以及代码依赖都会有所扭转。
2.应用公网在IEAD进行本地测试,要留神Hbase白名单,不仅要设置DataWorks的白名单,还需将本人本地的地址退出到白名单中。
3.代码打包时须要将pom中的依赖关系进行梳理,防止ODPS所存在的包在对应的依赖中,进而引起jar包抵触,并且打包时打成shaded包,防止缺失脱漏对应的依赖。
原文链接
本文为阿里云原创内容,未经容许不得转载。