在理论生产环境中,有这样一种场景:用户数据位于HDFS中,业务须要定期将这部分海量数据导入HBase零碎,以执行随机查问更新操作。这种场景如果调用写入API进行解决,极有可能会给RegionServer带来较大的写入压力:
•引起RegionServer频繁flush,进而一直compact、split,影响集群稳定性。
•引起RegionServer频繁GC,影响集群稳定性。
•耗费大量CPU资源、带宽资源、内存资源以及IO资源,与其余业务产生资源竞争。
•在某些场景下,比方均匀KV大小比拟大的场景,会耗尽RegionServer的解决线程,导致集群阻塞。
鉴于存在上述问题,HBase提供了另一种将数据写入HBase集群的办法——BulkLoad。BulkLoad首先应用MapReduce将待写入集群数据转换为HFile文件,再间接将这些HFile文件加载到在线集群中。显然,BulkLoad计划没有将写申请发送给RegionServer解决,能够无效防止上述一系列问题。
BulkLoad外围流程
从HBase的视角来看,BulkLoad次要由两个阶段组成:
1)HFile生成阶段。这个阶段会运行一个MapReduce工作,MapReduce的mapper须要本人实现,将HDFS文件中的数据读出来组装成一个复合KV,其中Key是rowkey,Value能够是KeyValue对象、Put对象甚至Delete对象;MapReduce的reducer由HBase负责,通过办法HFileOutputFormat2.configureIncrementalLoad()进行配置,这个办法次要负责以下事项。
•依据表信息配置一个全局有序的partitioner。
•将partitioner文件上传到HDFS集群并写入DistributedCache。
•设置reduce task的个数为指标表Region的个数。
•设置输入key/value类满足HFileOutputFormat所规定的格局要求。
•依据类型设置reducer执行相应的排序(KeyValueSortReducer或者PutSortReducer)。
这个阶段会为每个Region生成一个对应的HFile文件。
2)HFile导入阶段。HFile准备就绪之后,就能够应用工具completebulkload将HFile加载到在线HBase集群。completebulkload工具次要负责以下工作。
•顺次查看第一步生成的所有HFile文件,将每个文件映射到对应的Region。
•将HFile文件挪动到对应Region所在的HDFS文件目录下。
•告知Region对应的RegionServer,加载HFile文件对外提供服务。
如果在BulkLoad的两头过程中Region产生了决裂,completebulkload工具会主动将对应的HFile文件依照新生成的Region边界切分成多个HFile文件,保障每个HFile都能与指标表以后的Region绝对应。但这个过程须要读取HFile内容,因此并不高效。须要尽量减少HFile生成阶段和HFile导入阶段的提早,最好可能在HFile生成之后立即执行HFile导入。
基于BulkLoad两阶段的工作原理,BulkLoad的外围流程如图所示。
BulkLoad根底案例
在hbase上创立一张表:
create ‘test_log’,’ext’
执行BulkLoad代码:
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object BulkLoad1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("HbaseBulkLoad")
val spark = SparkSession.builder
.config(sparkConf)
.getOrCreate()
val sc = spark.sparkContext
val datas = List(
("abc", ("ext", "type", "login")),
("ccc", ("ext", "type", "logout"))
)
val dataRdd = sc.parallelize(datas)
val output = dataRdd.map {
x => {
val rowKey = Bytes.toBytes(x._1)
val immutableRowKey = new ImmutableBytesWritable(rowKey)
val colFam = x._2._1
val colName = x._2._2
val colValue = x._2._3
val kv = new KeyValue(
rowKey,
Bytes.toBytes(colFam),
Bytes.toBytes(colName),
Bytes.toBytes(colValue.toString)
)
(immutableRowKey, kv)
}
}
val hConf = HBaseConfiguration.create()
hConf.addResource("hbase_site.xml")
val hTableName = "test_log"
hConf.set("hbase.mapreduce.hfileoutputformat.table.name",hTableName)
val tableName = TableName.valueOf(hTableName)
val conn = ConnectionFactory.createConnection(hConf)
val table = conn.getTable(tableName)
val regionLocator = conn.getRegionLocator(tableName)
val hFileOutput = "/tmp/h_file"
output.saveAsNewAPIHadoopFile(hFileOutput,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
hConf)
val bulkLoader = new LoadIncrementalHFiles(hConf)
bulkLoader.doBulkLoad(new Path(hFileOutput),conn.getAdmin,table,regionLocator)
}
}
提交spark执行:
spark-submit \
–master yarn \
–conf spark.yarn.tokens.hbase.enabled=true \
–deploy-mode client \
–class BulkLoad1
–executor-memory 512m
–driver-memory 512m
–total-executor-cores 2
/home/hadoop/hadoop-2.8.5/files/Spark_study.jar
在hbase shell上查看:
scan ‘test_log’
发表回复