共计 4025 个字符,预计需要花费 11 分钟才能阅读完成。
HBase 二级索引方案
[TOC]
使用 HBase Coprocessor 方案
测试案例需求:在原表 LJK_TEST 上,将 mycf:name 作为二级索引。
第一步
创建一张索引表
create 'INDEX_LJK_TEST','mycf'
第二步
写代码
public class SecondIndexObserver extends BaseRegionObserver { | |
private static final String INDEX_TABLE_NAME = "INDEX_LJK_TEST"; | |
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("mycf"); | |
private static final byte[] COLUMN_NAME = Bytes.toBytes("name"); | |
private static final byte[] COLUMN_ID = Bytes.toBytes("id"); | |
private Configuration configuration = HBaseConfiguration.create(); | |
@Override | |
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {HTable hTable = new HTable(configuration, INDEX_TABLE_NAME); | |
List<Cell> cells = put.get(COLUMN_FAMILY, COLUMN_NAME); | |
for (Cell cell : cells) {Put indexPut = new Put(CellUtil.cloneValue(cell)); | |
indexPut.addColumn(COLUMN_FAMILY, COLUMN_ID, CellUtil.cloneRow(cell)); | |
hTable.put(indexPut); | |
} | |
} | |
} |
第三步
将 jar 包上传到 HDFS,并给表 LJK_TEST 加上协处理器。
alter 'LJK_TEST','coprocessor'=>'/user/LJK/hbase.server.test-1.0-SNAPSHOT.jar|com.sunsharing.SecondIndexObserver||'
第四步
测试!往原表增加数据,看是否二级索引表符合预期结果。
可以看到索引表对应增加了一条数据。
hbase(main):004:0> put 'LJK_TEST','003','mycf:name','LJK3' | |
0 row(s) in 0.0930 seconds | |
hbase(main):006:0> scan 'INDEX_LJK_TEST' | |
ROW COLUMN+CELL | |
LJK3 column=mycf:id, timestamp=1562055903019, value=003 | |
1 row(s) in 0.0110 seconds |
使用 Hadoop MapReduce 建立二级索引
测试案例需求:在原表 LJK_TEST 上,将 mycf:name 作为二级索引。
第一步
写代码
public class MrIndexBuilder { | |
static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> { | |
private String columnFamily; | |
private String quality; | |
private String indexTableName; | |
@Override | |
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {List<Cell> columnCells = value.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(quality)); | |
for (Cell cell : columnCells) {byte[] indexRow = CellUtil.cloneValue(cell); | |
Put put = new Put(indexRow); | |
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("id"), key.get()); | |
context.write(new ImmutableBytesWritable(Bytes.toBytes(indexTableName)), put); | |
} | |
} | |
@Override | |
protected void setup(Context context) {Configuration configuration = context.getConfiguration(); | |
this.columnFamily = configuration.get("cf"); | |
this.quality = configuration.get("qa"); | |
this.indexTableName = configuration.get("indexTalbeName"); | |
} | |
} | |
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = HBaseConfiguration.create(); | |
if (args.length < 4) {throw new RuntimeException("参数传入错误,需要 4 个参数,原表名,二级索引表名,原表的 CF,原表作为二级索引的字段名!"); | |
} | |
String tableName = args[0]; | |
String indexTalbeName = args[1]; | |
String columnFamily = args[2]; | |
String indexQualify = args[3]; | |
conf.set("cf", columnFamily); | |
conf.set("qa", indexQualify); | |
conf.set("indexTalbeName", indexTalbeName); | |
Job mrIndexBuilder = new Job(conf, "MrIndexBuilder"); | |
mrIndexBuilder.setJarByClass(MrIndexBuilder.class); | |
mrIndexBuilder.setMapperClass(MyMapper.class); | |
mrIndexBuilder.setInputFormatClass(TableInputFormat.class); | |
mrIndexBuilder.setOutputFormatClass(MultiTableOutputFormat.class); | |
mrIndexBuilder.setNumReduceTasks(0); | |
Scan scan = new Scan(); | |
scan.setCaching(500); | |
scan.setCacheBlocks(false); | |
TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class, | |
Put.class, mrIndexBuilder); | |
boolean b = mrIndexBuilder.waitForCompletion(true); | |
if (!b) {throw new IOException("任务报错!"); | |
} | |
} | |
} |
第二步
打成 jar 包,放到 hbase 集群环境的某一台服务器上。执行命令
HADOOP_CLASSPATH=`hbase classpath` hadoop jar hbase.server.test-1.0-SNAPSHOT.jar com.sunsharing.MrIndexBuilder LJK_TEST INDEX_LJK_TEST mycf name
第三步
验证结果符合预期
hbase(main):021:0> scan 'INDEX_LJK_TEST' | |
ROW COLUMN+CELL | |
LJK column=mycf:id, timestamp=1562657562219, value=002 | |
LJK3 column=mycf:id, timestamp=1562657562219, value=003 | |
LJK4 column=mycf:id, timestamp=1562657562219, value=004 | |
LJK5 column=mycf:id, timestamp=1562657562219, value=005 | |
LJK6 column=mycf:id, timestamp=1562657562219, value=006 | |
LJK7 column=mycf:id, timestamp=1562657562219, value=007 | |
LJK8 column=mycf:id, timestamp=1562657562219, value=008 | |
7 row(s) in 0.3670 seconds |
Phoenix 二级索引方案
该方案最为简单,先建立一张映射到 Phoenix 的表,接着采用全局二级索引
CREATE TABLE LJK_TEST (ID VARCHAR NOT NULL PRIMARY KEY,"mycf"."name" VARCHAR)
CREATE INDEX COVER_LJKTEST_INDEX ON LJKTEST(name);
ES 二级索引方案
该方案基本可以应付所有情况,待补充。
附录
Lily HBase 二级索引
正文完