HBase 二级索引方案
[TOC]
使用 HBase Coprocessor 方案
测试案例需求:在原表 LJK_TEST 上,将 mycf:name 作为二级索引。
第一步
创建一张索引表
create 'INDEX_LJK_TEST','mycf'
第二步
写代码
public class SecondIndexObserver extends BaseRegionObserver {
private static final String INDEX_TABLE_NAME = "INDEX_LJK_TEST";
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("mycf");
private static final byte[] COLUMN_NAME = Bytes.toBytes("name");
private static final byte[] COLUMN_ID = Bytes.toBytes("id");
private Configuration configuration = HBaseConfiguration.create();
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {HTable hTable = new HTable(configuration, INDEX_TABLE_NAME);
List<Cell> cells = put.get(COLUMN_FAMILY, COLUMN_NAME);
for (Cell cell : cells) {Put indexPut = new Put(CellUtil.cloneValue(cell));
indexPut.addColumn(COLUMN_FAMILY, COLUMN_ID, CellUtil.cloneRow(cell));
hTable.put(indexPut);
}
}
}
第三步
将 jar 包上传到 HDFS,并给表 LJK_TEST 加上协处理器。
alter 'LJK_TEST','coprocessor'=>'/user/LJK/hbase.server.test-1.0-SNAPSHOT.jar|com.sunsharing.SecondIndexObserver||'
第四步
测试!往原表增加数据,看是否二级索引表符合预期结果。
可以看到索引表对应增加了一条数据。
hbase(main):004:0> put 'LJK_TEST','003','mycf:name','LJK3'
0 row(s) in 0.0930 seconds
hbase(main):006:0> scan 'INDEX_LJK_TEST'
ROW COLUMN+CELL
LJK3 column=mycf:id, timestamp=1562055903019, value=003
1 row(s) in 0.0110 seconds
使用 Hadoop MapReduce 建立二级索引
测试案例需求:在原表 LJK_TEST 上,将 mycf:name 作为二级索引。
第一步
写代码
public class MrIndexBuilder {
static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
private String columnFamily;
private String quality;
private String indexTableName;
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {List<Cell> columnCells = value.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(quality));
for (Cell cell : columnCells) {byte[] indexRow = CellUtil.cloneValue(cell);
Put put = new Put(indexRow);
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("id"), key.get());
context.write(new ImmutableBytesWritable(Bytes.toBytes(indexTableName)), put);
}
}
@Override
protected void setup(Context context) {Configuration configuration = context.getConfiguration();
this.columnFamily = configuration.get("cf");
this.quality = configuration.get("qa");
this.indexTableName = configuration.get("indexTalbeName");
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = HBaseConfiguration.create();
if (args.length < 4) {throw new RuntimeException("参数传入错误,需要 4 个参数,原表名,二级索引表名,原表的 CF,原表作为二级索引的字段名!");
}
String tableName = args[0];
String indexTalbeName = args[1];
String columnFamily = args[2];
String indexQualify = args[3];
conf.set("cf", columnFamily);
conf.set("qa", indexQualify);
conf.set("indexTalbeName", indexTalbeName);
Job mrIndexBuilder = new Job(conf, "MrIndexBuilder");
mrIndexBuilder.setJarByClass(MrIndexBuilder.class);
mrIndexBuilder.setMapperClass(MyMapper.class);
mrIndexBuilder.setInputFormatClass(TableInputFormat.class);
mrIndexBuilder.setOutputFormatClass(MultiTableOutputFormat.class);
mrIndexBuilder.setNumReduceTasks(0);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class,
Put.class, mrIndexBuilder);
boolean b = mrIndexBuilder.waitForCompletion(true);
if (!b) {throw new IOException("任务报错!");
}
}
}
第二步
打成 jar 包,放到 hbase 集群环境的某一台服务器上。执行命令
HADOOP_CLASSPATH=`hbase classpath` hadoop jar hbase.server.test-1.0-SNAPSHOT.jar com.sunsharing.MrIndexBuilder LJK_TEST INDEX_LJK_TEST mycf name
第三步
验证结果符合预期
hbase(main):021:0> scan 'INDEX_LJK_TEST'
ROW COLUMN+CELL
LJK column=mycf:id, timestamp=1562657562219, value=002
LJK3 column=mycf:id, timestamp=1562657562219, value=003
LJK4 column=mycf:id, timestamp=1562657562219, value=004
LJK5 column=mycf:id, timestamp=1562657562219, value=005
LJK6 column=mycf:id, timestamp=1562657562219, value=006
LJK7 column=mycf:id, timestamp=1562657562219, value=007
LJK8 column=mycf:id, timestamp=1562657562219, value=008
7 row(s) in 0.3670 seconds
Phoenix 二级索引方案
该方案最为简单,先建立一张映射到 Phoenix 的表,接着采用全局二级索引
CREATE TABLE LJK_TEST (ID VARCHAR NOT NULL PRIMARY KEY,"mycf"."name" VARCHAR)
CREATE INDEX COVER_LJKTEST_INDEX ON LJKTEST(name);
ES 二级索引方案
该方案基本可以应付所有情况,待补充。
附录
Lily HBase 二级索引