在理论生产环境中,将计算和存储进行拆散,是咱们进步集群吞吐量、确保集群规模程度可扩大的次要办法之一,并且通过集群的扩容、性能的优化,确保在数据大幅增长时,存储不能称为零碎的瓶颈。大数据培训
具体到咱们理论的我的项目需要中,有一个典型的场景,通常会将 Hive 中的局部数据,比方热数据,存入到 HBase 中,进行冷热拆散解决。
咱们采纳 Spark 读取 Hive 表数据存入 HBase 中,这里次要有两种形式:
通过 HBase 的 put API 进行数据的批量写入
通过生成 HFile 文件,而后通过 BulkLoad 形式将数据存入 HBase
HBase 的原生 put 形式,通过 HBase 集群的 region server 向 HBase 插入数据,然而当数据量十分大时,region 会进行 split、compact 等解决,并且这些解决十分占用计算资源和 IO 开销,影响性能和集群的稳定性。
HBase 的数据最终是以 HFile 的模式存储到 HDFS 上的,如果咱们能间接将数据生成为 HFile 文件,而后将 HFile 文件保留到 HBase 对应的表中,能够防止上述的很多问题,效率会绝对更高。
本篇文章次要介绍如何应用 Spark 生成 HFile 文件,而后通过 BulkLoad 形式将数据导入到 HBase 中,并附批量 put 数据到 HBase 以及间接存入数据到 HBase 中的理论利用示例。
1. 生成 HFile,BulkLoad 导入
1.1 数据样例
{“id”:”1″,”name”:”jack”,”age”:”18″}
{“id”:”2″,”name”:”mike”,”age”:”19″}
{“id”:”3″,”name”:”kilos”,”age”:”20″}
{“id”:”4″,”name”:”tom”,”age”:”21″}
…
1.2 示例代码
/**
- @Author bigdatalearnshare
*/
object App {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkSession = SparkSession
.builder()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.master("local[*]")
.getOrCreate()
val rowKeyField = "id"
val df = sparkSession.read.format("json").load("/people.json")
val fields = df.columns.filterNot(_ == "id").sorted
val data = df.rdd.map { row =>
val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)
val kvs = fields.map { field =>
new KeyValue(rowKey, Bytes.toBytes("hfile-fy"), Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
}
(new ImmutableBytesWritable(rowKey), kvs)
}.flatMapValues(x => x).sortByKey()
val hbaseConf = HBaseConfiguration.create(sparkSession.sessionState.newHadoopConf())
hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "hfile")
val connection = ConnectionFactory.createConnection(hbaseConf)
val tableName = TableName.valueOf("hfile")
// 没有 HBase 表则创立
creteHTable(tableName, connection)
val table = connection.getTable(tableName)
try {val regionLocator = connection.getRegionLocator(tableName)
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)
val savePath = "hdfs://linux-1:9000/hfile_save"
delHdfsPath(savePath, sparkSession)
job.getConfiguration.set("mapred.output.dir", savePath)
data.saveAsNewAPIHadoopDataset(job.getConfiguration)
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
bulkLoader.doBulkLoad(new Path(savePath), connection.getAdmin, table, regionLocator)
} finally {
//WARN LoadIncrementalHFiles: Skipping non-directory hdfs://linux-1:9000/hfile_save/_SUCCESS 不影响, 间接把文件移到 HBASE 对应 HDFS 地址了
table.close()
connection.close()}
sparkSession.stop()
}
def creteHTable(tableName: TableName, connection: Connection): Unit = {
val admin = connection.getAdmin
if (!admin.tableExists(tableName)) {val tableDescriptor = new HTableDescriptor(tableName)
tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("hfile-fy")))
admin.createTable(tableDescriptor)
}
}
def delHdfsPath(path: String, sparkSession: SparkSession) {
val hdfs = FileSystem.get(sparkSession.sessionState.newHadoopConf())
val hdfsPath = new Path(path)
if (hdfs.exists(hdfsPath)) {//val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
hdfs.delete(hdfsPath, true)
}
}
}
1.3 注意事项
上述示例代码能够依据理论业务需要作相应调整,但有一个问题须要特地留神:
通过 Spark 读取过去的数据生成 HFile 时,要确保 HBase 的主键、列族、列依照有序排列。否则,会抛出以下异样:
Caused by: java.io.IOException: Added a key not lexically larger than previous. Current cell = 1/hfile-fy:age/1588230543677/Put/vlen=2/seqid=0, lastCell = 1/hfile-fy:name/1588230543677/Put/vlen=4/seqid=0
2. 批量 put
2.1 数据样例
val rowKeyField = “id”
val df = sparkSession.read.format(“json”).load(“/stats.json”)
val fields = df.columns.filterNot(_ == “id”)
df.rdd.foreachPartition {partition =>
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "batch_put")
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf("batch_put"))
val res = partition.map { row =>
val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)
val put = new Put(rowKey)
val family = Bytes.toBytes("hfile-fy")
fields.foreach { field =>
put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
}
put
}.toList
Try(table.put(res)).getOrElse(table.close())
table.close()
conn.close()
}
在理论利用中,咱们也能够将常常一起查问的数据拼接在一起存入一个列中,比方将上述的 pv 和 uv 拼接在一起应用,能够升高 KeyValue 带来的结构化开销。
3.saveAsNewAPIHadoopDataset
val hbaseConf = sparkSession.sessionState.newHadoopConf()
hbaseConf.set(“hbase.zookeeper.quorum”, “linux-1:2181,linux-2:2181,linux-3:2181”)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, “direct”)
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val rowKeyField = “id”
val df = sparkSession.read.format(“json”).load(“/stats.json”)
val fields = df.columns.filterNot(_ == “id”)
df.rdd.map {row =>
val put = new Put(Bytes.toBytes(row.getAs(rowKeyField).toString))
val family = Bytes.toBytes("hfile-fy")
fields.foreach { field =>
put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
}
(new ImmutableBytesWritable(), put)
}.saveAsNewAPIHadoopDataset(job.getConfiguration)
以上次要介绍了 3 种利用 Spark 将数据导入 HBase 的形式。其中,通过生成 HFile 文件,而后以 BulkLoad 导入的形式更适宜于大数据量的操作。
此外,如果咱们在应用 Spark(或者其余计算引擎)读取 HBase 表数据时,如果效率绝对低,比方:Spark 读取 HBase 时会依据 region 的数量生成对应数量的 task,导致雷同数据量下,会比间接读取 Hive 数据慢,也能够通过间接读取 HFile 的形式来解决。当然,理论利用还要联合具体的场景,波及的技术等。