Hadoop 踩坑记 (四)
Hbase 客户端编程 (Eclipse)
环境
关于 Hbase 的安装与配置以及 Eclipse 的配置请参考前两篇文章
本系列选用的 hbase 版本为 1.4.13
本系列选用的 hadoop 版本为 2.8.5
请注意包名、服务器等个性化配置
引入 jar 包
需要将 Hbase 中与客户端相关的 jar 包引入到 Build Path
理论上只需要将 org.apache.hadoop.hbase.*
相关包引入即可,但实际操作中还是遇到了缺少情况,因此将 hbase 的 lib
目录下的所有 jar 包都引入了
这种粗暴的方法源于本人对于 java 开发的了解太少,经验丰富的朋友应该可以按需引入。
表的创建与删除
一个示例的 Student 表结构如下图所示
代码如下
package wit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
public class HBaseTest {
// 声明静态配置 HBaseConfiguration
static Configuration cfg=HBaseConfiguration.create();
// 创建学生表
public static void createStdTable() throws Exception {cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");
// 数据表名
String tablename="Student";
// 列簇名列表
String[] columnFamilys= new String[] {"Std","Course"};
// 建立连接
Connection con = ConnectionFactory.createConnection(cfg);
// 获得 Admin 对象
Admin admin = con.getAdmin();
// 获得表对象
TableName tName = TableName.valueOf(tablename);
// 判断表是否存在
if (admin.tableExists(tName)) {System.out.println("table Exists!");
System.exit(0);
}
else{HTableDescriptor tableDesc = new HTableDescriptor(tName);
// 添加列簇
for(String cf:columnFamilys)
{HColumnDescriptor cfDesc = new HColumnDescriptor(cf);
if(cf.equals("Course"))// 设置课程的最大历史版本
cfDesc.setMaxVersions(3);
tableDesc.addFamily(cfDesc);
}
// 创建表
admin.createTable(tableDesc);
System.out.println("create table success!");
}
admin.close();
con.close();}
public static void main (String [] agrs) throws Throwable {
try {createStdTable();
}
catch (Exception e) {e.printStackTrace();
}
}
}
其中
cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");
导入了 hbase-site.xml
文件中的服务器配置,理论上应该要将该文件引用为项目配置,但本人水平所限,没有做到,因此采用代码中手动配置的方法,后面的代码中也一样采用了此种方式。
运行上述代码,得到 create table success!
的提示即为创建成功
删除表的代码如下
package wit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
public class Delete {
// 声明静态配置 HBaseConfiguration
static Configuration cfg=HBaseConfiguration.create();
public static void DeleteTable() throws Exception{cfg.set("hbase.zookeeper.quorum","hadoop1-ali,hadoop2-hw");
Connection con = ConnectionFactory.createConnection(cfg);
// 获得表对象
TableName tablename = TableName.valueOf("Student");
// 获得 Admin 对象
Admin admin = con.getAdmin();
if(admin.tableExists(tablename)){
try
{admin.disableTable(tablename);
admin.deleteTable(tablename);
}catch(Exception ex){ex.printStackTrace();
}
}
}
public static void main (String [] agrs) throws Throwable {
try {DeleteTable();
}
catch (Exception e) {e.printStackTrace();
}
}
}
表模式的修改
增加新的列
下面的代码会在表中新增列 Test
public static void AddStdColFamily () throws Throwable {Connection con = ConnectionFactory.createConnection(cfg);
// 获得表对象
TableName tablename = TableName.valueOf("Student");
// 获得 Admin 对象
Admin admin = con.getAdmin();
HColumnDescriptor newCol = new HColumnDescriptor("Test");
newCol.setMaxVersions(3);
if(admin.tableExists(tablename)){
try
{admin.disableTable(tablename);
admin.addColumn(tablename, newCol);
}catch(Exception ex){ex.printStackTrace();
}
}
admin.enableTable(tablename);
admin.close();
con.close();}
修改列簇属性
修改 Test 列簇的最大历史版本数为 5
public static void ModifyStdColFamily () throws Throwable {Connection con = ConnectionFactory.createConnection(cfg);
// 获得表对象
TableName tablename = TableName.valueOf("Student");
// 获得 Admin 对象
Admin admin = con.getAdmin();
HColumnDescriptor modCol = new HColumnDescriptor("Test");
modCol.setMaxVersions(5);
if(admin.tableExists(tablename)){
try
{admin.disableTable(tablename);
admin.modifyColumn(tablename, modCol);
}catch(Exception ex){ex.printStackTrace();
}
}
admin.enableTable(tablename);
admin.close();
con.close();}
删除列
删除 Test 列
public static void DeleteStdColFamily() throws Throwable {Connection con = ConnectionFactory.createConnection(cfg);
// 获得表对象
TableName tablename = TableName.valueOf("Student");
// 获得 Admin 对象
Admin admin = con.getAdmin();
if(admin.tableExists(tablename)){
try
{admin.disableTable(tablename);
admin.deleteColumn(tablename, Bytes.toBytes("Test"));
}catch(Exception ex){ex.printStackTrace();
}
}
admin.enableTable(tablename);
admin.close();
con.close();}
在表中插入和修改数据 (略)
与 MapReduce 集成
读取 hdfs 文件后写入数据表
hdfs 文件 Std.txt 内容为
200215125, Jim, Male, 2008-12-09, CS, 89, 78, 56
200215126, Marry, Female, 2001-2-09, AI , 79, 72, 66
200215127, Marker, Male, 2003-12-19, CE, 78, 48, 36
这里需要注意的是,此文件不能有空行,否则读取数据时会报错
代码如下,Map 过程读取 Std.txt
文件中的每一行,然后将学生的学号设置为 key,学生的其它信息设置为 value 后,写出到中间结果。Reduce 过程负责将 Map 形成的中间结果写入到 HBase 的 Student 表中,故 Reduce 继承至 TableReducer,在 main 函数中使用
package wit;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class StdHdfsToHBase {
public static class HDFSMap extends Mapper<Object, Text, Text, Text> {
// 实现 map 函数,读取 hdfs 上的 std.txt 文件
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
// 取出学生的学号为 rowKey
String stdRowKey = value.toString().split(",")[0];
System.out.println(stdRowKey);
// 学号后面的学生信息为 value
String stdInfo =
value.toString().substring(stdRowKey.length()+1);
System.out.println(stdInfo);
context.write(new Text(stdRowKey), new Text(stdInfo));
}
}
public static class HDFSReducer extends TableReducer<Text, Text,
ImmutableBytesWritable>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {Put put = new Put(key.getBytes());
for (Text val : values) {String[] stdInfo = val.toString().split(",");
put.addColumn("Std".getBytes(), "Name".getBytes(),
stdInfo[0].getBytes());
put.addColumn("Std".getBytes(), "gender".getBytes(),
stdInfo[1].getBytes());
put.addColumn("Std".getBytes(), "birth".getBytes(),
stdInfo[2].getBytes());
put.addColumn("Std".getBytes(), "dept".getBytes(),
stdInfo[3].getBytes());
put.addColumn("Course".getBytes(), "math".getBytes(),
Bytes.toBytes(Long.parseLong(stdInfo[4])));
put.addColumn("Course".getBytes(), "arts".getBytes(),
Bytes.toBytes(Long.parseLong(stdInfo[5])));
put.addColumn("Course".getBytes(), "phy".getBytes(),
Bytes.toBytes(Long.parseLong(stdInfo[6])));
// 写入学生信息到 HBase 表
context.write(new ImmutableBytesWritable(key.getBytes()), put);
}
}
}
public static void main(String[] args) throws
IOException, ClassNotFoundException, InterruptedException {Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop1-ali,hadoop2-hw");
Job job = Job.getInstance(conf, "StdHdfsToHBase");
job.setJarByClass(StdHdfsToHBase.class);
// 设置 Map
job.setMapperClass(HDFSMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置 Reducer
TableMapReduceUtil.initTableReducerJob("Student",
HDFSReducer.class, job);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
// 设置 std.txt 的输入目录
FileInputFormat.addInputPath(job, new
Path("hdfs://hadoop1-ali:9000/input/std"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
原文来自 陈十一的博客