输出数据文件 AvgTemperature.txt

DATE,HOUR,COND,PRES,HUM,TMP,AQI,PM2.5,PM1020160602,00,霾,1984,130,9,390,348,30020160802,01,霾,1163,81,8,393,368,30220160706,02,霾,1079,108,17,360,394,30620160706,03,霾,1116,79,6,339,387,30320160502,04,霾,1198,98,16,357,325,307 20160602,05,霾,1762,126,9,324,316,30120160408,06,霾,1996,131,3,349,344,30120160604,07,霾,1952,119,26,347,300,30920160105,08,霾,1410,81,8,350,395,30720160104,09,霾,1718,130,4,352,335,30820160501,10,霾,1714,119,27,310,336,30720160601,11,霾,1660,130,23,311,364,30220160606,12,霾,1598,96,12,369,346,30920160602,13,霾,1673,127,2,343,346,30320160706,14,霾,1578,122,8,360,323,30720160707,15,霾,1237,118,12,384,384,30120160205,16,霾,1231,78,9,361,357,30220160605,17,霾,1166,86,30,350,388,30720160506,18,霾,1426,94,2,378,372,30520160805,19,霾,1874,144,20,376,327,30220160405,20,霾,1778,94,22,360,335,30420160104,21,霾,1055,64,22,376,361,30520160304,22,霾,1349,78,15,367,384,30820160203,23,霾,2004,110,2,359,371,30420160603,24,霾,1375,115,19,308,301,30820160402,25,霾,1201,69,5,387,342,30520160707,26,霾,1272,112,23,348,333,30720160702,27,霾,1738,60,12,393,300,30320160301,28,霾,1752,107,12,364,331,30120160704,29,霾,1442,65,9,332,369,308

第一题:编写月平均气温统计程序

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class AvgTemperature {    public static class StatMapper extends Mapper<Object, Text, Text, IntWritable> {        private IntWritable intValue = new IntWritable();        private Text dateKey = new Text();        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {            String[] items = value.toString().split(",");            String date = items[0];            String tmp = items[5];            if(!"DATE".equals(date) && !"N/A".equals(tmp)){//排除第一行阐明以及未取到数据的行                dateKey.set(date.substring(0, 6));                intValue.set(Integer.parseInt(tmp));                context.write(dateKey, intValue);            }        }    }    public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable> {        private IntWritable result = new IntWritable();        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {            int tmp_sum = 0;            int count = 0;            for(IntWritable val : values){                tmp_sum += val.get();                count++;            }            int tmp_avg = tmp_sum/count;            result.set(tmp_avg);            context.write(key, result);        }    }    public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        Job job = new Job(conf, "AvgTemperature");        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        job.setJarByClass(AvgTemperature.class);        job.setMapperClass(StatMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        job.setReducerClass(StatReducer.class);        job.setPartitionerClass(HashPartitioner.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        TextOutputFormat.setOutputPath(job, new Path(args[1]));        TextInputFormat.setInputPaths(job, args[0]);        job.setNumReduceTasks(Integer.parseInt(args[2]));        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

运行后果:

201601    11201602    5201603    13201604    10201605    15201606    16201607    12201608    14

第二题:编写每日空气质量统计程序

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;import java.io.IOException;/** * @Author Natasha * @Description * @Date 2020/10/30 20:37 **/public class AirQuality {    public static class AirQualityMapprer extends Mapper<Object, Text, Text, IntWritable>{        private Text text = new Text();        private IntWritable intWritable = new IntWritable();        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {            String[] item = value.toString().split(",");            String date = item[0];            String kongqi = item[6];            if(!"DATE".equals(date) && !"N/A".equals(kongqi)){//排除第一行阐明以及未取到数据的行                text.set(date.substring(0, 6));                intWritable.set(Integer.parseInt(kongqi));                context.write(text, intWritable);            }        }    }    public static class AirQualityReducer extends Reducer<Text, IntWritable, Text, IntWritable>{        private IntWritable res = new IntWritable();        public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {            int aqi = 0;            int cnt = 0;            for(IntWritable iw : value){                aqi += iw.get();                cnt++;            }            int aqi_avg = aqi/cnt;            res.set(aqi_avg);            context.write(key, res);        }    }    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        Job job = new Job(conf, "AirQuality");        job.setJarByClass(AirQuality.class);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(AirQualityMapprer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        job.setPartitionerClass(HashPartitioner.class);        job.setReducerClass(AirQualityReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        job.setNumReduceTasks(Integer.parseInt(args[2]));        TextInputFormat.setInputPaths(job, args[0]);        TextOutputFormat.setOutputPath(job, new Path(args[1]));        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}

运行后果:

201601    359201602    360201603    365201604    365201605    348201606    342201607    359201608    384