1、谈谈 Hadoop 序列化和反序列化及自定义 bean 对象实现序列化?
1)序列化和反序列化
(1)序列化就是把内存中的对象,转换成字节序列(或其余数据传输协定)以便于存储(长久化)和网络传输。
(2)反序列化就是将收到字节序列(或其余数据传输协定)或者是硬盘的长久化数据,转换成内存中的对象。
(3)Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额定的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,hadoop 本人开发了一套序列化机制(Writable),精简、高效。
2)自定义 bean 对象要想序列化传输步骤及注意事项:
(1)必须实现 Writable 接口
(2)反序列化时,须要反射调用空参构造函数,所以必须有空参结构
(3)重写序列化办法
(4)重写反序列化办法
(5)留神反序列化的程序和序列化的程序完全一致
(6)要想把结果显示在文件中,须要重写 toString(),且用 ”\t” 离开,不便后续用
(7)如果须要将自定义的 bean 放在 key 中传输,则还须要实现 comparable 接口,因为 mapreduce 框中的 shuffle 过程肯定会对 key 进行排序
2、FileInputFormat 切片机制(☆☆☆☆☆)
job 提交流程源码详解
waitForCompletion()
submit();
// 1、建设连贯
connect();
// 1)创立提交 job 的代理
new Cluster(getConfiguration());
//(1)判断是本地 yarn 还是近程
initialize(jobTrackAddr, conf);
// 2、提交 job
submitter.submitJobInternal(Job.this, cluster)
// 1)创立给集群提交数据的 Stag 门路
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 2)获取 jobid,并创立 job 门路
JobID jobId = submitClient.getNewJobID();
// 3)拷贝 jar 包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片布局文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
// 5)向 Stag 门路写 xml 配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);
// 6)提交 job, 返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
3、在一个运行的 Hadoop 工作中,什么是 InputSplit?(☆☆☆☆☆)
FileInputFormat 源码解析 (input.getSplits(job))
(1)找到你数据存储的目录。
(2)开始遍历解决(布局切片)目录下的每一个文件。
(3)遍历第一个文件 ss.txt。
a)获取文件大小 fs.sizeOf(ss.txt);。
b)计算切片大小 computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M。
c)默认状况下,切片大小 =blocksize。
d)开始切,造成第 1 个切片:ss.txt—0:128M 第 2 个切片 ss.txt—128:256M 第 3 个切片 ss.txt—256M:300M(每次切片时,都要判断切完剩下的局部是否大于块的 1.1 倍,不大于 1.1 倍就划分一块切片)。
e)将切片信息写到一个切片布局文件中。
f)整个切片的外围过程在 getSplit() 办法中实现。
g)数据切片只是在逻辑上对输出数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit 只记录了分片的元数据信息,比方起始地位、长度以及所在的节点列表等。
h)留神:block 是 HDFS 上物理上存储的存储的数据,切片是对数据逻辑上的划分。
(4)提交切片布局文件到 yarn 上,yarn 上的 MrAppMaster 就能够依据切片布局文件计算开启 maptask 个数。
4、如何断定一个 job 的 map 和 reduce 的数量?
1)map 数量
splitSize=max{minSize,min{maxSize,blockSize}}
map 数量由解决的数据分成的 block 数量决定 default_num = total_size / split_size;
2)reduce 数量
reduce 的数量 job.setNumReduceTasks(x);x 为 reduce 的数量。不设置的话默认为 1。
5、Maptask 的个数由什么决定?
一个 job 的 map 阶段 MapTask 并行度(个数),由客户端提交 job 时的切片个数决定。
6、MapTask 和 ReduceTask 工作机制(☆☆☆☆☆)(也可答复 MapReduce 工作原理)
MapTask 工作机制
(1)Read 阶段:Map Task 通过用户编写的 RecordReader,从输出 InputSplit 中解析出一个个 key/value。
(2)Map 阶段:该节点次要是将解析出的 key/value 交给用户编写 map()函数解决,并产生一系列新的 key/value。大数据培训
(3)Collect 收集阶段:在用户编写 map() 函数中,当数据处理实现后,个别会调用 OutputCollector.collect()输入后果。在该函数外部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。
(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。须要留神的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
(5)Combine 阶段:当所有数据处理实现后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
ReduceTask 工作机制
(1)Copy 阶段:ReduceTask 从各个 MapTask 上近程拷贝一片数据,并针对某一片数据,如果其大小超过肯定阈值,则写到磁盘上,否则间接放到内存中。
(2)Merge 阶段:在近程拷贝数据的同时,ReduceTask 启动了两个后盾线程对内存和磁盘上的文件进行合并,以避免内存应用过多或磁盘上文件过多。
(3)Sort 阶段:依照 MapReduce 语义,用户编写 reduce()函数输出数据是按 key 进行汇集的一组数据。为了将 key 雷同的数据聚在一起,Hadoop 采纳了基于排序的策略。因为各个 MapTask 曾经实现对本人的处理结果进行了部分排序,因而,ReduceTask 只需对所有数据进行一次归并排序即可。
(4)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。
7、形容 mapReduce 有几种排序及排序产生的阶段(☆☆☆☆☆)
1)排序的分类:
(1)局部排序:
MapReduce 依据输出记录的键对数据集排序。保障输入的每个文件外部排序。
(2)全排序:
如何用 Hadoop 产生一个全局排序的文件?最简略的办法是应用一个分区。但该办法在解决大型文件时效率极低,因为一台机器必须解决所有输入文件,从而齐全丢失了 MapReduce 所提供的并行架构。
代替计划:首先创立一系列排好序的文件;其次,串联这些文件;最初,生成一个全局排序的文件。次要思路是应用一个分区来形容输入的全局排序。例如:能够为待剖析文件创建 3 个分区,在第一分区中,记录的单词首字母 a -g,第二分区记录单词首字母 h -n, 第三分区记录单词首字母 o -z。
(3)辅助排序:(GroupingComparator 分组)
Mapreduce 框架在记录达到 reducer 之前按键对记录排序,但键所对应的值并没有被排序。甚至在不同的执行轮次中,这些值的排序也不固定,因为它们来自不同的 map 工作且这些 map 工作在不同轮次中实现工夫各不相同。一般来说,大多数 MapReduce 程序会防止让 reduce 函数依赖于值的排序。然而,有时也须要通过特定的办法对键进行排序和分组等以实现对值的排序。
(4)二次排序:
在自定义排序过程中,如果 compareTo 中的判断条件为两个即为二次排序。
2)自定义排序 WritableComparable
bean 对象实现 WritableComparable 接口重写 compareTo 办法,就能够实现排序
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
3)排序产生的阶段:
(1)一个是在 map side 产生在 spill 后 partition 前。
(2)一个是在 reduce side 产生在 copy 后 reduce 前。
8、形容 mapReduce 中 shuffle 阶段的工作流程,如何优化 shuffle 阶段(☆☆☆☆☆)
分区,排序,溢写,拷贝到对应 reduce 机器上,减少 combiner,压缩溢写的文件。
9、形容 mapReduce 中 combiner 的作用是什么,个别应用情景,哪些状况不须要,及和 reduce 的区别?
1)Combiner 的意义就是对每一个 maptask 的输入进行部分汇总,以减小网络传输量。
2)Combiner 可能利用的前提是不能影响最终的业务逻辑,而且,Combiner 的输入 kv 应该跟 reducer 的输出 kv 类型要对应起来。
3)Combiner 和 reducer 的区别在于运行的地位。
Combiner 是在每一个 maptask 所在的节点运行;
Reducer 是接管全局所有 Mapper 的输入后果。
10、如果没有定义 partitioner,那数据在被送达 reducer 前是如何被分区的?
如果没有自定义的 partitioning,则默认的 partition 算法,即依据每一条数据的 key 的 hashcode 值摸运算(%)reduce 的数量,失去的数字就是“分区号“。
11、MapReduce 呈现单点负载多大,怎么负载平衡?(☆☆☆☆☆)
通过 Partitioner 实现
12、MapReduce 怎么实现 TopN?(☆☆☆☆☆)
能够自定义 groupingcomparator,对后果进行最大值排序,而后再 reduce 输入时,管制只输入前 n 个数。就达到了 topn 输入的目标。
13、Hadoop 的缓存机制(Distributedcache)(☆☆☆☆☆)
分布式缓存一个最重要的利用就是在进行 join 操作的时候,如果一个表很大,另一个表很小,咱们就能够将这个小表进行播送解决,即每个计算节点上都存一份,而后进行 map 端的连贯操作,通过我的试验验证,这种状况下解决效率大大高于个别的 reduce 端 join,播送解决就使用到了分布式缓存的技术。
DistributedCache 将拷贝缓存的文件到 Slave 节点在任何 Job 在节点上执行之前,文件在每个 Job 中只会被拷贝一次,缓存的归档文件会被在 Slave 节点中解压缩。将本地文件复制到 HDFS 中去,接着 Client 会通过 addCacheFile() 和 addCacheArchive()办法通知 DistributedCache 在 HDFS 中的地位。当文件寄存到文地时,JobClient 同样取得 DistributedCache 来创立符号链接,其模式为文件的 URI 加 fragment 标识。当用户须要取得缓存中所有无效文件的列表时,JobConf 的办法 getLocalCacheFiles() 和 getLocalArchives()都返回一个指向本地文件门路对象数组。
14、如何应用 mapReduce 实现两个表的 join?(☆☆☆☆☆)
1)reduce side join : 在 map 阶段,map 函数同时读取两个文件 File1 和 File2,为了辨别两种起源的 key/value 数据对,对每条数据打一个标签(tag), 比方:tag=0 示意来自文件 File1,tag=2 示意来自文件 File2。
2)map side join : Map side join 是针对以下场景进行的优化:两个待连贯表中,有一个表十分大,而另一个表十分小,以至于小表能够间接寄存到内存中。这样,咱们能够将小表复制多份,让每个 map task 内存中存在一份(比方寄存到 hash table 中),而后只扫描大表:对于大表中的每一条记录 key/value,在 hash table 中查找是否有雷同的 key 的记录,如果有,则连贯后输入即可。
15、什么样的计算不能用 mr 来提速?
1)数据量很小。
2)繁冗的小文件。
3)索引是更好的存取机制的时候。
4)事务处理。
5)只有一台机器的时候。
16、ETL 是哪三个单词的缩写
Extraction-Transformation-Loading 的缩写,中文名称为数据提取、转换和加载。