共计 5610 个字符,预计需要花费 15 分钟才能阅读完成。
输出数据文件 AvgTemperature.txt
:
DATE,HOUR,COND,PRES,HUM,TMP,AQI,PM2.5,PM10 | |
20160602,00, 霾,1984,130,9,390,348,300 | |
20160802,01, 霾,1163,81,8,393,368,302 | |
20160706,02, 霾,1079,108,17,360,394,306 | |
20160706,03, 霾,1116,79,6,339,387,303 | |
20160502,04, 霾,1198,98,16,357,325,307 | |
20160602,05, 霾,1762,126,9,324,316,301 | |
20160408,06, 霾,1996,131,3,349,344,301 | |
20160604,07, 霾,1952,119,26,347,300,309 | |
20160105,08, 霾,1410,81,8,350,395,307 | |
20160104,09, 霾,1718,130,4,352,335,308 | |
20160501,10, 霾,1714,119,27,310,336,307 | |
20160601,11, 霾,1660,130,23,311,364,302 | |
20160606,12, 霾,1598,96,12,369,346,309 | |
20160602,13, 霾,1673,127,2,343,346,303 | |
20160706,14, 霾,1578,122,8,360,323,307 | |
20160707,15, 霾,1237,118,12,384,384,301 | |
20160205,16, 霾,1231,78,9,361,357,302 | |
20160605,17, 霾,1166,86,30,350,388,307 | |
20160506,18, 霾,1426,94,2,378,372,305 | |
20160805,19, 霾,1874,144,20,376,327,302 | |
20160405,20, 霾,1778,94,22,360,335,304 | |
20160104,21, 霾,1055,64,22,376,361,305 | |
20160304,22, 霾,1349,78,15,367,384,308 | |
20160203,23, 霾,2004,110,2,359,371,304 | |
20160603,24, 霾,1375,115,19,308,301,308 | |
20160402,25, 霾,1201,69,5,387,342,305 | |
20160707,26, 霾,1272,112,23,348,333,307 | |
20160702,27, 霾,1738,60,12,393,300,303 | |
20160301,28, 霾,1752,107,12,364,331,301 | |
20160704,29, 霾,1442,65,9,332,369,308 |
第一题:编写月平均气温统计程序
import java.io.IOException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; | |
public class AvgTemperature { | |
public static class StatMapper extends Mapper<Object, Text, Text, IntWritable> {private IntWritable intValue = new IntWritable(); | |
private Text dateKey = new Text(); | |
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] items = value.toString().split(","); | |
String date = items[0]; | |
String tmp = items[5]; | |
if(!"DATE".equals(date) && !"N/A".equals(tmp)){// 排除第一行阐明以及未取到数据的行 | |
dateKey.set(date.substring(0, 6)); | |
intValue.set(Integer.parseInt(tmp)); | |
context.write(dateKey, intValue); | |
} | |
} | |
} | |
public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable(); | |
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { | |
int tmp_sum = 0; | |
int count = 0; | |
for(IntWritable val : values){tmp_sum += val.get(); | |
count++; | |
} | |
int tmp_avg = tmp_sum/count; | |
result.set(tmp_avg); | |
context.write(key, result); | |
} | |
} | |
public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration(); | |
Job job = new Job(conf, "AvgTemperature"); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
job.setJarByClass(AvgTemperature.class); | |
job.setMapperClass(StatMapper.class); | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(IntWritable.class); | |
job.setReducerClass(StatReducer.class); | |
job.setPartitionerClass(HashPartitioner.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
TextOutputFormat.setOutputPath(job, new Path(args[1])); | |
TextInputFormat.setInputPaths(job, args[0]); | |
job.setNumReduceTasks(Integer.parseInt(args[2])); | |
System.exit(job.waitForCompletion(true) ? 0 : 1); | |
} | |
} |
运行后果:
201601 11 | |
201602 5 | |
201603 13 | |
201604 10 | |
201605 15 | |
201606 16 | |
201607 12 | |
201608 14 |
第二题:编写每日空气质量统计程序
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; | |
import java.io.IOException; | |
/** | |
* @Author Natasha | |
* @Description | |
* @Date 2020/10/30 20:37 | |
**/ | |
public class AirQuality { | |
public static class AirQualityMapprer extends Mapper<Object, Text, Text, IntWritable>{private Text text = new Text(); | |
private IntWritable intWritable = new IntWritable(); | |
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] item = value.toString().split(","); | |
String date = item[0]; | |
String kongqi = item[6]; | |
if(!"DATE".equals(date) && !"N/A".equals(kongqi)){// 排除第一行阐明以及未取到数据的行 | |
text.set(date.substring(0, 6)); | |
intWritable.set(Integer.parseInt(kongqi)); | |
context.write(text, intWritable); | |
} | |
} | |
} | |
public static class AirQualityReducer extends Reducer<Text, IntWritable, Text, IntWritable>{private IntWritable res = new IntWritable(); | |
public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { | |
int aqi = 0; | |
int cnt = 0; | |
for(IntWritable iw : value){aqi += iw.get(); | |
cnt++; | |
} | |
int aqi_avg = aqi/cnt; | |
res.set(aqi_avg); | |
context.write(key, res); | |
} | |
} | |
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration(); | |
Job job = new Job(conf, "AirQuality"); | |
job.setJarByClass(AirQuality.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setMapperClass(AirQualityMapprer.class); | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(IntWritable.class); | |
job.setPartitionerClass(HashPartitioner.class); | |
job.setReducerClass(AirQualityReducer.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setNumReduceTasks(Integer.parseInt(args[2])); | |
TextInputFormat.setInputPaths(job, args[0]); | |
TextOutputFormat.setOutputPath(job, new Path(args[1])); | |
System.exit(job.waitForCompletion(true) ? 0 : 1); | |
} | |
} |
运行后果:
201601 359 | |
201602 360 | |
201603 365 | |
201604 365 | |
201605 348 | |
201606 342 | |
201607 359 | |
201608 384 |
正文完