一. 压缩优化设计

运行MapReduce程序时,磁盘I/O操作、网络数据传输、shuffle和merge要花大量的工夫,尤其是数据规模很大和工作负载密集的状况下,鉴于磁盘I/O和网络带宽是Hadoop的贵重资源,数据压缩对于节俭资源、最小化磁盘I/O和网络传输十分有帮忙。如果磁盘I/O和网络带宽影响了MapReduce作业性能,在任意MapReduce阶段启用压缩都能够改善端到端解决工夫并缩小I/O和网络流量。

压缩是mapreduce的一种优化策略:通过压缩编码对mapper或者reducer的输入进行压缩,
以缩小磁盘IO,进步MR程序运行速度,它的优缺点如下:
压缩的长处:

  • 缩小文件存储所占空间
  • 放慢文件传输效率,从而进步零碎的处理速度
  • 升高IO读写的次数

压缩的毛病

  • 用数据时须要先对文件解压,减轻CPU负荷,压缩算法越简单,解压工夫越长

二. 压缩反对

1. 查看Hadoop反对的压缩算法:hadoop checknative

2. Hadoop反对的压缩算法

3. 各压缩算法压缩性能比照

压缩算法长处毛病
Gzip压缩比在四种压缩形式中较高;hadoop自身反对,在利用中解决gzip格局的文件就和间接解决文本一样;有hadoop native库;大部分linux零碎都自带gzip命令,使用方便不反对split
Lzo压缩/解压速度也比拟快,正当的压缩率;反对split,是hadoop中最风行的压缩格局;反对hadoop native库;须要在linux零碎下自行装置lzop命令,使用方便压缩率比gzip要低;hadoop自身不反对,须要装置;lzo尽管反对split,但须要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个一般文件(为了反对split须要建索引,须要指定inputformat为lzo格局)
Bzip2反对split;具备很高的压缩率,比gzip压缩率都高;hadoop自身反对,但不反对native;在linux零碎下自带bzip2命令,使用方便缩/解压速度慢;不反对native
Snappy压缩速度快;反对hadoop native库不反对split;压缩比低;hadoop自身不反对,须要装置;linux零碎下没有对应的命令

4. 同样大小的数据对应压缩比

5. 压缩工夫和解压工夫


从以上比照能够看出:压缩比越高,压缩工夫越长,该当抉择压缩比与压缩工夫中等的压缩算法

三.Gzip压缩

1. 生成Gzip压缩文件

1. 需要:读取一般文本文件,将一般文本文件压缩为Gzip格局

2. 思路

  1. Input读取一般文本文件
  2. Map和Reduce间接输入
  3. 配置Output输
  4. 出压缩为Gzip格局

3.代码实现

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * @ClassName MRWriteGzip * @Description TODO 读取一般文件数据,对数据以Gzip格局进行压缩 */public class MRWriteGzip extends Configured implements Tool {    //构建、配置、提交一个 MapReduce的Job    public int run(String[] args) throws Exception {        //构建Job        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());        job.setJarByClass(MRWriteGzip.class);        //input:配置输出        Path inputPath = new Path(args[0]);        TextInputFormat.setInputPaths(job,inputPath);        //map:配置Map        job.setMapperClass(MrMapper.class);        job.setMapOutputKeyClass(NullWritable.class);        job.setMapOutputValueClass(Text.class);        //reduce:配置Reduce        job.setReducerClass(MrReduce.class);        job.setOutputKeyClass(NullWritable.class);        job.setOutputValueClass(Text.class);   //output:配置输入        Path outputPath = new Path(args[1]);        TextOutputFormat.setOutputPath(job,outputPath);        return job.waitForCompletion(true) ? 0 : -1;    }    //程序入口,调用run    public static void main(String[] args) throws Exception {        //用于治理以后程序的所有配置        Configuration conf = new Configuration();        //配置输入后果压缩为Gzip格局        conf.set("mapreduce.output.fileoutputformat.compress","true");        conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec");        //调用run办法,提交运行Job        int status = ToolRunner.run(conf, new MRWriteGzip(), args);        System.exit(status);    }    /**     * 定义Mapper类     */    public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{        private NullWritable outputKey = NullWritable.get();        @Override        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            //间接输入每条数据            context.write(this.outputKey,value);        }    }    /**     * 定义Reduce类     */    public static class MrReduce extends Reducer<NullWritable,Text,NullWritable, Text> {        @Override        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {            //间接输入每条数据            for (Text value : values) {                context.write(key, value);            }        }    }}

2. 读取Gzip压缩文件

1. 需要:读取Gzip压缩文件,还原为一般文本文件

2. 思路

  1. Input间接读取上一步的压缩后果文件
  2. Map和Reduce间接输入
  3. Output将后果保留为一般文本文件

    3.代码开发

    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * @ClassName MRReadGzip * @Description TODO 读取Gzip格局的数据,还原为一般文本文件 */public class MRReadGzip extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {     //构建Job     Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());     job.setJarByClass(MRReadGzip.class);     //input:配置输出     Path inputPath = new Path(args[0]);     TextInputFormat.setInputPaths(job,inputPath);     //map:配置Map     job.setMapperClass(MrMapper.class);     job.setMapOutputKeyClass(NullWritable.class);     job.setMapOutputValueClass(Text.class);     //reduce:配置Reduce     job.setReducerClass(MrReduce.class);     job.setOutputKeyClass(NullWritable.class);     job.setOutputValueClass(Text.class);     //output:配置输入     Path outputPath = new Path(args[1]);     TextOutputFormat.setOutputPath(job,outputPath);     return job.waitForCompletion(true) ? 0 : -1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     //配置输入后果压缩为Gzip格局//        conf.set("mapreduce.output.fileoutputformat.compress","true");//        conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec"); //调用run办法,提交运行Job     int status = ToolRunner.run(conf, new MRReadGzip(), args);     System.exit(status); } /**  * 定义Mapper类  */ public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{     private NullWritable outputKey = NullWritable.get();     @Override     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         //间接输入每条数据         context.write(this.outputKey,value);     } } /**  * 定义Reduce类  */ public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {     @Override     protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {         //间接输入每条数据         for (Text value : values) {             context.write(key, value);         }     } }}

    三. Snappy压缩

    1. 配置Hadoop反对Snappy

    Hadoop反对Snappy类型的压缩算法,并且也是最罕用的一种压缩算法,然而Hadoop官网已编译的安装包中并没有提供Snappy的反对,所以如果想应用Snappy压缩,必须下载Hadoop源码,本人进行编译,在编译时增加Snappy的反对,具体编译过程请参考《Hadoop3编译装置》手册。

2. 生成Snappy压缩文件:Map输入不压缩

1. 需要:读取一般文本文件,转换为Snappy压缩文件

2. 思路

  1. Input读取一般文本文件
  2. Map和Reduce间接输入
  3. Output配置输入压缩为Snappy类型

    3. 代码开发

    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * @ClassName MRWriteSnappy * @Description TODO 读取一般文件数据,对数据以Snappy格局进行压缩 */public class MRWriteSnappy extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {     //构建Job     Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());     job.setJarByClass(MRWriteSnappy.class);     //input:配置输出     Path inputPath = new Path(args[0]);     TextInputFormat.setInputPaths(job,inputPath);   //map:配置Map     job.setMapperClass(MrMapper.class);     job.setMapOutputKeyClass(NullWritable.class);     job.setMapOutputValueClass(Text.class);     //reduce:配置Reduce     job.setReducerClass(MrReduce.class);     job.setOutputKeyClass(NullWritable.class);     job.setOutputValueClass(Text.class);     //output:配置输入     Path outputPath = new Path(args[1]);     TextOutputFormat.setOutputPath(job,outputPath);     return job.waitForCompletion(true) ? 0 : -1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     //配置输入后果压缩为Snappy格局     conf.set("mapreduce.output.fileoutputformat.compress","true");     conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");     //调用run办法,提交运行Job     int status = ToolRunner.run(conf, new MRWriteSnappy(), args);     System.exit(status); } /**  * 定义Mapper类  */ public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{     private NullWritable outputKey = NullWritable.get(); @Override     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         //间接输入每条数据         context.write(this.outputKey,value);     } } /**  * 定义Reduce类  */ public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {     @Override     protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {         //间接输入每条数据         for (Text value : values) {             context.write(key, value);         }     } }}

2. 生成Snappy压缩文件:Map输入压缩

1.需要:读取一般文本文件,转换为Snappy压缩文件,并对Map输入的后果应用Snappy压缩

2. 思路: 将上一步的代码中增加Map输入压缩的配置

3. 代码开发

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * @ClassName MRMapOutputSnappy * @Description TODO 读取一般文件数据,对Map输入的数据以Snappy格局进行压缩 */public class MRMapOutputSnappy extends Configured implements Tool {    //构建、配置、提交一个 MapReduce的Job    public int run(String[] args) throws Exception {        //构建Job        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());        job.setJarByClass(MRMapOutputSnappy.class);        //input:配置输出        Path inputPath = new Path(args[0]);        TextInputFormat.setInputPaths(job,inputPath);        //map:配置Map        job.setMapperClass(MrMapper.class);        job.setMapOutputKeyClass(NullWritable.class);        job.setMapOutputValueClass(Text.class);        //reduce:配置Reduce        job.setReducerClass(MrReduce.class);        job.setOutputKeyClass(NullWritable.class);        job.setOutputValueClass(Text.class);        //output:配置输入        Path outputPath = new Path(args[1]);        TextOutputFormat.setOutputPath(job,outputPath);  return job.waitForCompletion(true) ? 0 : -1;    }    //程序入口,调用run    public static void main(String[] args) throws Exception {        //用于治理以后程序的所有配置        Configuration conf = new Configuration();        //配置Map输入后果压缩为Snappy格局        conf.set("mapreduce.map.output.compress","true");        conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");        //配置Reduce输入后果压缩为Snappy格局        conf.set("mapreduce.output.fileoutputformat.compress","true");        conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");        //调用run办法,提交运行Job        int status = ToolRunner.run(conf, new MRMapOutputSnappy(), args);        System.exit(status);    }    /**     * 定义Mapper类     */    public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{        private NullWritable outputKey = NullWritable.get();        @Override        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            //间接输入每条数据            context.write(this.outputKey,value);        }    }    /**     * 定义Reduce类     */   public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {        @Override        protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {            //间接输入每条数据            for (Text value : values) {                context.write(key, value);            }        }    }}

4. 读取Snappy压缩文件

1. 需要:读取上一步生成的Snappy文件,还原为一般文本文件

2. 思路:

  1. Input读取Snappy文件
  2. Map和Reduce间接输入
  3. Output间接输入为一般文本类型

    3. 代码:

    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * @ClassName MRReadSnappy * @Description TODO 读取Snappy格局的数据,还原为一般文本文件 */public class MRReadSnappy extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {     //构建Job     Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());     job.setJarByClass(MRReadSnappy.class);     //input:配置输出     Path inputPath = new Path(args[0]);     TextInputFormat.setInputPaths(job,inputPath);     //map:配置Map     job.setMapperClass(MrMapper.class);     job.setMapOutputKeyClass(NullWritable.class);     job.setMapOutputValueClass(Text.class);     //reduce:配置Reduce     job.setReducerClass(MrReduce.class);     job.setOutputKeyClass(NullWritable.class);     job.setOutputValueClass(Text.class);     //output:配置输入     Path outputPath = new Path(args[1]);     TextOutputFormat.setOutputPath(job,outputPath);     return job.waitForCompletion(true) ? 0 : -1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     //调用run办法,提交运行Job     int status = ToolRunner.run(conf, new MRReadSnappy(), args); System.exit(status); } /**  * 定义Mapper类  */ public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{     private NullWritable outputKey = NullWritable.get();     @Override     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         //间接输入每条数据         context.write(this.outputKey,value);     } } /**  * 定义Reduce类  */ public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {     @Override     protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {         //间接输入每条数据         for (Text value : values) {             context.write(key, value);         }     } }}

    五. Lzo压缩

    1. 配置Hadoop反对Lzo

    Hadoop自身不反对Lzo类型的压缩,须要额定独自装置,并在编译时增加Lzo的压缩算法反对,编译过程请参考编译手册《Apache Hadoop3-1-3编译装置部署lzo压缩指南》。
    编译实现后,请实现以下配置,让以后的Hadoop反对Lzo压缩

    • 增加lzo反对jar包
    cp hadoop-lzo-0.4.21-SNAPSHOT.jar /export/server/hadoop-3.1.4/share/hadoop/common/

    • 同步到所有节点
    cd  /export/server/hadoop-3.1.4/share/hadoop/common/scp hadoop-lzo-0.4.21-SNAPSHOT.jar node2:$PWDscp hadoop-lzo-0.4.21-SNAPSHOT.jar node3:$PWD
    • 批改core-site.xml
    <property> <name>io.compression.codecs</name><value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value></property><property> <name>io.compression.codec.lzo.class</name> <value>com.hadoop.compression.lzo.LzoCodec</value></property>
    • 同步core-site.xml到其余所有节点
    cd  /export/server/hadoop-3.1.4/etc/hadoopscp  core-site.xml node2:$PWDscp  core-site.xml node3:$PWD
    • 重新启动Hadoop集群

    2.生成Lzo压缩文件

    1. 需要:读取一般文本文件,生成Lzo压缩后果文件

    2. 思路

  4. 读取一般文本文件
  5. Map和Reduce间接输入
  6. 配置Output输入压缩为Lzo类型

    3. 代码开发

    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * @ClassName MRWriteLzo * @Description TODO 读取一般文件数据,对数据以Lzo格局进行压缩 */public class MRWriteLzo extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {     //构建Job     Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());     job.setJarByClass(MRWriteLzo.class);     //input:配置输出     Path inputPath = new Path(args[0]);     TextInputFormat.setInputPaths(job,inputPath);  //map:配置Map     job.setMapperClass(MrMapper.class);     job.setMapOutputKeyClass(NullWritable.class);     job.setMapOutputValueClass(Text.class);     //reduce:配置Reduce     job.setReducerClass(MrReduce.class);     job.setOutputKeyClass(NullWritable.class);     job.setOutputValueClass(Text.class);     //output:配置输入     Path outputPath = new Path(args[1]);     TextOutputFormat.setOutputPath(job,outputPath);     return job.waitForCompletion(true) ? 0 : -1; } //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     //配置输入后果压缩为Lzo格局     conf.set("mapreduce.output.fileoutputformat.compress","true");     conf.set("mapreduce.output.fileoutputformat.compress.codec","com.hadoop.compression.lzo.LzopCodec");     //调用run办法,提交运行Job     int status = ToolRunner.run(conf, new MRWriteLzo(), args);     System.exit(status); } /**  * 定义Mapper类  */ public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{     private NullWritable outputKey = NullWritable.get();     @Override     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         //间接输入每条数据         context.write(this.outputKey,value);     } } /**  * 定义Reduce类  */ public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {     @Override     protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {         //间接输入每条数据         for (Text value : values) {             context.write(key, value);         }     } }}

    3. 读取Lzo压缩文件

    1. 需要:读取Lzo压缩文件,复原为一般文本文件

    2. 代码开发

    import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * @ClassName MRReadLzo * @Description TODO 读取Lzo格局的数据,还原为一般文本文件 */public class MRReadLzo extends Configured implements Tool { //构建、配置、提交一个 MapReduce的Job public int run(String[] args) throws Exception {     //构建Job     Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());     job.setJarByClass(MRReadLzo.class);     //input:配置输出     Path inputPath = new Path(args[0]);     TextInputFormat.setInputPaths(job,inputPath);     //map:配置Map     job.setMapperClass(MrMapper.class);     job.setMapOutputKeyClass(NullWritable.class);     job.setMapOutputValueClass(Text.class);     //reduce:配置Reduce     job.setReducerClass(MrReduce.class);     job.setOutputKeyClass(NullWritable.class);     job.setOutputValueClass(Text.class);     //output:配置输入     Path outputPath = new Path(args[1]);     TextOutputFormat.setOutputPath(job,outputPath);     return job.waitForCompletion(true) ? 0 : -1; }  //程序入口,调用run public static void main(String[] args) throws Exception {     //用于治理以后程序的所有配置     Configuration conf = new Configuration();     //配置输入后果压缩为Gzip格局//        conf.set("mapreduce.output.fileoutputformat.compress","true");//        conf.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.GzipCodec");     //调用run办法,提交运行Job     int status = ToolRunner.run(conf, new MRReadLzo(), args);     System.exit(status); } /**  * 定义Mapper类  */ public static class MrMapper extends Mapper<LongWritable, Text, NullWritable, Text>{     private NullWritable outputKey = NullWritable.get();     @Override     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {         //间接输入每条数据         context.write(this.outputKey,value);     } } /**  * 定义Reduce类  */ public static class MrReduce extends Reducer<NullWritable, Text,NullWritable, Text> {     @Override     protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {         //间接输入每条数据  for (Text value : values) {             context.write(key, value);         }     } }}