Hadoop 踩坑记(四)
Hbase客户端编程(Eclipse)
环境
关于 Hbase 的安装与配置以及 Eclipse 的配置请参考前两篇文章
本系列选用的 hbase 版本为 1.4.13
本系列选用的 hadoop 版本为 2.8.5
请注意包名、服务器等个性化配置
引入jar包
需要将 Hbase 中与客户端相关的 jar 包引入到 Build Path
理论上只需要将 org.apache.hadoop.hbase.*
相关包引入即可,但实际操作中还是遇到了缺少情况,因此将 hbase 的 lib
目录下的所有 jar 包都引入了
这种粗暴的方法源于本人对于 java 开发的了解太少,经验丰富的朋友应该可以按需引入。
表的创建与删除
一个示例的 Student 表结构如下图所示
代码如下
package wit;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.HColumnDescriptor;import org.apache.hadoop.hbase.HTableDescriptor;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;public class HBaseTest {//声明静态配置 HBaseConfiguration static Configuration cfg=HBaseConfiguration.create(); //创建学生表 public static void createStdTable() throws Exception { cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw"); //数据表名 String tablename="Student"; //列簇名列表 String[] columnFamilys= new String[] {"Std","Course"}; //建立连接 Connection con = ConnectionFactory.createConnection(cfg); //获得Admin对象 Admin admin = con.getAdmin(); //获得表对象 TableName tName = TableName.valueOf(tablename); //判断表是否存在 if (admin.tableExists(tName)) { System.out.println("table Exists!"); System.exit(0); } else{ HTableDescriptor tableDesc = new HTableDescriptor(tName); //添加列簇 for(String cf:columnFamilys) { HColumnDescriptor cfDesc = new HColumnDescriptor(cf); if(cf.equals("Course"))//设置课程的最大历史版本 cfDesc.setMaxVersions(3); tableDesc.addFamily(cfDesc); } //创建表 admin.createTable(tableDesc); System.out.println("create table success!"); } admin.close(); con.close(); } public static void main (String [] agrs) throws Throwable { try { createStdTable(); } catch (Exception e) { e.printStackTrace(); } }}
其中
cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");
导入了 hbase-site.xml
文件中的服务器配置,理论上应该要将该文件引用为项目配置,但本人水平所限,没有做到,因此采用代码中手动配置的方法,后面的代码中也一样采用了此种方式。
运行上述代码,得到 create table success!
的提示即为创建成功
删除表的代码如下
package wit;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Admin;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;public class Delete {//声明静态配置 HBaseConfiguration static Configuration cfg=HBaseConfiguration.create(); public static void DeleteTable() throws Exception{ cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw"); Connection con = ConnectionFactory.createConnection(cfg); //获得表对象 TableName tablename = TableName.valueOf("Student"); //获得Admin对象 Admin admin = con.getAdmin(); if(admin.tableExists(tablename)){ try { admin.disableTable(tablename); admin.deleteTable(tablename); }catch(Exception ex){ ex.printStackTrace(); } } } public static void main (String [] agrs) throws Throwable { try { DeleteTable(); } catch (Exception e) { e.printStackTrace(); } }}
表模式的修改
增加新的列
下面的代码会在表中新增列 Test
public static void AddStdColFamily () throws Throwable { Connection con = ConnectionFactory.createConnection(cfg); //获得表对象 TableName tablename = TableName.valueOf("Student"); //获得Admin对象 Admin admin = con.getAdmin(); HColumnDescriptor newCol = new HColumnDescriptor("Test"); newCol.setMaxVersions(3); if(admin.tableExists(tablename)){ try { admin.disableTable(tablename); admin.addColumn(tablename, newCol); }catch(Exception ex){ ex.printStackTrace(); } } admin.enableTable(tablename); admin.close(); con.close();}
修改列簇属性
修改 Test 列簇的最大历史版本数为 5
public static void ModifyStdColFamily () throws Throwable { Connection con = ConnectionFactory.createConnection(cfg); //获得表对象 TableName tablename = TableName.valueOf("Student"); //获得Admin对象 Admin admin = con.getAdmin(); HColumnDescriptor modCol = new HColumnDescriptor("Test"); modCol.setMaxVersions(5); if(admin.tableExists(tablename)){ try { admin.disableTable(tablename); admin.modifyColumn(tablename, modCol); }catch(Exception ex){ ex.printStackTrace(); } } admin.enableTable(tablename); admin.close(); con.close();}
删除列
删除 Test 列
public static void DeleteStdColFamily() throws Throwable { Connection con = ConnectionFactory.createConnection(cfg); //获得表对象 TableName tablename = TableName.valueOf("Student"); //获得Admin对象 Admin admin = con.getAdmin(); if(admin.tableExists(tablename)){ try { admin.disableTable(tablename); admin.deleteColumn(tablename, Bytes.toBytes("Test")); }catch(Exception ex){ ex.printStackTrace(); } } admin.enableTable(tablename); admin.close(); con.close();}
在表中插入和修改数据(略)
与MapReduce集成
读取 hdfs 文件后写入数据表
hdfs 文件 Std.txt 内容为
200215125, Jim, Male, 2008-12-09, CS, 89, 78, 56200215126, Marry, Female, 2001-2-09, AI , 79, 72, 66200215127, Marker, Male, 2003-12-19, CE, 78, 48, 36
这里需要注意的是,此文件不能有空行,否则读取数据时会报错
代码如下,Map 过程读取 Std.txt
文件中的每一行,然后将学生的学号设置为 key,学生的其它信息设置为 value 后,写出到中间结果。Reduce 过程负责将 Map 形成的中间结果写入到 HBase 的 Student 表中,故 Reduce 继承至TableReducer,在 main 函数中使用
package wit;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class StdHdfsToHBase { public static class HDFSMap extends Mapper<Object, Text, Text, Text> { //实现map函数,读取hdfs上的std.txt文件 public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ //取出学生的学号为rowKey String stdRowKey = value.toString().split(",")[0]; System.out.println(stdRowKey); //学号后面的学生信息为value String stdInfo =value.toString().substring(stdRowKey.length()+1); System.out.println(stdInfo); context.write(new Text(stdRowKey), new Text(stdInfo));} } public static class HDFSReducer extends TableReducer<Text, Text,ImmutableBytesWritable>{ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Put put = new Put(key.getBytes()); for (Text val : values) { String[] stdInfo = val.toString().split(","); put.addColumn("Std".getBytes(), "Name".getBytes(), stdInfo[0].getBytes()); put.addColumn("Std".getBytes(), "gender".getBytes(), stdInfo[1].getBytes()); put.addColumn("Std".getBytes(), "birth".getBytes(), stdInfo[2].getBytes()); put.addColumn("Std".getBytes(), "dept".getBytes(), stdInfo[3].getBytes()); put.addColumn("Course".getBytes(), "math".getBytes(), Bytes.toBytes(Long.parseLong(stdInfo[4]))); put.addColumn("Course".getBytes(), "arts".getBytes(), Bytes.toBytes(Long.parseLong(stdInfo[5]))); put.addColumn("Course".getBytes(), "phy".getBytes(), Bytes.toBytes(Long.parseLong(stdInfo[6]))); //写入学生信息到HBase表 context.write(new ImmutableBytesWritable(key.getBytes()), put); } }} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop1-ali,hadoop2-hw"); Job job = Job.getInstance(conf, "StdHdfsToHBase"); job.setJarByClass(StdHdfsToHBase.class); // 设置Map job.setMapperClass(HDFSMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reducer TableMapReduceUtil.initTableReducerJob("Student", HDFSReducer.class, job); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Put.class); //设置std.txt的输入目录 FileInputFormat.addInputPath(job, new Path("hdfs://hadoop1-ali:9000/input/std")); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
原文来自 陈十一的博客