乐趣区

关于hive:Hive优化

一、批改引擎

默认解决引擎是 MapReduce

能够批改解决引擎位 Spark

批改为 Spark 引擎后速度晋升 10 多倍

二、参数设置

2.1、启动时提醒上面须要设置

In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>

reduce 数量由以下三个参数决定,
mapred.reduce.tasks(强制指定 reduce 的工作数量)

hive.exec.reducers.bytes.per.reducer(每个 reduce 工作解决的数据量,默认为 1000^3=1G)

hive.exec.reducers.max(每个工作最大的 reduce 数,默认为 999)

计算 reducer 数的公式很简略 N =min(hive.exec.reducers.max,总输出数据量 / hive.exec.reducers.bytes.per.reducer)

只有一个 reduce 的场景:
a、没有 group by 的汇总
b、order by
c、笛卡尔积

三、Fetch 抓取

Fetch 抓取是指,Hive 中对某些状况的查问能够不用应用 MapReduce 计算。例如:SELECT * FROM employees; 在这种状况下,Hive 能够简略地读取 employee 对应的存储目录下的文件,而后输入查问后果到控制台。
在 hive-default.xml.template 文件中 hive.fetch.task.conversion 默认是 more,老版本 hive 默认是 minimal,该属性批改为 more 当前,在全局查找、字段查找、limit 查找等都不走 mapreduce。

<property>
    <name>hive.fetch.task.conversion</name>
    <value>more</value>
    <description>
      Expects one of [none, minimal, more].
      Some select queries can be converted to single FETCH task minimizing latency.
      Currently the query should be single sourced not having any subquery and should not have
      any aggregations or distincts (which incurs RS), lateral views and joins.
      0. none : disable hive.fetch.task.conversion
      1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
      2. more  : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
    </description>
</property>

案例实战:

1)把 hive.fetch.task.conversion 设置成 none,而后执行查问语句,都会执行 mapreduce 程序。hive (default)> set hive.fetch.task.conversion=none;
hive (default)> select * from emp;
hive (default)> select ename from emp;
hive (default)> select ename from emp limit 3;
2)把 hive.fetch.task.conversion 设置成 more,而后执行查问语句,如下查问形式都不会执行 mapreduce 程序。hive (default)> set hive.fetch.task.conversion=more;
hive (default)> select * from emp;
hive (default)> select ename from emp;
hive (default)> select ename from emp limit 3;

四、本地模式

大多数的 Hadoop Job 是须要 Hadoop 提供的残缺的可扩展性来解决大数据集的。不过,有时 Hive 的输出数据量是十分小的。在这种状况下,为查问触发执行工作耗费的工夫可能会比理论 job 的执行工夫要多的多。对于大多数这种状况,Hive 能够通过本地模式在单台机器上解决所有的工作。对于小数据集,执行工夫能够显著被缩短。
用户能够通过设置 hive.exec.mode.local.auto 的值为 true,来让 Hive 在适当的时候主动启动这个优化。

set hive.exec.mode.local.auto=true;  // 开启本地 mr
// 设置 local mr 的最大输出数据量,当输出数据量小于这个值时采纳 local  mr 的形式,默认为 134217728,即 128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
// 设置 local mr 的最大输出文件个数,当输出文件个数小于这个值时采纳 local mr 的形式,默认为 4
set hive.exec.mode.local.auto.input.files.max=10;

案例实战:

1)开启本地模式,并执行查问语句
hive (default)> set hive.exec.mode.local.auto=true; 
hive (default)> select * from emp cluster by deptno;
Time taken: 1.328 seconds, Fetched: 14 row(s)
2)敞开本地模式,并执行查问语句
hive (default)> set hive.exec.mode.local.auto=false; 
hive (default)> select * from emp cluster by deptno;
Time taken: 20.09 seconds, Fetched: 14 row(s)

五、表的优化

5.1、小表 join 大表

  将 key 绝对扩散,并且数据量小的表放在 join 的右边,这样能够无效缩小内存溢出谬误产生的几率;再进一步,能够应用 mapjoin 让小的维度表(1000 条以下的记录条数)先进内存。在 map 端实现 reduce。
  理论测试发现:新版的 hive 曾经对小表 JOIN 大表和大表 JOIN 小表进行了优化。小表放在右边和左边曾经没有显著区别。

案例实操
1.需要
    测试大表 JOIN 小表和小表 JOIN 大表的效率

2.建大表、小表和 JOIN 后表的语句
// 创立大表
create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
// 创立小表
create table smalltable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
// 创立 join 后表的语句
create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

3.别离向大表和小表中导入数据
hive (default)> load data local inpath '/uardata1/hivetest/bigtable' into table bigtable;
hive (default)>load data local inpath '/uardata1/hivetest/smalltable' into table smalltable;

4.敞开 mapjoin 性能(默认是关上的)# 因为 mapjoin 将小表缓存到内存中,所以此处为了测试大表,小表 join, 将 mapjoin 敞开
set hive.auto.convert.join = false;

5.执行小表 JOIN 大表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
left join bigtable  b
on b.id = s.id;
# 三次测试耗时
Time taken: 11.304 seconds
Time taken: 17.587 seconds
Time taken: 10.252 seconds

6.执行大表 JOIN 小表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable  b
left join smalltable  s
on s.id = b.id;
# 三次测试耗时
Time taken: 14.343 seconds
Time taken: 11.867 seconds
Time taken: 13.149 seconds

5.2、大表 join 大表

5.2.1、空 Key

  有时 join 超时是因为某些 key 对应的数据太多,而雷同 key 对应的数据都会发送到雷同的 reducer 上,从而导致内存不够。此时咱们应该仔细分析这些异样的 key,很多状况下,这些 key 对应的数据是异样数据,咱们须要在 SQL 语句中进行过滤。例如 key 对应的字段空,操作如下:

(1)创立原始数据表、空 id 表、合并后数据表
// 创立原始表
create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
// 创立空 id 表
create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
// 创立 join 后表的语句
create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';(2)别离加载原始数据和空 id 数据到对应表中
hive (default)> load data local inpath '/uardata1/hivetest/ori' into table ori;
hive (default)> load data local inpath '/uardata1/hivetest/nullid' into table nullidtable;(3)测试不过滤空 id
hive (default)> insert overwrite table jointable 
select n.* from nullidtable n left join ori o on n.id = o.id;

Time taken: 23.255 seconds
Time taken: 20.177 seconds(4)测试过滤空 id   # 这个理论业务中应用很少,如果某个字段不存在就将数据过滤,其余字段的数据失常,这样会导致数据一个是样本少了,另外其余字段的数据信息变少
hive (default)> insert overwrite table jointable 
select n.* from (select * from nullidtable where id is not null) n  left join ori o on n.id = o.id;
Time taken: 7.946 seconds
Time taken: 7.882 seconds

5.2.2、空 key 转换

  有时尽管某个 key 为空对应的数据很多,然而相应的数据不是异样数据,必须要蕴含在 join 的后果中,此时咱们能够表 a 中 key 为空的字段赋一个随机的值,使得数据随机平均地分不到不同的 reducer 上。例如:

5.2.2.1、不随机散布空 null 值:

(1)设置 5 个 reduce 个数
set mapreduce.job.reduces = 5;(2)JOIN 两张表
insert overwrite table jointable
select n.* from nullidtable n left join ori b on n.id = b.id;


Time taken: 23.528 seconds
Time taken: 21.05 seconds

5.2.2.2、随机散布空 null 值

(1)设置 5 个 reduce 个数
set mapreduce.job.reduces = 5;(2)JOIN 两张表
# 此处将空 key 转换为随机数据,与原有数据的 key 保障不会抵触的同时,对空 key 散列散布
insert overwrite table jointable
select n.* from nullidtable n full join ori o on 
case when n.id is null then concat('hive', rand()) else n.id end = o.id;

5.3、MapJoin

  如果不指定 MapJoin 或者不合乎 MapJoin 的条件,那么 Hive 解析器会将 Join 操作转换成 Common Join,即:在 Reduce 阶段实现 join。容易产生数据歪斜。能够用 MapJoin 把小表全副加载到内存在 map 端进行 join,防止 reducer 解决。

5.3.1、开启 MapJoin

(1)设置主动抉择 Mapjoin
set hive.auto.convert.join = true; 默认为 true(2)大表小表的阈值设置(默认 25M 一下认为是小表):set hive.mapjoin.smalltable.filesize=25000000;

5.3.2、MapJoin 工作机制

5.3.3、实例

(1)开启 Mapjoin 性能
set hive.auto.convert.join = true; 默认为 true(2)执行小表 JOIN 大表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from smalltable s
join bigtable  b
on s.id = b.id;
Time taken: 4.817 seconds(3)执行大表 JOIN 小表语句
insert overwrite table jointable
select b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable  b
join smalltable  s
on s.id = b.id;

Time taken: 5.915 seconds

与 5.1、5.2、相比,工夫进步一倍。大小表的前后地位没有什么影响

5.4、Group By

  默认状况下,Map 阶段同一 Key 数据分发给一个 reduce,当一个 key 数据过大时就歪斜了。
  并不是所有的聚合操作都须要在 Reduce 端实现,很多聚合操作都能够 先在 Map 端进行局部聚合,最初在 Reduce 端得出最终后果。

5.4.1、开启 Map 端聚合参数设置

(1)是否在 Map 端进行聚合,默认为 True
hive.map.aggr = true(2)在 Map 端进行聚合操作的条目数目
hive.groupby.mapaggr.checkinterval = 100000(3)有数据歪斜的时候进行负载平衡(默认是 false)hive.groupby.skewindata = true

  当选项设定为 true,生成的查问打算会有两个 MR Job。第一个 MR Job 中,Map 的输入后果会随机散布到 Reduce 中,每个 Reduce 做局部聚合操作,并输入后果,这样解决的后果是雷同的 Group By Key 有可能被散发到不同的 Reduce 中,从而达到负载平衡的目标;第二个 MR Job 再依据预处理的数据后果依照 Group By Key 散布到 Reduce 中(这个过程能够保障雷同的 Group By Key 被散布到同一个 Reduce 中),最初实现最终的聚合操作。

5.5、Count(Distinct) 去重统计

数据量小的时候无所谓,数据量大的状况下,因为 COUNT DISTINCT 操作须要用一个 Reduce Task 来实现,这一个 Reduce 须要解决的数据量太大,就会导致整个 Job 很难实现,个别 COUNT DISTINCT 应用先 GROUP BY 再 COUNT 的形式替换:

5.5.1、案例一

1.    创立一张大表
hive (default)> create table bigtable(id bigint, time bigint, uid string, keyword
string, url_rank int, click_num int, click_url string) row format delimited
fields terminated by '\t';

2.加载数据
hive (default)> load data local inpath '/uardata1/hivetest/bigtable' into table
 bigtable;
 
3.设置 5 个 reduce 个数
set mapreduce.job.reduces = 5;

4.执行去重 id 查问
hive (default)> select count(distinct id) from bigtable;
OK
99947
Time taken: 3.284 seconds, Fetched: 1 row(s)

5.采纳 GROUP by 去重 id
hive (default)> select count(id) from (select id from bigtable group by id) a;
OK
99947
Time taken: 3.28 seconds, Fetched: 1 row(s)

尽管会多用一个 Job 来实现,但在数据量大的状况下,这个相对是值得的。

5.6、笛卡尔积

  尽量避免笛卡尔积,join 的时候不加 on 条件,或者有效的 on 条件,Hive 只能应用 1 个 reducer 来实现笛卡尔积。

5.7 行列过滤

  • 列解决:在 SELECT 中,只拿须要的列,如果有,尽量应用分区过滤,少用 SELECT *。
  • 行解决:在分区剪裁中,当应用外关联时,如果将副表的过滤条件写在 Where 前面,那么就会先全表关联,之后再过滤,比方:

5.7.1、案例一

1.测试先关联两张表,再用 where 条件过滤
hive (default)> select o.id from bigtable b
join ori o on o.id = b.id
where o.id <= 10;
Time taken: 3.282 seconds, Fetched: 100 row(s)

2.通过子查问后,再关联表
hive (default)> select b.id from bigtable b
join (select id from ori where id <= 10) o on b.id = o.id;
Time taken: 4.232 seconds, Fetched: 100 row(s)

5.8、动静分区调整

  关系型数据库中,对分区表 Insert 数据时候,数据库主动会 依据分区字段的值,将数据插入到相应的分区中 ,Hive 中也提供了相似的机制,即动静分区(Dynamic Partition), 只不过,应用 Hive 的动静分区,须要进行相应的配置

5.8.1、开启动静分区参数设置

(1)开启动静分区性能(默认 true,开启)hive.exec.dynamic.partition=true(2)设置为非严格模式(动静分区的模式,默认 strict,示意必须指定至多一个分区为动态分区,nonstrict 模式示意容许所有的分区字段都能够应用动静分区。)hive.exec.dynamic.partition.mode=nonstrict(3)在所有执行 MR 的节点上,最大一共能够创立多少个动静分区。hive.exec.max.dynamic.partitions=1000(4)在每个执行 MR 的节点上,最大能够创立多少个动静分区。该参数须要依据理论的数据来设定。比方:源数据中蕴含了一年的数据,即 day 字段有 365 个值,那么该参数就须要设置成大于 365,如果应用默认值 100,则会报错。hive.exec.max.dynamic.partitions.pernode=100(5)整个 MR Job 中,最大能够创立多少个 HDFS 文件。hive.exec.max.created.files=100000(6)当有空分区生成时,是否抛出异样。个别不须要设置。hive.error.on.empty.partition=false

5.8.2、案例一

需要:将 ori 中的数据依照工夫(如:20111230000008),插入到指标表 ori_partitioned_target 的相应分区中。

(1)创立分区表
create table ori_partitioned(id bigint, time bigint, uid string, keyword string,
 url_rank int, click_num int, click_url string) 
partitioned by (p_time bigint) 
row format delimited fields terminated by '\t';(2)加载数据到分区表中
hive (default)> load data local inpath '/uardata1/hivetest/ds1' into table
 ori_partitioned partition(p_time='20200730000010') ;
hive (default)> load data local inpath '/uardata1/hivetest/ds2' into table ori_partitioned partition(p_time='20200730000011') ;(3)创立指标分区表
create table ori_partitioned_target(id bigint, time bigint, uid string,
 keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t';(4)设置动静分区
set hive.exec.dynamic.partition = true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.exec.max.dynamic.partitions = 1000;
set hive.exec.max.dynamic.partitions.pernode = 100;
set hive.exec.max.created.files = 100000;
set hive.error.on.empty.partition = false;

hive (default)> insert overwrite table ori_partitioned_target partition (p_time) 
select id, time, uid, keyword, url_rank, click_num, click_url, p_time from ori_partitioned;(5)查看指标分区表的分区状况
hive (default)> show partitions ori_partitioned_target;

关注我的公众号【宝哥大数据】,更多干货

退出移动版