一.MapReduce性能优化概述

1. MapReduce的利用场景

Hadoop蕴含了GFS的开源实现HDFS(Hadoop distributed file system)和MapReduce框架的开源实现。Hadoop失去了企业界及学术界关注,Yahoo、Facebook、Cloudera、Twitter、Intel、华为等诸多公司和技术个人对Hadoop给予了大力支持。Cloudera对Apache Hadoop及相干组件的版本兼容性进行了整合、性能优化、功能测试,推出了其企业版的开源Hadoop。Intel推出了高效、平安及易于治理的Hadoop企业版。Hadoop因为其开源性质,已成为目前钻研、优化云计算框架的重要样本和根底。其中MapReduce框架很适宜解决文档剖析、倒排索引建设等类型的利用,然而在列存储、索引建设、连贯计算、迭代计算、科学计算及调度算法方面性能须要进一步优化。

2. 优缺点与需要

1. 长处

  1. Mapreduce易于编程
    它简略的实现一些接口,就能够实现一个分布式程序,这个程序能够散布到大量的便宜的pc机器上运行。也就是说你写一个分布式程序,跟写一个简略的串行程序是截然不同的。就是因为这个个性使的Mapreduce编程变得十分风行。
  2. 良好的扩展性
    我的项目中当你的计算资源得不到满足的时候,你能够通过简略的通过减少机器来扩大它的计算能力
  3. 高容错性
    Mapreduce的设计初衷就是使程序可能部署在便宜的pc机器上,这就要求它具备很高的容错性。比方一个机器挂了,它能够把下面的计算工作转移到另一个节点上运行,不至于这个工作运行失败,而且这个过程不须要人工参加,而齐全是由hadoop外部实现的。
  4. 适宜PB级以上海量数据的离线解决

2. 毛病


MapReduce尽管有很多的劣势,然而也有它不善于的。这里的“不善于”,不代表不能做,而是在有些场景下实现的成果差,并不适宜用MapReduce来解决,次要体现在以下后果方面:

  1. 实时计算: MapReduce次要解决的数据来自于文件系统,所以无奈像Oracle或MySQL那样在毫米或秒级内返回后果,如果须要大数据量的毫秒级响应,能够思考联合实时存储系统来实现,利用HBase、Kudu等.
  2. 流计算: 流计算的输出数据是动静的,而MapReduce次要的输出来自于HDFS等文件系统,数据是动态的,不能动态变化,这是因为MapReduce本身的设计特点决定了数据源必须是动态的。如果须要解决流式数据能够用Storm,Spark Steaming、Flink等流计算框架。
  3. DGA(有向无环图)计算: 多个应用程序存在依赖关系,后一个应用程序的输出为前一个的输入。在这种状况下,MapReduce并不是不能做,而是应用后,每个MapReduce作业的输入后果都会写入磁盘,会造成大量的词频IO导致性能十分低下,此时能够思考用Spark等迭代计算框架。

综合以上问题,MapReduce在解决离线分布式计算的过程中,次要思考如何晋升性能,放慢分布式计算过程。

3. 需要

基于整个MapReduce所存在的毛病,因为MapReduce整体构造曾经固定,所以整体的优化计划只能从以下两点来思考实现:

  1. 利用列存储思维,优化存储构造列存储在数据仓库、OLAP (on-line analytical processing)联机剖析解决等利用上能够进步查问性能。利用列存储思维对MapReduce框架进行优化,面临正当的数据结构设计及数据压缩等挑战

  1. 利用硬件资源优化连贯算法,进步每个阶段的连贯效率,MapReduce框架解决连贯操作的过程比较复杂,面临数据歪斜、分布式环境数据传输及须要多个MapReduce作业等挑战,优化MapReduce每个阶段中的资源能够充沛的利用硬件资源性能来晋升MapReduce的效率。

二. IO性能优化:文件类型

1. 优化计划

  1. 针对HDFS最后是为拜访大文件而开发的, 所以会呈现对大量小文件的存储效率不高问题, MapReduce在读取小文件进行解决时,也存在资源节约导致计算效率不高的问题采纳 SequenceFile和MapFile设计一个 HDFS中合并存储小文件的计划。该计划的次要思维是将小文件序列化存入一个 SequenceFIle/MapFile 容器,合并成大文件, 并建设相应的索引文件, 无效升高文件数目和进步拜访效率. 通过和现有的 Hadoop Archives(HAR files)文件归档解决小文件问题的计划比照, 试验结果表明, 基于SequenceFile或者MapFile的存储小文件计划能够更为无效的进步小文件存储性能和缩小HDFS文件系统的节点内存耗费
  2. 针对一般按行存储文本文件,MapReduce在解决实现聚合、过滤等性能时,性能绝对较差,针对行式存储的数据处理性能差的问题,能够抉择应用列式存储的计划来实现数据聚合解决,升高数据传输及读写的IO,进步整体MapReduce计算解决的性能

2. SequenceFile

1. 介绍

SequenceFile是hadoop里用来存储序列化的键值对即二进制的一种文件格式。SequenceFile文件也能够作为MapReduce作业的输出和输入,hive和spark也反对这种格局。
它有如下几个长处:

  • 以二进制的KV模式存储数据,与底层交互更加敌对,性能更快,所以能够在HDFS里存储图像或者更加简单的构造作为KV对。
  • SequenceFile反对压缩和分片。 当你压缩为一个SequenceFile时,并不是将整个文件压缩成一个独自的单元,而是压缩文件里的record或者block of records(块)。因而SequenceFile是可能反对分片的,即便应用的压缩形式如Snappy, Lz4 or Gzip不反对分片,也能够利用SequenceFIle来实现分片。
  • SequenceFile也能够用于存储多个小文件。因为Hadoop自身就是用来解决大型文件的,小文件是不适宜的,所以用一个SequenceFile来存储很多小文件就能够进步解决效率,也能节俭Namenode内存,因为Namenode只需一个SequenceFile的metadata,而不是为每个小文件创建独自的metadata。
  • 因为数据是以SequenceFile模式存储,所以两头输入文件即map输入也会用SequenceFile来存储,能够进步整体的IO开销性能

2. 存储特点

  1. sequenceFile文件是Hadoop用来存储二进制模式的[Key,Value]对而设计的一种立体文件(Flat File)。
  2. 能够把SequenceFile当做是一个容器,把所有的文件打包到SequenceFile类中能够高效的对小文件进行存储和解决。
  3. SequenceFile文件并不依照其存储的Key进行排序存储,SequenceFile的外部类Writer提供了append性能。
  4. SequenceFile中的Key和Value能够是任意类型Writable或者是自定义Writable。
  5. 存储构造上,SequenceFile次要由一个Header后跟多条Record组成,Header次要蕴含了Key classname,value classname,存储压缩算法,用户自定义元数据等信息,此外,还蕴含了一些同步标识,用于疾速定位到记录的边界。每条Record以键值对的形式进行存储,用来示意它的字符数组能够一次解析成:记录的长度、Key的长度、Key值和value值,并且Value值的构造取决于该记录是否被压缩。
  6. 在recourds中,又分为是否压缩格局。当没有被压缩时,key与value应用Serialization序列化写入SequenceFile。当抉择压缩格局时,record的压缩格局与没有压缩其实不尽相同,除了value的bytes被压缩,key是不被压缩的
  7. 在Block中,它使所有的信息进行压缩,压缩的最小大小由配置文件中io.seqfile.compress.blocksize配置项决定。

3. SequenceFile 工具类

  1. SequenceFileOutputFormat
    用于将MapReduce的后果输入为SequenceFile文件
  2. SequenceFileInputFormat
    用于读取SequenceFile文件

4. 生成SequenceFile

  1. 需要:将一般文件转换为SequenceFile文件
  2. 思路

    • 应用TextInputFormat读取一般文件文件
    • Map阶段对读取文件的每一行进行输入
    • Reduce阶段间接输入每条数据
    • 应用SequenceFileOutputFormat将后果保留为SequenceFile
  3. 代码实现

    • Driver类
    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;import java.util.Iterator;/** * @ClassName MrWriteToSequenceFile * @Description TODO 读取文本文件,转换为SequenceFile文件 * @Create By     itcast */public class MrWriteToSequenceFile extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {     // 实例化作业     Job job = Job.getInstance(this.getConf(), "MrWriteToSequenceFile");     // 设置作业的主程序     job.setJarByClass(this.getClass());     // 设置作业的输出为TextInputFormat(一般文本)     job.setInputFormatClass(TextInputFormat.class);     // 设置作业的输出门路     FileInputFormat.addInputPath(job, new Path(args[0]));     // 设置Map端的实现类     job.setMapperClass(WriteSeqFileAppMapper.class);     // 设置Map端输出的Key类型     job.setMapOutputKeyClass(NullWritable.class);     // 设置Map端输出的Value类型     job.setMapOutputValueClass(Text.class);     // 设置作业的输入为SequenceFileOutputFormat     job.setOutputFormatClass(SequenceFileOutputFormat.class);     // 应用SequenceFile的块级别压缩     SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);     // 设置Reduce端的实现类     job.setReducerClass(WriteSeqFileAppReducer.class);     // 设置Reduce端输入的Key类型     job.setOutputKeyClass(NullWritable.class);     // 设置Reduce端输入的Value类型     job.setOutputValueClass(Text.class);     // 从参数中获取输入门路     Path outputDir = new Path(args[1]);     // 如果输入门路已存在则删除     outputDir.getFileSystem(this.getConf()).delete(outputDir, true);     // 设置作业的输入门路     FileOutputFormat.setOutputPath(job, outputDir);// 提交作业并期待执行实现     return job.waitForCompletion(true) ? 0 : 1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     int status = ToolRunner.run(conf, new MrWriteToSequenceFile(), args);     System.exit(status); }}
    • Mapper类
    /** * 定义Mapper类 */public static class WriteSeqFileAppMapper extends Mapper<LongWritable, Text,NullWritable, Text>{ private NullWritable outputKey; @Override protected void setup(Context context) throws IOException, InterruptedException {     this.outputKey = NullWritable.get(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {     context.write(outputKey, value); } @Override protected void cleanup(Context context) throws IOException, InterruptedException {     this.outputKey = null; }}
    • Reduce类
    /** * 定义Reduce类 */public static class WriteSeqFileAppReducer extends Reducer<NullWritable,Text,NullWritable,Text>{ private NullWritable outputKey; @Override protected void setup(Context context) throws IOException, InterruptedException {     this.outputKey = NullWritable.get(); } @Override protected void reduce(NullWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {     Iterator<Text> iterator = value.iterator();     while (iterator.hasNext()) {         context.write(outputKey, iterator.next());     } } @Override protected void cleanup(Context context) throws IOException, InterruptedException {     this.outputKey = null; }}

    5. 读取SequenceFile

    • 需要: 将上一步转换好的SequenceFile再解析转换为一般文本文件内容
    • 思路:

      1. 应用SequenceFileInputformat读取SequenceFile
      2. Map阶段间接输入每一条数据
      3. Reduce阶段间接输入每一条数据
      4. 应用TextOutputFormat将后果保留为一般文本文件
    • 代码实现

      • Driver 类
    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;import java.util.Iterator;/** * @ClassName MrReadFromSequenceFile * @Description TODO 读取SequenceFile文件,转换为一般文本文件 * @Create By     itcast */public class MrReadFromSequenceFile extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {     // 实例化作业     Job job = Job.getInstance(this.getConf(), "MrReadFromSequenceFile");     // 设置作业的主程序     job.setJarByClass(this.getClass());     // 设置作业的输出为SequenceFileInputFormat(SequenceFile文本)     job.setInputFormatClass(SequenceFileInputFormat.class);     // 设置作业的输出门路     SequenceFileInputFormat.addInputPath(job, new Path(args[0]));     // 设置Map端的实现类     job.setMapperClass(ReadSeqFileAppMapper.class);     // 设置Map端输出的Key类型     job.setMapOutputKeyClass(NullWritable.class);     // 设置Map端输出的Value类型     job.setMapOutputValueClass(Text.class);     // 设置作业的输入为TextOutputFormat     job.setOutputFormatClass(TextOutputFormat.class);     // 设置Reduce端的实现类     job.setReducerClass(ReadSeqFileAppReducer.class);     // 设置Reduce端输入的Key类型     job.setOutputKeyClass(NullWritable.class);     // 设置Reduce端输入的Value类型     job.setOutputValueClass(Text.class);     // 从参数中获取输入门路     Path outputDir = new Path(args[1]);     // 如果输入门路已存在则删除     outputDir.getFileSystem(this.getConf()).delete(outputDir, true);     // 设置作业的输入门路      TextOutputFormat.setOutputPath(job, outputDir);     // 提交作业并期待执行实现     return job.waitForCompletion(true) ? 0 : 1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     int status = ToolRunner.run(conf, new MrReadFromSequenceFile(), args);     System.exit(status); }}
    • Mapper类
    /** * 定义Mapper类 */public static class ReadSeqFileAppMapper extends Mapper<NullWritable, Text, NullWritable, Text> { private NullWritable outputKey; @Override protected void setup(Context context) throws IOException, InterruptedException {     this.outputKey = NullWritable.get(); } @Override protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException {     context.write(outputKey, value); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { this.outputKey = null;}
    • Reducer类
    /** * 定义Reduce类 */public static class ReadSeqFileAppReducer extends Reducer<NullWritable,Text,NullWritable,Text>{ private NullWritable outputKey; @Override protected void setup(Context context) throws IOException, InterruptedException {     this.outputKey = NullWritable.get(); } @Override protected void reduce(NullWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {     Iterator<Text> iterator = value.iterator();     while (iterator.hasNext()) {         context.write(outputKey, iterator.next());     } } @Override protected void cleanup(Context context) throws IOException, InterruptedException {     this.outputKey = null; }}

3. MapFile

1. 介绍

能够了解MapFile是排序后的SequenceFile, 通过观察其构造能够看到MapFile由两局部组成别离是data和index。data既存储数据的文件,index作为文件的数据索引,次要记录了每个Record的Key值,以及该Record在文件中的偏移地位。在MapFile被拜访的时候,索引文件会被加载到内存,通过索引映射关系能够迅速定位到指定Record所在文件地位,因而,绝对SequenceFile而言,MapFile的检索效率是最高的,毛病是会耗费一部分内存来存储index数据。

MapFile的数据存储构造:

2. MapFile工具类

  1. MapFileOutputFormat:用于将MapReduce的后果输入为MapFile
  2. MapReduce中没有封装MapFile的读取输出类,可依据状况抉择以下计划来实现

    • 自定义InputFormat,应用MapFileOutputFormat中的getReader办法获取读取对象
    • 应用SequenceFileInputFormat对MapFile的数据进行解析
  3. 生成MapFile文件

    • 需要: 将一般文件转换为MapFile文件
    • 思路:

      • Input读取一个一般文件
      • Map阶段构建随机值作为Key,构建有序
      • Output生成MapFile文件
    • 实现
    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;import java.util.Iterator;import java.util.Random;/** * @ClassName MrWriteToMapFile * @Description TODO 读取文本文件,转换为MapFile文件 * @Create By     itcast */public class MrWriteToMapFile extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {     Configuration conf = getConf();     // 实例化作业     Job job = Job.getInstance(conf, "MrWriteToMapFile");     // 设置作业的主程序     job.setJarByClass(this.getClass());     // 设置作业的输出为TextInputFormat(一般文本)     job.setInputFormatClass(TextInputFormat.class);     // 设置作业的输出门路     FileInputFormat.addInputPath(job, new Path(args[0]));     // 设置Map端的实现类     job.setMapperClass(WriteMapFileAppMapper.class);     // 设置Map端输出的Key类型     job.setMapOutputKeyClass(IntWritable.class);     // 设置Map端输出的Value类型     job.setMapOutputValueClass(Text.class);     // 设置作业的输入为MapFileOutputFormat    job.setOutputFormatClass(MapFileOutputFormat.class);     // 设置Reduce端的实现类     job.setReducerClass(WriteMapFileAppReducer.class);     // 设置Reduce端输入的Key类型     job.setOutputKeyClass(IntWritable.class);     // 设置Reduce端输入的Value类型     job.setOutputValueClass(Text.class);     // 从参数中获取输入门路     Path outputDir = new Path(args[1]);     // 如果输入门路已存在则删除     outputDir.getFileSystem(conf).delete(outputDir, true);     // 设置作业的输入门路     MapFileOutputFormat.setOutputPath(job, outputDir);     // 提交作业并期待执行实现     return job.waitForCompletion(true) ? 0 : 1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     int status = ToolRunner.run(conf, new MrWriteToMapFile(), args);     System.exit(status); } /**  * 定义Mapper类  */ public static class WriteMapFileAppMapper extends Mapper<LongWritable, Text, IntWritable, Text>{     //定义输入的Key,每次随机生成一个值     private IntWritable outputKey = new IntWritable();     @Override     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         //随机生成一个数值         Random random = new Random();         this.outputKey.set(random.nextInt(100000));         context.write(outputKey, value);     } /**  * 定义Reduce类  */ public static class WriteMapFileAppReducer extends Reducer<IntWritable,Text,IntWritable,Text>{     @Override     protected void reduce(IntWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {         Iterator<Text> iterator = value.iterator();         while (iterator.hasNext()) {             context.write(key, iterator.next());         }     } }}
    1. 读取MapFile文件

      • 需要:将MapFile解析为一般文件内容
      • 思路:

        • input读取MapFile,留神,Hadoop没有提供MapFileInputFormat,所以应用SequenceFileInputFormat来解析,或者能够自定义InputFormat
        • Map和Reduce间接输入
        • Output将后果保留为一般文件
      • 实现
    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;import java.util.Iterator;/** * @ClassName MrReadFromMapFile * @Description TODO 读取MapFile文件,转换为一般文本文件 * @Create By     itcast */public class MrReadFromMapFile extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {     // 实例化作业     Job job = Job.getInstance(this.getConf(), "MrReadFromMapFile");     // 设置作业的主程序     job.setJarByClass(this.getClass());     // 设置作业的输出为SequenceFileInputFormat(Hadoop没有间接提供MapFileInput)     job.setInputFormatClass(SequenceFileInputFormat.class);     // 设置作业的输出门路     SequenceFileInputFormat.addInputPath(job, new Path(args[0]));     // 设置Map端的实现类     job.setMapperClass(ReadMapFileAppMapper.class);     // 设置Map端输出的Key类型     job.setMapOutputKeyClass(NullWritable.class);     // 设置Map端输出的Value类型     job.setMapOutputValueClass(Text.class);     // 设置作业的输入为SequenceFileOutputFormat     job.setOutputFormatClass(TextOutputFormat.class);     // 设置Reduce端的实现类     job.setReducerClass(ReadMapFileAppReducer.class);     // 设置Reduce端输入的Key类型      job.setOutputKeyClass(NullWritable.class);     // 设置Reduce端输入的Value类型     job.setOutputValueClass(Text.class);     // 从参数中获取输入门路     Path outputDir = new Path(args[1]);     // 如果输入门路已存在则删除     outputDir.getFileSystem(this.getConf()).delete(outputDir, true);     // 设置作业的输入门路     TextOutputFormat.setOutputPath(job, outputDir);     // 提交作业并期待执行实现     return job.waitForCompletion(true) ? 0 : 1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     int status = ToolRunner.run(conf, new MrReadFromMapFile(), args);     System.exit(status); } /**  * 定义Mapper类  */ public static class ReadMapFileAppMapper extends Mapper<IntWritable, Text, NullWritable, Text> {     private NullWritable outputKey = NullWritable.get();     @Override     protected void map(IntWritable key, Text value, Context context) throws IOException, InterruptedException {         context.write(outputKey, value);     } } /**  * 定义Reduce类 */ public static class ReadMapFileAppReducer extends Reducer<NullWritable,Text,NullWritable,Text>{     @Override     protected void reduce(NullWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {         Iterator<Text> iterator = value.iterator();         while (iterator.hasNext()) {             context.write(key, iterator.next());         }     } }}

4. ORCFile

1. ORC介绍

ORC(OptimizedRC File)文件格式是一种Hadoop生态圈中的列式存储格局,源自于RC(RecordColumnar File),它的产生早在2013年初,最后产生自Apache Hive,用于升高Hadoop数据存储空间和减速Hive查问速度。它并不是一个单纯的列式存储格局,依然是首先依据行组宰割
整个表,在每一个行组内进行按列存储。ORC文件是自描述的,它的元数据应用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以升高存储空间的耗费,目前也被Spark SQL、Presto等查问引擎反对。2015年ORC我的项目被Apache我的项目基金会晋升为Apache顶级我的项目。

ORC文件中保留了三个层级的统计信息,别离为文件级别、stripe级别和row group级别的,他们都能够用来依据Search ARGuments(谓词下推条件)判断是否能够跳过某些数据,在统计信息中都蕴含成员数和是否有null值,并且对于不同类型的数据设置一些特定的统计信息。

性能测试:

  • 原始Text格局,未压缩 : 38.1 G
  • ORC格局,默认压缩(ZLIB): 11.5 G
  • Parquet格局,默认压缩(Snappy):14.8 G
  • 测试比照:简单数据Join关联测试

2. ORCFile工具类

1. 增加ORC与MapReduce集成的Maven依赖
 <dependency>            <groupId>org.apache.orc</groupId>            <artifactId>orc-mapreduce</artifactId>            <version>1.6.3</version> </dependency>
2. 生成ORC文件
  1. 需要: 将一般文件转换为ORC文件
  2. 实现思路:

    • Input阶段读取一般文件
    • Map:阶段间接输入数据,没有Reduce阶段
    • Output阶段应用OrcOutputFormat保留为ORC文件类型
  3. 实现

    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.orc.OrcConf;import org.apache.orc.TypeDescription;import org.apache.orc.mapred.OrcStruct;import org.apache.orc.mapreduce.OrcOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;/** * @ClassName WriteOrcFileApp * @Description TODO 用于读取一般文本文件转换为ORC文件 */public class WriteOrcFileApp extends Configured implements Tool { // 作业名称 private static final String JOB_NAME = WriteOrcFileApp.class.getSimpleName(); //构建日志监听 private static final Logger LOG = LoggerFactory.getLogger(WriteOrcFileApp.class); //定义数据的字段信息 private static final String SCHEMA = "struct<id:string,type:string,orderID:string,bankCard:string,cardType:string,ctime:string,utime:string,remark:string>"; /**  * 重写Tool接口的run办法,用于提交作业  * @param args  * @return  * @throws Exception  */ public int run(String[] args) throws Exception {     // 设置Schema     OrcConf.MAPRED_OUTPUT_SCHEMA.setString(this.getConf(), SCHEMA);     // 实例化作业     Job job = Job.getInstance(this.getConf(), JOB_NAME);     // 设置作业的主程序     job.setJarByClass(WriteOrcFileApp.class);     // 设置作业的Mapper类     job.setMapperClass(WriteOrcFileAppMapper.class);     // 设置作业的输出为TextInputFormat(一般文本)     job.setInputFormatClass(TextInputFormat.class);     // 设置作业的输入为OrcOutputFormat     job.setOutputFormatClass(OrcOutputFormat.class);  // 设置作业应用0个Reduce(间接从map端输入)     job.setNumReduceTasks(0);     // 设置作业的输出门路     FileInputFormat.addInputPath(job, new Path(args[0]));     // 从参数中获取输入门路     Path outputDir = new Path(args[1]);     // 如果输入门路已存在则删除     outputDir.getFileSystem(this.getConf()).delete(outputDir, true);     // 设置作业的输入门路     OrcOutputFormat.setOutputPath(job, outputDir);     // 提交作业并期待执行实现     return job.waitForCompletion(true) ? 0 : 1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     int status = ToolRunner.run(conf, new WriteOrcFileApp(), args);     System.exit(status); } /**  * 实现Mapper类  */ public static class WriteOrcFileAppMapper extends Mapper<LongWritable, Text, NullWritable, OrcStruct> {     //获取字段形容信息     private TypeDescription schema = TypeDescription.fromString(SCHEMA);     //构建输入的Key     private final NullWritable outputKey = NullWritable.get();     //构建输入的Value为ORCStruct类型     private final OrcStruct outputValue = (OrcStruct) OrcStruct.createValue(schema);     public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException {         //将读取到的每一行数据进行宰割,失去所有字段         String[] fields = value.toString().split(",",8);         //将所有字段赋值给Value中的列         outputValue.setFieldValue(0, new Text(fields[0]));           outputValue.setFieldValue(1, new Text(fields[1]));         outputValue.setFieldValue(2, new Text(fields[2]));         outputValue.setFieldValue(3, new Text(fields[3]));         outputValue.setFieldValue(4, new Text(fields[4]));         outputValue.setFieldValue(5, new Text(fields[5]));         outputValue.setFieldValue(6, new Text(fields[6]));         outputValue.setFieldValue(7, new Text(fields[7]));         //输入KeyValue         output.write(outputKey, outputValue);     } }}

问题:
报错:短少orc-mapreduce的jar包依赖

解决:将orc-MapReduce的jar包增加到Hadoop的环境变量中,所有NodeManager节点都要增加

cp  orc-shims-1.6.3.jar  orc-core-1.6.3.jar  orc-mapreduce-1.6.3.jar aircompressor-0.15.jar  hive-storage-api-2.7.1.jar /export/server/hadoop-3.1.4/share/hadoop/mapreduce/

3. 读取ORC文件
  1. 需要: 读取ORC文件,还原成一般文本文件
  2. 思路:

    • Input阶段读取上一步当中生成的ORC文件
    • Map阶段间接读取输入
    • Output阶段将后果保留为一般文本文件
  3. 实现

    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.orc.mapred.OrcStruct;import org.apache.orc.mapreduce.OrcInputFormat;import java.io.IOException;/** * @ClassName ReadOrcFileApp * @Description TODO 读取ORC文件进行解析还原成一般文本文件 */public class ReadOrcFileApp extends Configured implements Tool { // 作业名称 private static final String JOB_NAME = WriteOrcFileApp.class.getSimpleName(); /**  * 重写Tool接口的run办法,用于提交作业  * @param args  * @return  * @throws Exception  */ public int run(String[] args) throws Exception { // 实例化作业     Job job = Job.getInstance(this.getConf(), JOB_NAME);     // 设置作业的主程序     job.setJarByClass(ReadOrcFileApp.class);     // 设置作业的输出为OrcInputFormat     job.setInputFormatClass(OrcInputFormat.class);     // 设置作业的输出门路     OrcInputFormat.addInputPath(job, new Path(args[0]));     // 设置作业的Mapper类     job.setMapperClass(ReadOrcFileAppMapper.class);     // 设置作业应用0个Reduce(间接从map端输入)     job.setNumReduceTasks(0);     // 设置作业的输出为TextOutputFormat     job.setOutputFormatClass(TextOutputFormat.class);     // 从参数中获取输入门路     Path outputDir = new Path(args[1]);     // 如果输入门路已存在则删除     outputDir.getFileSystem(this.getConf()).delete(outputDir, true);     // 设置作业的输入门路     FileOutputFormat.setOutputPath(job, outputDir);     // 提交作业并期待执行实现     return job.waitForCompletion(true) ? 0 : 1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     int status = ToolRunner.run(conf, new ReadOrcFileApp(), args);     System.exit(status); } /**  * 实现Mapper类  */ public static class ReadOrcFileAppMapper extends Mapper<NullWritable, OrcStruct, NullWritable, Text> {     private NullWritable outputKey;     private Text outputValue;     @Override  protected void setup(Context context) throws IOException, InterruptedException {         outputKey = NullWritable.get();         outputValue = new Text();     }     public void map(NullWritable key, OrcStruct value, Context output) throws IOException, InterruptedException {         //将ORC中的每条数据转换为Text对象         this.outputValue.set(                 value.getFieldValue(0).toString()+","+                         value.getFieldValue(1).toString()+","+                         value.getFieldValue(2).toString()+","+                         value.getFieldValue(3).toString()+","+                         value.getFieldValue(4).toString()+","+                         value.getFieldValue(5).toString()+","+                         value.getFieldValue(6).toString()+","+                         value.getFieldValue(7).toString()         );         //输入后果         output.write(outputKey, outputValue);     } }}