HBase二级索引方案

53次阅读

共计 4025 个字符,预计需要花费 11 分钟才能阅读完成。

HBase 二级索引方案

[TOC]

使用 HBase Coprocessor 方案

测试案例需求:在原表 LJK_TEST 上,将 mycf:name 作为二级索引。

第一步

创建一张索引表

create 'INDEX_LJK_TEST','mycf'

第二步

写代码

public class SecondIndexObserver extends BaseRegionObserver {

    private static final String INDEX_TABLE_NAME = "INDEX_LJK_TEST";

    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("mycf");
    private static final byte[] COLUMN_NAME = Bytes.toBytes("name");
    private static final byte[] COLUMN_ID = Bytes.toBytes("id");

    private Configuration configuration = HBaseConfiguration.create();


    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {HTable hTable = new HTable(configuration, INDEX_TABLE_NAME);
        List<Cell> cells = put.get(COLUMN_FAMILY, COLUMN_NAME);

        for (Cell cell : cells) {Put indexPut = new Put(CellUtil.cloneValue(cell));
            indexPut.addColumn(COLUMN_FAMILY, COLUMN_ID, CellUtil.cloneRow(cell));
            hTable.put(indexPut);
        }
    }
}

第三步

将 jar 包上传到 HDFS,并给表 LJK_TEST 加上协处理器。

alter 'LJK_TEST','coprocessor'=>'/user/LJK/hbase.server.test-1.0-SNAPSHOT.jar|com.sunsharing.SecondIndexObserver||'

第四步

测试!往原表增加数据,看是否二级索引表符合预期结果。

可以看到索引表对应增加了一条数据。

hbase(main):004:0> put 'LJK_TEST','003','mycf:name','LJK3'
0 row(s) in 0.0930 seconds

hbase(main):006:0> scan 'INDEX_LJK_TEST'
ROW                       COLUMN+CELL                                                           
 LJK3                     column=mycf:id, timestamp=1562055903019, value=003                    
1 row(s) in 0.0110 seconds

使用 Hadoop MapReduce 建立二级索引

测试案例需求:在原表 LJK_TEST 上,将 mycf:name 作为二级索引。

第一步

写代码

public class MrIndexBuilder {

    static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
        private String columnFamily;
        private String quality;
        private String indexTableName;

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {List<Cell> columnCells = value.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(quality));
            for (Cell cell : columnCells) {byte[] indexRow = CellUtil.cloneValue(cell);
                Put put = new Put(indexRow);
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("id"), key.get());
                context.write(new ImmutableBytesWritable(Bytes.toBytes(indexTableName)), put);
            }
        }

        @Override
        protected void setup(Context context) {Configuration configuration = context.getConfiguration();

            this.columnFamily = configuration.get("cf");
            this.quality = configuration.get("qa");
            this.indexTableName = configuration.get("indexTalbeName");
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = HBaseConfiguration.create();

        if (args.length < 4) {throw new RuntimeException("参数传入错误,需要 4 个参数,原表名,二级索引表名,原表的 CF,原表作为二级索引的字段名!");
        }

        String tableName = args[0];
        String indexTalbeName = args[1];
        String columnFamily = args[2];
        String indexQualify = args[3];

        conf.set("cf", columnFamily);
        conf.set("qa", indexQualify);
        conf.set("indexTalbeName", indexTalbeName);

        Job mrIndexBuilder = new Job(conf, "MrIndexBuilder");
        mrIndexBuilder.setJarByClass(MrIndexBuilder.class);
        mrIndexBuilder.setMapperClass(MyMapper.class);
        mrIndexBuilder.setInputFormatClass(TableInputFormat.class);
        mrIndexBuilder.setOutputFormatClass(MultiTableOutputFormat.class);
        mrIndexBuilder.setNumReduceTasks(0);

        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);

        TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class,
            Put.class, mrIndexBuilder);
        boolean b = mrIndexBuilder.waitForCompletion(true);
        if (!b) {throw new IOException("任务报错!");
        }
    }
}

第二步

打成 jar 包,放到 hbase 集群环境的某一台服务器上。执行命令

HADOOP_CLASSPATH=`hbase classpath` hadoop jar hbase.server.test-1.0-SNAPSHOT.jar com.sunsharing.MrIndexBuilder LJK_TEST INDEX_LJK_TEST  mycf name

第三步

验证结果符合预期

hbase(main):021:0> scan 'INDEX_LJK_TEST'
ROW                               COLUMN+CELL                                                                                    
 LJK                              column=mycf:id, timestamp=1562657562219, value=002                                             
 LJK3                             column=mycf:id, timestamp=1562657562219, value=003                                             
 LJK4                             column=mycf:id, timestamp=1562657562219, value=004                                             
 LJK5                             column=mycf:id, timestamp=1562657562219, value=005                                             
 LJK6                             column=mycf:id, timestamp=1562657562219, value=006                                             
 LJK7                             column=mycf:id, timestamp=1562657562219, value=007                                             
 LJK8                             column=mycf:id, timestamp=1562657562219, value=008                                             
7 row(s) in 0.3670 seconds

Phoenix 二级索引方案

该方案最为简单,先建立一张映射到 Phoenix 的表,接着采用全局二级索引

CREATE TABLE LJK_TEST (ID VARCHAR NOT NULL PRIMARY KEY,"mycf"."name" VARCHAR)

CREATE INDEX COVER_LJKTEST_INDEX ON LJKTEST(name);

ES 二级索引方案

该方案基本可以应付所有情况,待补充。

附录

Lily HBase 二级索引

正文完
 0

HBase二级索引方案

53次阅读

共计 4025 个字符,预计需要花费 11 分钟才能阅读完成。

HBase 二级索引方案

[TOC]

使用 HBase Coprocessor 方案

测试案例需求:在原表 LJK_TEST 上,将 mycf:name 作为二级索引。

第一步

创建一张索引表

create 'INDEX_LJK_TEST','mycf'

第二步

写代码

public class SecondIndexObserver extends BaseRegionObserver {

    private static final String INDEX_TABLE_NAME = "INDEX_LJK_TEST";

    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("mycf");
    private static final byte[] COLUMN_NAME = Bytes.toBytes("name");
    private static final byte[] COLUMN_ID = Bytes.toBytes("id");

    private Configuration configuration = HBaseConfiguration.create();


    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {HTable hTable = new HTable(configuration, INDEX_TABLE_NAME);
        List<Cell> cells = put.get(COLUMN_FAMILY, COLUMN_NAME);

        for (Cell cell : cells) {Put indexPut = new Put(CellUtil.cloneValue(cell));
            indexPut.addColumn(COLUMN_FAMILY, COLUMN_ID, CellUtil.cloneRow(cell));
            hTable.put(indexPut);
        }
    }
}

第三步

将 jar 包上传到 HDFS,并给表 LJK_TEST 加上协处理器。

alter 'LJK_TEST','coprocessor'=>'/user/LJK/hbase.server.test-1.0-SNAPSHOT.jar|com.sunsharing.SecondIndexObserver||'

第四步

测试!往原表增加数据,看是否二级索引表符合预期结果。

可以看到索引表对应增加了一条数据。

hbase(main):004:0> put 'LJK_TEST','003','mycf:name','LJK3'
0 row(s) in 0.0930 seconds

hbase(main):006:0> scan 'INDEX_LJK_TEST'
ROW                       COLUMN+CELL                                                           
 LJK3                     column=mycf:id, timestamp=1562055903019, value=003                    
1 row(s) in 0.0110 seconds

使用 Hadoop MapReduce 建立二级索引

测试案例需求:在原表 LJK_TEST 上,将 mycf:name 作为二级索引。

第一步

写代码

public class MrIndexBuilder {

    static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
        private String columnFamily;
        private String quality;
        private String indexTableName;

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {List<Cell> columnCells = value.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(quality));
            for (Cell cell : columnCells) {byte[] indexRow = CellUtil.cloneValue(cell);
                Put put = new Put(indexRow);
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("id"), key.get());
                context.write(new ImmutableBytesWritable(Bytes.toBytes(indexTableName)), put);
            }
        }

        @Override
        protected void setup(Context context) {Configuration configuration = context.getConfiguration();

            this.columnFamily = configuration.get("cf");
            this.quality = configuration.get("qa");
            this.indexTableName = configuration.get("indexTalbeName");
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = HBaseConfiguration.create();

        if (args.length < 4) {throw new RuntimeException("参数传入错误,需要 4 个参数,原表名,二级索引表名,原表的 CF,原表作为二级索引的字段名!");
        }

        String tableName = args[0];
        String indexTalbeName = args[1];
        String columnFamily = args[2];
        String indexQualify = args[3];

        conf.set("cf", columnFamily);
        conf.set("qa", indexQualify);
        conf.set("indexTalbeName", indexTalbeName);

        Job mrIndexBuilder = new Job(conf, "MrIndexBuilder");
        mrIndexBuilder.setJarByClass(MrIndexBuilder.class);
        mrIndexBuilder.setMapperClass(MyMapper.class);
        mrIndexBuilder.setInputFormatClass(TableInputFormat.class);
        mrIndexBuilder.setOutputFormatClass(MultiTableOutputFormat.class);
        mrIndexBuilder.setNumReduceTasks(0);

        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);

        TableMapReduceUtil.initTableMapperJob(tableName, scan, MyMapper.class, ImmutableBytesWritable.class,
            Put.class, mrIndexBuilder);
        boolean b = mrIndexBuilder.waitForCompletion(true);
        if (!b) {throw new IOException("任务报错!");
        }
    }
}

第二步

打成 jar 包,放到 hbase 集群环境的某一台服务器上。执行命令

HADOOP_CLASSPATH=`hbase classpath` hadoop jar hbase.server.test-1.0-SNAPSHOT.jar com.sunsharing.MrIndexBuilder LJK_TEST INDEX_LJK_TEST  mycf name

第三步

验证结果符合预期

hbase(main):021:0> scan 'INDEX_LJK_TEST'
ROW                               COLUMN+CELL                                                                                    
 LJK                              column=mycf:id, timestamp=1562657562219, value=002                                             
 LJK3                             column=mycf:id, timestamp=1562657562219, value=003                                             
 LJK4                             column=mycf:id, timestamp=1562657562219, value=004                                             
 LJK5                             column=mycf:id, timestamp=1562657562219, value=005                                             
 LJK6                             column=mycf:id, timestamp=1562657562219, value=006                                             
 LJK7                             column=mycf:id, timestamp=1562657562219, value=007                                             
 LJK8                             column=mycf:id, timestamp=1562657562219, value=008                                             
7 row(s) in 0.3670 seconds

Phoenix 二级索引方案

该方案最为简单,先建立一张映射到 Phoenix 的表,接着采用全局二级索引

CREATE TABLE LJK_TEST (ID VARCHAR NOT NULL PRIMARY KEY,"mycf"."name" VARCHAR)

CREATE INDEX COVER_LJKTEST_INDEX ON LJKTEST(name);

ES 二级索引方案

该方案基本可以应付所有情况,待补充。

附录

Lily HBase 二级索引

正文完
 0