在理论生产环境中,有这样一种场景:用户数据位于 HDFS 中,业务须要定期将这部分海量数据导入 HBase 零碎,以执行随机查问更新操作。这种场景如果调用写入 API 进行解决,极有可能会给 RegionServer 带来较大的写入压力:
•引起 RegionServer 频繁 flush,进而一直 compact、split,影响集群稳定性。
•引起 RegionServer 频繁 GC,影响集群稳定性。
•耗费大量 CPU 资源、带宽资源、内存资源以及 IO 资源,与其余业务产生资源竞争。
•在某些场景下,比方均匀 KV 大小比拟大的场景,会耗尽 RegionServer 的解决线程,导致集群阻塞。
鉴于存在上述问题,HBase 提供了另一种将数据写入 HBase 集群的办法——BulkLoad。BulkLoad 首先应用 MapReduce 将待写入集群数据转换为 HFile 文件,再间接将这些 HFile 文件加载到在线集群中。显然,BulkLoad 计划没有将写申请发送给 RegionServer 解决,能够无效防止上述一系列问题。
BulkLoad 外围流程
从 HBase 的视角来看,BulkLoad 次要由两个阶段组成:
1)HFile 生成阶段。这个阶段会运行一个 MapReduce 工作,MapReduce 的 mapper 须要本人实现,将 HDFS 文件中的数据读出来组装成一个复合 KV,其中 Key 是 rowkey,Value 能够是 KeyValue 对象、Put 对象甚至 Delete 对象;MapReduce 的 reducer 由 HBase 负责,通过办法 HFileOutputFormat2.configureIncrementalLoad() 进行配置,这个办法次要负责以下事项。
•依据表信息配置一个全局有序的 partitioner。
•将 partitioner 文件上传到 HDFS 集群并写入 DistributedCache。
•设置 reduce task 的个数为指标表 Region 的个数。
•设置输入 key/value 类满足 HFileOutputFormat 所规定的格局要求。
•依据类型设置 reducer 执行相应的排序(KeyValueSortReducer 或者 PutSortReducer)。
这个阶段会为每个 Region 生成一个对应的 HFile 文件。
2)HFile 导入阶段。HFile 准备就绪之后,就能够应用工具 completebulkload 将 HFile 加载到在线 HBase 集群。completebulkload 工具次要负责以下工作。
•顺次查看第一步生成的所有 HFile 文件,将每个文件映射到对应的 Region。
•将 HFile 文件挪动到对应 Region 所在的 HDFS 文件目录下。
•告知 Region 对应的 RegionServer,加载 HFile 文件对外提供服务。
如果在 BulkLoad 的两头过程中 Region 产生了决裂,completebulkload 工具会主动将对应的 HFile 文件依照新生成的 Region 边界切分成多个 HFile 文件,保障每个 HFile 都能与指标表以后的 Region 绝对应。但这个过程须要读取 HFile 内容,因此并不高效。须要尽量减少 HFile 生成阶段和 HFile 导入阶段的提早,最好可能在 HFile 生成之后立即执行 HFile 导入。
基于 BulkLoad 两阶段的工作原理,BulkLoad 的外围流程如图所示。
BulkLoad 根底案例
在 hbase 上创立一张表:
create ‘test_log’,’ext’
执行 BulkLoad 代码:
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object BulkLoad1 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("HbaseBulkLoad")
val spark = SparkSession.builder
.config(sparkConf)
.getOrCreate()
val sc = spark.sparkContext
val datas = List(("abc", ("ext", "type", "login")),
("ccc", ("ext", "type", "logout"))
)
val dataRdd = sc.parallelize(datas)
val output = dataRdd.map {
x => {val rowKey = Bytes.toBytes(x._1)
val immutableRowKey = new ImmutableBytesWritable(rowKey)
val colFam = x._2._1
val colName = x._2._2
val colValue = x._2._3
val kv = new KeyValue(
rowKey,
Bytes.toBytes(colFam),
Bytes.toBytes(colName),
Bytes.toBytes(colValue.toString)
)
(immutableRowKey, kv)
}
}
val hConf = HBaseConfiguration.create()
hConf.addResource("hbase_site.xml")
val hTableName = "test_log"
hConf.set("hbase.mapreduce.hfileoutputformat.table.name",hTableName)
val tableName = TableName.valueOf(hTableName)
val conn = ConnectionFactory.createConnection(hConf)
val table = conn.getTable(tableName)
val regionLocator = conn.getRegionLocator(tableName)
val hFileOutput = "/tmp/h_file"
output.saveAsNewAPIHadoopFile(hFileOutput,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
hConf)
val bulkLoader = new LoadIncrementalHFiles(hConf)
bulkLoader.doBulkLoad(new Path(hFileOutput),conn.getAdmin,table,regionLocator)
}
}
提交 spark 执行:
spark-submit \
–master yarn \
–conf spark.yarn.tokens.hbase.enabled=true \
–deploy-mode client \
–class BulkLoad1
–executor-memory 512m
–driver-memory 512m
–total-executor-cores 2
/home/hadoop/hadoop-2.8.5/files/Spark_study.jar
在 hbase shell 上查看:
scan ‘test_log’