hive 介绍
由 Facebook 开源的, 用于解决海量结构化日志的数据统计的项目
本质: 将 HQL 转化为 MapReduce 程序
Hive 的其实时 HDFS 上的目录和文件
Hive 的安装模式
- 嵌入模式
元数据信息被保存在自带的 Deybe 数据库中
只允许创建一个连接
多用于 Demo
- 本地模式
元数据信息被保存在 MySQL 数据库
MySQL 数据库与 Hive 运行在同一台物理机器上
多用于开发和测试
- 远程模式
元数据信息被保存在 MySQL 数据库
MySQL 数据库与 Hive 运行在不同台物理机器上
用于实际生成环境
Linux 下 MySQL 安装
1) 卸载
$ rpm -qa | grep mysql
$ sudo rpm -e mysql-libs-5.1.71-1.el6.x86_64 –nodeps
2) 安装
可选 择将缓存替换, 然后再安装 $ sudo cp -r /opt/software/x86_64/ /var/cache/yum/
$ sudo yum install -y mysql-server mysql mysql-devel
3) 启动 mysql 服务
$ sudo service mysqld start
4) 设置密码
$ /usr/bin/mysqladmin -u root password ‘ 新密码 ’
5) 开机启动
$ sudo chkconfig mysqld on
6) 授权 root 的权限及设置远程登录
登录
$ mysql -u root -p
授权
mysql> grant all privileges on *.* to 'root'@'%' identified by '密码';
mysql> grant all privileges on *.* to 'root'@'linux01' identified by '密码'; -- 必须有这一句,% 包括所有
all privileges 所有权限
. 所有数据库的所有表
‘root’@’%’ 在任意主机以 root 身份登录
‘root’@’linux03.ibf.com’ 在 linux03 主机以 root 登录
by ‘root’ 使用 root 作为密码
7)刷新授权
mysql> flush privileges;
8)测试, 在 windows 中是否可以登录
mysql -h linux03.ibf.com -u root -p
hive 环境搭建: 本地模式
必须先安装 HDFS 和 Yarn
1)安装:
$ tar -zxvf /opt/software/hive-0.13.1-bin.tar.gz -C /opt/modules/
重命名 hive 文件夹名字
$ cd /opt/modules
$ mv apache-hive-0.13.1-bin/ hive-0.13.1/
2)在 HDFS 上 创建 tmp 目录和 hive 仓库
$ bin/hdfs dfs -mkdir -p /user/hive/warehouse
$ bin/hdfs dfs -mkdir /tmp #已存在
$ bin/hdfs dfs -chmod g+w /user/hive/warehouse
$ bin/hdfs dfs -chmod g+w /tmp
3)修改配置
$ cd hive-0.13.1/
$ cp conf/hive-default.xml.template conf/hive-site.xml
$ cp conf/hive-log4j.properties.template conf/hive-log4j.properties
$ cp conf/hive-env.sh.template conf/hive-env.sh
3-1)修改 hive-env.sh
JAVA_HOME=/opt/modules/jdk1.7.0_67 #添加
HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HIVE_CONF_DIR=/opt/modules/hive-0.13.1/conf
3-2)修改 hive.site.xml
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://linux01:3306/metastore?createDatabaseIfNotExist=true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
3-3)修改日志配置 hive-log4j.properties
hive.log.dir=/opt/modules/hive-0.13.1/logs
3-4)拷贝 jdbc 驱动到 hive 的 lib 目录
$ cp /opt/software/mysql-connector-java-5.1.34-bin.jar /opt/modules/hive-0.13.1/lib/
4)确定 yarn 和 hdfs 启动
$ jps
6468 ResourceManager
6911 Jps
6300 RunJar
6757 NodeManager
2029 NameNode
2153 DataNode
此时使用 bin/hive 可以进入 hive
hive 启动及基本使用
进入 hive 目录
$ cd /opt/modules/hive-0.13.1/
进入 hive
bin/hive
基本命令
show databases;
create database mydb;
use mydb;
show tables;
创建表及加载数据
create table student (
id int comment 'id of student',
name string comment 'name of student',
age int comment 'age of student',
gender string comment 'sex of student',
addr string
)
comment 'this is a demo'
row format delimited fields terminated by '\t';
表默认创建在 /user/hive/warehouse 里
通过 hive.metastore.warhouse.dir 配置
- 查看表
desc student; 查看表字段
或
desc formatted student; 可以查看元数据
-
此时 mysql 的 metastore 数据库状况
mysql> select * from TBLS;
+--------+-------------+-------+------------------+-------+-----------+-------+----------+---------------+--------------------+--------------------+ | TBL_ID | CREATE_TIME | DB_ID | LAST_ACCESS_TIME | OWNER | RETENTION | SD_ID | TBL_NAME | TBL_TYPE | VIEW_EXPANDED_TEXT | VIEW_ORIGINAL_TEXT | +--------+-------------+-------+------------------+-------+-----------+-------+----------+---------------+--------------------+--------------------+ | 1 | 1556132119 | 6 | 0 | chen | 0 | 1 | student | MANAGED_TABLE | NULL | NULL | +--------+-------------+-------+------------------+-------+-----------+-------+----------+---------------+--------------------+--------------------+ 1 row in set (0.00 sec)
mysql> select * from COLUMNS_V2;
+-------+-----------------+-------------+-----------+-------------+ | CD_ID | COMMENT | COLUMN_NAME | TYPE_NAME | INTEGER_IDX | +-------+-----------------+-------------+-----------+-------------+ | 1 | NULL | addr | string | 4 | | 1 | age of student | age | int | 2 | | 1 | sex of student | gender | string | 3 | | 1 | id of student | id | int | 0 | | 1 | name of student | name | string | 1 | +-------+-----------------+-------------+-----------+-------------+ 5 rows in set (0.00 sec)
- 加载数据(在家目录下创建 student.log)
load data local inpath ‘/home/hadoop/student.log’ into table student;
- 从 hdfs 上加载(加载完,hdfs 上的 student.data 到 表目录下)
load data inpath ‘/input/student.data’ into table student;
在命令行内设置配置
重启无效
set hive.cli.print.header=true; #列名
set hive.cli.print.current.db=true; #表名
reset; 重置
重启有效
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
与 Linux 交互
!ls
!pwd
与 hadoop 交互
dfs -ls /
dfs -mkdir /hive
hive 的脚本
-e 执行 sql
-f 执行 sql 文件
-S 静默执行
hive -e
$ bin/hive -e “select *from test_db.emp_p”
hive -f
$ bin/hive -S -f /home/hadoop/emp.sql > ~/result.txt
删除表
drop table user;
清空表
truncate table user;
表类型
内部表(管理表 MANAGED_TABLE)
create table emp(
empId int,
empString string,
job string,
salary float,
deptId int
)
row format delimited fields terminated by '\t';
load data inpath '/input/dept.txt' into table dept;
# 或从本地加载
load data local inpath '/home/hadoop/dept.txt' into table dept;
外部表(EXTERNAL_TABLE)
create external table emp_ex (
empId int,
empName string,
job string,
salary float,
deptId int
)
row format delimited fields terminated by '\t'
location '/hive/table/emp';
把数据移动到表所在位置
hive (mydb)> dfs -mv /input/emp.txt /hive/table/emp/emp.txt
服务器加载
hive (mydb)> load data local inpath '/home/hadoop/emp.data' into table emp;
或者直接使用 dfs 命令移动数据到 hive 表目录下
hive (mydb)> dfs -put /home/hadoop/emp.data /hello/table/emp;
内部表和外部表区别
- 创建表
外部表创建表的时候,需要用 external
- 删除表
外部表在删除表的时候只会删除表的元数据 (metadata) 信息不会删除表数据(data)
内部表删除时会将元数据信息和表数据同时删除
- 内部表数据由 Hive 自身管理,外部表数据由 HDFS 管理
- 内部表数据存储的位置是 hive.metastore.warehouse.dir(默认:/user/hive/warehouse),外部表数据的存储位置由自己制定;
分区表
创建分区表
create table emp_part(
empno int,
empname string,
empjob string,
mgrno int,
birthday string,
salary float,
bonus float,
deptno int
)
partitioned by (province string)
row format delimited fields terminated by '\t';
向分区表加载数据
显式指定分区值
load data local inpath '/home/user01/emp.txt' into table emp_part partition (province='CHICAGO');
分区操作
查看分区
show partitions emp_part;
添加分区
alter table emp_part add partition (province='shanghai');
删除分区
alter table emp_part drop partition (province='shanghai');
向分区添加数据
load data local inpath '本地路径' into table emp_part partition (province='shanghai');
查询分区数据
select * from emp_part where province='henan';
二级分区
创建二级分区
create table emp_second(
id int ,
name string,
job string,
salary float,
dept int
)
partitioned by (day string,hour string)
row format delimited fields terminated by '\t';
添加分区
alter table emp_second add partition (day='20180125',hour='16');
删除分区
alter table emp_second drop partition (day='20180125');
添加数据的时候指定分区(没有该分区会创建)
load data local inpath '/home/hadoop/emp.log' into table emp_second partition (day='20180125',hour='17');
桶表
连接两个在相同列上划分了桶的表,使用 map side join 实现
使 sampling 更高效
需设置 set hive.enforce.bucketing=true
create table bucketed_users(id int, name string)
clustered by (id) into 4 buckets
某个数据被分到哪个桶根据指定列的 hash 值对桶数取余得到
导入方式总结
本地导入
load data local inpath ‘ 本地路径 ’ into table 表名
bin/hdfs dfs -put 本地路径 hdfs 路径(hive 的表位置)
hdfs 上导入
load data inpath ‘hdfs 路径 ’ into table 表名
覆盖写
load data inpath ‘hdfs 路径 ’ overwrite into table 表名
load data local inpath ‘ 本地路径 ’ overwrite into table 表名
通过 insert 语句将 select 的结果 插入到一张表中
insert into table test_tb select * from emp_p;
创建表时加载数据
create external table test_tb (
id int,
name string
)
row format delimited fields terminated by '\t';
location "/hive/test_tb";
sqoop 方式
导出方式
hive 脚本
hive -e
bin/hive -e “use test_db;select * from emp_p” > /home/hadoop/result.txt
hive -f 执行 sql 文件
bin/hive -f 路径 >> /home/hadoop/result.txt
hive>
- 导出到本地 (默认分隔符是 ASSII 001)
insert overwrite local directory ‘/home/hadoop/data’ select * from emp_p;
insert overwrite local directory ‘/home/hadoop/data’ row format delimited fields terminated by ‘^’ select * from emp_p;
- 到出 HDFS
hive > insert overwrite directory ‘/data’ select * from emp_p;
- export 和 import (HDFS)
hive > export table emp_p to ‘/input/export’ ;
hive > import table emp_imp from ‘hdfs_path’ ;
HQL
http://hive.apache.org/
常用语法
通配 * 指定字段
select id,name from emp;
where
条件查询
select * from emp_p where salary > 10000;
between and
select * from emp_p where sal between 10000 and 15000;
is null| is not null
select * from user where email is not null;
in () | not in ()
select * from emp_p where did in (1,2,3);
聚合函数
count max min sum avg
select count(1) personOfDept from emp_p group by job;
select sum(sal) from emp_p;
distinct
select distinct id from emp_part;
select distinct name, province from emp_part;
子查询
select eid,ename,salary ,did from emp where emp.did in (select did from dept where dname=’ 人事部 ’);
表连接
emp.eid emp.ename emp.salary emp.did
1001 jack 10000.0 1
1002 tom 2000.0 2
1003 lily 20000.0 3
1004 aobama 10000.0 5
1005 yang 10000.0 6
dept.did dept.dname dept.dtel
1 人事部 021-456
2 财务部 021-234
3 技术部 021-345
4 BI 部 021-31
5 产品部 021-232
积
select * from dept, emp;
select * from emp, dept where emp.did=dept.did;
join
select t1.eid, t1.ename, t1.salary,t2.did ,t2.dname from emp t1 join dept t2 on t1.did=t2.did;
外连查询
left join
select eid,ename, salary,t2.did, t2.dname from emp t1 left join dept t2 on t1.did = t2.did;
right join
select eid,ename, salary,t2.did, t2.dname from emp t1 right join dept t2 on t1.did = t2.did;
全连接
select eid,ename, salary,t2.did, t2.dname from emp t1 full join dept t2 on t1.did = t2.did;
hql 的四种排序
全局排序 Order By (对所有的数据进行排序)
select * from emp_part order by salary;
设置 reduce 个数为 3, 也只有一个文件
set mapreduce.job.reduces=3;
内部排序 sort by (每个 reduce 内部进行排序)
底层 时在 reduce 函数之前完成的
设置 reduce 个数
set mapreduce.job.reduces=2;
insert overwrite local directory ‘/home/hadoop/result’ select * from emp_part sort by salary; # 默认 reduce 个数为 1, 这种情况下和 order by 一样
分区排序(通过 distribute by 设置分区,使用 sort by 设置分区内排序)
set mapreduce.job.reduces=3;
这里使用部分分区, 薪资排序
insert overwrite local directory ‘/home/hadoop/result’ select * from emp_part distribute by deptno sort by salary;
Cluster By (distribute by 和 sort by 条件一致时 使用 cluster by)
连接方式
配置
修改 hive-site.xml
<property>
<name>hive.server2.long.polling.timeout</name>
<value>5000</value>
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>bigdata.ibf.com</value>
</property>
mysql 数据库中创建一个普通用户
1)创建用户
CREATE USER 'hadoop'@'centos01.bigdata.com' IDENTIFIED BY '123456';
2)授权访问(hive 的存储元数据的数据库)GRANT ALL ON metastore.* TO 'hadoop'@'centos01.bigdata.com' IDENTIFIED BY '123456';
GRANT ALL ON metastore.* TO 'hadoop'@'%' IDENTIFIED BY '123456';
3)刷新授权
flush privileges;
beeline
启动服务
$ bin/hiveserver2 &
或
$bin/hive --service hiveserver2 &
连接
$ bin/beeline
beeline>!connect jdbc:hive2://bigdata.ibf.com:10000
输入 mysql 的用户名
输入 mysql 密码
简单日志流量案例
1 需求及分析
需求
分析统计每天每小时的 PV 数和 UV 数
分析
创建数据源表
创建分区表(天, 小时)/ 加载数据
数据清洗
创建 hive 表
字段过滤
id url guid 字段补全(无) 字段格式化(无)
数据分析
pv:count(url) uv:count(distinct guid)
保存结果
日期(天) 小时 PV UV
导出结果
导出到 MySQL
2 具体实现
数据原表
1) 创建原表
create database if not exists hive_db;
user hive_db;
create table tracklogs(
id string,
url string,
referer string,
keyword string,
type string,
guid string,
pageId string,
moduleId string,
linkId string,
attachedInfo string,
sessionId string,
trackerU string,
trackerType string,
ip string,
trackerSrc string,
cookie string,
orderCode string,
trackTime string,
endUserId string,
firstLink string,
sessionViewNo string,
productId string,
curMerchantId string,
provinceId string,
cityId string,
fee string,
edmActivity string,
edmEmail string,
edmJobId string,
ieVersion string,
platform string,
internalKeyword string,
resultSum string,
currentPage string,
linkPosition string,
buttonPosition string
)
partitioned by (date string,hour string)
row format delimited fields terminated by ‘t’;
2) 加载数据
load data local inpath ‘/opt/datas/2015082818′ into table tracklogs partition(date=’20150828′,hour=’18’);
load data local inpath ‘/opt/datas/2015082819′ into table tracklogs partition(date=’20150828′,hour=’19’);
分析
1) 建立数据分析表
create table clear (
id string,
url string,
guid string
)
partitioned by (date string, hour string)
row format delimited fields terminated by ‘t’;
2) 过滤数据
insert into table clear partition(date=’20150828′,hour=’18’) select id,url,guid from tracklogs where date=’20150828′ and hour=’18’;
insert into table clear partition(date=’20150828′,hour=’19’) select id,url,guid from tracklogs where date=’20150828′ and hour=’19’;
3) 指标分析
pv : select date,hour,count(url) as pv from clear group by date,hour;
uv: select date,hour, count(distinct guid) as uv from clear group by date,hour;
保存结果到 result
create table result as select date,hour, count(url) pv, count(distinct guid) as uv from clear group by date,hour;
创建表时没指定分隔符则默认分隔符为 001
导出结果到 mysql
# 创建表
create table result(
day varchar(30),
hour varchar(30),
pv varchar(30) not null,
uv varchar(30) not null,
primary key(day,hour)
);
# 导出数据
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop export \
–connect jdbc:mysql://linux03.ibf.com:3306/mydb \
–username root \
–password root \
–table result \
–export-dir /user/hive/warehouse/hive_db.db/result \
–input-fields-terminated-by ‘001’ \
-m 1
动态分区表
开启动态分区
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
打开动态分区后,动态分区的模式,有 strict 和 nonstrict 两个值可选,strict 要求至少包含一个静态分区列,nonstrict 则无此要求。
创建表
create table clear_dynamic (
id string,
url string,
guid string
)
partitioned by (date string, hour string)
row format delimited fields terminated by ‘t’;
动态加载数据
直接加载 20180129 的所有 hour 的数据
insert into table clear_dynamic partition(date=’20180129′,hour) select id,url,guid,hour from tracklogs where date=’20180129′;
根据 hour 自动分区
以前是这样写
insert into table clear partition(date=’20150828′,hour=’18’) select id,url,guid from tracklogs where date=’20150828′ and hour=’18’;
insert into table clear partition(date=’20150828′,hour=’19’) select id,url,guid from tracklogs where date=’20150828′ and hour=’19’;
使用脚本动态加载到 hive 表中
20180129/
2018012900
2018012901
2018012902
2018012903
2018012904
2018012905
1) 编写 shell_脚本(bin/hive -e “”)
2) 测试脚本
show partitions tracklogs; #查看分区
alter table tracklogs drop partition(date=’20150828′,hour=’18’); 删除分区
alter table tracklogs drop partition(date=’20150828′,hour=’19’);
select count(1) from tracklogs; #查看记录数
3) 使用 shell 脚本使用(bin/hive -f)
4)测试
show partitions tracklogs; #查看分区
alter table tracklogs drop partition(date=’20150828′,hour=’18’); 删除分区
alter table tracklogs drop partition(date=’20150828′,hour=’19’);
select count(1) from tracklogs; #查看记录数
Hive 函数
用户自定义函数, 用于实现 hive 中不能实现的业务逻辑处理
类型:
UDF: 一进一出
UDAF: 多进一出 sum,count 等
UDTF: 一进多出 行列转换
编写 UDF:
编写 UDF 必须继承 UDF
必须至少实现一个 evaluale 方法
必须要有返回类型, 可以是 null
建议使用 hadoop 序列化类型
需求: 日期转换
31/Aug/2015:00:04:37 +0800 –> 2015-08-31 00:04:37
实现步骤
1) 自定义类实现 UDF 类
2) 打包不要指定主类
3) 添加到 hive 中
maven 中导入 hadoop 的包和 hive 的包
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.2</version>
</dependency>
具体实现范例
package com.myudf;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class DateFormate extends UDF {SimpleDateFormat inputDate = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH);
SimpleDateFormat outDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 31/Aug/2015:00:04:37 +0800 --> 2015-08-31 00:04:37
public Text evaluate(Text str) {if(str == null) {return null;}
if(StringUtils.isBlank(str.toString())) {return null;}
Date date = null;
String val = null;
try {date = inputDate.parse(str.toString());
val = outDate.format(date);
} catch (ParseException e) {e.printStackTrace();
}
return new Text(val);
}
public static void main(String[] args) {Text val = new DateFormate().evaluate(new Text("31/Aug/2015:00:04:37 +0800"));
System.out.println(val);
}
}
hive (test_db)>add jar /home/hadoop/DDD.jar;
hive (test_db)> CREATE TEMPORARY FUNCTION removequote as ‘com.myudf.date.RemoveQuoteUDF’;
hive (test_db)> show functions;
hive 压缩格式
压缩格式
bzip2, gzip, lzo, snappy 等
压缩比:bzip2>gzip>lzo bzip2
压缩解压速度:lzo>gzip>bzip2 lzo
hadoop 支持的压缩格式
bin/hadoop checknative -a
http://google.github.io/snappy/
配置压缩
- 编译 hadoop 源码:
mvn package -Pdist,native,docs -DskipTests -Dtar -Drequire.snappy
- 替换 $HADOOP_HOME/lib/native
关闭 hadoop 相关进程
解压 cdh5.xxx-snappy-lib-native.tar.gz 到 $HADOOP_HOME/lib
$ tar -zxvf native-hadoop-cdh5.14.2.tar.gz -C /opt/modules/hadoop-2.6.0-cdh5.14.2/lib
- 在次检查支持解压
可以观察到已经支持
$ bin/hadoop checknative -a
- 配置 hadoop (jobhistory 可以查看所有配置信息)
mapred-site.xml 配置
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
- 测试
运行 pi 程序: $ bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0-cdh5.3.6.jar pi 1 2
通过主机:19888 观察该任务的 configuration 中压缩配置
- 通过 hive 配置压缩格式
shuffle 阶段启用压缩
set hive.exec.compress.output=true;
set mapred.output.compress=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
reduce 输出的结果文件进行压缩
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
hive 中的文件存储格式
格式
create table (...)
row format delimited fields terminated by ''
STORED AS file_format
文件格式如下
TEXTFILE
RCFILE
ORC
PARQUET
AVRO
INPUTFORMAT
常用的文件格式(默认是 textfile)
| ORC — (Note: Available in Hive 0.11.0 and later)
| PARQUET –Parquet 就是基于 Dremel 的数据模型和算法实现的。这个比较常见
数据存储类型
按行存储
写的快
按列存储
读得快
- 可以跳过不符合条件的数据,只读取需要的数据,降低 IO 数据量。
- 压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码进一步节约存储空间。
- 只读取需要的列,支持向量运算,能够获取更好的扫描性能。
验证存储格式及压缩
使用给定的日志文件(18.1MB)
使用不同的存储格式,存储相同的数据,判断文件大小
在 MapReduce 的 shuffle 阶段启用压缩(对中间数据进行压缩可以减少 map 和 reduce task 间的数据传输量。对于 IO 型作业,可以加快速度。)
set hive.exec.compress.intermediate=true;
set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
对输出结果压缩
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
创建表 file_text,并加载数据
create table if not exists file_text(
t_time string,
t_url string,
t_uuid string,
t_refered_url string,
t_ip string,
t_user string,
t_city string
)
row format delimited fields terminated by '\t'
stored as textfile;
load data local inpath '/home/hadoop/page_views.data' into table file_text;
对比默认格式和 file_orc_snappy 数据大小比较
create table if not exists file_orc_snappy(
t_time string,
t_url string,
t_uuid string,
t_refered_url string,
t_ip string,
t_user string,
t_city string
)
row format delimited fields terminated by '\t'
stored as ORC
tblproperties("orc.compression"="Snappy");
insert into table file_orc_snappy select * from file_text;
-- 不能通过 load 来加载,因为 load 本质是 hdfs 的 put,这样不能压缩,必须要 insert 这样走 MapReduce 才能让压缩发挥作用
对比默认格式和 parquet 格式 数据大小比较
create table if not exists file_parquet(
t_time string,
t_url string,
t_uuid string,
t_refered_url string,
t_ip string,
t_user string,
t_city string
)
row format delimited fields terminated by '\t'
stored as parquet;
insert into table file_parquet select * from file_text;
对比默认格式和 parquet 格式,snappy 压缩 数据大小比较
create table if not exists file_parquet_snappy(
t_time string,
t_url string,
t_uuid string,
t_refered_url string,
t_ip string,
t_user string,
t_city string
)
row format delimited fields terminated by '\t'
stored as parquet
tblproperties("parquet.compression"="Snappy");
insert into table file_parquet_snappy select * from file_text;
hive (mydb)> dfs -du -s -h /user/hive/warehouse/mydb.db/file_parquet_snappy;
hive (mydb)> dfs -du -s -h /user/hive/warehouse/mydb.db/file_parquet;
hive 中使用正则加载数据
通过正则匹配,加载复杂格式日志文件
1 正则
2 根据日志加载数据
日志
“27.38.5.159” “-” “31/Aug/2015:00:04:53 +0800” “GET /course/view.php?id=27 HTTP/1.1” “200” “7877” – “http://www.ibf.com/user.php?act=mycourse&testsession=1637” “Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36” “-” “learn.ibf.com”
创建表
CREATE TABLE apachelog (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_set string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES ("input.regex" = "(\"[^]*\") (\"-|[^]*\") (\"[^\]]*\") (\"[^\]]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^]*) (\"[^]*\") (\"[^\"]*\") (\"-|[^]*\") (\"[^]*\")"
)
STORED AS TEXTFILE;
load data local inpath '/home/hadoop/moodle.ibf.access.log' into table apachelog;
Hive 优化
大表拆分为小表
外部表 + 分区表
存储格式 & 数据压缩
SQL 优化
并行执行
//Whether to execute jobs in parallel
set hive.exec.parallel=true;
//How many jobs at most can be executed in parallel
set hive.exec.parallel.thread.number=8;# 可以调大, 提高并行效率
mapreduce
- Reduce 数目
set mapreduce.job.reduces=1
- JVM 重用
mapreduce.job.jvm.numtasks=1 默认 1 个
- 推测执行
hive 配置,默认为 true
set hive.mapred.reduce.tasks.speculative.execution=true;
hadoop
mapreduce.map.speculative true
mapreduce.reduce.speculative true
设置输出文件合并
Size of merged files at the end of the job
将小文件合并避免降低 hdfs 存储大量小文件而降低性能
set hive.merge.size.per.task=256000000;
严格模式
set hive.mapred.mode=strict; nonstrict 默认
严格模式下,
分区表,必须加分区字段过滤条件
对 order by, 必须使用 limit
限制笛卡尔积的查询(join 的时候不使用 on,而使用 where)
hive join
map join
如果关联查询两张表中有一张小表默认 map join,将小表加入内存
hive.mapjoin.smalltable.filesize=25000000 默认大小
hive.auto.convert.join=true 默认开启
如果没有开启使用 mapjoin,使用语句制定小表使用 mapjoin
select /+ MAPJOIN(time_dim) / count(1) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
reduce join
对两张大表 join
对关联的 key 进行分组
smb join
Sort-Merge-Bucket join
解决大表与大表 join 速度慢问题
通过分桶字段的的 hash 值对桶的个数取余进行分桶
set hive.enforce.bucketing=true;
create table 表名 (
字段
)
clustered by(分桶字段) into 分桶数量 buckets;
如
create table student(
id int,
age int,
name string
)
clustered by (id) into 4 bucket
row format delimited fields terminated by ‘,’;
小文件处理
- 重建表,建表时减少 reduce 的数量
- 通过参数调节,设置 map/reduce 的数量
// 每个 Map 最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;
// 一个节点上 split 的至少的大小(这个值决定了多个 DataNode 上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;
// 一个交换机下 split 的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;
// 执行 Map 前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
// 设置 map 端输出进行合并,默认为 true
set hive.merge.mapfiles = true
// 设置 reduce 端输出进行合并,默认为 false
set hive.merge.mapredfiles = true
// 设置合并文件的大小
set hive.merge.size.per.task = 256000000
// 当输出文件平均大小小于设定值时,启动合并操作。这一设定只有当 hive.merge.mapfiles 或 hive.merge.mapredfiles 设定为 true 时,才会对相应的操作有效。
set hive.merge.smallfiles.avgsize=16000000
数据倾斜
本质原因:key 的分布不均导致的
Map 端部分聚合,相当于 Combiner
hive.map.aggr=true
有数据倾斜的时候进行负载均衡
hive.groupby.skewindata=true
当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
hive 案例:日志分析
名词
1) UV:count(distinct guid)
访问您网站的一台电脑客户端为一个访客。00:00-24:00 内相同的客户端只被计算一次。
2) PV:Page View— count(url)
即页面浏览量或点击量,用户每次刷新即被计算一次。
3) 登录人数:
登录网站访问的人数 [会员],endUserId 有值的数量
4) 游客数:
没有登录访问的人数,endUserId 为空的数量
5) 平均访问时长:
访客平均在网站停留的时间
trackTime --> max - min
6) 二跳率:pv>1 的访问量 / 总访问量
平均浏览 2 个页面及以上 (pv>1) 的用户数 / 用户总数(discont guid) 点击 1 次
二跳率的概念是当网站页面展开后,用户在页面上产生的首次点击被称为“二跳”,二跳的次数即为“二跳量”。二跳量与浏览量的比值称为页面的二跳率。
count(case when pv >=2 then guid else null end) / discont (guid)
7) 独立 IP:—count(distinct ip)
独立 IP 表示,拥有特定唯一 IP 地址的计算机访问您的网站的次数,因为这种统计方式比较容易实现,具有较高的真实性,所以成为大多数机构衡量网站流量的重要指标。比如你是 ADSL 拨号上网的,你拨一次号都自动分配一个 ip,这样你进入了本站,那就算一个 ip,当你断线了而没清理 cookies,之后又拨 了一次号,又自动分配到一个 ip,你再进来了本站,那么又统计到一个 ip,但是 UV(独立访客)没有变,因为 2 次都是你进入了本站。
日期 | uv | pv | 登录人数 | 游客人数 | 平均访问时间 | 二跳率 | 独立 IP 数 |
---|---|---|---|---|---|---|---|
窗口函数分析函数
over 语句
准备测试数据
hive (db_analogs)> create database ts;
hive (db_analogs)> use ts;
hive (ts)> create table testscore(gender string,satscore int, idnum int) row format delimited fields terminated by '\t';
hive (ts)> load data local inpath '/opt/datas/TESTSCORES.csv' into table testscore;
OVER with standard aggregates: COUNT、SUM、MIN/MAX、AVG
需求 1:
按照性别分组,satscore 分数排序(降序),最后一列显示所在分组中的最高分
Female 1000 37070397 1590
Female 970 60714297 1590
Female 910 30834797 1590
Male 1600 39196697 1600
Male 1360 44327297 1600
Male 1340 55983497 1600
答案 sql:
hive (ts)> select gender,satscore,idnum,max(satscore) over(partition by gender order by satscore desc) maxs from testscore;
注 意:
partition by 是分组用的
分析函数
要求 topN
按照性别分组,satscore 排序(降序),最后一列显示在分组中的名次
需求 1:
分数相同名次不同,名次后面根据行数增长
Female 1590 23573597 1
Female 1520 40177297 2
Female 1520 73461797 3
Female 1490 9589297 4
Female 1390 99108497 5
Female 1380 23048597 6 # 分数相同
Female 1380 81994397 7 # 分数相同
需求 2:
分数相同名次相同,名次后面根据行数增长
Female 1590 23573597 1
Female 1520 40177297 2
Female 1520 73461797 2
Female 1490 9589297 4
Female 1390 99108497 5
Female 1380 23048597 6 #分数相同
Female 1380 81994397 6 # 分数相同
需求 3:
分数相同名次相同,名次连续增长
Female 1590 23573597 1
Female 1520 40177297 2
Female 1520 73461797 2
Female 1490 9589297 3
Female 1390 99108497 4
Female 1380 23048597 5
Female 1380 81994397 5
SQL
sql1
hive (ts)> select gender,satscore,idnum,row_number() over(partition by gender order by satscore desc) maxs from testscore;
-- ROW_NUMBER() 从 1 开始,按照顺序,生成分组内记录的序列
sql2
select gender,satscore,idnum,rank() over(partition by gender order by satscore desc) maxs from testscore;
-- RANK() 生成数据项在分组中的排名,排名相等会在名次中留下空位
sql3
select gender,satscore,idnum,dense_rank() over(partition by gender order by satscore desc) maxs from testscore;
-- DENSE_RANK() 生成数据项在分组中的排名,排名相等会在名次中不会留下空位
窗口函数
# 当有 order by,而没有指定窗口子句时,窗口子句默认为 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW(从起点到当前行的范围)
# 当 order by 和窗口子句都没有时,窗口子句默认 ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING(从起点到后面的终点)
UNBOUNDED PRECEDING
UNBOUNDED FOLLOWING
1 PRECEDING
1 FOLLOWING
CURRENT ROW
窗口对比
select gender,satscore,idnum,sum(satscore) over(partition by gender order by satscore desc RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) sums from testscore;
select gender,satscore,idnum,sum(satscore) over(partition by gender order by satscore desc RANGE BETWEEN UNBOUNDED PRECEDING AND unbounded following) sums from testscore;
select gender,satscore,idnum,sum(satscore) over(partition by gender order by satscore desc RANGE BETWEEN 1 PRECEDING AND CURRENT ROW) sums from testscore;
当前行数据幅度 + 1 后范围内
LAG
落后值(上 n 个值),在不指定落后个数的情况下,默认为落后一个值(数据从上向下显示,落后即当前值之前显示的值)
场景: 分析用户页面浏览顺序
sql
hive (ts)> select gender,satscore,idnum, lag(satscore) over(partition by gender order by satscore desc) as lastvalue from testscore;
要求
gender satscore idnum lastvalue
Female 1590 23573597 NULL # 此处为 null,可以为其指定默认值
Female 1520 40177297 1590 # 显示当前 satscore 的上一条记录的值
Female 1520 73461797 1520 # 显示当前 satscore 的上一条记录的值
Female 1490 9589297 1520
Female 1390 99108497 1490
LEAD
与 LAG 相反(下 n 搁置),用法同理,前面的值(领先值),默认为领先一个值(数据从上向下显示,领先即当前值之后显示的值)
sql
hive (ts)> select gender,satscore,idnum, lead(satscore, 1, 0) over(partition by gender order by satscore desc) as nextvalue from testscore;
结果
gender satscore idnum nextvalue
...
Female 1060 59149297 1060
Female 1060 46028397 1000
Female 1000 37070397 970
Female 970 60714297 910
Female 910 30834797 0