一.MapReduce 性能优化概述
1. MapReduce 的利用场景
Hadoop 蕴含了 GFS 的开源实现 HDFS(Hadoop distributed file system)和 MapReduce 框架的开源实现。Hadoop 失去了企业界及学术界关注,Yahoo、Facebook、Cloudera、Twitter、Intel、华为等诸多公司和技术个人对 Hadoop 给予了大力支持。Cloudera 对 Apache Hadoop 及相干组件的版本兼容性进行了整合、性能优化、功能测试,推出了其企业版的开源 Hadoop。Intel 推出了高效、平安及易于治理的 Hadoop 企业版。Hadoop 因为其开源性质,已成为目前钻研、优化云计算框架的重要样本和根底。其中 MapReduce 框架很适宜解决文档剖析、倒排索引建设等类型的利用,然而在列存储、索引建设、连贯计算、迭代计算、科学计算及调度算法方面性能须要进一步优化。
2. 优缺点与需要
1. 长处
- Mapreduce 易于编程
它简略的实现一些接口,就能够实现一个分布式程序,这个程序能够散布到大量的便宜的 pc 机器上运行。也就是说你写一个分布式程序,跟写一个简略的串行程序是截然不同的。就是因为这个个性使的 Mapreduce 编程变得十分风行。 - 良好的扩展性
我的项目中当你的计算资源得不到满足的时候,你能够通过简略的通过减少机器来扩大它的计算能力 - 高容错性
Mapreduce 的设计初衷就是使程序可能部署在便宜的 pc 机器上,这就要求它具备很高的容错性。比方一个机器挂了,它能够把下面的计算工作转移到另一个节点上运行,不至于这个工作运行失败,而且这个过程不须要人工参加,而齐全是由 hadoop 外部实现的。 - 适宜 PB 级以上海量数据的离线解决
2. 毛病
MapReduce 尽管有很多的劣势,然而也有它不善于的。这里的“不善于”,不代表不能做,而是在有些场景下实现的成果差,并不适宜用 MapReduce 来解决,次要体现在以下后果方面:
- 实时计算: MapReduce 次要解决的数据来自于文件系统,所以无奈像 Oracle 或 MySQL 那样在毫米或秒级内返回后果,如果须要大数据量的毫秒级响应,能够思考联合实时存储系统来实现,利用 HBase、Kudu 等.
- 流计算: 流计算的输出数据是动静的,而 MapReduce 次要的输出来自于 HDFS 等文件系统,数据是动态的,不能动态变化,这是因为 MapReduce 本身的设计特点决定了数据源必须是动态的。如果须要解决流式数据能够用 Storm,Spark Steaming、Flink 等流计算框架。
- DGA(有向无环图)计算: 多个应用程序存在依赖关系,后一个应用程序的输出为前一个的输入。在这种状况下,MapReduce 并不是不能做,而是应用后,每个 MapReduce 作业的输入后果都会写入磁盘,会造成大量的词频 IO 导致性能十分低下,此时能够思考用 Spark 等迭代计算框架。
综合以上问题,MapReduce 在解决离线分布式计算的过程中,次要思考如何晋升性能,放慢分布式计算过程。
3. 需要
基于整个 MapReduce 所存在的毛病,因为 MapReduce 整体构造曾经固定,所以整体的优化计划只能从以下两点来思考实现:
- 利用列存储思维,优化存储构造列存储在数据仓库、OLAP (on-line analytical processing)联机剖析解决等利用上能够进步查问性能。利用列存储思维对 MapReduce 框架进行优化,面临正当的 数据结构设计及数据压缩等挑战。
- 利用硬件资源优化连贯算法,进步每个阶段的连贯效率,MapReduce 框架解决连贯操作的过程比较复杂,面临数据歪斜、分布式环境数据传输及须要多个 MapReduce 作业等挑战,优化 MapReduce 每个阶段中的资源能够充沛的利用硬件资源性能来晋升 MapReduce 的效率。
二. IO 性能优化:文件类型
1. 优化计划
- 针对 HDFS 最后是为拜访大文件而开发的, 所以 会呈现对大量小文件的存储效率不高问题 , MapReduce 在读取小文件进行解决时,也存在资源节约导致计算效率不高的问题采纳 SequenceFile 和 MapFile 设计一个 HDFS 中合并存储小文件的计划。该计划的次要思维是 将小文件序列化存入一个 SequenceFIle/MapFile 容器, 合并成大文件, 并建设相应的索引文件, 无效升高文件数目和进步拜访效率 . 通过和现有的 Hadoop Archives(HAR files) 文件归档解决小文件问题的计划比照, 试验结果表明, 基于 SequenceFile 或者 MapFile 的存储小文件计划能够更为无效的进步小文件存储性能和缩小 HDFS 文件系统的节点内存耗费
- 针对一般按行存储文本文件,MapReduce 在解决实现聚合、过滤等性能时,性能绝对较差,针对行式存储的数据处理性能差的问题,能够抉择应用列式存储的计划来实现数据聚合解决,升高数据传输及读写的 IO,进步整体 MapReduce 计算解决的性能
2. SequenceFile
1. 介绍
SequenceFile 是 hadoop 里用来存储序列化的键值对即二进制的一种文件格式。SequenceFile 文件也能够作为 MapReduce 作业的输出和输入,hive 和 spark 也反对这种格局。
它有如下几个长处:
- 以二进制的 KV 模式存储数据,与底层交互更加敌对,性能更快,所以能够在 HDFS 里存储图像或者更加简单的构造作为 KV 对。
- SequenceFile 反对压缩和分片。 当你压缩为一个 SequenceFile 时,并不是将整个文件压缩成一个独自的单元,而是压缩文件里的 record 或者 block of records(块)。因而 SequenceFile 是可能反对分片的,即便应用的压缩形式如 Snappy, Lz4 or Gzip 不反对分片,也能够利用 SequenceFIle 来实现分片。
- SequenceFile 也能够用于存储多个小文件。因为 Hadoop 自身就是用来解决大型文件的,小文件是不适宜的,所以用一个 SequenceFile 来存储很多小文件就能够进步解决效率,也能节俭 Namenode 内存,因为 Namenode 只需一个 SequenceFile 的 metadata,而不是为每个小文件创建独自的 metadata。
- 因为数据是以 SequenceFile 模式存储,所以两头输入文件即 map 输入也会用 SequenceFile 来存储,能够进步整体的 IO 开销性能
2. 存储特点
- sequenceFile 文件是 Hadoop 用来存储二进制模式的 [Key,Value] 对而设计的一种立体文件(Flat File)。
- 能够把 SequenceFile 当做是一个容器,把所有的文件打包到 SequenceFile 类中能够高效的对小文件进行存储和解决。
- SequenceFile 文件并不依照其存储的 Key 进行排序存储,SequenceFile 的外部类 Writer 提供了 append 性能。
- SequenceFile 中的 Key 和 Value 能够是任意类型 Writable 或者是自定义 Writable。
- 存储构造上,SequenceFile 次要由一个 Header 后跟多条 Record 组成,Header 次要蕴含了 Key classname,value classname,存储压缩算法,用户自定义元数据等信息,此外,还蕴含了一些同步标识,用于疾速定位到记录的边界。每条 Record 以键值对的形式进行存储,用来示意它的字符数组能够一次解析成:记录的长度、Key 的长度、Key 值和 value 值,并且 Value 值的构造取决于该记录是否被压缩。
- 在 recourds 中,又分为是否压缩格局。当没有被压缩时,key 与 value 应用 Serialization 序列化写入 SequenceFile。当抉择压缩格局时,record 的压缩格局与没有压缩其实不尽相同,除了 value 的 bytes 被压缩,key 是不被压缩的
- 在 Block 中,它使所有的信息进行压缩,压缩的最小大小由配置文件中 io.seqfile.compress.blocksize 配置项决定。
3. SequenceFile 工具类
- SequenceFileOutputFormat
用于将 MapReduce 的后果输入为 SequenceFile 文件 - SequenceFileInputFormat
用于读取 SequenceFile 文件
4. 生成 SequenceFile
- 需要: 将一般文件转换为 SequenceFile 文件
-
思路
- 应用 TextInputFormat 读取一般文件文件
- Map 阶段对读取文件的每一行进行输入
- Reduce 阶段间接输入每条数据
- 应用 SequenceFileOutputFormat 将后果保留为 SequenceFile
-
代码实现
- Driver 类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Iterator; /** * @ClassName MrWriteToSequenceFile * @Description TODO 读取文本文件,转换为 SequenceFile 文件 * @Create By itcast */ public class MrWriteToSequenceFile extends Configured implements Tool { // 构建、配置、提交一个 MapReduce 的 Job public int run(String[] args) throws Exception { // 实例化作业 Job job = Job.getInstance(this.getConf(), "MrWriteToSequenceFile"); // 设置作业的主程序 job.setJarByClass(this.getClass()); // 设置作业的输出为 TextInputFormat(一般文本)job.setInputFormatClass(TextInputFormat.class); // 设置作业的输出门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 设置 Map 端的实现类 job.setMapperClass(WriteSeqFileAppMapper.class); // 设置 Map 端输出的 Key 类型 job.setMapOutputKeyClass(NullWritable.class); // 设置 Map 端输出的 Value 类型 job.setMapOutputValueClass(Text.class); // 设置作业的输入为 SequenceFileOutputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class); // 应用 SequenceFile 的块级别压缩 SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK); // 设置 Reduce 端的实现类 job.setReducerClass(WriteSeqFileAppReducer.class); // 设置 Reduce 端输入的 Key 类型 job.setOutputKeyClass(NullWritable.class); // 设置 Reduce 端输入的 Value 类型 job.setOutputValueClass(Text.class); // 从参数中获取输入门路 Path outputDir = new Path(args[1]); // 如果输入门路已存在则删除 outputDir.getFileSystem(this.getConf() ).delete(outputDir, true); // 设置作业的输入门路 FileOutputFormat.setOutputPath(job, outputDir); // 提交作业并期待执行实现 return job.waitForCompletion(true) ? 0 : 1; } // 程序入口,调用 run public static void main(String[] args) throws Exception { // 用于治理以后程序的所有配置 Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new MrWriteToSequenceFile(), args); System.exit(status); } }
- Mapper 类
/** * 定义 Mapper 类 */ public static class WriteSeqFileAppMapper extends Mapper<LongWritable, Text,NullWritable, Text>{ private NullWritable outputKey; @Override protected void setup(Context context) throws IOException, InterruptedException {this.outputKey = NullWritable.get(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(outputKey, value); } @Override protected void cleanup(Context context) throws IOException, InterruptedException {this.outputKey = null;}}
- Reduce 类
/** * 定义 Reduce 类 */ public static class WriteSeqFileAppReducer extends Reducer<NullWritable,Text,NullWritable,Text>{ private NullWritable outputKey; @Override protected void setup(Context context) throws IOException, InterruptedException {this.outputKey = NullWritable.get(); } @Override protected void reduce(NullWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {Iterator<Text> iterator = value.iterator(); while (iterator.hasNext()) {context.write(outputKey, iterator.next()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException {this.outputKey = null;} }
5. 读取 SequenceFile
- 需要: 将上一步转换好的 SequenceFile 再解析转换为一般文本文件内容
-
思路:
- 应用 SequenceFileInputformat 读取 SequenceFile
- Map 阶段间接输入每一条数据
- Reduce 阶段间接输入每一条数据
- 应用 TextOutputFormat 将后果保留为一般文本文件
-
代码实现
- Driver 类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Iterator; /** * @ClassName MrReadFromSequenceFile * @Description TODO 读取 SequenceFile 文件,转换为一般文本文件 * @Create By itcast */ public class MrReadFromSequenceFile extends Configured implements Tool { // 构建、配置、提交一个 MapReduce 的 Job public int run(String[] args) throws Exception { // 实例化作业 Job job = Job.getInstance(this.getConf(), "MrReadFromSequenceFile"); // 设置作业的主程序 job.setJarByClass(this.getClass()); // 设置作业的输出为 SequenceFileInputFormat(SequenceFile 文本)job.setInputFormatClass(SequenceFileInputFormat.class); // 设置作业的输出门路 SequenceFileInputFormat.addInputPath(job, new Path(args[0])); // 设置 Map 端的实现类 job.setMapperClass(ReadSeqFileAppMapper.class); // 设置 Map 端输出的 Key 类型 job.setMapOutputKeyClass(NullWritable.class); // 设置 Map 端输出的 Value 类型 job.setMapOutputValueClass(Text.class); // 设置作业的输入为 TextOutputFormat job.setOutputFormatClass(TextOutputFormat.class); // 设置 Reduce 端的实现类 job.setReducerClass(ReadSeqFileAppReducer.class); // 设置 Reduce 端输入的 Key 类型 job.setOutputKeyClass(NullWritable.class); // 设置 Reduce 端输入的 Value 类型 job.setOutputValueClass(Text.class); // 从参数中获取输入门路 Path outputDir = new Path(args[1]); // 如果输入门路已存在则删除 outputDir.getFileSystem(this.getConf()).delete(outputDir, true); // 设置作业的输入门路 TextOutputFormat.setOutputPath(job, outputDir); // 提交作业并期待执行实现 return job.waitForCompletion(true) ? 0 : 1; } // 程序入口,调用 run public static void main(String[] args) throws Exception { // 用于治理以后程序的所有配置 Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new MrReadFromSequenceFile(), args); System.exit(status); } }
- Mapper 类
/** * 定义 Mapper 类 */ public static class ReadSeqFileAppMapper extends Mapper<NullWritable, Text, NullWritable, Text> { private NullWritable outputKey; @Override protected void setup(Context context) throws IOException, InterruptedException {this.outputKey = NullWritable.get(); } @Override protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(outputKey, value); } @Override protected void cleanup(Context context) throws IOException, InterruptedException {this.outputKey = null;}
- Reducer 类
/** * 定义 Reduce 类 */ public static class ReadSeqFileAppReducer extends Reducer<NullWritable,Text,NullWritable,Text>{ private NullWritable outputKey; @Override protected void setup(Context context) throws IOException, InterruptedException {this.outputKey = NullWritable.get(); } @Override protected void reduce(NullWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {Iterator<Text> iterator = value.iterator(); while (iterator.hasNext()) {context.write(outputKey, iterator.next()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException {this.outputKey = null;} }
3. MapFile
1. 介绍
能够了解 MapFile 是排序后的 SequenceFile, 通过观察其构造能够看到 MapFile 由两局部组成别离是 data 和 index。data 既存储数据的文件,index 作为文件的数据索引,次要记录了每个 Record 的 Key 值,以及该 Record 在文件中的偏移地位。在 MapFile 被拜访的时候,索引文件会被加载到内存,通过索引映射关系能够迅速定位到指定 Record 所在文件地位,因而,绝对 SequenceFile 而言,MapFile 的检索效率是最高的,毛病是会耗费一部分内存来存储 index 数据。
MapFile 的数据存储构造:
2. MapFile 工具类
- MapFileOutputFormat:用于将 MapReduce 的后果输入为 MapFile
-
MapReduce 中没有封装 MapFile 的读取输出类,可依据状况抉择以下计划来实现
- 自定义 InputFormat,应用 MapFileOutputFormat 中的 getReader 办法获取读取对象
- 应用 SequenceFileInputFormat 对 MapFile 的数据进行解析
-
生成 MapFile 文件
- 需要: 将一般文件转换为 MapFile 文件
-
思路:
- Input 读取一个一般文件
- Map 阶段构建随机值作为 Key,构建有序
- Output 生成 MapFile 文件
- 实现
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Iterator; import java.util.Random; /** * @ClassName MrWriteToMapFile * @Description TODO 读取文本文件,转换为 MapFile 文件 * @Create By itcast */ public class MrWriteToMapFile extends Configured implements Tool { // 构建、配置、提交一个 MapReduce 的 Job public int run(String[] args) throws Exception {Configuration conf = getConf(); // 实例化作业 Job job = Job.getInstance(conf, "MrWriteToMapFile"); // 设置作业的主程序 job.setJarByClass(this.getClass()); // 设置作业的输出为 TextInputFormat(一般文本)job.setInputFormatClass(TextInputFormat.class); // 设置作业的输出门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 设置 Map 端的实现类 job.setMapperClass(WriteMapFileAppMapper.class); // 设置 Map 端输出的 Key 类型 job.setMapOutputKeyClass(IntWritable.class); // 设置 Map 端输出的 Value 类型 job.setMapOutputValueClass(Text.class); // 设置作业的输入为 MapFileOutputFormat job.setOutputFormatClass(MapFileOutputFormat.class); // 设置 Reduce 端的实现类 job.setReducerClass(WriteMapFileAppReducer.class); // 设置 Reduce 端输入的 Key 类型 job.setOutputKeyClass(IntWritable.class); // 设置 Reduce 端输入的 Value 类型 job.setOutputValueClass(Text.class); // 从参数中获取输入门路 Path outputDir = new Path(args[1]); // 如果输入门路已存在则删除 outputDir.getFileSystem(conf).delete(outputDir, true); // 设置作业的输入门路 MapFileOutputFormat.setOutputPath(job, outputDir); // 提交作业并期待执行实现 return job.waitForCompletion(true) ? 0 : 1; } // 程序入口,调用 run public static void main(String[] args) throws Exception { // 用于治理以后程序的所有配置 Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new MrWriteToMapFile(), args); System.exit(status); } /** * 定义 Mapper 类 */ public static class WriteMapFileAppMapper extends Mapper<LongWritable, Text, IntWritable, Text>{ // 定义输入的 Key, 每次随机生成一个值 private IntWritable outputKey = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 随机生成一个数值 Random random = new Random(); this.outputKey.set(random.nextInt(100000)); context.write(outputKey, value); } /** * 定义 Reduce 类 */ public static class WriteMapFileAppReducer extends Reducer<IntWritable,Text,IntWritable,Text>{ @Override protected void reduce(IntWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {Iterator<Text> iterator = value.iterator(); while (iterator.hasNext()) {context.write(key, iterator.next()); } } } }
-
读取 MapFile 文件
- 需要:将 MapFile 解析为一般文件内容
-
思路:
- input 读取 MapFile,留神,Hadoop 没有提供 MapFileInputFormat,所以应用 SequenceFileInputFormat 来解析,或者能够自定义 InputFormat
- Map 和 Reduce 间接输入
- Output 将后果保留为一般文件
- 实现
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; import java.util.Iterator; /** * @ClassName MrReadFromMapFile * @Description TODO 读取 MapFile 文件,转换为一般文本文件 * @Create By itcast */ public class MrReadFromMapFile extends Configured implements Tool { // 构建、配置、提交一个 MapReduce 的 Job public int run(String[] args) throws Exception { // 实例化作业 Job job = Job.getInstance(this.getConf(), "MrReadFromMapFile"); // 设置作业的主程序 job.setJarByClass(this.getClass()); // 设置作业的输出为 SequenceFileInputFormat(Hadoop 没有间接提供 MapFileInput)job.setInputFormatClass(SequenceFileInputFormat.class); // 设置作业的输出门路 SequenceFileInputFormat.addInputPath(job, new Path(args[0])); // 设置 Map 端的实现类 job.setMapperClass(ReadMapFileAppMapper.class); // 设置 Map 端输出的 Key 类型 job.setMapOutputKeyClass(NullWritable.class); // 设置 Map 端输出的 Value 类型 job.setMapOutputValueClass(Text.class); // 设置作业的输入为 SequenceFileOutputFormat job.setOutputFormatClass(TextOutputFormat.class); // 设置 Reduce 端的实现类 job.setReducerClass(ReadMapFileAppReducer.class); // 设置 Reduce 端输入的 Key 类型 job.setOutputKeyClass(NullWritable.class); // 设置 Reduce 端输入的 Value 类型 job.setOutputValueClass(Text.class); // 从参数中获取输入门路 Path outputDir = new Path(args[1]); // 如果输入门路已存在则删除 outputDir.getFileSystem(this.getConf()).delete(outputDir, true); // 设置作业的输入门路 TextOutputFormat.setOutputPath(job, outputDir); // 提交作业并期待执行实现 return job.waitForCompletion(true) ? 0 : 1; } // 程序入口,调用 run public static void main(String[] args) throws Exception { // 用于治理以后程序的所有配置 Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new MrReadFromMapFile(), args); System.exit(status); } /** * 定义 Mapper 类 */ public static class ReadMapFileAppMapper extends Mapper<IntWritable, Text, NullWritable, Text> {private NullWritable outputKey = NullWritable.get(); @Override protected void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(outputKey, value); } } /** * 定义 Reduce 类 */ public static class ReadMapFileAppReducer extends Reducer<NullWritable,Text,NullWritable,Text>{ @Override protected void reduce(NullWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {Iterator<Text> iterator = value.iterator(); while (iterator.hasNext()) {context.write(key, iterator.next()); } } } }
4. ORCFile
1. ORC 介绍
ORC(OptimizedRC File)文件格式是一种 Hadoop 生态圈中的列式存储格局,源自于 RC(RecordColumnar File),它的产生早在 2013 年初,最后产生自 Apache Hive,用于升高 Hadoop 数据存储空间和减速 Hive 查问速度。它并不是一个单纯的列式存储格局,依然是首先依据行组宰割
整个表,在每一个行组内进行按列存储。ORC 文件是自描述的,它的元数据应用 Protocol Buffers 序列化,并且文件中的数据尽可能的压缩以升高存储空间的耗费,目前也被 Spark SQL、Presto 等查问引擎反对。2015 年 ORC 我的项目被 Apache 我的项目基金会晋升为 Apache 顶级我的项目。
ORC 文件中保留了三个层级的统计信息,别离为 文件级别、stripe 级别和 row group 级别 的,他们都能够用来依据 Search ARGuments(谓词下推条件)判断是否能够跳过某些数据,在统计信息中都蕴含成员数和是否有 null 值,并且对于不同类型的数据设置一些特定的统计信息。
性能测试:
- 原始 Text 格局,未压缩 : 38.1 G
- ORC 格局,默认压缩(ZLIB): 11.5 G
- Parquet 格局,默认压缩(Snappy):14.8 G
- 测试比照:简单数据 Join 关联测试
2. ORCFile 工具类
1. 增加 ORC 与 MapReduce 集成的 Maven 依赖
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<version>1.6.3</version>
</dependency>
2. 生成 ORC 文件
- 需要: 将一般文件转换为 ORC 文件
-
实现思路:
- Input 阶段读取一般文件
- Map: 阶段间接输入数据,没有 Reduce 阶段
- Output 阶段应用 OrcOutputFormat 保留为 ORC 文件类型
-
实现
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.orc.OrcConf; import org.apache.orc.TypeDescription; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapreduce.OrcOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * @ClassName WriteOrcFileApp * @Description TODO 用于读取一般文本文件转换为 ORC 文件 */ public class WriteOrcFileApp extends Configured implements Tool { // 作业名称 private static final String JOB_NAME = WriteOrcFileApp.class.getSimpleName(); // 构建日志监听 private static final Logger LOG = LoggerFactory.getLogger(WriteOrcFileApp.class); // 定义数据的字段信息 private static final String SCHEMA = "struct<id:string,type:string,orderID:string,bankCard:string,cardType:string,ctime:string,utime:string,remark:string>"; /** * 重写 Tool 接口的 run 办法,用于提交作业 * @param args * @return * @throws Exception */ public int run(String[] args) throws Exception { // 设置 Schema OrcConf.MAPRED_OUTPUT_SCHEMA.setString(this.getConf(), SCHEMA); // 实例化作业 Job job = Job.getInstance(this.getConf(), JOB_NAME); // 设置作业的主程序 job.setJarByClass(WriteOrcFileApp.class); // 设置作业的 Mapper 类 job.setMapperClass(WriteOrcFileAppMapper.class); // 设置作业的输出为 TextInputFormat(一般文本)job.setInputFormatClass(TextInputFormat.class); // 设置作业的输入为 OrcOutputFormat job.setOutputFormatClass(OrcOutputFormat.class); // 设置作业应用 0 个 Reduce(间接从 map 端输入)job.setNumReduceTasks(0); // 设置作业的输出门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 从参数中获取输入门路 Path outputDir = new Path(args[1]); // 如果输入门路已存在则删除 outputDir.getFileSystem(this.getConf()).delete(outputDir, true); // 设置作业的输入门路 OrcOutputFormat.setOutputPath(job, outputDir); // 提交作业并期待执行实现 return job.waitForCompletion(true) ? 0 : 1; } // 程序入口,调用 run public static void main(String[] args) throws Exception { // 用于治理以后程序的所有配置 Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new WriteOrcFileApp(), args); System.exit(status); } /** * 实现 Mapper 类 */ public static class WriteOrcFileAppMapper extends Mapper<LongWritable, Text, NullWritable, OrcStruct> { // 获取字段形容信息 private TypeDescription schema = TypeDescription.fromString(SCHEMA); // 构建输入的 Key private final NullWritable outputKey = NullWritable.get(); // 构建输入的 Value 为 ORCStruct 类型 private final OrcStruct outputValue = (OrcStruct) OrcStruct.createValue(schema); public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException { // 将读取到的每一行数据进行宰割,失去所有字段 String[] fields = value.toString().split(",",8); // 将所有字段赋值给 Value 中的列 outputValue.setFieldValue(0, new Text(fields[0])); outputValue.setFieldValue(1, new Text(fields[1])); outputValue.setFieldValue(2, new Text(fields[2])); outputValue.setFieldValue(3, new Text(fields[3])); outputValue.setFieldValue(4, new Text(fields[4])); outputValue.setFieldValue(5, new Text(fields[5])); outputValue.setFieldValue(6, new Text(fields[6])); outputValue.setFieldValue(7, new Text(fields[7])); // 输入 KeyValue output.write(outputKey, outputValue); } } }
问题:
报错:短少 orc-mapreduce 的 jar 包依赖
解决:将 orc-MapReduce 的 jar 包增加到 Hadoop 的环境变量中,所有 NodeManager 节点都要增加
cp orc-shims-1.6.3.jar orc-core-1.6.3.jar orc-mapreduce-1.6.3.jar
aircompressor-0.15.jar hive-storage-api-2.7.1.jar /export/server/hadoop-3.1.4/share/hadoop/mapreduce/
3. 读取 ORC 文件
- 需要: 读取 ORC 文件,还原成一般文本文件
-
思路:
- Input 阶段读取上一步当中生成的 ORC 文件
- Map 阶段间接读取输入
- Output 阶段将后果保留为一般文本文件
-
实现
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapreduce.OrcInputFormat; import java.io.IOException; /** * @ClassName ReadOrcFileApp * @Description TODO 读取 ORC 文件进行解析还原成一般文本文件 */ public class ReadOrcFileApp extends Configured implements Tool { // 作业名称 private static final String JOB_NAME = WriteOrcFileApp.class.getSimpleName(); /** * 重写 Tool 接口的 run 办法,用于提交作业 * @param args * @return * @throws Exception */ public int run(String[] args) throws Exception { // 实例化作业 Job job = Job.getInstance(this.getConf(), JOB_NAME); // 设置作业的主程序 job.setJarByClass(ReadOrcFileApp.class); // 设置作业的输出为 OrcInputFormat job.setInputFormatClass(OrcInputFormat.class); // 设置作业的输出门路 OrcInputFormat.addInputPath(job, new Path(args[0])); // 设置作业的 Mapper 类 job.setMapperClass(ReadOrcFileAppMapper.class); // 设置作业应用 0 个 Reduce(间接从 map 端输入)job.setNumReduceTasks(0); // 设置作业的输出为 TextOutputFormat job.setOutputFormatClass(TextOutputFormat.class); // 从参数中获取输入门路 Path outputDir = new Path(args[1]); // 如果输入门路已存在则删除 outputDir.getFileSystem(this.getConf()).delete(outputDir, true); // 设置作业的输入门路 FileOutputFormat.setOutputPath(job, outputDir); // 提交作业并期待执行实现 return job.waitForCompletion(true) ? 0 : 1; } // 程序入口,调用 run public static void main(String[] args) throws Exception { // 用于治理以后程序的所有配置 Configuration conf = new Configuration(); int status = ToolRunner.run(conf, new ReadOrcFileApp(), args); System.exit(status); } /** * 实现 Mapper 类 */ public static class ReadOrcFileAppMapper extends Mapper<NullWritable, OrcStruct, NullWritable, Text> { private NullWritable outputKey; private Text outputValue; @Override protected void setup(Context context) throws IOException, InterruptedException {outputKey = NullWritable.get(); outputValue = new Text();} public void map(NullWritable key, OrcStruct value, Context output) throws IOException, InterruptedException { // 将 ORC 中的每条数据转换为 Text 对象 this.outputValue.set(value.getFieldValue(0).toString()+","+ value.getFieldValue(1).toString()+","+ value.getFieldValue(2).toString()+","+ value.getFieldValue(3).toString()+","+ value.getFieldValue(4).toString()+","+ value.getFieldValue(5).toString()+","+ value.getFieldValue(6).toString()+","+ value.getFieldValue(7).toString()); // 输入后果 output.write(outputKey, outputValue); } } }