共计 6214 个字符,预计需要花费 16 分钟才能阅读完成。
背景
Spark on MaxCompute 可以访问位于阿里云 VPC 内的实例(例如 ECS、HBase、RDS), 默认 MaxCompute 底层网络和外网是隔离的,Spark on MaxCompute 提供了一种方案通过配置 spark.hadoop.odps.cupid.vpc.domain.list 来访问阿里云的 vpc 网络环境的 Hbase。Hbase 标准版和增强版的配置不同,本文通过访问阿里云的标准版和增强版的 Hbase 简单的描述需要加的配置。
Hbase 标准版
环境准备 Hbase 的网络环境是存在 vpc 下的,所以我们首先要添加安全组开放端口 2181、10600、16020. 同时 Hbase 有白名单限制我们需要把对应的 MaxCompute 的 IP 加入到 Hbase 的白名单。设置对应 vpc 的安全组
找到对应的 vpc id 然后添加安全组设置端口
添加 Hbase 的白名单
在 hbase 的白名单添加
100.104.0.0/16
创建 Hbase 表
create ‘test’,’cf’
编写 Spark 程序 需要的 Hbase 依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>2.0.5</version>
</dependency>
编写代码
object App {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("HbaseTest")
.config("spark.sql.catalogImplementation", "odps")
.config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
.getOrCreate()
val sc = spark.sparkContext
val config = HBaseConfiguration.create()
val zkAddress = "hb-2zecxg2ltnpeg8me4-master\*-\*\*\*:2181,hb-2zecxg2ltnpeg8me4-master\*-\*\*\*:2181,hb-2zecxg2ltnpeg8me4-master\*-\*\*\*:2181"
config.set(HConstants.ZOOKEEPER\_QUORUM, zkAddress);
val jobConf = new JobConf(config)
jobConf.setOutputFormat(classOf\[TableOutputFormat\])
jobConf.set(TableOutputFormat.OUTPUT\_TABLE,"test")
try{
import spark.\_
spark.sql("select'7', 88").rdd.map(row => {val name= row(0).asInstanceOf\[String\]
val id = row(1).asInstanceOf\[Integer\]
val put = new Put(Bytes.toBytes(id))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
} finally {sc.stop()
}
}
}
提交到 DataWorks 由于大于 50m 通过 odps 客户端提交
add jar SparkHbase-1.0-SNAPSHOT -f;
进入数据开发新建 spark 节点
添加配置 需要配置 spark.hadoop.odps.cupid.vpc.domain.list 这里的 hbase 域名需要 hbase 所有的机器,少一台可能会造成网络不通
{
“regionId”:”cn-beijing”,
“vpcs”:[
{
"vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
"zones":\[
{
"urls":\[
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":2181
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},{
"domain":"hb-2zecxg2ltnpeg8me4-cor\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
}
\]
}
\]
}
]
}
Hbase 增强版
环境准备 Hbase 增强版的端口是 30020、10600、16020. 同时 Hbase 有白名单限制我们需要把对应的 MaxCompute 的 IP 加入到 Hbase 的白名单。设置对应 vpc 的安全组 找到对应的 vpc id 然后添加安全组设置端口
添加 Hbase 的白名单
100.104.0.0/16
创建 Hbase 表
create ‘test’,’cf’
编写 Spark 程序 需要的 Hbase 依赖,引用的包必须是阿里云增强版的依赖
<dependency>
<groupId>com.aliyun.hbase</groupId>
<artifactId>alihbase-client</artifactId>
<version>2.0.8</version>
</dependency>
编写代码
object McToHbase {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("spark\_sql\_ddl")
.config("spark.sql.catalogImplementation", "odps")
.config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
.config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
.getOrCreate()
val sc = spark.sparkContext
try{spark.sql("select'7','long'").rdd.foreachPartition { iter =>
val config = HBaseConfiguration.create()
// 集群的连接地址 (VPC 内网地址) 在控制台页面的数据库连接界面获得
config.set("hbase.zookeeper.quorum", ":30020");
import spark.\_
// xml\_template.comment.hbaseue.username\_password.default
config.set("hbase.client.username", "");
config.set("hbase.client.password", "");
val tableName = TableName.valueOf("test")
val conn = ConnectionFactory.createConnection(config)
val table = conn.getTable(tableName);
val puts = new util.ArrayList\[Put\]()
iter.foreach(
row => {val id = row(0).asInstanceOf\[String\]
val name = row(1).asInstanceOf\[String\]
val put = new Put(Bytes.toBytes(id))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
puts.add(put)
table.put(puts)
}
)
}
} finally {
sc.stop()
}
}
}
注意 hbase clinet 会报 org.apache.spark.SparkException: Task not serializable 原因是 spark 会把序列化对象以将其发送给其他的 worker 解决方案
– 使类可序列化
- 仅在 map 中传递的 lambda 函数中声明实例。
- 将 NotSerializable 对象设置为静态对象,并在每台计算机上创建一次。
- 调用 rdd.forEachPartition 并在其中创建
Serializable 对象,如下所示:
rdd.forEachPartition(iter-> {NotSerializable notSerializable = new NotSerializable();
// … 现在处理 iter});
提交到 DataWorks 由于大于 50m 通过 odps 客户端提交
add jar SparkHbase-1.0-SNAPSHOT -f;
进入数据开发新建 spark 节点
添加配置 需要配置 spark.hadoop.odps.cupid.vpc.domain.list 注意:1. 这个里需要添加增强版 java api 访问地址,这里必须采用 ip 的形式。ip 通过直接 ping 该地址获取,这里的 ip 是 172.16.0.10 添加端口 16000
2. 这里的 hbase 域名需要 hbase 所有的机器,少一台可能会造成网络不通
{
“regionId”:”cn-beijing”,
“vpcs”:[
{
"vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
"zones":\[
{
"urls":\[
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":30020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16000
},
{
"domain":"hb-2zecxg2ltnpeg8me4-master\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},
{
"domain":"hb-2zecxg2ltnpeg8me4-cor\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},{
"domain":"hb-2zecxg2ltnpeg8me4-cor\*-\*\*\*.hbase.rds.aliyuncs.com",
"port":16020
},
{"domain":"172.16.0.10","port":16000}
\]
}
\]
}
]
}
大家如果对 MaxCompute 有更多咨询或者建议,欢迎扫码加入 MaxCompute 开发者社区钉钉群,或点击链接 申请加入。