输出数据文件 AvgTemperature.txt
:
DATE,HOUR,COND,PRES,HUM,TMP,AQI,PM2.5,PM10
20160602,00, 霾,1984,130,9,390,348,300
20160802,01, 霾,1163,81,8,393,368,302
20160706,02, 霾,1079,108,17,360,394,306
20160706,03, 霾,1116,79,6,339,387,303
20160502,04, 霾,1198,98,16,357,325,307
20160602,05, 霾,1762,126,9,324,316,301
20160408,06, 霾,1996,131,3,349,344,301
20160604,07, 霾,1952,119,26,347,300,309
20160105,08, 霾,1410,81,8,350,395,307
20160104,09, 霾,1718,130,4,352,335,308
20160501,10, 霾,1714,119,27,310,336,307
20160601,11, 霾,1660,130,23,311,364,302
20160606,12, 霾,1598,96,12,369,346,309
20160602,13, 霾,1673,127,2,343,346,303
20160706,14, 霾,1578,122,8,360,323,307
20160707,15, 霾,1237,118,12,384,384,301
20160205,16, 霾,1231,78,9,361,357,302
20160605,17, 霾,1166,86,30,350,388,307
20160506,18, 霾,1426,94,2,378,372,305
20160805,19, 霾,1874,144,20,376,327,302
20160405,20, 霾,1778,94,22,360,335,304
20160104,21, 霾,1055,64,22,376,361,305
20160304,22, 霾,1349,78,15,367,384,308
20160203,23, 霾,2004,110,2,359,371,304
20160603,24, 霾,1375,115,19,308,301,308
20160402,25, 霾,1201,69,5,387,342,305
20160707,26, 霾,1272,112,23,348,333,307
20160702,27, 霾,1738,60,12,393,300,303
20160301,28, 霾,1752,107,12,364,331,301
20160704,29, 霾,1442,65,9,332,369,308
第一题:编写月平均气温统计程序
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class AvgTemperature {
public static class StatMapper extends Mapper<Object, Text, Text, IntWritable> {private IntWritable intValue = new IntWritable();
private Text dateKey = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] items = value.toString().split(",");
String date = items[0];
String tmp = items[5];
if(!"DATE".equals(date) && !"N/A".equals(tmp)){// 排除第一行阐明以及未取到数据的行
dateKey.set(date.substring(0, 6));
intValue.set(Integer.parseInt(tmp));
context.write(dateKey, intValue);
}
}
}
public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int tmp_sum = 0;
int count = 0;
for(IntWritable val : values){tmp_sum += val.get();
count++;
}
int tmp_avg = tmp_sum/count;
result.set(tmp_avg);
context.write(key, result);
}
}
public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();
Job job = new Job(conf, "AvgTemperature");
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setJarByClass(AvgTemperature.class);
job.setMapperClass(StatMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(StatReducer.class);
job.setPartitionerClass(HashPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
TextInputFormat.setInputPaths(job, args[0]);
job.setNumReduceTasks(Integer.parseInt(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行后果:
201601 11
201602 5
201603 13
201604 10
201605 15
201606 16
201607 12
201608 14
第二题:编写每日空气质量统计程序
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import java.io.IOException;
/**
* @Author Natasha
* @Description
* @Date 2020/10/30 20:37
**/
public class AirQuality {
public static class AirQualityMapprer extends Mapper<Object, Text, Text, IntWritable>{private Text text = new Text();
private IntWritable intWritable = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] item = value.toString().split(",");
String date = item[0];
String kongqi = item[6];
if(!"DATE".equals(date) && !"N/A".equals(kongqi)){// 排除第一行阐明以及未取到数据的行
text.set(date.substring(0, 6));
intWritable.set(Integer.parseInt(kongqi));
context.write(text, intWritable);
}
}
}
public static class AirQualityReducer extends Reducer<Text, IntWritable, Text, IntWritable>{private IntWritable res = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
int aqi = 0;
int cnt = 0;
for(IntWritable iw : value){aqi += iw.get();
cnt++;
}
int aqi_avg = aqi/cnt;
res.set(aqi_avg);
context.write(key, res);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();
Job job = new Job(conf, "AirQuality");
job.setJarByClass(AirQuality.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(AirQualityMapprer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setPartitionerClass(HashPartitioner.class);
job.setReducerClass(AirQualityReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(Integer.parseInt(args[2]));
TextInputFormat.setInputPaths(job, args[0]);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
运行后果:
201601 359
201602 360
201603 365
201604 365
201605 348
201606 342
201607 359
201608 384