乐趣区

关于mapreduce:四Hadoop之MapReduce实战小例子

输出数据文件 AvgTemperature.txt

DATE,HOUR,COND,PRES,HUM,TMP,AQI,PM2.5,PM10
20160602,00, 霾,1984,130,9,390,348,300
20160802,01, 霾,1163,81,8,393,368,302
20160706,02, 霾,1079,108,17,360,394,306
20160706,03, 霾,1116,79,6,339,387,303
20160502,04, 霾,1198,98,16,357,325,307 
20160602,05, 霾,1762,126,9,324,316,301
20160408,06, 霾,1996,131,3,349,344,301
20160604,07, 霾,1952,119,26,347,300,309
20160105,08, 霾,1410,81,8,350,395,307
20160104,09, 霾,1718,130,4,352,335,308
20160501,10, 霾,1714,119,27,310,336,307
20160601,11, 霾,1660,130,23,311,364,302
20160606,12, 霾,1598,96,12,369,346,309
20160602,13, 霾,1673,127,2,343,346,303
20160706,14, 霾,1578,122,8,360,323,307
20160707,15, 霾,1237,118,12,384,384,301
20160205,16, 霾,1231,78,9,361,357,302
20160605,17, 霾,1166,86,30,350,388,307
20160506,18, 霾,1426,94,2,378,372,305
20160805,19, 霾,1874,144,20,376,327,302
20160405,20, 霾,1778,94,22,360,335,304
20160104,21, 霾,1055,64,22,376,361,305
20160304,22, 霾,1349,78,15,367,384,308
20160203,23, 霾,2004,110,2,359,371,304
20160603,24, 霾,1375,115,19,308,301,308
20160402,25, 霾,1201,69,5,387,342,305
20160707,26, 霾,1272,112,23,348,333,307
20160702,27, 霾,1738,60,12,393,300,303
20160301,28, 霾,1752,107,12,364,331,301
20160704,29, 霾,1442,65,9,332,369,308

第一题:编写月平均气温统计程序

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class AvgTemperature {

    public static class StatMapper extends Mapper<Object, Text, Text, IntWritable> {private IntWritable intValue = new IntWritable();
        private Text dateKey = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] items = value.toString().split(",");
            String date = items[0];
            String tmp = items[5];
            if(!"DATE".equals(date) && !"N/A".equals(tmp)){// 排除第一行阐明以及未取到数据的行
                dateKey.set(date.substring(0, 6));
                intValue.set(Integer.parseInt(tmp));
                context.write(dateKey, intValue);
            }
        }
    }

    public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int tmp_sum = 0;
            int count = 0;
            for(IntWritable val : values){tmp_sum += val.get();
                count++;
            }
            int tmp_avg = tmp_sum/count;
            result.set(tmp_avg);
            context.write(key, result);
        }
    }

    public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();
        Job job = new Job(conf, "AvgTemperature");
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setJarByClass(AvgTemperature.class);

        job.setMapperClass(StatMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(StatReducer.class);
        job.setPartitionerClass(HashPartitioner.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        TextOutputFormat.setOutputPath(job, new Path(args[1]));
        TextInputFormat.setInputPaths(job, args[0]);
        job.setNumReduceTasks(Integer.parseInt(args[2]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

运行后果:

201601    11
201602    5
201603    13
201604    10
201605    15
201606    16
201607    12
201608    14

第二题:编写每日空气质量统计程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

import java.io.IOException;

/**
 * @Author Natasha
 * @Description
 * @Date 2020/10/30 20:37
 **/
public class AirQuality {
    public static class AirQualityMapprer extends Mapper<Object, Text, Text, IntWritable>{private Text text = new Text();
        private IntWritable intWritable = new IntWritable();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] item = value.toString().split(",");

            String date = item[0];
            String kongqi = item[6];
            if(!"DATE".equals(date) && !"N/A".equals(kongqi)){// 排除第一行阐明以及未取到数据的行
                text.set(date.substring(0, 6));
                intWritable.set(Integer.parseInt(kongqi));
                context.write(text, intWritable);
            }
        }
    }

    public static class AirQualityReducer extends Reducer<Text, IntWritable, Text, IntWritable>{private IntWritable res = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
            int aqi = 0;
            int cnt = 0;
            for(IntWritable iw : value){aqi += iw.get();
                cnt++;
            }
            int aqi_avg = aqi/cnt;
            res.set(aqi_avg);
            context.write(key, res);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();
        Job job = new Job(conf, "AirQuality");

        job.setJarByClass(AirQuality.class);
        job.setInputFormatClass(TextInputFormat.class);

        job.setMapperClass(AirQualityMapprer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setPartitionerClass(HashPartitioner.class);

        job.setReducerClass(AirQualityReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setNumReduceTasks(Integer.parseInt(args[2]));

        TextInputFormat.setInputPaths(job, args[0]);
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

运行后果:

201601    359
201602    360
201603    365
201604    365
201605    348
201606    342
201607    359
201608    384
退出移动版