共计 9477 个字符,预计需要花费 24 分钟才能阅读完成。
- . 讲一下第一个项目
-
hive 中 shuffle 的优化
-
压缩
压缩可以使磁盘上存储的数据量变小,通过降低 I / O 来提高查询速度。对 hive 产生的一系列 MR 中间过程启用压缩
set hive.exec.compress.intermediate=true; set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
对最终输出结果压缩(写到 hdfs、本地磁盘的文件)
set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
-
join 优化
- map join
-
如果关联查询两张表中有一张小表默认 map join,将小表加入内存
hive.mapjoin.smalltable.filesize=25000000 默认大小
hive.auto.convert.join=true 默认开启
如果没有开启使用 mapjoin,使用语句制定小表使用 mapjoin
```sql
select /*+ MAPJOIN(time_dim) */ count(1) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
```
2. smb join
Sort-Merge-Bucket join
解决大表与大表 join 速度慢问题
通过分桶字段的的 hash 值对桶的个数取余进行分桶
3. 倾斜连接
```xml
<!-- hive.optimize.skewjoin: 是否为连接表中的倾斜键创建单独的执行计划。它基于存储在元数据中的倾斜键。在编译时,Hive 为倾斜键和其他键值生成各自的查询计 划。-->
<property> <name>hive.optimize.skewjoin</name>
<value>true</value>
</property> <property>
<!-- hive.skewjoin.key: 决定如何确定连接中的倾斜键。在连接操作中,如果同一键值所对应的数据行数超过该参数值,则认为该键是一个倾斜连接键。-->
<name>hive.skewjoin.key</name>
<value>100000</value>
</property>
<!-- hive.skewjoin.mapjoin.map.tasks: 指定倾斜连接中,用于 Map 连接作业的任务数。该参数应该与 hive.skewjoin.mapjoin.min.split 一起使用,执行细粒度的控制。-->
<property> <name>hive.skewjoin.mapjoin.map.tasks</name>
<value>10000</value>
</property>
<!-- hive.skewjoin.mapjoin.min.split: 通过指定最小 split 的大小,确定 Map 连接作业的任务数。该参数应该与 hive.skewjoin.mapjoin.map.tasks 一起使用,执行细粒度的控制。-->
<property>
<name>hive.skewjoin.mapjoin.min.split</name>
<value>33554432</value>
</property>
```
- Hive 在集群过程中怎么解决数据倾斜
本质原因:key 的分布不均导致的
Map 端部分聚合,相当于 Combiner
hive.map.aggr=true
有数据倾斜的时候进行负载均衡
hive.groupby.skewindata=true
当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
- sqoop 要将数据库中的所有表执行导入, 怎么操作? 哪些参数? 增量导入?
全量导入
[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \
> --connect jdbc:mysql://linux03.ibf.com:3306/mydb \
> --username root \
> --password 123456 \
> --table user
增量导入
bin/sqoop import \
--connect jdbc:mysql://linux03.ibf.com:3306/mydb \
--username root \
--password 123456 \
--table user \
--fields-terminated-by '\t' \
--target-dir /sqoop/incremental \
-m 1 \
--direct \
--check-column id \
--incremental append \
--last-value 3
- hive 导致数据倾斜的可能性(哪些操作会导致) –> 分桶 join key 分布不均匀 大量空值导致如何解决?
根据 key 操作到时结果分布不均都可能导致数据倾斜,如 group by key
order by 使用全局排序最终只会在一个 reducer 上运行所有数据,导致数据倾斜
大量 NULL
hive 的 NULL 有时候是必须的:
1)hive 中 insert 语句必须列数匹配,不支持不写入,没有值的列必须使用 null 占位。
2)hive 表的数据文件中按分隔符区分各个列。空列会保存 NULL(n)来保留列位置。但外部表加载某些数据时如果列不够,如表 13 列,文件数据只有 2 列,则在表查询时表中的末尾剩余列无数据对应,自动显示为 NULL。
所以,NULL 转化为空字符串,可以节省磁盘空间,实现方法有几种
1)建表时直接指定(两种方式)
a、用语句
ROW FORMAT SERDE‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’with serdeproperties('serialization.null.format' = '')
实现,注意两者必须一起使用,如
CREATE TABLE hive_tb (id int,name STRING)
PARTITIONED BY (`day` string,`type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck', `hour` tinyint)
ROW FORMAT SERDE‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’WITH SERDEPROPERTIES (‘field.delim’='/t’,‘escape.delim’='//’,‘serialization.null.format'=''
) STORED AS TEXTFILE;
b、或者通过 ROW FORMAT DELIMITED NULL DEFINED AS '' 如
CREATE TABLE hive_tb (id int,name STRING)
PARTITIONED BY (`day` string,`type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck', `hour` tinyint)
ROW FORMAT DELIMITED
NULL DEFINED AS ''
STORED AS TEXTFILE;
2)修改已存在的表
alter table hive_tb set serdeproperties('serialization.null.format' = '');
- hive 中如何增加一列数据?
新增一列
hive > alter table log_messages add coloumns(
app_name string comment 'Application name',
session_id long comment 'The current session id'
);
-- 增加列的表的最后一个字段之后,在分区字段之前添加。
如果在表中新增一列 new_column,则在原表上直插入 new_column 这一列数据不可行
如果新增一列是分区,则可以新增数据到该分区下
insert into table clear partition(date='20150828',hour='18') select id,url,guid from tracklogs where date='20150828' and hour='18';
- 运行 spark
-
有没有 hive 处理过 json? 有哪些函数?
-
建表时制定 jar 包处理 json 数据
- 首先添加 jar 包
-
ADD JAR hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar;
2. 建表
```
hive (default)> ADD JAR hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar;
Added [hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar] to class path
Added resources: [hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar]
hive (default)> create table spark_people_json(
>
> `name` string,
>
> `age` int)
>
> ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
>
> STORED AS TEXTFILE;
OK
Time taken: 4.445 seconds
```
2. 记录下如果只是某个字段为 json,想要获取里面的某个值怎么操作?1. get_json_object()
只能获取一个字段
```sql
select get_json_object('{"shop":{"book":[{"price":43.3,"type":"art"},{"price":30,"type":"technology"}],"clothes":{"price":19.951,"type":"shirt"}},"name":"jane","age":"23"}', '$.shop.book[0].type');
```
2. json_tuple()
可以获取多个字段
```sql
select json_tuple('{"name":"jack","server":"www.qq.com"}','server','name')
```
3. 自行编写 UDF
- sparkstreaming 正在运行的程序如何去中止? 怎么安全停止? 代码做了更新, 如何让正在运行的和更新后的代码做一个交替?
- sparkstreaming 和 kafka 集成中 精确一次的数据消费如何实现?
使用直接连接方式
-
消息语义有几种?
- at least once — 消息绝不会丢,但可能会重复传输
- at most once — 消息可能会丢,但绝不会重复传输
- exactly once — 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的
- kafka 的消费者怎么去保证精确一次的消费?
-
sparkstreaming 和 kafka 集成有几种方式?
- Receiver-based Approach
- Direct Approach (No Receivers) native Offsets
- 怎么实现 sparkstreaming?
- 项目中有几个人 如何分配?
- 所在项目中的存储架构?
- 开发工具用的是什么?(什么情况用什么工具 /xshell/idea)
- 代码怎么去做的管理(git)?
- hive 中的分析函数?
- 常见的字符串用哪些函数?
-
hive 中如何去统计每周一, 每个月的第一天的 pv?
获取指定日期月份的第一天、年份的第一天
select trunc('2019-02-24', 'YYYY'); select trunc('2019-02-24', 'MM');
指定日期下周的指定周几
select next_day('2019-02-24', 'TU');
按指定格式返回指定日期增加几个月后的日期
select add_months('2019-02-28', 1); select add_months('2019-02-24 21:15:16', 2, 'YYYY-MM-dd HH:mm:ss');
select count(guid) from table group by trunc(date, ‘MM’)
select count(guid) from table group by next_day(‘2019-06-08’, ‘MONDAY’);
- dataset 和 dataframe 区别?
Spark2.x 之后,官方已经将(DataFrame)/Dataset(数据集)API 的进行了 统一,DataFrame 仅 是 Dataset 中每个元素为 Row 类型的时候
不同之处在于 Dataset 是 strongly typed(强类型的),而 dataframe 则是 untypedrel(弱类型的)
-
项目中 hive 的元数据在哪儿保存?
- 若没有制定将 metastore 保存到指定的数据库,则 metastore 默认存放在 hive 自带的 deybe 数据库中,这就是安装 hive 的嵌入模式
- 若在设置中制定外部数据库,则保存在该数据库中,本地模式和远程模式使用这种方式保存 metastore
-
元数据怎么保证他的安全性?
修改元数据所用的用户名和密码
<property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>123456</value> </property>
在 mysql 端设置 metastore 数据库的访问权限
-
sqoop 导入导出有几种方式? 增量导出?
导入
全量导入 ``` [hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/test_db \ > --username root \ > --password root \ > --table toHdfs \ > --target-dir /toHdfs \ > --direct \ > --delete-target-dir \ > --fields-terminated-by '\t' \ > -m 1 ``` 增量导入 append ```sh bin/sqoop import \ --connect jdbc:mysql://linux03.ibf.com:3306/mydb \ --username root \ --password 123456 \ --table user \ --fields-terminated-by '\t' \ --target-dir /sqoop/incremental \ -m 1 \ --direct \ --check-column id \ --incremental append \ --last-value 3 ``` 增量导入 lastmodified 表中必须有一列指示时间 ``` sqoop import \ --connect jdbc:mysql://master:3306/test \ --username hive \ --password 123456 \ --table customertest \ --check-column last_mod \ --incremental lastmodified \ --last-value "2016-12-15 15:47:29" \ -m 1 \ --append ```
导出
插入 默认情况下,sqoop-export 将新行添加到表中;每行输入记录都被转换成一条 INSERT 语句,将此行记录添加到目标数据库表中。如果数据库中的表具有约束条件(例如,其值必须唯一的主键列)并且已有数据存在,则必须注意避免插入违反这些约束条件的记录。如果 INSERT 语句失败,导出过程将失败。此模式主要用于将记录导出到可以接收这些结果的空表中。更新 如果指定了 --update-key 参数,则 Sqoop 将改为修改数据库中表中现有的数据。每个输入记录都将转化为 UPDATE 语句修改现有数据。语句修改的行取决于 --update-key 指定的列名,如果数据库中的表中不存在的数据,那么也不会插入。根据目标数据库,如果要更新已存在于数据库中的行,或者如果行尚不存在则插入行,则还可以 --update-mode 使用 allowinsert 模式指定参数
- sparkstreaming 按批处理 hdfs 做保存 小文件过多问题?
使用窗口函数,指定足够长的窗口处理数据,总而使数据量足够大(最好在一个 block 大小左右),完成后使用 foreachRDD 将数据写出到 HDFS
-
hive 中的负责数据类型有哪些?
- 数值型
TINYINT
SMALLINT
INT/INTEGER
BIGINT
FLOAT
DOUBLE
DECIMAL
- 字符型
string
varchar
char
- 日期型
TIMESTAMP
DATE
- 数值型
- hive 中数据的导入导出, 如何加载? 保存在哪儿?
导入:
load data [local] inpath '路径' overwrite into table 表名
导出:
insert overwrite [local] directory '/home/hadoop/data' select * from emp_p;
local:加 local 是从本地加载,不加 local 是从 hdfs 加载
-
RDD 的创建方式?
- 并行化在 driver 端 已有的数据集,不能 parallelize executor 端的数据
scala> var data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5) scala> val rdd = sc.parallelize(data) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
- 引用或者读取一个外部存储系统的数据集,比如像 HDFS,Hbase,或者任何 Hadoop inputFormat 的子类的数据集
scala> sc.textFile("student.log") res0: org.apache.spark.rdd.RDD[String] = student.log MapPartitionsRDD[1] at textFile at <console>:25
- 从已经存在的 RDD,调用 transformation 算子,产生新的子 RDD
- 运行中 driver 的内存溢出 怎么处理?
- 有一个正在运行的 sparkstreaming 的程序, 如何进行升级切换?
- 一个应用程序正在运行, 在不停机的情况下, 要修改它的计算位移怎么操作?
-
hadoop 的压缩格式
bin/hadoop checknative -a
[chen@centos01 hadoop-2.6.0-cdh5.14.2]$ bin/hadoop checknative -a 19/06/05 19:15:45 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native 19/06/05 19:15:45 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library Native library checking: hadoop: true /opt/modules/hadoop-2.6.0-cdh5.14.2/lib/native/libhadoop.so.1.0.0 zlib: true /lib64/libz.so.1 snappy: true /opt/modules/hadoop-2.6.0-cdh5.14.2/lib/native/libsnappy.so.1 lz4: true revision:10301 bzip2: true /lib64/libbz2.so.1 openssl: true /usr/lib64/libcrypto.so
-
sparksql 处理完的 dataframe 结果要保存在数据库中, 具体应该怎么做?
spark .read .table("mydb.emp") .write .mode(SaveMode.Ignore) .jdbc("jdbc:mysql://centos01:3306/mydb", "emp", prop)
- mapreduce 过程中 shuffle 的优化?
shuffle 过程:map 端:环形缓冲区(到 80%)--》溢写(分区,排序)--》combiner --》compress --》reduce 端:--》merge --》排序 --》group
combiner 可选择开启,在 map 端进行一次小 reduce
compress 可选择开区,将结果压缩,减少 IO
shuffle 中分区时采用 HashPartitioner,相同的 key 会进入同一个 reduce,key 分布不均会导致数据倾斜,参考数据倾斜优化过程
-
hive 二次排序的问题?
order by:全局有序,最终数据会进入一个 reduce 中,不推荐使用
sort by:局部有序,每个 reduce 中的数据局有序
distribute by
通过 distribute by 设置分区,使用 sort by 设置分区内排序 distribute by 经常与 sort by 在一起使用
cluster by:distribute by 和 sort by 条件一致时 使用 cluster by
二次排序在 by 后面加上字段名 a, b, c …,hive 会先按 a 排序,若 a 相同按 b 排序,若 b 相同按 c 排序
select * from score order by score.s_id asc, score.s_score desc;
score.s_id score.c_id score.s_score 01 03 99 01 02 90 01 01 80 02 03 80 02 01 70 02 02 60 03 03 80 03 02 80 03 01 80 04 01 50 04 02 30 04 03 20 05 02 87 05 01 76 06 03 34 06 01 31 07 03 98 07 02 89 Time taken: 96.333 seconds, Fetched: 18 row(s)
可以知道为什么不用 order by 排序了
- rowkey 怎么设计, 为什么这么设计?
- sparkstreaming 窗口函数 开窗怎么开的, 开的多长
- sparkstreaming 窗口函数的三个参数是怎么设计的?
- 特别是离线数据清洗这块, 之前讲的 mapreduce 急需更新
- region 动态分割时, 集群的状态? 说一下对 storm 的理解?