关于hbase:大数据开发技术之如何将数据导入到HBase

在理论生产环境中,将计算和存储进行拆散,是咱们进步集群吞吐量、确保集群规模程度可扩大的次要办法之一,并且通过集群的扩容、性能的优化,确保在数据大幅增长时,存储不能称为零碎的瓶颈。大数据培训

具体到咱们理论的我的项目需要中,有一个典型的场景,通常会将Hive中的局部数据,比方热数据,存入到HBase中,进行冷热拆散解决。

咱们采纳Spark读取Hive表数据存入HBase中,这里次要有两种形式:

通过HBase的put API进行数据的批量写入

通过生成HFile文件,而后通过BulkLoad形式将数据存入HBase

HBase的原生put形式,通过HBase集群的region server向HBase插入数据,然而当数据量十分大时,region会进行split、compact等解决,并且这些解决十分占用计算资源和IO开销,影响性能和集群的稳定性。

HBase的数据最终是以HFile的模式存储到HDFS上的,如果咱们能间接将数据生成为HFile文件,而后将HFile文件保留到HBase对应的表中,能够防止上述的很多问题,效率会绝对更高。

本篇文章次要介绍如何应用Spark生成HFile文件,而后通过BulkLoad形式将数据导入到HBase中,并附批量put数据到HBase以及间接存入数据到HBase中的理论利用示例。

1. 生成HFile,BulkLoad导入

1.1 数据样例

{“id”:”1″,”name”:”jack”,”age”:”18″}
{“id”:”2″,”name”:”mike”,”age”:”19″}
{“id”:”3″,”name”:”kilos”,”age”:”20″}
{“id”:”4″,”name”:”tom”,”age”:”21″}

1.2 示例代码

/**

  • @Author bigdatalearnshare
    */

object App {

def main(args: Array[String]): Unit = {

System.setProperty("HADOOP_USER_NAME", "root")


val sparkSession = SparkSession
  .builder()
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .master("local[*]")
  .getOrCreate()

val rowKeyField = "id"

val df = sparkSession.read.format("json").load("/people.json")


val fields = df.columns.filterNot(_ == "id").sorted


val data = df.rdd.map { row =>
  val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)


  val kvs = fields.map { field =>
    new KeyValue(rowKey, Bytes.toBytes("hfile-fy"), Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
  }


  (new ImmutableBytesWritable(rowKey), kvs)
}.flatMapValues(x => x).sortByKey()

val hbaseConf = HBaseConfiguration.create(sparkSession.sessionState.newHadoopConf())
hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "hfile")
val connection = ConnectionFactory.createConnection(hbaseConf)


val tableName = TableName.valueOf("hfile")


//没有HBase表则创立
creteHTable(tableName, connection)


val table = connection.getTable(tableName)


try {
  val regionLocator = connection.getRegionLocator(tableName)


  val job = Job.getInstance(hbaseConf)


  job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
  job.setMapOutputValueClass(classOf[KeyValue])


  HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)


  val savePath = "hdfs://linux-1:9000/hfile_save"
  delHdfsPath(savePath, sparkSession)


  job.getConfiguration.set("mapred.output.dir", savePath)


  data.saveAsNewAPIHadoopDataset(job.getConfiguration)


  val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
  bulkLoader.doBulkLoad(new Path(savePath), connection.getAdmin, table, regionLocator)


} finally {
  //WARN LoadIncrementalHFiles: Skipping non-directory hdfs://linux-1:9000/hfile_save/_SUCCESS 不影响,间接把文件移到HBASE对应HDFS地址了
  table.close()
  connection.close()
}


sparkSession.stop()

}

def creteHTable(tableName: TableName, connection: Connection): Unit = {

val admin = connection.getAdmin


if (!admin.tableExists(tableName)) {
  val tableDescriptor = new HTableDescriptor(tableName)
  tableDescriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("hfile-fy")))
  admin.createTable(tableDescriptor)
}

}

def delHdfsPath(path: String, sparkSession: SparkSession) {

val hdfs = FileSystem.get(sparkSession.sessionState.newHadoopConf())
val hdfsPath = new Path(path)


if (hdfs.exists(hdfsPath)) {
  //val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
  hdfs.delete(hdfsPath, true)
}

}
}

1.3 注意事项

上述示例代码能够依据理论业务需要作相应调整,但有一个问题须要特地留神:

通过Spark读取过去的数据生成HFile时,要确保HBase的主键、列族、列依照有序排列。否则,会抛出以下异样:

Caused by: java.io.IOException: Added a key not lexically larger than previous. Current cell = 1/hfile-fy:age/1588230543677/Put/vlen=2/seqid=0, lastCell = 1/hfile-fy:name/1588230543677/Put/vlen=4/seqid=0

2. 批量put

2.1数据样例

val rowKeyField = “id”
val df = sparkSession.read.format(“json”).load(“/stats.json”)
val fields = df.columns.filterNot(_ == “id”)

df.rdd.foreachPartition { partition =>

  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "linux-1:2181,linux-2:2181,linux-3:2181")
  hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "batch_put")


  val conn = ConnectionFactory.createConnection(hbaseConf)
  val table = conn.getTable(TableName.valueOf("batch_put"))


  val res = partition.map { row =>
    val rowKey = Bytes.toBytes(row.getAs(rowKeyField).toString)
    val put = new Put(rowKey)
    val family = Bytes.toBytes("hfile-fy")


    fields.foreach { field =>
      put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
    }


    put
  }.toList


  Try(table.put(res)).getOrElse(table.close())


  table.close()
  conn.close()

}

在理论利用中,咱们也能够将常常一起查问的数据拼接在一起存入一个列中,比方将上述的pv和uv拼接在一起应用,能够升高KeyValue带来的结构化开销。

3.saveAsNewAPIHadoopDataset

val hbaseConf = sparkSession.sessionState.newHadoopConf()
hbaseConf.set(“hbase.zookeeper.quorum”, “linux-1:2181,linux-2:2181,linux-3:2181”)

hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, “direct”)
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

val rowKeyField = “id”

val df = sparkSession.read.format(“json”).load(“/stats.json”)

val fields = df.columns.filterNot(_ == “id”)

df.rdd.map { row =>

val put = new Put(Bytes.toBytes(row.getAs(rowKeyField).toString))


val family = Bytes.toBytes("hfile-fy")


fields.foreach { field =>
  put.addColumn(family, Bytes.toBytes(field), Bytes.toBytes(row.getAs(field).toString))
}


(new ImmutableBytesWritable(), put)

}.saveAsNewAPIHadoopDataset(job.getConfiguration)

以上次要介绍了3种利用Spark将数据导入HBase的形式。其中,通过生成HFile文件,而后以BulkLoad导入的形式更适宜于大数据量的操作。

此外,如果咱们在应用Spark(或者其余计算引擎)读取HBase表数据时,如果效率绝对低,比方:Spark读取HBase时会依据region的数量生成对应数量的task,导致雷同数据量下,会比间接读取Hive数据慢,也能够通过间接读取HFile的形式来解决。当然,理论利用还要联合具体的场景,波及的技术等。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理