Hadoop 踩坑记(四)

Hbase客户端编程(Eclipse)

环境

关于 Hbase 的安装与配置以及 Eclipse 的配置请参考前两篇文章

本系列选用的 hbase 版本为 1.4.13

本系列选用的 hadoop 版本为 2.8.5

请注意包名、服务器等个性化配置

引入jar包

需要将 Hbase 中与客户端相关的 jar 包引入到 Build Path

理论上只需要将 org.apache.hadoop.hbase.* 相关包引入即可,但实际操作中还是遇到了缺少情况,因此将 hbase 的 lib 目录下的所有 jar 包都引入了

这种粗暴的方法源于本人对于 java 开发的了解太少,经验丰富的朋友应该可以按需引入。

表的创建与删除

一个示例的 Student 表结构如下图所示

代码如下

package wit;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;public class HBaseTest {//声明静态配置 HBaseConfiguration   static Configuration cfg=HBaseConfiguration.create();    //创建学生表   public static void createStdTable() throws Exception {   cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");      //数据表名    String tablename="Student";    //列簇名列表    String[] columnFamilys= new String[] {"Std","Course"};    //建立连接    Connection con = ConnectionFactory.createConnection(cfg);    //获得Admin对象    Admin admin = con.getAdmin();    //获得表对象    TableName tName  = TableName.valueOf(tablename);    //判断表是否存在        if (admin.tableExists(tName)) {            System.out.println("table Exists!");            System.exit(0);        }        else{        HTableDescriptor tableDesc = new HTableDescriptor(tName);        //添加列簇        for(String cf:columnFamilys)        {          HColumnDescriptor cfDesc = new HColumnDescriptor(cf);          if(cf.equals("Course"))//设置课程的最大历史版本            cfDesc.setMaxVersions(3);          tableDesc.addFamily(cfDesc);        }        //创建表            admin.createTable(tableDesc);            System.out.println("create table success!");        }        admin.close();        con.close();    }   public static void  main (String [] agrs) throws Throwable {      try {        createStdTable();      }      catch (Exception e) {        e.printStackTrace();      }  }}

其中

cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");

导入了 hbase-site.xml 文件中的服务器配置,理论上应该要将该文件引用为项目配置,但本人水平所限,没有做到,因此采用代码中手动配置的方法,后面的代码中也一样采用了此种方式。

运行上述代码,得到 create table success! 的提示即为创建成功

删除表的代码如下

package wit;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;public class Delete {//声明静态配置 HBaseConfiguration   static Configuration cfg=HBaseConfiguration.create();   public static void DeleteTable() throws Exception{   cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");   Connection con = ConnectionFactory.createConnection(cfg);   //获得表对象   TableName tablename = TableName.valueOf("Student");    //获得Admin对象   Admin admin = con.getAdmin();   if(admin.tableExists(tablename)){     try       {        admin.disableTable(tablename);        admin.deleteTable(tablename);       }catch(Exception ex){         ex.printStackTrace();       }    } }   public static void  main (String [] agrs) throws Throwable {    try {      DeleteTable();    }    catch (Exception e) {      e.printStackTrace();    }  }}

表模式的修改

增加新的列

下面的代码会在表中新增列 Test

public static void  AddStdColFamily () throws Throwable {    Connection con = ConnectionFactory.createConnection(cfg);    //获得表对象    TableName tablename = TableName.valueOf("Student");    //获得Admin对象    Admin admin = con.getAdmin();    HColumnDescriptor newCol = new HColumnDescriptor("Test");    newCol.setMaxVersions(3);    if(admin.tableExists(tablename)){      try        {         admin.disableTable(tablename);         admin.addColumn(tablename, newCol);        }catch(Exception ex){          ex.printStackTrace();       }    }    admin.enableTable(tablename);    admin.close();    con.close();}

修改列簇属性

修改 Test 列簇的最大历史版本数为 5

public static void  ModifyStdColFamily () throws Throwable {  Connection con = ConnectionFactory.createConnection(cfg);  //获得表对象  TableName tablename = TableName.valueOf("Student");  //获得Admin对象   Admin admin = con.getAdmin();   HColumnDescriptor modCol = new HColumnDescriptor("Test");   modCol.setMaxVersions(5);   if(admin.tableExists(tablename)){     try     {       admin.disableTable(tablename);       admin.modifyColumn(tablename, modCol);      }catch(Exception ex){          ex.printStackTrace();      }     }      admin.enableTable(tablename);      admin.close();      con.close();}  

删除列

删除 Test 列

public static void  DeleteStdColFamily() throws Throwable {   Connection con = ConnectionFactory.createConnection(cfg);   //获得表对象   TableName tablename = TableName.valueOf("Student");   //获得Admin对象   Admin admin = con.getAdmin(); if(admin.tableExists(tablename)){     try     {       admin.disableTable(tablename);       admin.deleteColumn(tablename, Bytes.toBytes("Test"));      }catch(Exception ex){         ex.printStackTrace();       }    }    admin.enableTable(tablename);    admin.close();    con.close();}

在表中插入和修改数据(略)

与MapReduce集成

读取 hdfs 文件后写入数据表

hdfs 文件 Std.txt 内容为

200215125, Jim, Male, 2008-12-09, CS, 89, 78, 56200215126, Marry, Female, 2001-2-09, AI , 79, 72, 66200215127, Marker, Male, 2003-12-19, CE, 78, 48, 36

这里需要注意的是,此文件不能有空行,否则读取数据时会报错

代码如下,Map 过程读取 Std.txt 文件中的每一行,然后将学生的学号设置为 key,学生的其它信息设置为 value 后,写出到中间结果。Reduce 过程负责将 Map 形成的中间结果写入到 HBase 的 Student 表中,故 Reduce 继承至TableReducer,在 main 函数中使用

package wit;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class StdHdfsToHBase {  public static class HDFSMap extends Mapper<Object, Text, Text, Text> {      //实现map函数,读取hdfs上的std.txt文件    public void map(Object key, Text value, Context context)    throws IOException, InterruptedException{    //取出学生的学号为rowKey    String stdRowKey = value.toString().split(",")[0];     System.out.println(stdRowKey);    //学号后面的学生信息为value    String stdInfo =value.toString().substring(stdRowKey.length()+1);     System.out.println(stdInfo);    context.write(new Text(stdRowKey), new Text(stdInfo));}   }  public static class HDFSReducer extends TableReducer<Text, Text,ImmutableBytesWritable>{     @Override    protected void reduce(Text key, Iterable<Text> values, Context context)    throws IOException, InterruptedException {        Put put = new Put(key.getBytes());        for (Text val : values) {         String[] stdInfo = val.toString().split(",");             put.addColumn("Std".getBytes(), "Name".getBytes(), stdInfo[0].getBytes());             put.addColumn("Std".getBytes(), "gender".getBytes(), stdInfo[1].getBytes());             put.addColumn("Std".getBytes(), "birth".getBytes(), stdInfo[2].getBytes());             put.addColumn("Std".getBytes(), "dept".getBytes(), stdInfo[3].getBytes());             put.addColumn("Course".getBytes(), "math".getBytes(), Bytes.toBytes(Long.parseLong(stdInfo[4])));             put.addColumn("Course".getBytes(), "arts".getBytes(), Bytes.toBytes(Long.parseLong(stdInfo[5])));             put.addColumn("Course".getBytes(), "phy".getBytes(), Bytes.toBytes(Long.parseLong(stdInfo[6])));             //写入学生信息到HBase表           context.write(new ImmutableBytesWritable(key.getBytes()), put);           }    }}  public static void main(String[] args) throws    IOException, ClassNotFoundException, InterruptedException {      Configuration conf = HBaseConfiguration.create();      conf.set("hbase.zookeeper.quorum", "hadoop1-ali,hadoop2-hw");      Job job = Job.getInstance(conf, "StdHdfsToHBase");      job.setJarByClass(StdHdfsToHBase.class);      // 设置Map      job.setMapperClass(HDFSMap.class);       job.setMapOutputKeyClass(Text.class);          job.setMapOutputValueClass(Text.class);          //设置Reducer      TableMapReduceUtil.initTableReducerJob("Student",      HDFSReducer.class, job);          job.setOutputKeyClass(ImmutableBytesWritable.class);          job.setOutputValueClass(Put.class);      //设置std.txt的输入目录      FileInputFormat.addInputPath(job, new      Path("hdfs://hadoop1-ali:9000/input/std"));      System.exit(job.waitForCompletion(true) ? 0 : 1);  }}

原文来自 陈十一的博客